Bläddra i källkod

ADD Redis

tags/v1.2.5
DengBiao 2 år sedan
förälder
incheckning
8b150606b9
8 ändrade filer med 2421 tillägg och 1 borttagningar
  1. +4
    -1
      go.mod
  2. +11
    -0
      md/kudian.go
  3. +19
    -0
      pay/pay_by_kudian.go
  4. +421
    -0
      utils/cache/base.go
  5. +403
    -0
      utils/cache/redis.go
  6. +622
    -0
      utils/cache/redis_cluster.go
  7. +324
    -0
      utils/cache/redis_pool.go
  8. +617
    -0
      utils/cache/redis_pool_cluster.go

+ 4
- 1
go.mod Visa fil

@@ -4,11 +4,14 @@ go 1.15

require (
github.com/gin-gonic/gin v1.8.0
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-sql-driver/mysql v1.6.0
github.com/gomodule/redigo v1.8.8
github.com/iGoogle-ink/gopay v1.5.36
github.com/iGoogle-ink/gotil v1.0.20
github.com/pkg/errors v0.9.1
github.com/shopspring/decimal v1.2.0
go.uber.org/zap v1.16.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
xorm.io/builder v0.3.10 // indirect
xorm.io/xorm v1.3.0
)

+ 11
- 0
md/kudian.go Visa fil

@@ -35,6 +35,17 @@ type KuDianAliAppPayParams struct {
} `json:"must_params"`
}

type MakePayParamsByKuDian struct {
RequestUrl string `json:"request_url"`
Params interface{} `json:"params"`
}

const (
WX_APPLET_PAY_URL = "zy-base-widget-sub/pages/cashier/cashier?unique_identifier="
WX_APP_TO_APPLET_PAY_UNIQUE_IDENTIFIER = "%s:wx_app_to_applet_pay_unique_identifier:%s:%s" // 酷点支付 - App跳转小程序支付 (mid + store_id + 纳秒时间戳)
WX_APP_TO_APPLET_PAY_UNIQUE_IDENTIFIER_CACHE_TIME = 60 * 10 //缓存10分钟
)

// ########################################### 酷点支付 - 回调结构体 ###############################################

