package cache import ( "strconv" "time" "github.com/go-redis/redis" ) var pools *redis.ClusterClient func NewRedisCluster(addrs []string) error { opt := &redis.ClusterOptions{ Addrs: addrs, PoolSize: redisPoolSize, PoolTimeout: redisPoolTTL, IdleTimeout: redisIdleTTL, DialTimeout: redisDialTTL, ReadTimeout: redisReadTTL, WriteTimeout: redisWriteTTL, } pools = redis.NewClusterClient(opt) if err := pools.Ping().Err(); err != nil { return err } return nil } func RCGet(key string) (interface{}, error) { res, err := pools.Get(key).Result() if err != nil { return nil, convertError(err) } return []byte(res), nil } func RCSet(key string, value interface{}) error { err := pools.Set(key, value, 0).Err() return convertError(err) } func RCGetSet(key string, value interface{}) (interface{}, error) { res, err := pools.GetSet(key, value).Result() if err != nil { return nil, convertError(err) } return []byte(res), nil } func RCSetNx(key string, value interface{}) (int64, error) { res, err := pools.SetNX(key, value, 0).Result() if err != nil { return 0, convertError(err) } if res { return 1, nil } return 0, nil } func RCSetEx(key string, value interface{}, timeout int64) error { _, err := pools.Set(key, value, time.Duration(timeout)*time.Second).Result() if err != nil { return convertError(err) } return nil } // nil表示成功,ErrNil表示数据库内已经存在这个key,其他表示数据库发生错误 func RCSetNxEx(key string, value interface{}, timeout int64) error { res, err := pools.SetNX(key, value, time.Duration(timeout)*time.Second).Result() if err != nil { return convertError(err) } if res { return nil } return ErrNil } func RCMGet(keys ...string) ([]interface{}, error) { res, err := pools.MGet(keys...).Result() return res, convertError(err) } // 为确保多个key映射到同一个slot,每个key最好加上hash tag,如:{test} func RCMSet(kvs map[string]interface{}) error { pairs := make([]string, 0, len(kvs)*2) for k, v := range kvs { val, err := String(v, nil) if err != nil { return err } pairs = append(pairs, k, val) } return convertError(pools.MSet(pairs).Err()) } // 为确保多个key映射到同一个slot,每个key最好加上hash tag,如:{test} func RCMSetNX(kvs map[string]interface{}) (bool, error) { pairs := make([]string, 0, len(kvs)*2) for k, v := range kvs { val, err := String(v, nil) if err != nil { return false, err } pairs = append(pairs, k, val) } res, err := pools.MSetNX(pairs).Result() return res, convertError(err) } func RCExpireAt(key string, timestamp int64) (int64, error) { res, err := pools.ExpireAt(key, time.Unix(timestamp, 0)).Result() if err != nil { return 0, convertError(err) } if res { return 1, nil } return 0, nil } func RCDel(keys ...string) (int64, error) { args := make([]interface{}, 0, len(keys)) for _, key := range keys { args = append(args, key) } res, err := pools.Del(keys...).Result() if err != nil { return res, convertError(err) } return res, nil } func RCIncr(key string) (int64, error) { res, err := pools.Incr(key).Result() if err != nil { return res, convertError(err) } return res, nil } func RCIncrBy(key string, delta int64) (int64, error) { res, err := pools.IncrBy(key, delta).Result() if err != nil { return res, convertError(err) } return res, nil } func RCExpire(key string, duration int64) (int64, error) { res, err := pools.Expire(key, time.Duration(duration)*time.Second).Result() if err != nil { return 0, convertError(err) } if res { return 1, nil } return 0, nil } func RCExists(key string) (bool, error) { res, err := pools.Exists(key).Result() if err != nil { return false, convertError(err) } if res > 0 { return true, nil } return false, nil } func RCHGet(key string, field string) (interface{}, error) { res, err := pools.HGet(key, field).Result() if err != nil { return nil, convertError(err) } return []byte(res), nil } func RCHLen(key string) (int64, error) { res, err := pools.HLen(key).Result() if err != nil { return res, convertError(err) } return res, nil } func RCHSet(key string, field string, val interface{}) error { value, err := String(val, nil) if err != nil && err != ErrNil { return err } _, err = pools.HSet(key, field, value).Result() if err != nil { return convertError(err) } return nil } func RCHDel(key string, fields ...string) (int64, error) { args := make([]interface{}, 0, len(fields)+1) args = append(args, key) for _, field := range fields { args = append(args, field) } res, err := pools.HDel(key, fields...).Result() if err != nil { return 0, convertError(err) } return res, nil } func RCHMGet(key string, fields ...string) (interface{}, error) { args := make([]interface{}, 0, len(fields)+1) args = append(args, key) for _, field := range fields { args = append(args, field) } if len(fields) == 0 { return nil, ErrNil } res, err := pools.HMGet(key, fields...).Result() if err != nil { return nil, convertError(err) } return res, nil } func RCHMSet(key string, kvs ...interface{}) error { if len(kvs) == 0 { return nil } if len(kvs)%2 != 0 { return ErrWrongArgsNum } var err error v := map[string]interface{}{} // todo change v["field"], err = String(kvs[0], nil) if err != nil && err != ErrNil { return err } v["value"], err = String(kvs[1], nil) if err != nil && err != ErrNil { return err } pairs := make([]string, 0, len(kvs)-2) if len(kvs) > 2 { for _, kv := range kvs[2:] { kvString, err := String(kv, nil) if err != nil && err != ErrNil { return err } pairs = append(pairs, kvString) } } v["paris"] = pairs _, err = pools.HMSet(key, v).Result() if err != nil { return convertError(err) } return nil } func RCHKeys(key string) ([]string, error) { res, err := pools.HKeys(key).Result() if err != nil { return res, convertError(err) } return res, nil } func RCHVals(key string) ([]interface{}, error) { res, err := pools.HVals(key).Result() if err != nil { return nil, convertError(err) } rs := make([]interface{}, 0, len(res)) for _, res := range res { rs = append(rs, res) } return rs, nil } func RCHGetAll(key string) (map[string]string, error) { vals, err := pools.HGetAll(key).Result() if err != nil { return nil, convertError(err) } return vals, nil } func RCHIncrBy(key, field string, delta int64) (int64, error) { res, err := pools.HIncrBy(key, field, delta).Result() if err != nil { return res, convertError(err) } return res, nil } func RCZAdd(key string, kvs ...interface{}) (int64, error) { args := make([]interface{}, 0, len(kvs)+1) args = append(args, key) args = append(args, kvs...) if len(kvs) == 0 { return 0, nil } if len(kvs)%2 != 0 { return 0, ErrWrongArgsNum } zs := make([]redis.Z, len(kvs)/2) for i := 0; i < len(kvs); i += 2 { idx := i / 2 score, err := Float64(kvs[i], nil) if err != nil && err != ErrNil { return 0, err } zs[idx].Score = score zs[idx].Member = kvs[i+1] } res, err := pools.ZAdd(key, zs...).Result() if err != nil { return res, convertError(err) } return res, nil } func RCZRem(key string, members ...string) (int64, error) { args := make([]interface{}, 0, len(members)) args = append(args, key) for _, member := range members { args = append(args, member) } res, err := pools.ZRem(key, members).Result() if err != nil { return res, convertError(err) } return res, err } func RCZRange(key string, min, max int64, withScores bool) (interface{}, error) { res := make([]interface{}, 0) if withScores { zs, err := pools.ZRangeWithScores(key, min, max).Result() if err != nil { return nil, convertError(err) } for _, z := range zs { res = append(res, z.Member, strconv.FormatFloat(z.Score, 'f', -1, 64)) } } else { ms, err := pools.ZRange(key, min, max).Result() if err != nil { return nil, convertError(err) } for _, m := range ms { res = append(res, m) } } return res, nil } func RCZRangeByScoreWithScore(key string, min, max int64) (map[string]int64, error) { opt := new(redis.ZRangeBy) opt.Min = strconv.FormatInt(int64(min), 10) opt.Max = strconv.FormatInt(int64(max), 10) opt.Count = -1 opt.Offset = 0 vals, err := pools.ZRangeByScoreWithScores(key, *opt).Result() if err != nil { return nil, convertError(err) } res := make(map[string]int64, len(vals)) for _, val := range vals { key, err := String(val.Member, nil) if err != nil && err != ErrNil { return nil, err } res[key] = int64(val.Score) } return res, nil } func RCLRange(key string, start, stop int64) (interface{}, error) { res, err := pools.LRange(key, start, stop).Result() if err != nil { return nil, convertError(err) } return res, nil } func RCLSet(key string, index int, value interface{}) error { err := pools.LSet(key, int64(index), value).Err() return convertError(err) } func RCLLen(key string) (int64, error) { res, err := pools.LLen(key).Result() if err != nil { return res, convertError(err) } return res, nil } func RCLRem(key string, count int, value interface{}) (int, error) { val, _ := value.(string) res, err := pools.LRem(key, int64(count), val).Result() if err != nil { return int(res), convertError(err) } return int(res), nil } func RCTTl(key string) (int64, error) { duration, err := pools.TTL(key).Result() if err != nil { return int64(duration.Seconds()), convertError(err) } return int64(duration.Seconds()), nil } func RCLPop(key string) (interface{}, error) { res, err := pools.LPop(key).Result() if err != nil { return nil, convertError(err) } return res, nil } func RCRPop(key string) (interface{}, error) { res, err := pools.RPop(key).Result() if err != nil { return nil, convertError(err) } return res, nil } func RCBLPop(key string, timeout int) (interface{}, error) { res, err := pools.BLPop(time.Duration(timeout)*time.Second, key).Result() if err != nil { // 兼容redis 2.x if err == redis.Nil { return nil, ErrNil } return nil, err } return res[1], nil } func RCBRPop(key string, timeout int) (interface{}, error) { res, err := pools.BRPop(time.Duration(timeout)*time.Second, key).Result() if err != nil { // 兼容redis 2.x if err == redis.Nil { return nil, ErrNil } return nil, convertError(err) } return res[1], nil } func RCLPush(key string, value ...interface{}) error { args := make([]interface{}, 0, len(value)+1) args = append(args, key) args = append(args, value...) vals := make([]string, 0, len(value)) for _, v := range value { val, err := String(v, nil) if err != nil && err != ErrNil { return err } vals = append(vals, val) } _, err := pools.LPush(key, vals).Result() // todo ... if err != nil { return convertError(err) } return nil } func RCRPush(key string, value ...interface{}) error { args := make([]interface{}, 0, len(value)+1) args = append(args, key) args = append(args, value...) vals := make([]string, 0, len(value)) for _, v := range value { val, err := String(v, nil) if err != nil && err != ErrNil { if err == ErrNil { continue } return err } if val == "" { continue } vals = append(vals, val) } _, err := pools.RPush(key, vals).Result() // todo ... if err != nil { return convertError(err) } return nil } // 为确保srcKey跟destKey映射到同一个slot,srcKey和destKey需要加上hash tag,如:{test} func RCBRPopLPush(srcKey string, destKey string, timeout int) (interface{}, error) { res, err := pools.BRPopLPush(srcKey, destKey, time.Duration(timeout)*time.Second).Result() if err != nil { return nil, convertError(err) } return res, nil } // 为确保srcKey跟destKey映射到同一个slot,srcKey和destKey需要加上hash tag,如:{test} func RCRPopLPush(srcKey string, destKey string) (interface{}, error) { res, err := pools.RPopLPush(srcKey, destKey).Result() if err != nil { return nil, convertError(err) } return res, nil } func RCSAdd(key string, members ...interface{}) (int64, error) { args := make([]interface{}, 0, len(members)+1) args = append(args, key) args = append(args, members...) ms := make([]string, 0, len(members)) for _, member := range members { m, err := String(member, nil) if err != nil && err != ErrNil { return 0, err } ms = append(ms, m) } res, err := pools.SAdd(key, ms).Result() // todo ... if err != nil { return res, convertError(err) } return res, nil } func RCSPop(key string) ([]byte, error) { res, err := pools.SPop(key).Result() if err != nil { return nil, convertError(err) } return []byte(res), nil } func RCSIsMember(key string, member interface{}) (bool, error) { m, _ := member.(string) res, err := pools.SIsMember(key, m).Result() if err != nil { return res, convertError(err) } return res, nil } func RCSRem(key string, members ...interface{}) (int64, error) { args := make([]interface{}, 0, len(members)+1) args = append(args, key) args = append(args, members...) ms := make([]string, 0, len(members)) for _, member := range members { m, err := String(member, nil) if err != nil && err != ErrNil { return 0, err } ms = append(ms, m) } res, err := pools.SRem(key, ms).Result() // todo ... if err != nil { return res, convertError(err) } return res, nil } func RCSMembers(key string) ([]string, error) { res, err := pools.SMembers(key).Result() if err != nil { return nil, convertError(err) } return res, nil } func RCScriptLoad(luaScript string) (interface{}, error) { res, err := pools.ScriptLoad(luaScript).Result() if err != nil { return nil, convertError(err) } return res, nil } func RCEvalSha(sha1 string, numberKeys int, keysArgs ...interface{}) (interface{}, error) { vals := make([]interface{}, 0, len(keysArgs)+2) vals = append(vals, sha1, numberKeys) vals = append(vals, keysArgs...) keys := make([]string, 0, numberKeys) args := make([]string, 0, len(keysArgs)-numberKeys) for i, value := range keysArgs { val, err := String(value, nil) if err != nil && err != ErrNil { return nil, err } if i < numberKeys { keys = append(keys, val) } else { args = append(args, val) } } res, err := pools.EvalSha(sha1, keys, args).Result() if err != nil { return nil, convertError(err) } return res, nil } func RCEval(luaScript string, numberKeys int, keysArgs ...interface{}) (interface{}, error) { vals := make([]interface{}, 0, len(keysArgs)+2) vals = append(vals, luaScript, numberKeys) vals = append(vals, keysArgs...) keys := make([]string, 0, numberKeys) args := make([]string, 0, len(keysArgs)-numberKeys) for i, value := range keysArgs { val, err := String(value, nil) if err != nil && err != ErrNil { return nil, err } if i < numberKeys { keys = append(keys, val) } else { args = append(args, val) } } res, err := pools.Eval(luaScript, keys, args).Result() if err != nil { return nil, convertError(err) } return res, nil } func RCGetBit(key string, offset int64) (int64, error) { res, err := pools.GetBit(key, offset).Result() if err != nil { return res, convertError(err) } return res, nil } func RCSetBit(key string, offset uint32, value int) (int, error) { res, err := pools.SetBit(key, int64(offset), value).Result() return int(res), convertError(err) } func RCGetClient() *redis.ClusterClient { return pools } func convertError(err error) error { if err == redis.Nil { // 为了兼容redis 2.x,这里不返回 ErrNil,ErrNil在调用redis_cluster_reply函数时才返回 return nil } return err }