|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622 |
- package cache
-
- import (
- "strconv"
- "time"
-
- "github.com/go-redis/redis"
- )
-
- var pools *redis.ClusterClient
-
- func NewRedisCluster(addrs []string) error {
- opt := &redis.ClusterOptions{
- Addrs: addrs,
- PoolSize: redisPoolSize,
- PoolTimeout: redisPoolTTL,
- IdleTimeout: redisIdleTTL,
- DialTimeout: redisDialTTL,
- ReadTimeout: redisReadTTL,
- WriteTimeout: redisWriteTTL,
- }
- pools = redis.NewClusterClient(opt)
- if err := pools.Ping().Err(); err != nil {
- return err
- }
- return nil
- }
-
- func RCGet(key string) (interface{}, error) {
- res, err := pools.Get(key).Result()
- if err != nil {
- return nil, convertError(err)
- }
- return []byte(res), nil
- }
- func RCSet(key string, value interface{}) error {
- err := pools.Set(key, value, 0).Err()
- return convertError(err)
- }
- func RCGetSet(key string, value interface{}) (interface{}, error) {
- res, err := pools.GetSet(key, value).Result()
- if err != nil {
- return nil, convertError(err)
- }
- return []byte(res), nil
- }
- func RCSetNx(key string, value interface{}) (int64, error) {
- res, err := pools.SetNX(key, value, 0).Result()
- if err != nil {
- return 0, convertError(err)
- }
- if res {
- return 1, nil
- }
- return 0, nil
- }
- func RCSetEx(key string, value interface{}, timeout int64) error {
- _, err := pools.Set(key, value, time.Duration(timeout)*time.Second).Result()
- if err != nil {
- return convertError(err)
- }
- return nil
- }
-
- // nil表示成功,ErrNil表示数据库内已经存在这个key,其他表示数据库发生错误
- func RCSetNxEx(key string, value interface{}, timeout int64) error {
- res, err := pools.SetNX(key, value, time.Duration(timeout)*time.Second).Result()
- if err != nil {
- return convertError(err)
- }
- if res {
- return nil
- }
- return ErrNil
- }
- func RCMGet(keys ...string) ([]interface{}, error) {
- res, err := pools.MGet(keys...).Result()
- return res, convertError(err)
- }
-
- // 为确保多个key映射到同一个slot,每个key最好加上hash tag,如:{test}
- func RCMSet(kvs map[string]interface{}) error {
- pairs := make([]string, 0, len(kvs)*2)
- for k, v := range kvs {
- val, err := String(v, nil)
- if err != nil {
- return err
- }
- pairs = append(pairs, k, val)
- }
- return convertError(pools.MSet(pairs).Err())
- }
-
- // 为确保多个key映射到同一个slot,每个key最好加上hash tag,如:{test}
- func RCMSetNX(kvs map[string]interface{}) (bool, error) {
- pairs := make([]string, 0, len(kvs)*2)
- for k, v := range kvs {
- val, err := String(v, nil)
- if err != nil {
- return false, err
- }
- pairs = append(pairs, k, val)
- }
- res, err := pools.MSetNX(pairs).Result()
- return res, convertError(err)
- }
- func RCExpireAt(key string, timestamp int64) (int64, error) {
- res, err := pools.ExpireAt(key, time.Unix(timestamp, 0)).Result()
- if err != nil {
- return 0, convertError(err)
- }
- if res {
- return 1, nil
- }
- return 0, nil
- }
- func RCDel(keys ...string) (int64, error) {
- args := make([]interface{}, 0, len(keys))
- for _, key := range keys {
- args = append(args, key)
- }
- res, err := pools.Del(keys...).Result()
- if err != nil {
- return res, convertError(err)
- }
- return res, nil
- }
- func RCIncr(key string) (int64, error) {
- res, err := pools.Incr(key).Result()
- if err != nil {
- return res, convertError(err)
- }
- return res, nil
- }
- func RCIncrBy(key string, delta int64) (int64, error) {
- res, err := pools.IncrBy(key, delta).Result()
- if err != nil {
- return res, convertError(err)
- }
- return res, nil
- }
- func RCExpire(key string, duration int64) (int64, error) {
- res, err := pools.Expire(key, time.Duration(duration)*time.Second).Result()
- if err != nil {
- return 0, convertError(err)
- }
- if res {
- return 1, nil
- }
- return 0, nil
- }
- func RCExists(key string) (bool, error) {
- res, err := pools.Exists(key).Result()
- if err != nil {
- return false, convertError(err)
- }
- if res > 0 {
- return true, nil
- }
- return false, nil
- }
- func RCHGet(key string, field string) (interface{}, error) {
- res, err := pools.HGet(key, field).Result()
- if err != nil {
- return nil, convertError(err)
- }
- return []byte(res), nil
- }
- func RCHLen(key string) (int64, error) {
- res, err := pools.HLen(key).Result()
- if err != nil {
- return res, convertError(err)
- }
- return res, nil
- }
- func RCHSet(key string, field string, val interface{}) error {
- value, err := String(val, nil)
- if err != nil && err != ErrNil {
- return err
- }
- _, err = pools.HSet(key, field, value).Result()
- if err != nil {
- return convertError(err)
- }
- return nil
- }
- func RCHDel(key string, fields ...string) (int64, error) {
- args := make([]interface{}, 0, len(fields)+1)
- args = append(args, key)
- for _, field := range fields {
- args = append(args, field)
- }
- res, err := pools.HDel(key, fields...).Result()
- if err != nil {
- return 0, convertError(err)
- }
- return res, nil
- }
-
- func RCHMGet(key string, fields ...string) (interface{}, error) {
- args := make([]interface{}, 0, len(fields)+1)
- args = append(args, key)
- for _, field := range fields {
- args = append(args, field)
- }
- if len(fields) == 0 {
- return nil, ErrNil
- }
- res, err := pools.HMGet(key, fields...).Result()
- if err != nil {
- return nil, convertError(err)
- }
- return res, nil
- }
- func RCHMSet(key string, kvs ...interface{}) error {
- if len(kvs) == 0 {
- return nil
- }
- if len(kvs)%2 != 0 {
- return ErrWrongArgsNum
- }
- var err error
- v := map[string]interface{}{} // todo change
- v["field"], err = String(kvs[0], nil)
- if err != nil && err != ErrNil {
- return err
- }
- v["value"], err = String(kvs[1], nil)
- if err != nil && err != ErrNil {
- return err
- }
- pairs := make([]string, 0, len(kvs)-2)
- if len(kvs) > 2 {
- for _, kv := range kvs[2:] {
- kvString, err := String(kv, nil)
- if err != nil && err != ErrNil {
- return err
- }
- pairs = append(pairs, kvString)
- }
- }
- v["paris"] = pairs
- _, err = pools.HMSet(key, v).Result()
- if err != nil {
- return convertError(err)
- }
- return nil
- }
-
- func RCHKeys(key string) ([]string, error) {
- res, err := pools.HKeys(key).Result()
- if err != nil {
- return res, convertError(err)
- }
- return res, nil
- }
- func RCHVals(key string) ([]interface{}, error) {
- res, err := pools.HVals(key).Result()
- if err != nil {
- return nil, convertError(err)
- }
- rs := make([]interface{}, 0, len(res))
- for _, res := range res {
- rs = append(rs, res)
- }
- return rs, nil
- }
- func RCHGetAll(key string) (map[string]string, error) {
- vals, err := pools.HGetAll(key).Result()
- if err != nil {
- return nil, convertError(err)
- }
- return vals, nil
- }
- func RCHIncrBy(key, field string, delta int64) (int64, error) {
- res, err := pools.HIncrBy(key, field, delta).Result()
- if err != nil {
- return res, convertError(err)
- }
- return res, nil
- }
- func RCZAdd(key string, kvs ...interface{}) (int64, error) {
- args := make([]interface{}, 0, len(kvs)+1)
- args = append(args, key)
- args = append(args, kvs...)
- if len(kvs) == 0 {
- return 0, nil
- }
- if len(kvs)%2 != 0 {
- return 0, ErrWrongArgsNum
- }
- zs := make([]redis.Z, len(kvs)/2)
- for i := 0; i < len(kvs); i += 2 {
- idx := i / 2
- score, err := Float64(kvs[i], nil)
- if err != nil && err != ErrNil {
- return 0, err
- }
- zs[idx].Score = score
- zs[idx].Member = kvs[i+1]
- }
- res, err := pools.ZAdd(key, zs...).Result()
- if err != nil {
- return res, convertError(err)
- }
- return res, nil
- }
- func RCZRem(key string, members ...string) (int64, error) {
- args := make([]interface{}, 0, len(members))
- args = append(args, key)
- for _, member := range members {
- args = append(args, member)
- }
- res, err := pools.ZRem(key, members).Result()
- if err != nil {
- return res, convertError(err)
- }
- return res, err
- }
-
- func RCZRange(key string, min, max int64, withScores bool) (interface{}, error) {
- res := make([]interface{}, 0)
- if withScores {
- zs, err := pools.ZRangeWithScores(key, min, max).Result()
- if err != nil {
- return nil, convertError(err)
- }
- for _, z := range zs {
- res = append(res, z.Member, strconv.FormatFloat(z.Score, 'f', -1, 64))
- }
- } else {
- ms, err := pools.ZRange(key, min, max).Result()
- if err != nil {
- return nil, convertError(err)
- }
- for _, m := range ms {
- res = append(res, m)
- }
- }
- return res, nil
- }
- func RCZRangeByScoreWithScore(key string, min, max int64) (map[string]int64, error) {
- opt := new(redis.ZRangeBy)
- opt.Min = strconv.FormatInt(int64(min), 10)
- opt.Max = strconv.FormatInt(int64(max), 10)
- opt.Count = -1
- opt.Offset = 0
- vals, err := pools.ZRangeByScoreWithScores(key, *opt).Result()
- if err != nil {
- return nil, convertError(err)
- }
- res := make(map[string]int64, len(vals))
- for _, val := range vals {
- key, err := String(val.Member, nil)
- if err != nil && err != ErrNil {
- return nil, err
- }
- res[key] = int64(val.Score)
- }
- return res, nil
- }
- func RCLRange(key string, start, stop int64) (interface{}, error) {
- res, err := pools.LRange(key, start, stop).Result()
- if err != nil {
- return nil, convertError(err)
- }
- return res, nil
- }
- func RCLSet(key string, index int, value interface{}) error {
- err := pools.LSet(key, int64(index), value).Err()
- return convertError(err)
- }
- func RCLLen(key string) (int64, error) {
- res, err := pools.LLen(key).Result()
- if err != nil {
- return res, convertError(err)
- }
- return res, nil
- }
- func RCLRem(key string, count int, value interface{}) (int, error) {
- val, _ := value.(string)
- res, err := pools.LRem(key, int64(count), val).Result()
- if err != nil {
- return int(res), convertError(err)
- }
- return int(res), nil
- }
- func RCTTl(key string) (int64, error) {
- duration, err := pools.TTL(key).Result()
- if err != nil {
- return int64(duration.Seconds()), convertError(err)
- }
- return int64(duration.Seconds()), nil
- }
- func RCLPop(key string) (interface{}, error) {
- res, err := pools.LPop(key).Result()
- if err != nil {
- return nil, convertError(err)
- }
- return res, nil
- }
- func RCRPop(key string) (interface{}, error) {
- res, err := pools.RPop(key).Result()
- if err != nil {
- return nil, convertError(err)
- }
- return res, nil
- }
- func RCBLPop(key string, timeout int) (interface{}, error) {
- res, err := pools.BLPop(time.Duration(timeout)*time.Second, key).Result()
- if err != nil {
- // 兼容redis 2.x
- if err == redis.Nil {
- return nil, ErrNil
- }
- return nil, err
- }
- return res[1], nil
- }
- func RCBRPop(key string, timeout int) (interface{}, error) {
- res, err := pools.BRPop(time.Duration(timeout)*time.Second, key).Result()
- if err != nil {
- // 兼容redis 2.x
- if err == redis.Nil {
- return nil, ErrNil
- }
- return nil, convertError(err)
- }
- return res[1], nil
- }
- func RCLPush(key string, value ...interface{}) error {
- args := make([]interface{}, 0, len(value)+1)
- args = append(args, key)
- args = append(args, value...)
- vals := make([]string, 0, len(value))
- for _, v := range value {
- val, err := String(v, nil)
- if err != nil && err != ErrNil {
- return err
- }
- vals = append(vals, val)
- }
- _, err := pools.LPush(key, vals).Result() // todo ...
- if err != nil {
- return convertError(err)
- }
- return nil
- }
- func RCRPush(key string, value ...interface{}) error {
- args := make([]interface{}, 0, len(value)+1)
- args = append(args, key)
- args = append(args, value...)
- vals := make([]string, 0, len(value))
- for _, v := range value {
- val, err := String(v, nil)
- if err != nil && err != ErrNil {
- if err == ErrNil {
- continue
- }
- return err
- }
- if val == "" {
- continue
- }
- vals = append(vals, val)
- }
- _, err := pools.RPush(key, vals).Result() // todo ...
- if err != nil {
- return convertError(err)
- }
- return nil
- }
-
- // 为确保srcKey跟destKey映射到同一个slot,srcKey和destKey需要加上hash tag,如:{test}
- func RCBRPopLPush(srcKey string, destKey string, timeout int) (interface{}, error) {
- res, err := pools.BRPopLPush(srcKey, destKey, time.Duration(timeout)*time.Second).Result()
- if err != nil {
- return nil, convertError(err)
- }
- return res, nil
- }
-
- // 为确保srcKey跟destKey映射到同一个slot,srcKey和destKey需要加上hash tag,如:{test}
- func RCRPopLPush(srcKey string, destKey string) (interface{}, error) {
- res, err := pools.RPopLPush(srcKey, destKey).Result()
- if err != nil {
- return nil, convertError(err)
- }
- return res, nil
- }
- func RCSAdd(key string, members ...interface{}) (int64, error) {
- args := make([]interface{}, 0, len(members)+1)
- args = append(args, key)
- args = append(args, members...)
- ms := make([]string, 0, len(members))
- for _, member := range members {
- m, err := String(member, nil)
- if err != nil && err != ErrNil {
- return 0, err
- }
- ms = append(ms, m)
- }
- res, err := pools.SAdd(key, ms).Result() // todo ...
- if err != nil {
- return res, convertError(err)
- }
- return res, nil
- }
- func RCSPop(key string) ([]byte, error) {
- res, err := pools.SPop(key).Result()
- if err != nil {
- return nil, convertError(err)
- }
- return []byte(res), nil
- }
- func RCSIsMember(key string, member interface{}) (bool, error) {
- m, _ := member.(string)
- res, err := pools.SIsMember(key, m).Result()
- if err != nil {
- return res, convertError(err)
- }
- return res, nil
- }
- func RCSRem(key string, members ...interface{}) (int64, error) {
- args := make([]interface{}, 0, len(members)+1)
- args = append(args, key)
- args = append(args, members...)
- ms := make([]string, 0, len(members))
- for _, member := range members {
- m, err := String(member, nil)
- if err != nil && err != ErrNil {
- return 0, err
- }
- ms = append(ms, m)
- }
- res, err := pools.SRem(key, ms).Result() // todo ...
- if err != nil {
- return res, convertError(err)
- }
- return res, nil
- }
- func RCSMembers(key string) ([]string, error) {
- res, err := pools.SMembers(key).Result()
- if err != nil {
- return nil, convertError(err)
- }
- return res, nil
- }
- func RCScriptLoad(luaScript string) (interface{}, error) {
- res, err := pools.ScriptLoad(luaScript).Result()
- if err != nil {
- return nil, convertError(err)
- }
- return res, nil
- }
- func RCEvalSha(sha1 string, numberKeys int, keysArgs ...interface{}) (interface{}, error) {
- vals := make([]interface{}, 0, len(keysArgs)+2)
- vals = append(vals, sha1, numberKeys)
- vals = append(vals, keysArgs...)
- keys := make([]string, 0, numberKeys)
- args := make([]string, 0, len(keysArgs)-numberKeys)
- for i, value := range keysArgs {
- val, err := String(value, nil)
- if err != nil && err != ErrNil {
- return nil, err
- }
- if i < numberKeys {
- keys = append(keys, val)
- } else {
- args = append(args, val)
- }
- }
- res, err := pools.EvalSha(sha1, keys, args).Result()
- if err != nil {
- return nil, convertError(err)
- }
- return res, nil
- }
- func RCEval(luaScript string, numberKeys int, keysArgs ...interface{}) (interface{}, error) {
- vals := make([]interface{}, 0, len(keysArgs)+2)
- vals = append(vals, luaScript, numberKeys)
- vals = append(vals, keysArgs...)
- keys := make([]string, 0, numberKeys)
- args := make([]string, 0, len(keysArgs)-numberKeys)
- for i, value := range keysArgs {
- val, err := String(value, nil)
- if err != nil && err != ErrNil {
- return nil, err
- }
- if i < numberKeys {
- keys = append(keys, val)
- } else {
- args = append(args, val)
- }
- }
- res, err := pools.Eval(luaScript, keys, args).Result()
- if err != nil {
- return nil, convertError(err)
- }
- return res, nil
- }
- func RCGetBit(key string, offset int64) (int64, error) {
- res, err := pools.GetBit(key, offset).Result()
- if err != nil {
- return res, convertError(err)
- }
- return res, nil
- }
- func RCSetBit(key string, offset uint32, value int) (int, error) {
- res, err := pools.SetBit(key, int64(offset), value).Result()
- return int(res), convertError(err)
- }
- func RCGetClient() *redis.ClusterClient {
- return pools
- }
- func convertError(err error) error {
- if err == redis.Nil {
- // 为了兼容redis 2.x,这里不返回 ErrNil,ErrNil在调用redis_cluster_reply函数时才返回
- return nil
- }
- return err
- }
|