type KuDianPayCallback struct {


+ 19
- 0
pay/pay_by_kudian.go Visa fil

@@ -3,10 +3,12 @@ package pay
import (
"code.fnuoos.com/go_rely_warehouse/zyos_go_pay.git/md"
zhios_pay_utils "code.fnuoos.com/go_rely_warehouse/zyos_go_pay.git/utils"
"code.fnuoos.com/go_rely_warehouse/zyos_go_pay.git/utils/cache"
"encoding/json"
"fmt"
"github.com/gin-gonic/gin"
"github.com/pkg/errors"
"time"
)

//酷点 - 支付宝APP支付
@@ -120,3 +122,20 @@ func CheckAllCallbackParamsByKuDian(c *gin.Context) (interface{}, error) {
zhios_pay_utils.FilePutContents("CheckAllCallbackParamsByKuDian", zhios_pay_utils.SerializeStr(dataPay))
return dataPay, nil
}

func MakePayParamsByKuDian(url, uniqueIdentifierOne, uniqueIdentifierTwo, appletOriginalId, queryParams string, params map[string]interface{}) (map[string]string, error) {
if params == nil {
return nil, errors.New("参数解析错误")
}
data := md.MakePayParamsByKuDian{
RequestUrl: url,
Params: params,
}
redisKey := fmt.Sprintf(md.WX_APP_TO_APPLET_PAY_UNIQUE_IDENTIFIER, uniqueIdentifierOne, uniqueIdentifierTwo, zhios_pay_utils.AnyToString(time.Now().UnixNano()))
cache.SetJson(redisKey, data, md.WX_APP_TO_APPLET_PAY_UNIQUE_IDENTIFIER_CACHE_TIME)
var r = map[string]string{}
r["use_pay_method"] = "third_party"
r["applet_original_id"] = appletOriginalId
r["wx_url"] = md.WX_APPLET_PAY_URL + redisKey + "&" + queryParams
return r, nil
}

+ 421
- 0
utils/cache/base.go Visa fil

@@ -0,0 +1,421 @@
package cache

import (
"errors"
"fmt"
"strconv"
"time"
)

const (
redisDialTTL = 10 * time.Second
redisReadTTL = 3 * time.Second
redisWriteTTL = 3 * time.Second
redisIdleTTL = 10 * time.Second
redisPoolTTL = 10 * time.Second
redisPoolSize int = 512
redisMaxIdleConn int = 64
redisMaxActive int = 512
)

var (
ErrNil = errors.New("nil return")
ErrWrongArgsNum = errors.New("args num error")
ErrNegativeInt = errors.New("redis cluster: unexpected value for Uint64")
)

// 以下为提供类型转换

func Int(reply interface{}, err error) (int, error) {
if err != nil {
return 0, err
}
switch reply := reply.(type) {
case int:
return reply, nil
case int8:
return int(reply), nil
case int16:
return int(reply), nil
case int32:
return int(reply), nil
case int64:
x := int(reply)
if int64(x) != reply {
return 0, strconv.ErrRange
}
return x, nil
case uint:
n := int(reply)
if n < 0 {
return 0, strconv.ErrRange
}
return n, nil
case uint8:
return int(reply), nil
case uint16:
return int(reply), nil
case uint32:
n := int(reply)
if n < 0 {
return 0, strconv.ErrRange
}
return n, nil
case uint64:
n := int(reply)
if n < 0 {
return 0, strconv.ErrRange
}
return n, nil
case []byte:
data := string(reply)
if len(data) == 0 {
return 0, ErrNil
}

n, err := strconv.ParseInt(data, 10, 0)
return int(n), err
case string:
if len(reply) == 0 {
return 0, ErrNil
}

n, err := strconv.ParseInt(reply, 10, 0)
return int(n), err
case nil:
return 0, ErrNil
case error:
return 0, reply
}
return 0, fmt.Errorf("redis cluster: unexpected type for Int, got type %T", reply)
}

func Int64(reply interface{}, err error) (int64, error) {
if err != nil {
return 0, err
}
switch reply := reply.(type) {
case int:
return int64(reply), nil
case int8:
return int64(reply), nil
case int16:
return int64(reply), nil
case int32:
return int64(reply), nil
case int64:
return reply, nil
case uint:
n := int64(reply)
if n < 0 {
return 0, strconv.ErrRange
}
return n, nil
case uint8:
return int64(reply), nil
case uint16:
return int64(reply), nil
case uint32:
return int64(reply), nil
case uint64:
n := int64(reply)
if n < 0 {
return 0, strconv.ErrRange
}
return n, nil
case []byte:
data := string(reply)
if len(data) == 0 {
return 0, ErrNil
}

n, err := strconv.ParseInt(data, 10, 64)
return n, err
case string:
if len(reply) == 0 {
return 0, ErrNil
}

n, err := strconv.ParseInt(reply, 10, 64)
return n, err
case nil:
return 0, ErrNil
case error:
return 0, reply
}
return 0, fmt.Errorf("redis cluster: unexpected type for Int64, got type %T", reply)
}

func Uint64(reply interface{}, err error) (uint64, error) {
if err != nil {
return 0, err
}
switch reply := reply.(type) {
case uint:
return uint64(reply), nil
case uint8:
return uint64(reply), nil
case uint16:
return uint64(reply), nil
case uint32:
return uint64(reply), nil
case uint64:
return reply, nil
case int:
if reply < 0 {
return 0, ErrNegativeInt
}
return uint64(reply), nil
case int8:
if reply < 0 {
return 0, ErrNegativeInt
}
return uint64(reply), nil
case int16:
if reply < 0 {
return 0, ErrNegativeInt
}
return uint64(reply), nil
case int32:
if reply < 0 {
return 0, ErrNegativeInt
}
return uint64(reply), nil
case int64:
if reply < 0 {
return 0, ErrNegativeInt
}
return uint64(reply), nil
case []byte:
data := string(reply)
if len(data) == 0 {
return 0, ErrNil
}

n, err := strconv.ParseUint(data, 10, 64)
return n, err
case string:
if len(reply) == 0 {
return 0, ErrNil
}

n, err := strconv.ParseUint(reply, 10, 64)
return n, err
case nil:
return 0, ErrNil
case error:
return 0, reply
}
return 0, fmt.Errorf("redis cluster: unexpected type for Uint64, got type %T", reply)
}

func Float64(reply interface{}, err error) (float64, error) {
if err != nil {
return 0, err
}

var value float64
err = nil
switch v := reply.(type) {
case float32:
value = float64(v)
case float64:
value = v
case int:
value = float64(v)
case int8:
value = float64(v)
case int16:
value = float64(v)
case int32:
value = float64(v)
case int64:
value = float64(v)
case uint:
value = float64(v)
case uint8:
value = float64(v)
case uint16:
value = float64(v)
case uint32:
value = float64(v)
case uint64:
value = float64(v)
case []byte:
data := string(v)
if len(data) == 0 {
return 0, ErrNil
}
value, err = strconv.ParseFloat(string(v), 64)
case string:
if len(v) == 0 {
return 0, ErrNil
}
value, err = strconv.ParseFloat(v, 64)
case nil:
err = ErrNil
case error:
err = v
default:
err = fmt.Errorf("redis cluster: unexpected type for Float64, got type %T", v)
}

return value, err
}

func Bool(reply interface{}, err error) (bool, error) {
if err != nil {
return false, err
}
switch reply := reply.(type) {
case bool:
return reply, nil
case int64:
return reply != 0, nil
case []byte:
data := string(reply)
if len(data) == 0 {
return false, ErrNil
}

return strconv.ParseBool(data)
case string:
if len(reply) == 0 {
return false, ErrNil
}

return strconv.ParseBool(reply)
case nil:
return false, ErrNil
case error:
return false, reply
}
return false, fmt.Errorf("redis cluster: unexpected type for Bool, got type %T", reply)
}

func Bytes(reply interface{}, err error) ([]byte, error) {
if err != nil {
return nil, err
}
switch reply := reply.(type) {
case []byte:
if len(reply) == 0 {
return nil, ErrNil
}
return reply, nil
case string:
data := []byte(reply)
if len(data) == 0 {
return nil, ErrNil
}
return data, nil
case nil:
return nil, ErrNil
case error:
return nil, reply
}
return nil, fmt.Errorf("redis cluster: unexpected type for Bytes, got type %T", reply)
}

func String(reply interface{}, err error) (string, error) {
if err != nil {
return "", err
}

value := ""
err = nil
switch v := reply.(type) {
case string:
if len(v) == 0 {
return "", ErrNil
}

value = v
case []byte:
if len(v) == 0 {
return "", ErrNil
}

value = string(v)
case int:
value = strconv.FormatInt(int64(v), 10)
case int8:
value = strconv.FormatInt(int64(v), 10)
case int16:
value = strconv.FormatInt(int64(v), 10)
case int32:
value = strconv.FormatInt(int64(v), 10)
case int64:
value = strconv.FormatInt(v, 10)
case uint:
value = strconv.FormatUint(uint64(v), 10)
case uint8:
value = strconv.FormatUint(uint64(v), 10)
case uint16:
value = strconv.FormatUint(uint64(v), 10)
case uint32:
value = strconv.FormatUint(uint64(v), 10)
case uint64:
value = strconv.FormatUint(v, 10)
case float32:
value = strconv.FormatFloat(float64(v), 'f', -1, 32)
case float64:
value = strconv.FormatFloat(v, 'f', -1, 64)
case bool:
value = strconv.FormatBool(v)
case nil:
err = ErrNil
case error:
err = v
default:
err = fmt.Errorf("redis cluster: unexpected type for String, got type %T", v)
}

return value, err
}

func Strings(reply interface{}, err error) ([]string, error) {
if err != nil {
return nil, err
}
switch reply := reply.(type) {
case []interface{}:
result := make([]string, len(reply))
for i := range reply {
if reply[i] == nil {
continue
}
switch subReply := reply[i].(type) {
case string:
result[i] = subReply
case []byte:
result[i] = string(subReply)
default:
return nil, fmt.Errorf("redis cluster: unexpected element type for String, got type %T", reply[i])
}
}
return result, nil
case []string:
return reply, nil
case nil:
return nil, ErrNil
case error:
return nil, reply
}
return nil, fmt.Errorf("redis cluster: unexpected type for Strings, got type %T", reply)
}

func Values(reply interface{}, err error) ([]interface{}, error) {
if err != nil {
return nil, err
}
switch reply := reply.(type) {
case []interface{}:
return reply, nil
case nil:
return nil, ErrNil
case error:
return nil, reply
}
return nil, fmt.Errorf("redis cluster: unexpected type for Values, got type %T", reply)
}

+ 403
- 0
utils/cache/redis.go Visa fil

@@ -0,0 +1,403 @@
package cache

import (
"encoding/json"
"errors"
"log"
"strings"
"time"

redigo "github.com/gomodule/redigo/redis"
)

// configuration
type Config struct {
Server string
Password string
MaxIdle int // Maximum number of idle connections in the pool.

// Maximum number of connections allocated by the pool at a given time.
// When zero, there is no limit on the number of connections in the pool.
MaxActive int

// Close connections after remaining idle for this duration. If the value
// is zero, then idle connections are not closed. Applications should set
// the timeout to a value less than the server's timeout.
IdleTimeout time.Duration

// If Wait is true and the pool is at the MaxActive limit, then Get() waits
// for a connection to be returned to the pool before returning.
Wait bool
KeyPrefix string // prefix to all keys; example is "dev environment name"
KeyDelimiter string // delimiter to be used while appending keys; example is ":"
KeyPlaceholder string // placeholder to be parsed using given arguments to obtain a final key; example is "?"
}

var pool *redigo.Pool
var conf *Config

func NewRedis(addr string) {
if addr == "" {
panic("\nredis connect string cannot be empty\n")
}
pool = &redigo.Pool{
MaxIdle: redisMaxIdleConn,
IdleTimeout: redisIdleTTL,
MaxActive: redisMaxActive,
// MaxConnLifetime: redisDialTTL,
Wait: true,
Dial: func() (redigo.Conn, error) {
c, err := redigo.Dial("tcp", addr,
redigo.DialConnectTimeout(redisDialTTL),
redigo.DialReadTimeout(redisReadTTL),
redigo.DialWriteTimeout(redisWriteTTL),
)
if err != nil {
log.Println("Redis Dial failed: ", err)
return nil, err
}
return c, err
},
TestOnBorrow: func(c redigo.Conn, t time.Time) error {
_, err := c.Do("PING")
if err != nil {
log.Println("Unable to ping to redis server:", err)
}
return err
},
}
conn := pool.Get()
defer conn.Close()
if conn.Err() != nil {
println("\nredis connect " + addr + " error: " + conn.Err().Error())
} else {
println("\nredis connect " + addr + " success!\n")
}
}

func Do(cmd string, args ...interface{}) (reply interface{}, err error) {
conn := pool.Get()
defer conn.Close()
return conn.Do(cmd, args...)
}

func GetPool() *redigo.Pool {
return pool
}

func ParseKey(key string, vars []string) (string, error) {
arr := strings.Split(key, conf.KeyPlaceholder)
actualKey := ""
if len(arr) != len(vars)+1 {
return "", errors.New("redis/connection.go: Insufficient arguments to parse key")
} else {
for index, val := range arr {
if index == 0 {
actualKey = arr[index]
} else {
actualKey += vars[index-1] + val
}
}
}
return getPrefixedKey(actualKey), nil
}

func getPrefixedKey(key string) string {
return conf.KeyPrefix + conf.KeyDelimiter + key
}
func StripEnvKey(key string) string {
return strings.TrimLeft(key, conf.KeyPrefix+conf.KeyDelimiter)
}
func SplitKey(key string) []string {
return strings.Split(key, conf.KeyDelimiter)
}
func Expire(key string, ttl int) (interface{}, error) {
return Do("EXPIRE", key, ttl)
}
func Persist(key string) (interface{}, error) {
return Do("PERSIST", key)
}

func Del(key string) (interface{}, error) {
return Do("DEL", key)
}
func Set(key string, data interface{}) (interface{}, error) {
// set
return Do("SET", key, data)
}
func SetNX(key string, data interface{}) (interface{}, error) {
return Do("SETNX", key, data)
}
func SetEx(key string, data interface{}, ttl int) (interface{}, error) {
return Do("SETEX", key, ttl, data)
}

func SetJson(key string, data interface{}, ttl int) bool {
c, err := json.Marshal(data)
if err != nil {
return false
}
if ttl < 1 {
_, err = Set(key, c)
} else {
_, err = SetEx(key, c, ttl)
}
if err != nil {
return false
}
return true
}

func GetJson(key string, dst interface{}) error {
b, err := GetBytes(key)
if err != nil {
return err
}
if err = json.Unmarshal(b, dst); err != nil {
return err
}
return nil
}

func Get(key string) (interface{}, error) {
// get
return Do("GET", key)
}
func GetTTL(key string) (time.Duration, error) {
ttl, err := redigo.Int64(Do("TTL", key))
return time.Duration(ttl) * time.Second, err
}
func GetBytes(key string) ([]byte, error) {
return redigo.Bytes(Do("GET", key))
}
func GetString(key string) (string, error) {
return redigo.String(Do("GET", key))
}
func GetStringMap(key string) (map[string]string, error) {
return redigo.StringMap(Do("GET", key))
}
func GetInt(key string) (int, error) {
return redigo.Int(Do("GET", key))
}
func GetInt64(key string) (int64, error) {
return redigo.Int64(Do("GET", key))
}
func GetStringLength(key string) (int, error) {
return redigo.Int(Do("STRLEN", key))
}
func ZAdd(key string, score float64, data interface{}) (interface{}, error) {
return Do("ZADD", key, score, data)
}
func ZAddNX(key string, score float64, data interface{}) (interface{}, error) {
return Do("ZADD", key, "NX", score, data)
}
func ZRem(key string, data interface{}) (interface{}, error) {
return Do("ZREM", key, data)
}
func ZRange(key string, start int, end int, withScores bool) ([]interface{}, error) {
if withScores {
return redigo.Values(Do("ZRANGE", key, start, end, "WITHSCORES"))
}
return redigo.Values(Do("ZRANGE", key, start, end))
}
func ZRemRangeByScore(key string, start int64, end int64) ([]interface{}, error) {
return redigo.Values(Do("ZREMRANGEBYSCORE", key, start, end))
}
func ZCard(setName string) (int64, error) {
return redigo.Int64(Do("ZCARD", setName))
}
func ZScan(setName string) (int64, error) {
return redigo.Int64(Do("ZCARD", setName))
}
func SAdd(setName string, data interface{}) (interface{}, error) {
return Do("SADD", setName, data)
}
func SCard(setName string) (int64, error) {
return redigo.Int64(Do("SCARD", setName))
}
func SIsMember(setName string, data interface{}) (bool, error) {
return redigo.Bool(Do("SISMEMBER", setName, data))
}
func SMembers(setName string) ([]string, error) {
return redigo.Strings(Do("SMEMBERS", setName))
}
func SRem(setName string, data interface{}) (interface{}, error) {
return Do("SREM", setName, data)
}
func HSet(key string, HKey string, data interface{}) (interface{}, error) {
return Do("HSET", key, HKey, data)
}

func HGet(key string, HKey string) (interface{}, error) {
return Do("HGET", key, HKey)
}

func HMGet(key string, hashKeys ...string) ([]interface{}, error) {
ret, err := Do("HMGET", key, hashKeys)
if err != nil {
return nil, err
}
reta, ok := ret.([]interface{})
if !ok {
return nil, errors.New("result not an array")
}
return reta, nil
}

func HMSet(key string, hashKeys []string, vals []interface{}) (interface{}, error) {
if len(hashKeys) == 0 || len(hashKeys) != len(vals) {
var ret interface{}
return ret, errors.New("bad length")
}
input := []interface{}{key}
for i, v := range hashKeys {
input = append(input, v, vals[i])
}
return Do("HMSET", input...)
}

func HGetString(key string, HKey string) (string, error) {
return redigo.String(Do("HGET", key, HKey))
}
func HGetFloat(key string, HKey string) (float64, error) {
f, err := redigo.Float64(Do("HGET", key, HKey))
return f, err
}
func HGetInt(key string, HKey string) (int, error) {
return redigo.Int(Do("HGET", key, HKey))
}
func HGetInt64(key string, HKey string) (int64, error) {
return redigo.Int64(Do("HGET", key, HKey))
}
func HGetBool(key string, HKey string) (bool, error) {
return redigo.Bool(Do("HGET", key, HKey))
}
func HDel(key string, HKey string) (interface{}, error) {
return Do("HDEL", key, HKey)
}

func HGetAll(key string) (map[string]interface{}, error) {
vals, err := redigo.Values(Do("HGETALL", key))
if err != nil {
return nil, err
}
num := len(vals) / 2
result := make(map[string]interface{}, num)
for i := 0; i < num; i++ {
key, _ := redigo.String(vals[2*i], nil)
result[key] = vals[2*i+1]
}
return result, nil
}

func FlushAll() bool {
res, _ := redigo.String(Do("FLUSHALL"))
if res == "" {
return false
}
return true
}

// NOTE: Use this in production environment with extreme care.
// Read more here:https://redigo.io/commands/keys
func Keys(pattern string) ([]string, error) {
return redigo.Strings(Do("KEYS", pattern))
}

func HKeys(key string) ([]string, error) {
return redigo.Strings(Do("HKEYS", key))
}

func Exists(key string) bool {
count, err := redigo.Int(Do("EXISTS", key))
if count == 0 || err != nil {
return false
}
return true
}

func Incr(key string) (int64, error) {
return redigo.Int64(Do("INCR", key))
}

func Decr(key string) (int64, error) {
return redigo.Int64(Do("DECR", key))
}

func IncrBy(key string, incBy int64) (int64, error) {
return redigo.Int64(Do("INCRBY", key, incBy))
}

func DecrBy(key string, decrBy int64) (int64, error) {
return redigo.Int64(Do("DECRBY", key))
}

func IncrByFloat(key string, incBy float64) (float64, error) {
return redigo.Float64(Do("INCRBYFLOAT", key, incBy))
}

func DecrByFloat(key string, decrBy float64) (float64, error) {
return redigo.Float64(Do("DECRBYFLOAT", key, decrBy))
}

// use for message queue
func LPush(key string, data interface{}) (interface{}, error) {
// set
return Do("LPUSH", key, data)
}

func LPop(key string) (interface{}, error) {
return Do("LPOP", key)
}

func LPopString(key string) (string, error) {
return redigo.String(Do("LPOP", key))
}
func LPopFloat(key string) (float64, error) {
f, err := redigo.Float64(Do("LPOP", key))
return f, err
}
func LPopInt(key string) (int, error) {
return redigo.Int(Do("LPOP", key))
}
func LPopInt64(key string) (int64, error) {
return redigo.Int64(Do("LPOP", key))
}

func RPush(key string, data interface{}) (interface{}, error) {
// set
return Do("RPUSH", key, data)
}

func RPop(key string) (interface{}, error) {
return Do("RPOP", key)
}

func RPopString(key string) (string, error) {
return redigo.String(Do("RPOP", key))
}
func RPopFloat(key string) (float64, error) {
f, err := redigo.Float64(Do("RPOP", key))
return f, err
}
func RPopInt(key string) (int, error) {
return redigo.Int(Do("RPOP", key))
}
func RPopInt64(key string) (int64, error) {
return redigo.Int64(Do("RPOP", key))
}

func Scan(cursor int64, pattern string, count int64) (int64, []string, error) {
var items []string
var newCursor int64

values, err := redigo.Values(Do("SCAN", cursor, "MATCH", pattern, "COUNT", count))
if err != nil {
return 0, nil, err
}
values, err = redigo.Scan(values, &newCursor, &items)
if err != nil {
return 0, nil, err
}
return newCursor, items, nil
}

+ 622
- 0
utils/cache/redis_cluster.go Visa fil

@@ -0,0 +1,622 @@
package cache

import (
"strconv"
"time"

"github.com/go-redis/redis"
)

var pools *redis.ClusterClient

func NewRedisCluster(addrs []string) error {
opt := &redis.ClusterOptions{
Addrs: addrs,
PoolSize: redisPoolSize,
PoolTimeout: redisPoolTTL,
IdleTimeout: redisIdleTTL,
DialTimeout: redisDialTTL,
ReadTimeout: redisReadTTL,
WriteTimeout: redisWriteTTL,
}
pools = redis.NewClusterClient(opt)
if err := pools.Ping().Err(); err != nil {
return err
}
return nil
}

func RCGet(key string) (interface{}, error) {
res, err := pools.Get(key).Result()
if err != nil {
return nil, convertError(err)
}
return []byte(res), nil
}
func RCSet(key string, value interface{}) error {
err := pools.Set(key, value, 0).Err()
return convertError(err)
}
func RCGetSet(key string, value interface{}) (interface{}, error) {
res, err := pools.GetSet(key, value).Result()
if err != nil {
return nil, convertError(err)
}
return []byte(res), nil
}
func RCSetNx(key string, value interface{}) (int64, error) {
res, err := pools.SetNX(key, value, 0).Result()
if err != nil {
return 0, convertError(err)
}
if res {
return 1, nil
}
return 0, nil
}
func RCSetEx(key string, value interface{}, timeout int64) error {
_, err := pools.Set(key, value, time.Duration(timeout)*time.Second).Result()
if err != nil {
return convertError(err)
}
return nil
}

// nil表示成功,ErrNil表示数据库内已经存在这个key,其他表示数据库发生错误
func RCSetNxEx(key string, value interface{}, timeout int64) error {
res, err := pools.SetNX(key, value, time.Duration(timeout)*time.Second).Result()
if err != nil {
return convertError(err)
}
if res {
return nil
}
return ErrNil
}
func RCMGet(keys ...string) ([]interface{}, error) {
res, err := pools.MGet(keys...).Result()
return res, convertError(err)
}

// 为确保多个key映射到同一个slot,每个key最好加上hash tag,如:{test}
func RCMSet(kvs map[string]interface{}) error {
pairs := make([]string, 0, len(kvs)*2)
for k, v := range kvs {
val, err := String(v, nil)
if err != nil {
return err
}
pairs = append(pairs, k, val)
}
return convertError(pools.MSet(pairs).Err())
}

// 为确保多个key映射到同一个slot,每个key最好加上hash tag,如:{test}
func RCMSetNX(kvs map[string]interface{}) (bool, error) {
pairs := make([]string, 0, len(kvs)*2)
for k, v := range kvs {
val, err := String(v, nil)
if err != nil {
return false, err
}
pairs = append(pairs, k, val)
}
res, err := pools.MSetNX(pairs).Result()
return res, convertError(err)
}
func RCExpireAt(key string, timestamp int64) (int64, error) {
res, err := pools.ExpireAt(key, time.Unix(timestamp, 0)).Result()
if err != nil {
return 0, convertError(err)
}
if res {
return 1, nil
}
return 0, nil
}
func RCDel(keys ...string) (int64, error) {
args := make([]interface{}, 0, len(keys))
for _, key := range keys {
args = append(args, key)
}
res, err := pools.Del(keys...).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func RCIncr(key string) (int64, error) {
res, err := pools.Incr(key).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func RCIncrBy(key string, delta int64) (int64, error) {
res, err := pools.IncrBy(key, delta).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func RCExpire(key string, duration int64) (int64, error) {
res, err := pools.Expire(key, time.Duration(duration)*time.Second).Result()
if err != nil {
return 0, convertError(err)
}
if res {
return 1, nil
}
return 0, nil
}
func RCExists(key string) (bool, error) {
res, err := pools.Exists(key).Result()
if err != nil {
return false, convertError(err)
}
if res > 0 {
return true, nil
}
return false, nil
}
func RCHGet(key string, field string) (interface{}, error) {
res, err := pools.HGet(key, field).Result()
if err != nil {
return nil, convertError(err)
}
return []byte(res), nil
}
func RCHLen(key string) (int64, error) {
res, err := pools.HLen(key).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func RCHSet(key string, field string, val interface{}) error {
value, err := String(val, nil)
if err != nil && err != ErrNil {
return err
}
_, err = pools.HSet(key, field, value).Result()
if err != nil {
return convertError(err)
}
return nil
}
func RCHDel(key string, fields ...string) (int64, error) {
args := make([]interface{}, 0, len(fields)+1)
args = append(args, key)
for _, field := range fields {
args = append(args, field)
}
res, err := pools.HDel(key, fields...).Result()
if err != nil {
return 0, convertError(err)
}
return res, nil
}

func RCHMGet(key string, fields ...string) (interface{}, error) {
args := make([]interface{}, 0, len(fields)+1)
args = append(args, key)
for _, field := range fields {
args = append(args, field)
}
if len(fields) == 0 {
return nil, ErrNil
}
res, err := pools.HMGet(key, fields...).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func RCHMSet(key string, kvs ...interface{}) error {
if len(kvs) == 0 {
return nil
}
if len(kvs)%2 != 0 {
return ErrWrongArgsNum
}
var err error
v := map[string]interface{}{} // todo change
v["field"], err = String(kvs[0], nil)
if err != nil && err != ErrNil {
return err
}
v["value"], err = String(kvs[1], nil)
if err != nil && err != ErrNil {
return err
}
pairs := make([]string, 0, len(kvs)-2)
if len(kvs) > 2 {
for _, kv := range kvs[2:] {
kvString, err := String(kv, nil)
if err != nil && err != ErrNil {
return err
}
pairs = append(pairs, kvString)
}
}
v["paris"] = pairs
_, err = pools.HMSet(key, v).Result()
if err != nil {
return convertError(err)
}
return nil
}

func RCHKeys(key string) ([]string, error) {
res, err := pools.HKeys(key).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func RCHVals(key string) ([]interface{}, error) {
res, err := pools.HVals(key).Result()
if err != nil {
return nil, convertError(err)
}
rs := make([]interface{}, 0, len(res))
for _, res := range res {
rs = append(rs, res)
}
return rs, nil
}
func RCHGetAll(key string) (map[string]string, error) {
vals, err := pools.HGetAll(key).Result()
if err != nil {
return nil, convertError(err)
}
return vals, nil
}
func RCHIncrBy(key, field string, delta int64) (int64, error) {
res, err := pools.HIncrBy(key, field, delta).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func RCZAdd(key string, kvs ...interface{}) (int64, error) {
args := make([]interface{}, 0, len(kvs)+1)
args = append(args, key)
args = append(args, kvs...)
if len(kvs) == 0 {
return 0, nil
}
if len(kvs)%2 != 0 {
return 0, ErrWrongArgsNum
}
zs := make([]redis.Z, len(kvs)/2)
for i := 0; i < len(kvs); i += 2 {
idx := i / 2
score, err := Float64(kvs[i], nil)
if err != nil && err != ErrNil {
return 0, err
}
zs[idx].Score = score
zs[idx].Member = kvs[i+1]
}
res, err := pools.ZAdd(key, zs...).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func RCZRem(key string, members ...string) (int64, error) {
args := make([]interface{}, 0, len(members))
args = append(args, key)
for _, member := range members {
args = append(args, member)
}
res, err := pools.ZRem(key, members).Result()
if err != nil {
return res, convertError(err)
}
return res, err
}

func RCZRange(key string, min, max int64, withScores bool) (interface{}, error) {
res := make([]interface{}, 0)
if withScores {
zs, err := pools.ZRangeWithScores(key, min, max).Result()
if err != nil {
return nil, convertError(err)
}
for _, z := range zs {
res = append(res, z.Member, strconv.FormatFloat(z.Score, 'f', -1, 64))
}
} else {
ms, err := pools.ZRange(key, min, max).Result()
if err != nil {
return nil, convertError(err)
}
for _, m := range ms {
res = append(res, m)
}
}
return res, nil
}
func RCZRangeByScoreWithScore(key string, min, max int64) (map[string]int64, error) {
opt := new(redis.ZRangeBy)
opt.Min = strconv.FormatInt(int64(min), 10)
opt.Max = strconv.FormatInt(int64(max), 10)
opt.Count = -1
opt.Offset = 0
vals, err := pools.ZRangeByScoreWithScores(key, *opt).Result()
if err != nil {
return nil, convertError(err)
}
res := make(map[string]int64, len(vals))
for _, val := range vals {
key, err := String(val.Member, nil)
if err != nil && err != ErrNil {
return nil, err
}
res[key] = int64(val.Score)
}
return res, nil
}
func RCLRange(key string, start, stop int64) (interface{}, error) {
res, err := pools.LRange(key, start, stop).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func RCLSet(key string, index int, value interface{}) error {
err := pools.LSet(key, int64(index), value).Err()
return convertError(err)
}
func RCLLen(key string) (int64, error) {
res, err := pools.LLen(key).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func RCLRem(key string, count int, value interface{}) (int, error) {
val, _ := value.(string)
res, err := pools.LRem(key, int64(count), val).Result()
if err != nil {
return int(res), convertError(err)
}
return int(res), nil
}
func RCTTl(key string) (int64, error) {
duration, err := pools.TTL(key).Result()
if err != nil {
return int64(duration.Seconds()), convertError(err)
}
return int64(duration.Seconds()), nil
}
func RCLPop(key string) (interface{}, error) {
res, err := pools.LPop(key).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func RCRPop(key string) (interface{}, error) {
res, err := pools.RPop(key).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func RCBLPop(key string, timeout int) (interface{}, error) {
res, err := pools.BLPop(time.Duration(timeout)*time.Second, key).Result()
if err != nil {
// 兼容redis 2.x
if err == redis.Nil {
return nil, ErrNil
}
return nil, err
}
return res[1], nil
}
func RCBRPop(key string, timeout int) (interface{}, error) {
res, err := pools.BRPop(time.Duration(timeout)*time.Second, key).Result()
if err != nil {
// 兼容redis 2.x
if err == redis.Nil {
return nil, ErrNil
}
return nil, convertError(err)
}
return res[1], nil
}
func RCLPush(key string, value ...interface{}) error {
args := make([]interface{}, 0, len(value)+1)
args = append(args, key)
args = append(args, value...)
vals := make([]string, 0, len(value))
for _, v := range value {
val, err := String(v, nil)
if err != nil && err != ErrNil {
return err
}
vals = append(vals, val)
}
_, err := pools.LPush(key, vals).Result() // todo ...
if err != nil {
return convertError(err)
}
return nil
}
func RCRPush(key string, value ...interface{}) error {
args := make([]interface{}, 0, len(value)+1)
args = append(args, key)
args = append(args, value...)
vals := make([]string, 0, len(value))
for _, v := range value {
val, err := String(v, nil)
if err != nil && err != ErrNil {
if err == ErrNil {
continue
}
return err
}
if val == "" {
continue
}
vals = append(vals, val)
}
_, err := pools.RPush(key, vals).Result() // todo ...
if err != nil {
return convertError(err)
}
return nil
}

// 为确保srcKey跟destKey映射到同一个slot,srcKey和destKey需要加上hash tag,如:{test}
func RCBRPopLPush(srcKey string, destKey string, timeout int) (interface{}, error) {
res, err := pools.BRPopLPush(srcKey, destKey, time.Duration(timeout)*time.Second).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}

// 为确保srcKey跟destKey映射到同一个slot,srcKey和destKey需要加上hash tag,如:{test}
func RCRPopLPush(srcKey string, destKey string) (interface{}, error) {
res, err := pools.RPopLPush(srcKey, destKey).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func RCSAdd(key string, members ...interface{}) (int64, error) {
args := make([]interface{}, 0, len(members)+1)
args = append(args, key)
args = append(args, members...)
ms := make([]string, 0, len(members))
for _, member := range members {
m, err := String(member, nil)
if err != nil && err != ErrNil {
return 0, err
}
ms = append(ms, m)
}
res, err := pools.SAdd(key, ms).Result() // todo ...
if err != nil {
return res, convertError(err)
}
return res, nil
}
func RCSPop(key string) ([]byte, error) {
res, err := pools.SPop(key).Result()
if err != nil {
return nil, convertError(err)
}
return []byte(res), nil
}
func RCSIsMember(key string, member interface{}) (bool, error) {
m, _ := member.(string)
res, err := pools.SIsMember(key, m).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func RCSRem(key string, members ...interface{}) (int64, error) {
args := make([]interface{}, 0, len(members)+1)
args = append(args, key)
args = append(args, members...)
ms := make([]string, 0, len(members))
for _, member := range members {
m, err := String(member, nil)
if err != nil && err != ErrNil {
return 0, err
}
ms = append(ms, m)
}
res, err := pools.SRem(key, ms).Result() // todo ...
if err != nil {
return res, convertError(err)
}
return res, nil
}
func RCSMembers(key string) ([]string, error) {
res, err := pools.SMembers(key).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func RCScriptLoad(luaScript string) (interface{}, error) {
res, err := pools.ScriptLoad(luaScript).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func RCEvalSha(sha1 string, numberKeys int, keysArgs ...interface{}) (interface{}, error) {
vals := make([]interface{}, 0, len(keysArgs)+2)
vals = append(vals, sha1, numberKeys)
vals = append(vals, keysArgs...)
keys := make([]string, 0, numberKeys)
args := make([]string, 0, len(keysArgs)-numberKeys)
for i, value := range keysArgs {
val, err := String(value, nil)
if err != nil && err != ErrNil {
return nil, err
}
if i < numberKeys {
keys = append(keys, val)
} else {
args = append(args, val)
}
}
res, err := pools.EvalSha(sha1, keys, args).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func RCEval(luaScript string, numberKeys int, keysArgs ...interface{}) (interface{}, error) {
vals := make([]interface{}, 0, len(keysArgs)+2)
vals = append(vals, luaScript, numberKeys)
vals = append(vals, keysArgs...)
keys := make([]string, 0, numberKeys)
args := make([]string, 0, len(keysArgs)-numberKeys)
for i, value := range keysArgs {
val, err := String(value, nil)
if err != nil && err != ErrNil {
return nil, err
}
if i < numberKeys {
keys = append(keys, val)
} else {
args = append(args, val)
}
}
res, err := pools.Eval(luaScript, keys, args).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func RCGetBit(key string, offset int64) (int64, error) {
res, err := pools.GetBit(key, offset).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func RCSetBit(key string, offset uint32, value int) (int, error) {
res, err := pools.SetBit(key, int64(offset), value).Result()
return int(res), convertError(err)
}
func RCGetClient() *redis.ClusterClient {
return pools
}
func convertError(err error) error {
if err == redis.Nil {
// 为了兼容redis 2.x,这里不返回 ErrNil,ErrNil在调用redis_cluster_reply函数时才返回
return nil
}
return err
}

+ 324
- 0
utils/cache/redis_pool.go Visa fil

@@ -0,0 +1,324 @@
package cache

import (
"errors"
"log"
"strings"
"time"

redigo "github.com/gomodule/redigo/redis"
)

type RedisPool struct {
*redigo.Pool
}

func NewRedisPool(cfg *Config) *RedisPool {
return &RedisPool{&redigo.Pool{
MaxIdle: cfg.MaxIdle,
IdleTimeout: cfg.IdleTimeout,
MaxActive: cfg.MaxActive,
Wait: cfg.Wait,
Dial: func() (redigo.Conn, error) {
c, err := redigo.Dial("tcp", cfg.Server)
if err != nil {
log.Println("Redis Dial failed: ", err)
return nil, err
}
if cfg.Password != "" {
if _, err := c.Do("AUTH", cfg.Password); err != nil {
c.Close()
log.Println("Redis AUTH failed: ", err)
return nil, err
}
}
return c, err
},
TestOnBorrow: func(c redigo.Conn, t time.Time) error {
_, err := c.Do("PING")
if err != nil {
log.Println("Unable to ping to redis server:", err)
}
return err
},
}}
}

func (p *RedisPool) Do(cmd string, args ...interface{}) (reply interface{}, err error) {
conn := pool.Get()
defer conn.Close()
return conn.Do(cmd, args...)
}

func (p *RedisPool) GetPool() *redigo.Pool {
return pool
}

func (p *RedisPool) ParseKey(key string, vars []string) (string, error) {
arr := strings.Split(key, conf.KeyPlaceholder)
actualKey := ""
if len(arr) != len(vars)+1 {
return "", errors.New("redis/connection.go: Insufficient arguments to parse key")
} else {
for index, val := range arr {
if index == 0 {
actualKey = arr[index]
} else {
actualKey += vars[index-1] + val
}
}
}
return getPrefixedKey(actualKey), nil
}

func (p *RedisPool) getPrefixedKey(key string) string {
return conf.KeyPrefix + conf.KeyDelimiter + key
}
func (p *RedisPool) StripEnvKey(key string) string {
return strings.TrimLeft(key, conf.KeyPrefix+conf.KeyDelimiter)
}
func (p *RedisPool) SplitKey(key string) []string {
return strings.Split(key, conf.KeyDelimiter)
}
func (p *RedisPool) Expire(key string, ttl int) (interface{}, error) {
return Do("EXPIRE", key, ttl)
}
func (p *RedisPool) Persist(key string) (interface{}, error) {
return Do("PERSIST", key)
}

func (p *RedisPool) Del(key string) (interface{}, error) {
return Do("DEL", key)
}
func (p *RedisPool) Set(key string, data interface{}) (interface{}, error) {
// set
return Do("SET", key, data)
}
func (p *RedisPool) SetNX(key string, data interface{}) (interface{}, error) {
return Do("SETNX", key, data)
}
func (p *RedisPool) SetEx(key string, data interface{}, ttl int) (interface{}, error) {
return Do("SETEX", key, ttl, data)
}
func (p *RedisPool) Get(key string) (interface{}, error) {
// get
return Do("GET", key)
}
func (p *RedisPool) GetStringMap(key string) (map[string]string, error) {
// get
return redigo.StringMap(Do("GET", key))
}

func (p *RedisPool) GetTTL(key string) (time.Duration, error) {
ttl, err := redigo.Int64(Do("TTL", key))
return time.Duration(ttl) * time.Second, err
}
func (p *RedisPool) GetBytes(key string) ([]byte, error) {
return redigo.Bytes(Do("GET", key))
}
func (p *RedisPool) GetString(key string) (string, error) {
return redigo.String(Do("GET", key))
}
func (p *RedisPool) GetInt(key string) (int, error) {
return redigo.Int(Do("GET", key))
}
func (p *RedisPool) GetStringLength(key string) (int, error) {
return redigo.Int(Do("STRLEN", key))
}
func (p *RedisPool) ZAdd(key string, score float64, data interface{}) (interface{}, error) {
return Do("ZADD", key, score, data)
}
func (p *RedisPool) ZRem(key string, data interface{}) (interface{}, error) {
return Do("ZREM", key, data)
}
func (p *RedisPool) ZRange(key string, start int, end int, withScores bool) ([]interface{}, error) {
if withScores {
return redigo.Values(Do("ZRANGE", key, start, end, "WITHSCORES"))
}
return redigo.Values(Do("ZRANGE", key, start, end))
}
func (p *RedisPool) SAdd(setName string, data interface{}) (interface{}, error) {
return Do("SADD", setName, data)
}
func (p *RedisPool) SCard(setName string) (int64, error) {
return redigo.Int64(Do("SCARD", setName))
}
func (p *RedisPool) SIsMember(setName string, data interface{}) (bool, error) {
return redigo.Bool(Do("SISMEMBER", setName, data))
}
func (p *RedisPool) SMembers(setName string) ([]string, error) {
return redigo.Strings(Do("SMEMBERS", setName))
}
func (p *RedisPool) SRem(setName string, data interface{}) (interface{}, error) {
return Do("SREM", setName, data)
}
func (p *RedisPool) HSet(key string, HKey string, data interface{}) (interface{}, error) {
return Do("HSET", key, HKey, data)
}

func (p *RedisPool) HGet(key string, HKey string) (interface{}, error) {
return Do("HGET", key, HKey)
}

func (p *RedisPool) HMGet(key string, hashKeys ...string) ([]interface{}, error) {
ret, err := Do("HMGET", key, hashKeys)
if err != nil {
return nil, err
}
reta, ok := ret.([]interface{})
if !ok {
return nil, errors.New("result not an array")
}
return reta, nil
}

func (p *RedisPool) HMSet(key string, hashKeys []string, vals []interface{}) (interface{}, error) {
if len(hashKeys) == 0 || len(hashKeys) != len(vals) {
var ret interface{}
return ret, errors.New("bad length")
}
input := []interface{}{key}
for i, v := range hashKeys {
input = append(input, v, vals[i])
}
return Do("HMSET", input...)
}

func (p *RedisPool) HGetString(key string, HKey string) (string, error) {
return redigo.String(Do("HGET", key, HKey))
}
func (p *RedisPool) HGetFloat(key string, HKey string) (float64, error) {
f, err := redigo.Float64(Do("HGET", key, HKey))
return float64(f), err
}
func (p *RedisPool) HGetInt(key string, HKey string) (int, error) {
return redigo.Int(Do("HGET", key, HKey))
}
func (p *RedisPool) HGetInt64(key string, HKey string) (int64, error) {
return redigo.Int64(Do("HGET", key, HKey))
}
func (p *RedisPool) HGetBool(key string, HKey string) (bool, error) {
return redigo.Bool(Do("HGET", key, HKey))
}
func (p *RedisPool) HDel(key string, HKey string) (interface{}, error) {
return Do("HDEL", key, HKey)
}
func (p *RedisPool) HGetAll(key string) (map[string]interface{}, error) {
vals, err := redigo.Values(Do("HGETALL", key))
if err != nil {
return nil, err
}
num := len(vals) / 2
result := make(map[string]interface{}, num)
for i := 0; i < num; i++ {
key, _ := redigo.String(vals[2*i], nil)
result[key] = vals[2*i+1]
}
return result, nil
}

// NOTE: Use this in production environment with extreme care.
// Read more here:https://redigo.io/commands/keys
func (p *RedisPool) Keys(pattern string) ([]string, error) {
return redigo.Strings(Do("KEYS", pattern))
}

func (p *RedisPool) HKeys(key string) ([]string, error) {
return redigo.Strings(Do("HKEYS", key))
}

func (p *RedisPool) Exists(key string) (bool, error) {
count, err := redigo.Int(Do("EXISTS", key))
if count == 0 {
return false, err
} else {
return true, err
}
}

func (p *RedisPool) Incr(key string) (int64, error) {
return redigo.Int64(Do("INCR", key))
}

func (p *RedisPool) Decr(key string) (int64, error) {
return redigo.Int64(Do("DECR", key))
}

func (p *RedisPool) IncrBy(key string, incBy int64) (int64, error) {
return redigo.Int64(Do("INCRBY", key, incBy))
}

func (p *RedisPool) DecrBy(key string, decrBy int64) (int64, error) {
return redigo.Int64(Do("DECRBY", key))
}

func (p *RedisPool) IncrByFloat(key string, incBy float64) (float64, error) {
return redigo.Float64(Do("INCRBYFLOAT", key, incBy))
}

func (p *RedisPool) DecrByFloat(key string, decrBy float64) (float64, error) {
return redigo.Float64(Do("DECRBYFLOAT", key, decrBy))
}

// use for message queue
func (p *RedisPool) LPush(key string, data interface{}) (interface{}, error) {
// set
return Do("LPUSH", key, data)
}

func (p *RedisPool) LPop(key string) (interface{}, error) {
return Do("LPOP", key)
}

func (p *RedisPool) LPopString(key string) (string, error) {
return redigo.String(Do("LPOP", key))
}
func (p *RedisPool) LPopFloat(key string) (float64, error) {
f, err := redigo.Float64(Do("LPOP", key))
return float64(f), err
}
func (p *RedisPool) LPopInt(key string) (int, error) {
return redigo.Int(Do("LPOP", key))
}
func (p *RedisPool) LPopInt64(key string) (int64, error) {
return redigo.Int64(Do("LPOP", key))
}

func (p *RedisPool) RPush(key string, data interface{}) (interface{}, error) {
// set
return Do("RPUSH", key, data)
}

func (p *RedisPool) RPop(key string) (interface{}, error) {
return Do("RPOP", key)
}

func (p *RedisPool) RPopString(key string) (string, error) {
return redigo.String(Do("RPOP", key))
}
func (p *RedisPool) RPopFloat(key string) (float64, error) {
f, err := redigo.Float64(Do("RPOP", key))
return float64(f), err
}
func (p *RedisPool) RPopInt(key string) (int, error) {
return redigo.Int(Do("RPOP", key))
}
func (p *RedisPool) RPopInt64(key string) (int64, error) {
return redigo.Int64(Do("RPOP", key))
}

func (p *RedisPool) Scan(cursor int64, pattern string, count int64) (int64, []string, error) {
var items []string
var newCursor int64

values, err := redigo.Values(Do("SCAN", cursor, "MATCH", pattern, "COUNT", count))
if err != nil {
return 0, nil, err
}
values, err = redigo.Scan(values, &newCursor, &items)
if err != nil {
return 0, nil, err
}

return newCursor, items, nil
}

+ 617
- 0
utils/cache/redis_pool_cluster.go Visa fil

@@ -0,0 +1,617 @@
package cache

import (
"strconv"
"time"

"github.com/go-redis/redis"
)

type RedisClusterPool struct {
client *redis.ClusterClient
}

func NewRedisClusterPool(addrs []string) (*RedisClusterPool, error) {
opt := &redis.ClusterOptions{
Addrs: addrs,
PoolSize: 512,
PoolTimeout: 10 * time.Second,
IdleTimeout: 10 * time.Second,
DialTimeout: 10 * time.Second,
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
}
c := redis.NewClusterClient(opt)
if err := c.Ping().Err(); err != nil {
return nil, err
}
return &RedisClusterPool{client: c}, nil
}

func (p *RedisClusterPool) Get(key string) (interface{}, error) {
res, err := p.client.Get(key).Result()
if err != nil {
return nil, convertError(err)
}
return []byte(res), nil
}
func (p *RedisClusterPool) Set(key string, value interface{}) error {
err := p.client.Set(key, value, 0).Err()
return convertError(err)
}
func (p *RedisClusterPool) GetSet(key string, value interface{}) (interface{}, error) {
res, err := p.client.GetSet(key, value).Result()
if err != nil {
return nil, convertError(err)
}
return []byte(res), nil
}
func (p *RedisClusterPool) SetNx(key string, value interface{}) (int64, error) {
res, err := p.client.SetNX(key, value, 0).Result()
if err != nil {
return 0, convertError(err)
}
if res {
return 1, nil
}
return 0, nil
}
func (p *RedisClusterPool) SetEx(key string, value interface{}, timeout int64) error {
_, err := p.client.Set(key, value, time.Duration(timeout)*time.Second).Result()
if err != nil {
return convertError(err)
}
return nil
}

// nil表示成功,ErrNil表示数据库内已经存在这个key,其他表示数据库发生错误
func (p *RedisClusterPool) SetNxEx(key string, value interface{}, timeout int64) error {
res, err := p.client.SetNX(key, value, time.Duration(timeout)*time.Second).Result()
if err != nil {
return convertError(err)
}
if res {
return nil
}
return ErrNil
}
func (p *RedisClusterPool) MGet(keys ...string) ([]interface{}, error) {
res, err := p.client.MGet(keys...).Result()
return res, convertError(err)
}

// 为确保多个key映射到同一个slot,每个key最好加上hash tag,如:{test}
func (p *RedisClusterPool) MSet(kvs map[string]interface{}) error {
pairs := make([]string, 0, len(kvs)*2)
for k, v := range kvs {
val, err := String(v, nil)
if err != nil {
return err
}
pairs = append(pairs, k, val)
}
return convertError(p.client.MSet(pairs).Err())
}

// 为确保多个key映射到同一个slot,每个key最好加上hash tag,如:{test}
func (p *RedisClusterPool) MSetNX(kvs map[string]interface{}) (bool, error) {
pairs := make([]string, 0, len(kvs)*2)
for k, v := range kvs {
val, err := String(v, nil)
if err != nil {
return false, err
}
pairs = append(pairs, k, val)
}
res, err := p.client.MSetNX(pairs).Result()
return res, convertError(err)
}
func (p *RedisClusterPool) ExpireAt(key string, timestamp int64) (int64, error) {
res, err := p.client.ExpireAt(key, time.Unix(timestamp, 0)).Result()
if err != nil {
return 0, convertError(err)
}
if res {
return 1, nil
}
return 0, nil
}
func (p *RedisClusterPool) Del(keys ...string) (int64, error) {
args := make([]interface{}, 0, len(keys))
for _, key := range keys {
args = append(args, key)
}
res, err := p.client.Del(keys...).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) Incr(key string) (int64, error) {
res, err := p.client.Incr(key).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) IncrBy(key string, delta int64) (int64, error) {
res, err := p.client.IncrBy(key, delta).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) Expire(key string, duration int64) (int64, error) {
res, err := p.client.Expire(key, time.Duration(duration)*time.Second).Result()
if err != nil {
return 0, convertError(err)
}
if res {
return 1, nil
}
return 0, nil
}
func (p *RedisClusterPool) Exists(key string) (bool, error) { // todo (bool, error)
res, err := p.client.Exists(key).Result()
if err != nil {
return false, convertError(err)
}
if res > 0 {
return true, nil
}
return false, nil
}
func (p *RedisClusterPool) HGet(key string, field string) (interface{}, error) {
res, err := p.client.HGet(key, field).Result()
if err != nil {
return nil, convertError(err)
}
return []byte(res), nil
}
func (p *RedisClusterPool) HLen(key string) (int64, error) {
res, err := p.client.HLen(key).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) HSet(key string, field string, val interface{}) error {
value, err := String(val, nil)
if err != nil && err != ErrNil {
return err
}
_, err = p.client.HSet(key, field, value).Result()
if err != nil {
return convertError(err)
}
return nil
}
func (p *RedisClusterPool) HDel(key string, fields ...string) (int64, error) {
args := make([]interface{}, 0, len(fields)+1)
args = append(args, key)
for _, field := range fields {
args = append(args, field)
}
res, err := p.client.HDel(key, fields...).Result()
if err != nil {
return 0, convertError(err)
}
return res, nil
}

func (p *RedisClusterPool) HMGet(key string, fields ...string) (interface{}, error) {
args := make([]interface{}, 0, len(fields)+1)
args = append(args, key)
for _, field := range fields {
args = append(args, field)
}
if len(fields) == 0 {
return nil, ErrNil
}
res, err := p.client.HMGet(key, fields...).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) HMSet(key string, kvs ...interface{}) error {
if len(kvs) == 0 {
return nil
}
if len(kvs)%2 != 0 {
return ErrWrongArgsNum
}
var err error
v := map[string]interface{}{} // todo change
v["field"], err = String(kvs[0], nil)
if err != nil && err != ErrNil {
return err
}
v["value"], err = String(kvs[1], nil)
if err != nil && err != ErrNil {
return err
}
pairs := make([]string, 0, len(kvs)-2)
if len(kvs) > 2 {
for _, kv := range kvs[2:] {
kvString, err := String(kv, nil)
if err != nil && err != ErrNil {
return err
}
pairs = append(pairs, kvString)
}
}
v["paris"] = pairs
_, err = p.client.HMSet(key, v).Result()
if err != nil {
return convertError(err)
}
return nil
}

func (p *RedisClusterPool) HKeys(key string) ([]string, error) {
res, err := p.client.HKeys(key).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) HVals(key string) ([]interface{}, error) {
res, err := p.client.HVals(key).Result()
if err != nil {
return nil, convertError(err)
}
rs := make([]interface{}, 0, len(res))
for _, res := range res {
rs = append(rs, res)
}
return rs, nil
}
func (p *RedisClusterPool) HGetAll(key string) (map[string]string, error) {
vals, err := p.client.HGetAll(key).Result()
if err != nil {
return nil, convertError(err)
}
return vals, nil
}
func (p *RedisClusterPool) HIncrBy(key, field string, delta int64) (int64, error) {
res, err := p.client.HIncrBy(key, field, delta).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) ZAdd(key string, kvs ...interface{}) (int64, error) {
args := make([]interface{}, 0, len(kvs)+1)
args = append(args, key)
args = append(args, kvs...)
if len(kvs) == 0 {
return 0, nil
}
if len(kvs)%2 != 0 {
return 0, ErrWrongArgsNum
}
zs := make([]redis.Z, len(kvs)/2)
for i := 0; i < len(kvs); i += 2 {
idx := i / 2
score, err := Float64(kvs[i], nil)
if err != nil && err != ErrNil {
return 0, err
}
zs[idx].Score = score
zs[idx].Member = kvs[i+1]
}
res, err := p.client.ZAdd(key, zs...).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) ZRem(key string, members ...string) (int64, error) {
args := make([]interface{}, 0, len(members))
args = append(args, key)
for _, member := range members {
args = append(args, member)
}
res, err := p.client.ZRem(key, members).Result()
if err != nil {
return res, convertError(err)
}
return res, err
}

func (p *RedisClusterPool) ZRange(key string, min, max int64, withScores bool) (interface{}, error) {
res := make([]interface{}, 0)
if withScores {
zs, err := p.client.ZRangeWithScores(key, min, max).Result()
if err != nil {
return nil, convertError(err)
}
for _, z := range zs {
res = append(res, z.Member, strconv.FormatFloat(z.Score, 'f', -1, 64))
}
} else {
ms, err := p.client.ZRange(key, min, max).Result()
if err != nil {
return nil, convertError(err)
}
for _, m := range ms {
res = append(res, m)
}
}
return res, nil
}
func (p *RedisClusterPool) ZRangeByScoreWithScore(key string, min, max int64) (map[string]int64, error) {
opt := new(redis.ZRangeBy)
opt.Min = strconv.FormatInt(int64(min), 10)
opt.Max = strconv.FormatInt(int64(max), 10)
opt.Count = -1
opt.Offset = 0
vals, err := p.client.ZRangeByScoreWithScores(key, *opt).Result()
if err != nil {
return nil, convertError(err)
}
res := make(map[string]int64, len(vals))
for _, val := range vals {
key, err := String(val.Member, nil)
if err != nil && err != ErrNil {
return nil, err
}
res[key] = int64(val.Score)
}
return res, nil
}
func (p *RedisClusterPool) LRange(key string, start, stop int64) (interface{}, error) {
res, err := p.client.LRange(key, start, stop).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) LSet(key string, index int, value interface{}) error {
err := p.client.LSet(key, int64(index), value).Err()
return convertError(err)
}
func (p *RedisClusterPool) LLen(key string) (int64, error) {
res, err := p.client.LLen(key).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) LRem(key string, count int, value interface{}) (int, error) {
val, _ := value.(string)
res, err := p.client.LRem(key, int64(count), val).Result()
if err != nil {
return int(res), convertError(err)
}
return int(res), nil
}
func (p *RedisClusterPool) TTl(key string) (int64, error) {
duration, err := p.client.TTL(key).Result()
if err != nil {
return int64(duration.Seconds()), convertError(err)
}
return int64(duration.Seconds()), nil
}
func (p *RedisClusterPool) LPop(key string) (interface{}, error) {
res, err := p.client.LPop(key).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) RPop(key string) (interface{}, error) {
res, err := p.client.RPop(key).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) BLPop(key string, timeout int) (interface{}, error) {
res, err := p.client.BLPop(time.Duration(timeout)*time.Second, key).Result()
if err != nil {
// 兼容redis 2.x
if err == redis.Nil {
return nil, ErrNil
}
return nil, err
}
return res[1], nil
}
func (p *RedisClusterPool) BRPop(key string, timeout int) (interface{}, error) {
res, err := p.client.BRPop(time.Duration(timeout)*time.Second, key).Result()
if err != nil {
// 兼容redis 2.x
if err == redis.Nil {
return nil, ErrNil
}
return nil, convertError(err)
}
return res[1], nil
}
func (p *RedisClusterPool) LPush(key string, value ...interface{}) error {
args := make([]interface{}, 0, len(value)+1)
args = append(args, key)
args = append(args, value...)
vals := make([]string, 0, len(value))
for _, v := range value {
val, err := String(v, nil)
if err != nil && err != ErrNil {
return err
}
vals = append(vals, val)
}
_, err := p.client.LPush(key, vals).Result() // todo ...
if err != nil {
return convertError(err)
}
return nil
}
func (p *RedisClusterPool) RPush(key string, value ...interface{}) error {
args := make([]interface{}, 0, len(value)+1)
args = append(args, key)
args = append(args, value...)
vals := make([]string, 0, len(value))
for _, v := range value {
val, err := String(v, nil)
if err != nil && err != ErrNil {
if err == ErrNil {
continue
}
return err
}
if val == "" {
continue
}
vals = append(vals, val)
}
_, err := p.client.RPush(key, vals).Result() // todo ...
if err != nil {
return convertError(err)
}
return nil
}

// 为确保srcKey跟destKey映射到同一个slot,srcKey和destKey需要加上hash tag,如:{test}
func (p *RedisClusterPool) BRPopLPush(srcKey string, destKey string, timeout int) (interface{}, error) {
res, err := p.client.BRPopLPush(srcKey, destKey, time.Duration(timeout)*time.Second).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}

// 为确保srcKey跟destKey映射到同一个slot,srcKey和destKey需要加上hash tag,如:{test}
func (p *RedisClusterPool) RPopLPush(srcKey string, destKey string) (interface{}, error) {
res, err := p.client.RPopLPush(srcKey, destKey).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) SAdd(key string, members ...interface{}) (int64, error) {
args := make([]interface{}, 0, len(members)+1)
args = append(args, key)
args = append(args, members...)
ms := make([]string, 0, len(members))
for _, member := range members {
m, err := String(member, nil)
if err != nil && err != ErrNil {
return 0, err
}
ms = append(ms, m)
}
res, err := p.client.SAdd(key, ms).Result() // todo ...
if err != nil {
return res, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) SPop(key string) ([]byte, error) {
res, err := p.client.SPop(key).Result()
if err != nil {
return nil, convertError(err)
}
return []byte(res), nil
}
func (p *RedisClusterPool) SIsMember(key string, member interface{}) (bool, error) {
m, _ := member.(string)
res, err := p.client.SIsMember(key, m).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) SRem(key string, members ...interface{}) (int64, error) {
args := make([]interface{}, 0, len(members)+1)
args = append(args, key)
args = append(args, members...)
ms := make([]string, 0, len(members))
for _, member := range members {
m, err := String(member, nil)
if err != nil && err != ErrNil {
return 0, err
}
ms = append(ms, m)
}
res, err := p.client.SRem(key, ms).Result() // todo ...
if err != nil {
return res, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) SMembers(key string) ([]string, error) {
res, err := p.client.SMembers(key).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) ScriptLoad(luaScript string) (interface{}, error) {
res, err := p.client.ScriptLoad(luaScript).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) EvalSha(sha1 string, numberKeys int, keysArgs ...interface{}) (interface{}, error) {
vals := make([]interface{}, 0, len(keysArgs)+2)
vals = append(vals, sha1, numberKeys)
vals = append(vals, keysArgs...)
keys := make([]string, 0, numberKeys)
args := make([]string, 0, len(keysArgs)-numberKeys)
for i, value := range keysArgs {
val, err := String(value, nil)
if err != nil && err != ErrNil {
return nil, err
}
if i < numberKeys {
keys = append(keys, val)
} else {
args = append(args, val)
}
}
res, err := p.client.EvalSha(sha1, keys, args).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) Eval(luaScript string, numberKeys int, keysArgs ...interface{}) (interface{}, error) {
vals := make([]interface{}, 0, len(keysArgs)+2)
vals = append(vals, luaScript, numberKeys)
vals = append(vals, keysArgs...)
keys := make([]string, 0, numberKeys)
args := make([]string, 0, len(keysArgs)-numberKeys)
for i, value := range keysArgs {
val, err := String(value, nil)
if err != nil && err != ErrNil {
return nil, err
}
if i < numberKeys {
keys = append(keys, val)
} else {
args = append(args, val)
}
}
res, err := p.client.Eval(luaScript, keys, args).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) GetBit(key string, offset int64) (int64, error) {
res, err := p.client.GetBit(key, offset).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) SetBit(key string, offset uint32, value int) (int, error) {
res, err := p.client.SetBit(key, int64(offset), value).Result()
return int(res), convertError(err)
}
func (p *RedisClusterPool) GetClient() *redis.ClusterClient {
return pools
}

Laddar…
Avbryt
Spara