Parcourir la source

add Reverse:for v1.1.3

tags/v1.1.3
huangjuajun il y a 2 ans
Parent
révision
94020bb218
9 fichiers modifiés avec 3058 ajouts et 0 suppressions
  1. +421
    -0
      utils/cache/base.go
  2. +107
    -0
      utils/cache/cache/cache.go
  3. +86
    -0
      utils/cache/cache/conv.go
  4. +241
    -0
      utils/cache/cache/file.go
  5. +239
    -0
      utils/cache/cache/memory.go
  6. +402
    -0
      utils/cache/redis.go
  7. +622
    -0
      utils/cache/redis_cluster.go
  8. +323
    -0
      utils/cache/redis_pool.go
  9. +617
    -0
      utils/cache/redis_pool_cluster.go

+ 421
- 0
utils/cache/base.go Voir le fichier

@@ -0,0 +1,421 @@
package zhios_tool_cache

import (
"errors"
"fmt"
"strconv"
"time"
)

const (
redisDialTTL = 10 * time.Second
redisReadTTL = 3 * time.Second
redisWriteTTL = 3 * time.Second
redisIdleTTL = 10 * time.Second
redisPoolTTL = 10 * time.Second
redisPoolSize int = 512
redisMaxIdleConn int = 64
redisMaxActive int = 512
)

var (
ErrNil = errors.New("nil return")
ErrWrongArgsNum = errors.New("args num error")
ErrNegativeInt = errors.New("redis cluster: unexpected value for Uint64")
)

// 以下为提供类型转换

func Int(reply interface{}, err error) (int, error) {
if err != nil {
return 0, err
}
switch reply := reply.(type) {
case int:
return reply, nil
case int8:
return int(reply), nil
case int16:
return int(reply), nil
case int32:
return int(reply), nil
case int64:
x := int(reply)
if int64(x) != reply {
return 0, strconv.ErrRange
}
return x, nil
case uint:
n := int(reply)
if n < 0 {
return 0, strconv.ErrRange
}
return n, nil
case uint8:
return int(reply), nil
case uint16:
return int(reply), nil
case uint32:
n := int(reply)
if n < 0 {
return 0, strconv.ErrRange
}
return n, nil
case uint64:
n := int(reply)
if n < 0 {
return 0, strconv.ErrRange
}
return n, nil
case []byte:
data := string(reply)
if len(data) == 0 {
return 0, ErrNil
}

n, err := strconv.ParseInt(data, 10, 0)
return int(n), err
case string:
if len(reply) == 0 {
return 0, ErrNil
}

n, err := strconv.ParseInt(reply, 10, 0)
return int(n), err
case nil:
return 0, ErrNil
case error:
return 0, reply
}
return 0, fmt.Errorf("redis cluster: unexpected type for Int, got type %T", reply)
}

func Int64(reply interface{}, err error) (int64, error) {
if err != nil {
return 0, err
}
switch reply := reply.(type) {
case int:
return int64(reply), nil
case int8:
return int64(reply), nil
case int16:
return int64(reply), nil
case int32:
return int64(reply), nil
case int64:
return reply, nil
case uint:
n := int64(reply)
if n < 0 {
return 0, strconv.ErrRange
}
return n, nil
case uint8:
return int64(reply), nil
case uint16:
return int64(reply), nil
case uint32:
return int64(reply), nil
case uint64:
n := int64(reply)
if n < 0 {
return 0, strconv.ErrRange
}
return n, nil
case []byte:
data := string(reply)
if len(data) == 0 {
return 0, ErrNil
}

n, err := strconv.ParseInt(data, 10, 64)
return n, err
case string:
if len(reply) == 0 {
return 0, ErrNil
}

n, err := strconv.ParseInt(reply, 10, 64)
return n, err
case nil:
return 0, ErrNil
case error:
return 0, reply
}
return 0, fmt.Errorf("redis cluster: unexpected type for Int64, got type %T", reply)
}

func Uint64(reply interface{}, err error) (uint64, error) {
if err != nil {
return 0, err
}
switch reply := reply.(type) {
case uint:
return uint64(reply), nil
case uint8:
return uint64(reply), nil
case uint16:
return uint64(reply), nil
case uint32:
return uint64(reply), nil
case uint64:
return reply, nil
case int:
if reply < 0 {
return 0, ErrNegativeInt
}
return uint64(reply), nil
case int8:
if reply < 0 {
return 0, ErrNegativeInt
}
return uint64(reply), nil
case int16:
if reply < 0 {
return 0, ErrNegativeInt
}
return uint64(reply), nil
case int32:
if reply < 0 {
return 0, ErrNegativeInt
}
return uint64(reply), nil
case int64:
if reply < 0 {
return 0, ErrNegativeInt
}
return uint64(reply), nil
case []byte:
data := string(reply)
if len(data) == 0 {
return 0, ErrNil
}

n, err := strconv.ParseUint(data, 10, 64)
return n, err
case string:
if len(reply) == 0 {
return 0, ErrNil
}

n, err := strconv.ParseUint(reply, 10, 64)
return n, err
case nil:
return 0, ErrNil
case error:
return 0, reply
}
return 0, fmt.Errorf("redis cluster: unexpected type for Uint64, got type %T", reply)
}

func Float64(reply interface{}, err error) (float64, error) {
if err != nil {
return 0, err
}

var value float64
err = nil
switch v := reply.(type) {
case float32:
value = float64(v)
case float64:
value = v
case int:
value = float64(v)
case int8:
value = float64(v)
case int16:
value = float64(v)
case int32:
value = float64(v)
case int64:
value = float64(v)
case uint:
value = float64(v)
case uint8:
value = float64(v)
case uint16:
value = float64(v)
case uint32:
value = float64(v)
case uint64:
value = float64(v)
case []byte:
data := string(v)
if len(data) == 0 {
return 0, ErrNil
}
value, err = strconv.ParseFloat(string(v), 64)
case string:
if len(v) == 0 {
return 0, ErrNil
}
value, err = strconv.ParseFloat(v, 64)
case nil:
err = ErrNil
case error:
err = v
default:
err = fmt.Errorf("redis cluster: unexpected type for Float64, got type %T", v)
}

return value, err
}

func Bool(reply interface{}, err error) (bool, error) {
if err != nil {
return false, err
}
switch reply := reply.(type) {
case bool:
return reply, nil
case int64:
return reply != 0, nil
case []byte:
data := string(reply)
if len(data) == 0 {
return false, ErrNil
}

return strconv.ParseBool(data)
case string:
if len(reply) == 0 {
return false, ErrNil
}

return strconv.ParseBool(reply)
case nil:
return false, ErrNil
case error:
return false, reply
}
return false, fmt.Errorf("redis cluster: unexpected type for Bool, got type %T", reply)
}

func Bytes(reply interface{}, err error) ([]byte, error) {
if err != nil {
return nil, err
}
switch reply := reply.(type) {
case []byte:
if len(reply) == 0 {
return nil, ErrNil
}
return reply, nil
case string:
data := []byte(reply)
if len(data) == 0 {
return nil, ErrNil
}
return data, nil
case nil:
return nil, ErrNil
case error:
return nil, reply
}
return nil, fmt.Errorf("redis cluster: unexpected type for Bytes, got type %T", reply)
}

func String(reply interface{}, err error) (string, error) {
if err != nil {
return "", err
}

value := ""
err = nil
switch v := reply.(type) {
case string:
if len(v) == 0 {
return "", ErrNil
}

value = v
case []byte:
if len(v) == 0 {
return "", ErrNil
}

value = string(v)
case int:
value = strconv.FormatInt(int64(v), 10)
case int8:
value = strconv.FormatInt(int64(v), 10)
case int16:
value = strconv.FormatInt(int64(v), 10)
case int32:
value = strconv.FormatInt(int64(v), 10)
case int64:
value = strconv.FormatInt(v, 10)
case uint:
value = strconv.FormatUint(uint64(v), 10)
case uint8:
value = strconv.FormatUint(uint64(v), 10)
case uint16:
value = strconv.FormatUint(uint64(v), 10)
case uint32:
value = strconv.FormatUint(uint64(v), 10)
case uint64:
value = strconv.FormatUint(v, 10)
case float32:
value = strconv.FormatFloat(float64(v), 'f', -1, 32)
case float64:
value = strconv.FormatFloat(v, 'f', -1, 64)
case bool:
value = strconv.FormatBool(v)
case nil:
err = ErrNil
case error:
err = v
default:
err = fmt.Errorf("redis cluster: unexpected type for String, got type %T", v)
}

return value, err
}

func Strings(reply interface{}, err error) ([]string, error) {
if err != nil {
return nil, err
}
switch reply := reply.(type) {
case []interface{}:
result := make([]string, len(reply))
for i := range reply {
if reply[i] == nil {
continue
}
switch subReply := reply[i].(type) {
case string:
result[i] = subReply
case []byte:
result[i] = string(subReply)
default:
return nil, fmt.Errorf("redis cluster: unexpected element type for String, got type %T", reply[i])
}
}
return result, nil
case []string:
return reply, nil
case nil:
return nil, ErrNil
case error:
return nil, reply
}
return nil, fmt.Errorf("redis cluster: unexpected type for Strings, got type %T", reply)
}

func Values(reply interface{}, err error) ([]interface{}, error) {
if err != nil {
return nil, err
}
switch reply := reply.(type) {
case []interface{}:
return reply, nil
case nil:
return nil, ErrNil
case error:
return nil, reply
}
return nil, fmt.Errorf("redis cluster: unexpected type for Values, got type %T", reply)
}

+ 107
- 0
utils/cache/cache/cache.go Voir le fichier

@@ -0,0 +1,107 @@
package zhios_tool_cache

import (
"fmt"
"time"
)

var c Cache

type Cache interface {
// get cached value by key.
Get(key string) interface{}
// GetMulti is a batch version of Get.
GetMulti(keys []string) []interface{}
// set cached value with key and expire time.
Put(key string, val interface{}, timeout time.Duration) error
// delete cached value by key.
Delete(key string) error
// increase cached int value by key, as a counter.
Incr(key string) error
// decrease cached int value by key, as a counter.
Decr(key string) error
// check if cached value exists or not.
IsExist(key string) bool
// clear all cache.
ClearAll() error
// start gc routine based on config string settings.
StartAndGC(config string) error
}

// Instance is a function create a new Cache Instance
type Instance func() Cache

var adapters = make(map[string]Instance)

// Register makes a cache adapter available by the adapter name.
// If Register is called twice with the same name or if driver is nil,
// it panics.
func Register(name string, adapter Instance) {
if adapter == nil {
panic("cache: Register adapter is nil")
}
if _, ok := adapters[name]; ok {
panic("cache: Register called twice for adapter " + name)
}
adapters[name] = adapter
}

// NewCache Create a new cache driver by adapter name and config string.
// config need to be correct JSON as string: {"interval":360}.
// it will start gc automatically.
func NewCache(adapterName, config string) (adapter Cache, err error) {
instanceFunc, ok := adapters[adapterName]
if !ok {
err = fmt.Errorf("cache: unknown adapter name %q (forgot to import?)", adapterName)
return
}
adapter = instanceFunc()
err = adapter.StartAndGC(config)
if err != nil {
adapter = nil
}
return
}

func InitCache(adapterName, config string) (err error) {
instanceFunc, ok := adapters[adapterName]
if !ok {
err = fmt.Errorf("cache: unknown adapter name %q (forgot to import?)", adapterName)
return
}
c = instanceFunc()
err = c.StartAndGC(config)
if err != nil {
c = nil
}
return
}

func Get(key string) interface{} {
return c.Get(key)
}

func GetMulti(keys []string) []interface{} {
return c.GetMulti(keys)
}
func Put(key string, val interface{}, ttl time.Duration) error {
return c.Put(key, val, ttl)
}
func Delete(key string) error {
return c.Delete(key)
}
func Incr(key string) error {
return c.Incr(key)
}
func Decr(key string) error {
return c.Decr(key)
}
func IsExist(key string) bool {
return c.IsExist(key)
}
func ClearAll() error {
return c.ClearAll()
}
func StartAndGC(cfg string) error {
return c.StartAndGC(cfg)
}

+ 86
- 0
utils/cache/cache/conv.go Voir le fichier

@@ -0,0 +1,86 @@
package zhios_tool_cache

import (
"fmt"
"strconv"
)

// GetString convert interface to string.
func GetString(v interface{}) string {
switch result := v.(type) {
case string:
return result
case []byte:
return string(result)
default:
if v != nil {
return fmt.Sprint(result)
}
}
return ""
}

// GetInt convert interface to int.
func GetInt(v interface{}) int {
switch result := v.(type) {
case int:
return result
case int32:
return int(result)
case int64:
return int(result)
default:
if d := GetString(v); d != "" {
value, _ := strconv.Atoi(d)
return value
}
}
return 0
}

// GetInt64 convert interface to int64.
func GetInt64(v interface{}) int64 {
switch result := v.(type) {
case int:
return int64(result)
case int32:
return int64(result)
case int64:
return result
default:

if d := GetString(v); d != "" {
value, _ := strconv.ParseInt(d, 10, 64)
return value
}
}
return 0
}

// GetFloat64 convert interface to float64.
func GetFloat64(v interface{}) float64 {
switch result := v.(type) {
case float64:
return result
default:
if d := GetString(v); d != "" {
value, _ := strconv.ParseFloat(d, 64)
return value
}
}
return 0
}

// GetBool convert interface to bool.
func GetBool(v interface{}) bool {
switch result := v.(type) {
case bool:
return result
default:
if d := GetString(v); d != "" {
value, _ := strconv.ParseBool(d)
return value
}
}
return false
}

+ 241
- 0
utils/cache/cache/file.go Voir le fichier

@@ -0,0 +1,241 @@
package zhios_tool_cache

import (
"bytes"
"crypto/md5"
"encoding/gob"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"reflect"
"strconv"
"time"
)

// FileCacheItem is basic unit of file cache adapter.
// it contains data and expire time.
type FileCacheItem struct {
Data interface{}
LastAccess time.Time
Expired time.Time
}

// FileCache Config
var (
FileCachePath = "cache" // cache directory
FileCacheFileSuffix = ".bin" // cache file suffix
FileCacheDirectoryLevel = 2 // cache file deep level if auto generated cache files.
FileCacheEmbedExpiry time.Duration // cache expire time, default is no expire forever.
)

// FileCache is cache adapter for file storage.
type FileCache struct {
CachePath string
FileSuffix string
DirectoryLevel int
EmbedExpiry int
}

// NewFileCache Create new file cache with no config.
// the level and expiry need set in method StartAndGC as config string.
func NewFileCache() Cache {
// return &FileCache{CachePath:FileCachePath, FileSuffix:FileCacheFileSuffix}
return &FileCache{}
}

// StartAndGC will start and begin gc for file cache.
// the config need to be like {CachePath:"/cache","FileSuffix":".bin","DirectoryLevel":2,"EmbedExpiry":0}
func (fc *FileCache) StartAndGC(config string) error {

var cfg map[string]string
json.Unmarshal([]byte(config), &cfg)
if _, ok := cfg["CachePath"]; !ok {
cfg["CachePath"] = FileCachePath
}
if _, ok := cfg["FileSuffix"]; !ok {
cfg["FileSuffix"] = FileCacheFileSuffix
}
if _, ok := cfg["DirectoryLevel"]; !ok {
cfg["DirectoryLevel"] = strconv.Itoa(FileCacheDirectoryLevel)
}
if _, ok := cfg["EmbedExpiry"]; !ok {
cfg["EmbedExpiry"] = strconv.FormatInt(int64(FileCacheEmbedExpiry.Seconds()), 10)
}
fc.CachePath = cfg["CachePath"]
fc.FileSuffix = cfg["FileSuffix"]
fc.DirectoryLevel, _ = strconv.Atoi(cfg["DirectoryLevel"])
fc.EmbedExpiry, _ = strconv.Atoi(cfg["EmbedExpiry"])

fc.Init()
return nil
}

// Init will make new dir for file cache if not exist.
func (fc *FileCache) Init() {
if ok, _ := exists(fc.CachePath); !ok { // todo : error handle
_ = os.MkdirAll(fc.CachePath, os.ModePerm) // todo : error handle
}
}

// get cached file name. it's md5 encoded.
func (fc *FileCache) getCacheFileName(key string) string {
m := md5.New()
io.WriteString(m, key)
keyMd5 := hex.EncodeToString(m.Sum(nil))
cachePath := fc.CachePath
switch fc.DirectoryLevel {
case 2:
cachePath = filepath.Join(cachePath, keyMd5[0:2], keyMd5[2:4])
case 1:
cachePath = filepath.Join(cachePath, keyMd5[0:2])
}

if ok, _ := exists(cachePath); !ok { // todo : error handle
_ = os.MkdirAll(cachePath, os.ModePerm) // todo : error handle
}

return filepath.Join(cachePath, fmt.Sprintf("%s%s", keyMd5, fc.FileSuffix))
}

// Get value from file cache.
// if non-exist or expired, return empty string.
func (fc *FileCache) Get(key string) interface{} {
fileData, err := FileGetContents(fc.getCacheFileName(key))
if err != nil {
return ""
}
var to FileCacheItem
GobDecode(fileData, &to)
if to.Expired.Before(time.Now()) {
return ""
}
return to.Data
}

// GetMulti gets values from file cache.
// if non-exist or expired, return empty string.
func (fc *FileCache) GetMulti(keys []string) []interface{} {
var rc []interface{}
for _, key := range keys {
rc = append(rc, fc.Get(key))
}
return rc
}

// Put value into file cache.
// timeout means how long to keep this file, unit of ms.
// if timeout equals FileCacheEmbedExpiry(default is 0), cache this item forever.
func (fc *FileCache) Put(key string, val interface{}, timeout time.Duration) error {
gob.Register(val)

item := FileCacheItem{Data: val}
if timeout == FileCacheEmbedExpiry {
item.Expired = time.Now().Add((86400 * 365 * 10) * time.Second) // ten years
} else {
item.Expired = time.Now().Add(timeout)
}
item.LastAccess = time.Now()
data, err := GobEncode(item)
if err != nil {
return err
}
return FilePutContents(fc.getCacheFileName(key), data)
}

// Delete file cache value.
func (fc *FileCache) Delete(key string) error {
filename := fc.getCacheFileName(key)
if ok, _ := exists(filename); ok {
return os.Remove(filename)
}
return nil
}

// Incr will increase cached int value.
// fc value is saving forever unless Delete.
func (fc *FileCache) Incr(key string) error {
data := fc.Get(key)
var incr int
if reflect.TypeOf(data).Name() != "int" {
incr = 0
} else {
incr = data.(int) + 1
}
fc.Put(key, incr, FileCacheEmbedExpiry)
return nil
}

// Decr will decrease cached int value.
func (fc *FileCache) Decr(key string) error {
data := fc.Get(key)
var decr int
if reflect.TypeOf(data).Name() != "int" || data.(int)-1 <= 0 {
decr = 0
} else {
decr = data.(int) - 1
}
fc.Put(key, decr, FileCacheEmbedExpiry)
return nil
}

// IsExist check value is exist.
func (fc *FileCache) IsExist(key string) bool {
ret, _ := exists(fc.getCacheFileName(key))
return ret
}

// ClearAll will clean cached files.
// not implemented.
func (fc *FileCache) ClearAll() error {
return nil
}

// check file exist.
func exists(path string) (bool, error) {
_, err := os.Stat(path)
if err == nil {
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return false, err
}

// FileGetContents Get bytes to file.
// if non-exist, create this file.
func FileGetContents(filename string) (data []byte, e error) {
return ioutil.ReadFile(filename)
}

// FilePutContents Put bytes to file.
// if non-exist, create this file.
func FilePutContents(filename string, content []byte) error {
return ioutil.WriteFile(filename, content, os.ModePerm)
}

// GobEncode Gob encodes file cache item.
func GobEncode(data interface{}) ([]byte, error) {
buf := bytes.NewBuffer(nil)
enc := gob.NewEncoder(buf)
err := enc.Encode(data)
if err != nil {
return nil, err
}
return buf.Bytes(), err
}

// GobDecode Gob decodes file cache item.
func GobDecode(data []byte, to *FileCacheItem) error {
buf := bytes.NewBuffer(data)
dec := gob.NewDecoder(buf)
return dec.Decode(&to)
}

func init() {
Register("file", NewFileCache)
}

+ 239
- 0
utils/cache/cache/memory.go Voir le fichier

@@ -0,0 +1,239 @@
package zhios_tool_cache

import (
"encoding/json"
"errors"
"sync"
"time"
)

var (
// DefaultEvery means the clock time of recycling the expired cache items in memory.
DefaultEvery = 60 // 1 minute
)

// MemoryItem store memory cache item.
type MemoryItem struct {
val interface{}
createdTime time.Time
lifespan time.Duration
}

func (mi *MemoryItem) isExpire() bool {
// 0 means forever
if mi.lifespan == 0 {
return false
}
return time.Now().Sub(mi.createdTime) > mi.lifespan
}

// MemoryCache is Memory cache adapter.
// it contains a RW locker for safe map storage.
type MemoryCache struct {
sync.RWMutex
dur time.Duration
items map[string]*MemoryItem
Every int // run an expiration check Every clock time
}

// NewMemoryCache returns a new MemoryCache.
func NewMemoryCache() Cache {
cache := MemoryCache{items: make(map[string]*MemoryItem)}
return &cache
}

// Get cache from memory.
// if non-existed or expired, return nil.
func (bc *MemoryCache) Get(name string) interface{} {
bc.RLock()
defer bc.RUnlock()
if itm, ok := bc.items[name]; ok {
if itm.isExpire() {
return nil
}
return itm.val
}
return nil
}

// GetMulti gets caches from memory.
// if non-existed or expired, return nil.
func (bc *MemoryCache) GetMulti(names []string) []interface{} {
var rc []interface{}
for _, name := range names {
rc = append(rc, bc.Get(name))
}
return rc
}

// Put cache to memory.
// if lifespan is 0, it will be forever till restart.
func (bc *MemoryCache) Put(name string, value interface{}, lifespan time.Duration) error {
bc.Lock()
defer bc.Unlock()
bc.items[name] = &MemoryItem{
val: value,
createdTime: time.Now(),
lifespan: lifespan,
}
return nil
}

// Delete cache in memory.
func (bc *MemoryCache) Delete(name string) error {
bc.Lock()
defer bc.Unlock()
if _, ok := bc.items[name]; !ok {
return errors.New("key not exist")
}
delete(bc.items, name)
if _, ok := bc.items[name]; ok {
return errors.New("delete key error")
}
return nil
}

// Incr increase cache counter in memory.
// it supports int,int32,int64,uint,uint32,uint64.
func (bc *MemoryCache) Incr(key string) error {
bc.RLock()
defer bc.RUnlock()
itm, ok := bc.items[key]
if !ok {
return errors.New("key not exist")
}
switch itm.val.(type) {
case int:
itm.val = itm.val.(int) + 1
case int32:
itm.val = itm.val.(int32) + 1
case int64:
itm.val = itm.val.(int64) + 1
case uint:
itm.val = itm.val.(uint) + 1
case uint32:
itm.val = itm.val.(uint32) + 1
case uint64:
itm.val = itm.val.(uint64) + 1
default:
return errors.New("item val is not (u)int (u)int32 (u)int64")
}
return nil
}

// Decr decrease counter in memory.
func (bc *MemoryCache) Decr(key string) error {
bc.RLock()
defer bc.RUnlock()
itm, ok := bc.items[key]
if !ok {
return errors.New("key not exist")
}
switch itm.val.(type) {
case int:
itm.val = itm.val.(int) - 1
case int64:
itm.val = itm.val.(int64) - 1
case int32:
itm.val = itm.val.(int32) - 1
case uint:
if itm.val.(uint) > 0 {
itm.val = itm.val.(uint) - 1
} else {
return errors.New("item val is less than 0")
}
case uint32:
if itm.val.(uint32) > 0 {
itm.val = itm.val.(uint32) - 1
} else {
return errors.New("item val is less than 0")
}
case uint64:
if itm.val.(uint64) > 0 {
itm.val = itm.val.(uint64) - 1
} else {
return errors.New("item val is less than 0")
}
default:
return errors.New("item val is not int int64 int32")
}
return nil
}

// IsExist check cache exist in memory.
func (bc *MemoryCache) IsExist(name string) bool {
bc.RLock()
defer bc.RUnlock()
if v, ok := bc.items[name]; ok {
return !v.isExpire()
}
return false
}

// ClearAll will delete all cache in memory.
func (bc *MemoryCache) ClearAll() error {
bc.Lock()
defer bc.Unlock()
bc.items = make(map[string]*MemoryItem)
return nil
}

// StartAndGC start memory cache. it will check expiration in every clock time.
func (bc *MemoryCache) StartAndGC(config string) error {
var cf map[string]int
json.Unmarshal([]byte(config), &cf)
if _, ok := cf["interval"]; !ok {
cf = make(map[string]int)
cf["interval"] = DefaultEvery
}
dur := time.Duration(cf["interval"]) * time.Second
bc.Every = cf["interval"]
bc.dur = dur
go bc.vacuum()
return nil
}

// check expiration.
func (bc *MemoryCache) vacuum() {
bc.RLock()
every := bc.Every
bc.RUnlock()

if every < 1 {
return
}
for {
<-time.After(bc.dur)
if bc.items == nil {
return
}
if keys := bc.expiredKeys(); len(keys) != 0 {
bc.clearItems(keys)
}
}
}

// expiredKeys returns key list which are expired.
func (bc *MemoryCache) expiredKeys() (keys []string) {
bc.RLock()
defer bc.RUnlock()
for key, itm := range bc.items {
if itm.isExpire() {
keys = append(keys, key)
}
}
return
}

// clearItems removes all the items which key in keys.
func (bc *MemoryCache) clearItems(keys []string) {
bc.Lock()
defer bc.Unlock()
for _, key := range keys {
delete(bc.items, key)
}
}

func init() {
Register("memory", NewMemoryCache)
}

+ 402
- 0
utils/cache/redis.go Voir le fichier

@@ -0,0 +1,402 @@
package zhios_tool_cache

import (
"encoding/json"
"errors"
"github.com/gomodule/redigo/redis"
"log"
"strings"
"time"
)

// configuration
type Config struct {
Server string
Password string
MaxIdle int // Maximum number of idle connections in the pool.

// Maximum number of connections allocated by the pool at a given time.
// When zero, there is no limit on the number of connections in the pool.
MaxActive int

// Close connections after remaining idle for this duration. If the value
// is zero, then idle connections are not closed. Applications should set
// the timeout to a value less than the server's timeout.
IdleTimeout time.Duration

// If Wait is true and the pool is at the MaxActive limit, then Get() waits
// for a connection to be returned to the pool before returning.
Wait bool
KeyPrefix string // prefix to all keys; example is "dev environment name"
KeyDelimiter string // delimiter to be used while appending keys; example is ":"
KeyPlaceholder string // placeholder to be parsed using given arguments to obtain a final key; example is "?"
}

var pool *redis.Pool
var conf *Config

func NewRedis(addr string) {
if addr == "" {
panic("\nredis connect string cannot be empty\n")
}
pool = &redis.Pool{
MaxIdle: redisMaxIdleConn,
IdleTimeout: redisIdleTTL,
MaxActive: redisMaxActive,
// MaxConnLifetime: redisDialTTL,
Wait: true,
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", addr,
redis.DialConnectTimeout(redisDialTTL),
redis.DialReadTimeout(redisReadTTL),
redis.DialWriteTimeout(redisWriteTTL),
)
if err != nil {
log.Println("Redis Dial failed: ", err)
return nil, err
}
return c, err
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
if err != nil {
log.Println("Unable to ping to redis server:", err)
}
return err
},
}
conn := pool.Get()
defer conn.Close()
if conn.Err() != nil {
println("\nredis connect " + addr + " error: " + conn.Err().Error())
} else {
println("\nredis connect " + addr + " success!\n")
}
}

func Do(cmd string, args ...interface{}) (reply interface{}, err error) {
conn := pool.Get()
defer conn.Close()
return conn.Do(cmd, args...)
}

func GetPool() *redis.Pool {
return pool
}

func ParseKey(key string, vars []string) (string, error) {
arr := strings.Split(key, conf.KeyPlaceholder)
actualKey := ""
if len(arr) != len(vars)+1 {
return "", errors.New("redis/connection.go: Insufficient arguments to parse key")
} else {
for index, val := range arr {
if index == 0 {
actualKey = arr[index]
} else {
actualKey += vars[index-1] + val
}
}
}
return getPrefixedKey(actualKey), nil
}

func getPrefixedKey(key string) string {
return conf.KeyPrefix + conf.KeyDelimiter + key
}
func StripEnvKey(key string) string {
return strings.TrimLeft(key, conf.KeyPrefix+conf.KeyDelimiter)
}
func SplitKey(key string) []string {
return strings.Split(key, conf.KeyDelimiter)
}
func Expire(key string, ttl int) (interface{}, error) {
return Do("EXPIRE", key, ttl)
}
func Persist(key string) (interface{}, error) {
return Do("PERSIST", key)
}

func Del(key string) (interface{}, error) {
return Do("DEL", key)
}
func Set(key string, data interface{}) (interface{}, error) {
// set
return Do("SET", key, data)
}
func SetNX(key string, data interface{}) (interface{}, error) {
return Do("SETNX", key, data)
}
func SetEx(key string, data interface{}, ttl int) (interface{}, error) {
return Do("SETEX", key, ttl, data)
}

func SetJson(key string, data interface{}, ttl int) bool {
c, err := json.Marshal(data)
if err != nil {
return false
}
if ttl < 1 {
_, err = Set(key, c)
} else {
_, err = SetEx(key, c, ttl)
}
if err != nil {
return false
}
return true
}

func GetJson(key string, dst interface{}) error {
b, err := GetBytes(key)
if err != nil {
return err
}
if err = json.Unmarshal(b, dst); err != nil {
return err
}
return nil
}

func Get(key string) (interface{}, error) {
// get
return Do("GET", key)
}
func GetTTL(key string) (time.Duration, error) {
ttl, err := redis.Int64(Do("TTL", key))
return time.Duration(ttl) * time.Second, err
}
func GetBytes(key string) ([]byte, error) {
return redis.Bytes(Do("GET", key))
}
func GetString(key string) (string, error) {
return redis.String(Do("GET", key))
}
func GetStringMap(key string) (map[string]string, error) {
return redis.StringMap(Do("GET", key))
}
func GetInt(key string) (int, error) {
return redis.Int(Do("GET", key))
}
func GetInt64(key string) (int64, error) {
return redis.Int64(Do("GET", key))
}
func GetStringLength(key string) (int, error) {
return redis.Int(Do("STRLEN", key))
}
func ZAdd(key string, score float64, data interface{}) (interface{}, error) {
return Do("ZADD", key, score, data)
}
func ZAddNX(key string, score float64, data interface{}) (interface{}, error) {
return Do("ZADD", key, "NX", score, data)
}
func ZRem(key string, data interface{}) (interface{}, error) {
return Do("ZREM", key, data)
}
func ZRange(key string, start int, end int, withScores bool) ([]interface{}, error) {
if withScores {
return redis.Values(Do("ZRANGE", key, start, end, "WITHSCORES"))
}
return redis.Values(Do("ZRANGE", key, start, end))
}
func ZRemRangeByScore(key string, start int64, end int64) ([]interface{}, error) {
return redis.Values(Do("ZREMRANGEBYSCORE", key, start, end))
}
func ZCard(setName string) (int64, error) {
return redis.Int64(Do("ZCARD", setName))
}
func ZScan(setName string) (int64, error) {
return redis.Int64(Do("ZCARD", setName))
}
func SAdd(setName string, data interface{}) (interface{}, error) {
return Do("SADD", setName, data)
}
func SCard(setName string) (int64, error) {
return redis.Int64(Do("SCARD", setName))
}
func SIsMember(setName string, data interface{}) (bool, error) {
return redis.Bool(Do("SISMEMBER", setName, data))
}
func SMembers(setName string) ([]string, error) {
return redis.Strings(Do("SMEMBERS", setName))
}
func SRem(setName string, data interface{}) (interface{}, error) {
return Do("SREM", setName, data)
}
func HSet(key string, HKey string, data interface{}) (interface{}, error) {
return Do("HSET", key, HKey, data)
}

func HGet(key string, HKey string) (interface{}, error) {
return Do("HGET", key, HKey)
}

func HMGet(key string, hashKeys ...string) ([]interface{}, error) {
ret, err := Do("HMGET", key, hashKeys)
if err != nil {
return nil, err
}
reta, ok := ret.([]interface{})
if !ok {
return nil, errors.New("result not an array")
}
return reta, nil
}

func HMSet(key string, hashKeys []string, vals []interface{}) (interface{}, error) {
if len(hashKeys) == 0 || len(hashKeys) != len(vals) {
var ret interface{}
return ret, errors.New("bad length")
}
input := []interface{}{key}
for i, v := range hashKeys {
input = append(input, v, vals[i])
}
return Do("HMSET", input...)
}

func HGetString(key string, HKey string) (string, error) {
return redis.String(Do("HGET", key, HKey))
}
func HGetFloat(key string, HKey string) (float64, error) {
f, err := redis.Float64(Do("HGET", key, HKey))
return f, err
}
func HGetInt(key string, HKey string) (int, error) {
return redis.Int(Do("HGET", key, HKey))
}
func HGetInt64(key string, HKey string) (int64, error) {
return redis.Int64(Do("HGET", key, HKey))
}
func HGetBool(key string, HKey string) (bool, error) {
return redis.Bool(Do("HGET", key, HKey))
}
func HDel(key string, HKey string) (interface{}, error) {
return Do("HDEL", key, HKey)
}

func HGetAll(key string) (map[string]interface{}, error) {
vals, err := redis.Values(Do("HGETALL", key))
if err != nil {
return nil, err
}
num := len(vals) / 2
result := make(map[string]interface{}, num)
for i := 0; i < num; i++ {
key, _ := redis.String(vals[2*i], nil)
result[key] = vals[2*i+1]
}
return result, nil
}

func FlushAll() bool {
res, _ := redis.String(Do("FLUSHALL"))
if res == "" {
return false
}
return true
}

// NOTE: Use this in production environment with extreme care.
// Read more here:https://redis.io/commands/keys
func Keys(pattern string) ([]string, error) {
return redis.Strings(Do("KEYS", pattern))
}

func HKeys(key string) ([]string, error) {
return redis.Strings(Do("HKEYS", key))
}

func Exists(key string) bool {
count, err := redis.Int(Do("EXISTS", key))
if count == 0 || err != nil {
return false
}
return true
}

func Incr(key string) (int64, error) {
return redis.Int64(Do("INCR", key))
}

func Decr(key string) (int64, error) {
return redis.Int64(Do("DECR", key))
}

func IncrBy(key string, incBy int64) (int64, error) {
return redis.Int64(Do("INCRBY", key, incBy))
}

func DecrBy(key string, decrBy int64) (int64, error) {
return redis.Int64(Do("DECRBY", key))
}

func IncrByFloat(key string, incBy float64) (float64, error) {
return redis.Float64(Do("INCRBYFLOAT", key, incBy))
}

func DecrByFloat(key string, decrBy float64) (float64, error) {
return redis.Float64(Do("DECRBYFLOAT", key, decrBy))
}

// use for message queue
func LPush(key string, data interface{}) (interface{}, error) {
// set
return Do("LPUSH", key, data)
}

func LPop(key string) (interface{}, error) {
return Do("LPOP", key)
}

func LPopString(key string) (string, error) {
return redis.String(Do("LPOP", key))
}
func LPopFloat(key string) (float64, error) {
f, err := redis.Float64(Do("LPOP", key))
return f, err
}
func LPopInt(key string) (int, error) {
return redis.Int(Do("LPOP", key))
}
func LPopInt64(key string) (int64, error) {
return redis.Int64(Do("LPOP", key))
}

func RPush(key string, data interface{}) (interface{}, error) {
// set
return Do("RPUSH", key, data)
}

func RPop(key string) (interface{}, error) {
return Do("RPOP", key)
}

func RPopString(key string) (string, error) {
return redis.String(Do("RPOP", key))
}
func RPopFloat(key string) (float64, error) {
f, err := redis.Float64(Do("RPOP", key))
return f, err
}
func RPopInt(key string) (int, error) {
return redis.Int(Do("RPOP", key))
}
func RPopInt64(key string) (int64, error) {
return redis.Int64(Do("RPOP", key))
}

func Scan(cursor int64, pattern string, count int64) (int64, []string, error) {
var items []string
var newCursor int64

values, err := redis.Values(Do("SCAN", cursor, "MATCH", pattern, "COUNT", count))
if err != nil {
return 0, nil, err
}
values, err = redis.Scan(values, &newCursor, &items)
if err != nil {
return 0, nil, err
}
return newCursor, items, nil
}

+ 622
- 0
utils/cache/redis_cluster.go Voir le fichier

@@ -0,0 +1,622 @@
package zhios_tool_cache

import (
"strconv"
"time"

"github.com/go-redis/redis"
)

var pools *redis.ClusterClient

func NewRedisCluster(addrs []string) error {
opt := &redis.ClusterOptions{
Addrs: addrs,
PoolSize: redisPoolSize,
PoolTimeout: redisPoolTTL,
IdleTimeout: redisIdleTTL,
DialTimeout: redisDialTTL,
ReadTimeout: redisReadTTL,
WriteTimeout: redisWriteTTL,
}
pools = redis.NewClusterClient(opt)
if err := pools.Ping().Err(); err != nil {
return err
}
return nil
}

func RCGet(key string) (interface{}, error) {
res, err := pools.Get(key).Result()
if err != nil {
return nil, convertError(err)
}
return []byte(res), nil
}
func RCSet(key string, value interface{}) error {
err := pools.Set(key, value, 0).Err()
return convertError(err)
}
func RCGetSet(key string, value interface{}) (interface{}, error) {
res, err := pools.GetSet(key, value).Result()
if err != nil {
return nil, convertError(err)
}
return []byte(res), nil
}
func RCSetNx(key string, value interface{}) (int64, error) {
res, err := pools.SetNX(key, value, 0).Result()
if err != nil {
return 0, convertError(err)
}
if res {
return 1, nil
}
return 0, nil
}
func RCSetEx(key string, value interface{}, timeout int64) error {
_, err := pools.Set(key, value, time.Duration(timeout)*time.Second).Result()
if err != nil {
return convertError(err)
}
return nil
}

// nil表示成功,ErrNil表示数据库内已经存在这个key,其他表示数据库发生错误
func RCSetNxEx(key string, value interface{}, timeout int64) error {
res, err := pools.SetNX(key, value, time.Duration(timeout)*time.Second).Result()
if err != nil {
return convertError(err)
}
if res {
return nil
}
return ErrNil
}
func RCMGet(keys ...string) ([]interface{}, error) {
res, err := pools.MGet(keys...).Result()
return res, convertError(err)
}

// 为确保多个key映射到同一个slot,每个key最好加上hash tag,如:{test}
func RCMSet(kvs map[string]interface{}) error {
pairs := make([]string, 0, len(kvs)*2)
for k, v := range kvs {
val, err := String(v, nil)
if err != nil {
return err
}
pairs = append(pairs, k, val)
}
return convertError(pools.MSet(pairs).Err())
}

// 为确保多个key映射到同一个slot,每个key最好加上hash tag,如:{test}
func RCMSetNX(kvs map[string]interface{}) (bool, error) {
pairs := make([]string, 0, len(kvs)*2)
for k, v := range kvs {
val, err := String(v, nil)
if err != nil {
return false, err
}
pairs = append(pairs, k, val)
}
res, err := pools.MSetNX(pairs).Result()
return res, convertError(err)
}
func RCExpireAt(key string, timestamp int64) (int64, error) {
res, err := pools.ExpireAt(key, time.Unix(timestamp, 0)).Result()
if err != nil {
return 0, convertError(err)
}
if res {
return 1, nil
}
return 0, nil
}
func RCDel(keys ...string) (int64, error) {
args := make([]interface{}, 0, len(keys))
for _, key := range keys {
args = append(args, key)
}
res, err := pools.Del(keys...).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func RCIncr(key string) (int64, error) {
res, err := pools.Incr(key).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func RCIncrBy(key string, delta int64) (int64, error) {
res, err := pools.IncrBy(key, delta).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func RCExpire(key string, duration int64) (int64, error) {
res, err := pools.Expire(key, time.Duration(duration)*time.Second).Result()
if err != nil {
return 0, convertError(err)
}
if res {
return 1, nil
}
return 0, nil
}
func RCExists(key string) (bool, error) {
res, err := pools.Exists(key).Result()
if err != nil {
return false, convertError(err)
}
if res > 0 {
return true, nil
}
return false, nil
}
func RCHGet(key string, field string) (interface{}, error) {
res, err := pools.HGet(key, field).Result()
if err != nil {
return nil, convertError(err)
}
return []byte(res), nil
}
func RCHLen(key string) (int64, error) {
res, err := pools.HLen(key).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func RCHSet(key string, field string, val interface{}) error {
value, err := String(val, nil)
if err != nil && err != ErrNil {
return err
}
_, err = pools.HSet(key, field, value).Result()
if err != nil {
return convertError(err)
}
return nil
}
func RCHDel(key string, fields ...string) (int64, error) {
args := make([]interface{}, 0, len(fields)+1)
args = append(args, key)
for _, field := range fields {
args = append(args, field)
}
res, err := pools.HDel(key, fields...).Result()
if err != nil {
return 0, convertError(err)
}
return res, nil
}

func RCHMGet(key string, fields ...string) (interface{}, error) {
args := make([]interface{}, 0, len(fields)+1)
args = append(args, key)
for _, field := range fields {
args = append(args, field)
}
if len(fields) == 0 {
return nil, ErrNil
}
res, err := pools.HMGet(key, fields...).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func RCHMSet(key string, kvs ...interface{}) error {
if len(kvs) == 0 {
return nil
}
if len(kvs)%2 != 0 {
return ErrWrongArgsNum
}
var err error
v := map[string]interface{}{} // todo change
v["field"], err = String(kvs[0], nil)
if err != nil && err != ErrNil {
return err
}
v["value"], err = String(kvs[1], nil)
if err != nil && err != ErrNil {
return err
}
pairs := make([]string, 0, len(kvs)-2)
if len(kvs) > 2 {
for _, kv := range kvs[2:] {
kvString, err := String(kv, nil)
if err != nil && err != ErrNil {
return err
}
pairs = append(pairs, kvString)
}
}
v["paris"] = pairs
_, err = pools.HMSet(key, v).Result()
if err != nil {
return convertError(err)
}
return nil
}

func RCHKeys(key string) ([]string, error) {
res, err := pools.HKeys(key).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func RCHVals(key string) ([]interface{}, error) {
res, err := pools.HVals(key).Result()
if err != nil {
return nil, convertError(err)
}
rs := make([]interface{}, 0, len(res))
for _, res := range res {
rs = append(rs, res)
}
return rs, nil
}
func RCHGetAll(key string) (map[string]string, error) {
vals, err := pools.HGetAll(key).Result()
if err != nil {
return nil, convertError(err)
}
return vals, nil
}
func RCHIncrBy(key, field string, delta int64) (int64, error) {
res, err := pools.HIncrBy(key, field, delta).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func RCZAdd(key string, kvs ...interface{}) (int64, error) {
args := make([]interface{}, 0, len(kvs)+1)
args = append(args, key)
args = append(args, kvs...)
if len(kvs) == 0 {
return 0, nil
}
if len(kvs)%2 != 0 {
return 0, ErrWrongArgsNum
}
zs := make([]redis.Z, len(kvs)/2)
for i := 0; i < len(kvs); i += 2 {
idx := i / 2
score, err := Float64(kvs[i], nil)
if err != nil && err != ErrNil {
return 0, err
}
zs[idx].Score = score
zs[idx].Member = kvs[i+1]
}
res, err := pools.ZAdd(key, zs...).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func RCZRem(key string, members ...string) (int64, error) {
args := make([]interface{}, 0, len(members))
args = append(args, key)
for _, member := range members {
args = append(args, member)
}
res, err := pools.ZRem(key, members).Result()
if err != nil {
return res, convertError(err)
}
return res, err
}

func RCZRange(key string, min, max int64, withScores bool) (interface{}, error) {
res := make([]interface{}, 0)
if withScores {
zs, err := pools.ZRangeWithScores(key, min, max).Result()
if err != nil {
return nil, convertError(err)
}
for _, z := range zs {
res = append(res, z.Member, strconv.FormatFloat(z.Score, 'f', -1, 64))
}
} else {
ms, err := pools.ZRange(key, min, max).Result()
if err != nil {
return nil, convertError(err)
}
for _, m := range ms {
res = append(res, m)
}
}
return res, nil
}
func RCZRangeByScoreWithScore(key string, min, max int64) (map[string]int64, error) {
opt := new(redis.ZRangeBy)
opt.Min = strconv.FormatInt(int64(min), 10)
opt.Max = strconv.FormatInt(int64(max), 10)
opt.Count = -1
opt.Offset = 0
vals, err := pools.ZRangeByScoreWithScores(key, *opt).Result()
if err != nil {
return nil, convertError(err)
}
res := make(map[string]int64, len(vals))
for _, val := range vals {
key, err := String(val.Member, nil)
if err != nil && err != ErrNil {
return nil, err
}
res[key] = int64(val.Score)
}
return res, nil
}
func RCLRange(key string, start, stop int64) (interface{}, error) {
res, err := pools.LRange(key, start, stop).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func RCLSet(key string, index int, value interface{}) error {
err := pools.LSet(key, int64(index), value).Err()
return convertError(err)
}
func RCLLen(key string) (int64, error) {
res, err := pools.LLen(key).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func RCLRem(key string, count int, value interface{}) (int, error) {
val, _ := value.(string)
res, err := pools.LRem(key, int64(count), val).Result()
if err != nil {
return int(res), convertError(err)
}
return int(res), nil
}
func RCTTl(key string) (int64, error) {
duration, err := pools.TTL(key).Result()
if err != nil {
return int64(duration.Seconds()), convertError(err)
}
return int64(duration.Seconds()), nil
}
func RCLPop(key string) (interface{}, error) {
res, err := pools.LPop(key).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func RCRPop(key string) (interface{}, error) {
res, err := pools.RPop(key).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func RCBLPop(key string, timeout int) (interface{}, error) {
res, err := pools.BLPop(time.Duration(timeout)*time.Second, key).Result()
if err != nil {
// 兼容redis 2.x
if err == redis.Nil {
return nil, ErrNil
}
return nil, err
}
return res[1], nil
}
func RCBRPop(key string, timeout int) (interface{}, error) {
res, err := pools.BRPop(time.Duration(timeout)*time.Second, key).Result()
if err != nil {
// 兼容redis 2.x
if err == redis.Nil {
return nil, ErrNil
}
return nil, convertError(err)
}
return res[1], nil
}
func RCLPush(key string, value ...interface{}) error {
args := make([]interface{}, 0, len(value)+1)
args = append(args, key)
args = append(args, value...)
vals := make([]string, 0, len(value))
for _, v := range value {
val, err := String(v, nil)
if err != nil && err != ErrNil {
return err
}
vals = append(vals, val)
}
_, err := pools.LPush(key, vals).Result() // todo ...
if err != nil {
return convertError(err)
}
return nil
}
func RCRPush(key string, value ...interface{}) error {
args := make([]interface{}, 0, len(value)+1)
args = append(args, key)
args = append(args, value...)
vals := make([]string, 0, len(value))
for _, v := range value {
val, err := String(v, nil)
if err != nil && err != ErrNil {
if err == ErrNil {
continue
}
return err
}
if val == "" {
continue
}
vals = append(vals, val)
}
_, err := pools.RPush(key, vals).Result() // todo ...
if err != nil {
return convertError(err)
}
return nil
}

// 为确保srcKey跟destKey映射到同一个slot,srcKey和destKey需要加上hash tag,如:{test}
func RCBRPopLPush(srcKey string, destKey string, timeout int) (interface{}, error) {
res, err := pools.BRPopLPush(srcKey, destKey, time.Duration(timeout)*time.Second).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}

// 为确保srcKey跟destKey映射到同一个slot,srcKey和destKey需要加上hash tag,如:{test}
func RCRPopLPush(srcKey string, destKey string) (interface{}, error) {
res, err := pools.RPopLPush(srcKey, destKey).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func RCSAdd(key string, members ...interface{}) (int64, error) {
args := make([]interface{}, 0, len(members)+1)
args = append(args, key)
args = append(args, members...)
ms := make([]string, 0, len(members))
for _, member := range members {
m, err := String(member, nil)
if err != nil && err != ErrNil {
return 0, err
}
ms = append(ms, m)
}
res, err := pools.SAdd(key, ms).Result() // todo ...
if err != nil {
return res, convertError(err)
}
return res, nil
}
func RCSPop(key string) ([]byte, error) {
res, err := pools.SPop(key).Result()
if err != nil {
return nil, convertError(err)
}
return []byte(res), nil
}
func RCSIsMember(key string, member interface{}) (bool, error) {
m, _ := member.(string)
res, err := pools.SIsMember(key, m).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func RCSRem(key string, members ...interface{}) (int64, error) {
args := make([]interface{}, 0, len(members)+1)
args = append(args, key)
args = append(args, members...)
ms := make([]string, 0, len(members))
for _, member := range members {
m, err := String(member, nil)
if err != nil && err != ErrNil {
return 0, err
}
ms = append(ms, m)
}
res, err := pools.SRem(key, ms).Result() // todo ...
if err != nil {
return res, convertError(err)
}
return res, nil
}
func RCSMembers(key string) ([]string, error) {
res, err := pools.SMembers(key).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func RCScriptLoad(luaScript string) (interface{}, error) {
res, err := pools.ScriptLoad(luaScript).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func RCEvalSha(sha1 string, numberKeys int, keysArgs ...interface{}) (interface{}, error) {
vals := make([]interface{}, 0, len(keysArgs)+2)
vals = append(vals, sha1, numberKeys)
vals = append(vals, keysArgs...)
keys := make([]string, 0, numberKeys)
args := make([]string, 0, len(keysArgs)-numberKeys)
for i, value := range keysArgs {
val, err := String(value, nil)
if err != nil && err != ErrNil {
return nil, err
}
if i < numberKeys {
keys = append(keys, val)
} else {
args = append(args, val)
}
}
res, err := pools.EvalSha(sha1, keys, args).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func RCEval(luaScript string, numberKeys int, keysArgs ...interface{}) (interface{}, error) {
vals := make([]interface{}, 0, len(keysArgs)+2)
vals = append(vals, luaScript, numberKeys)
vals = append(vals, keysArgs...)
keys := make([]string, 0, numberKeys)
args := make([]string, 0, len(keysArgs)-numberKeys)
for i, value := range keysArgs {
val, err := String(value, nil)
if err != nil && err != ErrNil {
return nil, err
}
if i < numberKeys {
keys = append(keys, val)
} else {
args = append(args, val)
}
}
res, err := pools.Eval(luaScript, keys, args).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func RCGetBit(key string, offset int64) (int64, error) {
res, err := pools.GetBit(key, offset).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func RCSetBit(key string, offset uint32, value int) (int, error) {
res, err := pools.SetBit(key, int64(offset), value).Result()
return int(res), convertError(err)
}
func RCGetClient() *redis.ClusterClient {
return pools
}
func convertError(err error) error {
if err == redis.Nil {
// 为了兼容redis 2.x,这里不返回 ErrNil,ErrNil在调用redis_cluster_reply函数时才返回
return nil
}
return err
}

+ 323
- 0
utils/cache/redis_pool.go Voir le fichier

@@ -0,0 +1,323 @@
package zhios_tool_cache

import (
"errors"
"github.com/gomodule/redigo/redis"
"log"
"strings"
"time"
)

type RedisPool struct {
*redis.Pool
}

func NewRedisPool(cfg *Config) *RedisPool {
return &RedisPool{&redis.Pool{
MaxIdle: cfg.MaxIdle,
IdleTimeout: cfg.IdleTimeout,
MaxActive: cfg.MaxActive,
Wait: cfg.Wait,
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", cfg.Server)
if err != nil {
log.Println("Redis Dial failed: ", err)
return nil, err
}
if cfg.Password != "" {
if _, err := c.Do("AUTH", cfg.Password); err != nil {
c.Close()
log.Println("Redis AUTH failed: ", err)
return nil, err
}
}
return c, err
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
if err != nil {
log.Println("Unable to ping to redis server:", err)
}
return err
},
}}
}

func (p *RedisPool) Do(cmd string, args ...interface{}) (reply interface{}, err error) {
conn := pool.Get()
defer conn.Close()
return conn.Do(cmd, args...)
}

func (p *RedisPool) GetPool() *redis.Pool {
return pool
}

func (p *RedisPool) ParseKey(key string, vars []string) (string, error) {
arr := strings.Split(key, conf.KeyPlaceholder)
actualKey := ""
if len(arr) != len(vars)+1 {
return "", errors.New("redis/connection.go: Insufficient arguments to parse key")
} else {
for index, val := range arr {
if index == 0 {
actualKey = arr[index]
} else {
actualKey += vars[index-1] + val
}
}
}
return getPrefixedKey(actualKey), nil
}

func (p *RedisPool) getPrefixedKey(key string) string {
return conf.KeyPrefix + conf.KeyDelimiter + key
}
func (p *RedisPool) StripEnvKey(key string) string {
return strings.TrimLeft(key, conf.KeyPrefix+conf.KeyDelimiter)
}
func (p *RedisPool) SplitKey(key string) []string {
return strings.Split(key, conf.KeyDelimiter)
}
func (p *RedisPool) Expire(key string, ttl int) (interface{}, error) {
return Do("EXPIRE", key, ttl)
}
func (p *RedisPool) Persist(key string) (interface{}, error) {
return Do("PERSIST", key)
}

func (p *RedisPool) Del(key string) (interface{}, error) {
return Do("DEL", key)
}
func (p *RedisPool) Set(key string, data interface{}) (interface{}, error) {
// set
return Do("SET", key, data)
}
func (p *RedisPool) SetNX(key string, data interface{}) (interface{}, error) {
return Do("SETNX", key, data)
}
func (p *RedisPool) SetEx(key string, data interface{}, ttl int) (interface{}, error) {
return Do("SETEX", key, ttl, data)
}
func (p *RedisPool) Get(key string) (interface{}, error) {
// get
return Do("GET", key)
}
func (p *RedisPool) GetStringMap(key string) (map[string]string, error) {
// get
return redis.StringMap(Do("GET", key))
}

func (p *RedisPool) GetTTL(key string) (time.Duration, error) {
ttl, err := redis.Int64(Do("TTL", key))
return time.Duration(ttl) * time.Second, err
}
func (p *RedisPool) GetBytes(key string) ([]byte, error) {
return redis.Bytes(Do("GET", key))
}
func (p *RedisPool) GetString(key string) (string, error) {
return redis.String(Do("GET", key))
}
func (p *RedisPool) GetInt(key string) (int, error) {
return redis.Int(Do("GET", key))
}
func (p *RedisPool) GetStringLength(key string) (int, error) {
return redis.Int(Do("STRLEN", key))
}
func (p *RedisPool) ZAdd(key string, score float64, data interface{}) (interface{}, error) {
return Do("ZADD", key, score, data)
}
func (p *RedisPool) ZRem(key string, data interface{}) (interface{}, error) {
return Do("ZREM", key, data)
}
func (p *RedisPool) ZRange(key string, start int, end int, withScores bool) ([]interface{}, error) {
if withScores {
return redis.Values(Do("ZRANGE", key, start, end, "WITHSCORES"))
}
return redis.Values(Do("ZRANGE", key, start, end))
}
func (p *RedisPool) SAdd(setName string, data interface{}) (interface{}, error) {
return Do("SADD", setName, data)
}
func (p *RedisPool) SCard(setName string) (int64, error) {
return redis.Int64(Do("SCARD", setName))
}
func (p *RedisPool) SIsMember(setName string, data interface{}) (bool, error) {
return redis.Bool(Do("SISMEMBER", setName, data))
}
func (p *RedisPool) SMembers(setName string) ([]string, error) {
return redis.Strings(Do("SMEMBERS", setName))
}
func (p *RedisPool) SRem(setName string, data interface{}) (interface{}, error) {
return Do("SREM", setName, data)
}
func (p *RedisPool) HSet(key string, HKey string, data interface{}) (interface{}, error) {
return Do("HSET", key, HKey, data)
}

func (p *RedisPool) HGet(key string, HKey string) (interface{}, error) {
return Do("HGET", key, HKey)
}

func (p *RedisPool) HMGet(key string, hashKeys ...string) ([]interface{}, error) {
ret, err := Do("HMGET", key, hashKeys)
if err != nil {
return nil, err
}
reta, ok := ret.([]interface{})
if !ok {
return nil, errors.New("result not an array")
}
return reta, nil
}

func (p *RedisPool) HMSet(key string, hashKeys []string, vals []interface{}) (interface{}, error) {
if len(hashKeys) == 0 || len(hashKeys) != len(vals) {
var ret interface{}
return ret, errors.New("bad length")
}
input := []interface{}{key}
for i, v := range hashKeys {
input = append(input, v, vals[i])
}
return Do("HMSET", input...)
}

func (p *RedisPool) HGetString(key string, HKey string) (string, error) {
return redis.String(Do("HGET", key, HKey))
}
func (p *RedisPool) HGetFloat(key string, HKey string) (float64, error) {
f, err := redis.Float64(Do("HGET", key, HKey))
return float64(f), err
}
func (p *RedisPool) HGetInt(key string, HKey string) (int, error) {
return redis.Int(Do("HGET", key, HKey))
}
func (p *RedisPool) HGetInt64(key string, HKey string) (int64, error) {
return redis.Int64(Do("HGET", key, HKey))
}
func (p *RedisPool) HGetBool(key string, HKey string) (bool, error) {
return redis.Bool(Do("HGET", key, HKey))
}
func (p *RedisPool) HDel(key string, HKey string) (interface{}, error) {
return Do("HDEL", key, HKey)
}
func (p *RedisPool) HGetAll(key string) (map[string]interface{}, error) {
vals, err := redis.Values(Do("HGETALL", key))
if err != nil {
return nil, err
}
num := len(vals) / 2
result := make(map[string]interface{}, num)
for i := 0; i < num; i++ {
key, _ := redis.String(vals[2*i], nil)
result[key] = vals[2*i+1]
}
return result, nil
}

// NOTE: Use this in production environment with extreme care.
// Read more here:https://redis.io/commands/keys
func (p *RedisPool) Keys(pattern string) ([]string, error) {
return redis.Strings(Do("KEYS", pattern))
}

func (p *RedisPool) HKeys(key string) ([]string, error) {
return redis.Strings(Do("HKEYS", key))
}

func (p *RedisPool) Exists(key string) (bool, error) {
count, err := redis.Int(Do("EXISTS", key))
if count == 0 {
return false, err
} else {
return true, err
}
}

func (p *RedisPool) Incr(key string) (int64, error) {
return redis.Int64(Do("INCR", key))
}

func (p *RedisPool) Decr(key string) (int64, error) {
return redis.Int64(Do("DECR", key))
}

func (p *RedisPool) IncrBy(key string, incBy int64) (int64, error) {
return redis.Int64(Do("INCRBY", key, incBy))
}

func (p *RedisPool) DecrBy(key string, decrBy int64) (int64, error) {
return redis.Int64(Do("DECRBY", key))
}

func (p *RedisPool) IncrByFloat(key string, incBy float64) (float64, error) {
return redis.Float64(Do("INCRBYFLOAT", key, incBy))
}

func (p *RedisPool) DecrByFloat(key string, decrBy float64) (float64, error) {
return redis.Float64(Do("DECRBYFLOAT", key, decrBy))
}

// use for message queue
func (p *RedisPool) LPush(key string, data interface{}) (interface{}, error) {
// set
return Do("LPUSH", key, data)
}

func (p *RedisPool) LPop(key string) (interface{}, error) {
return Do("LPOP", key)
}

func (p *RedisPool) LPopString(key string) (string, error) {
return redis.String(Do("LPOP", key))
}
func (p *RedisPool) LPopFloat(key string) (float64, error) {
f, err := redis.Float64(Do("LPOP", key))
return float64(f), err
}
func (p *RedisPool) LPopInt(key string) (int, error) {
return redis.Int(Do("LPOP", key))
}
func (p *RedisPool) LPopInt64(key string) (int64, error) {
return redis.Int64(Do("LPOP", key))
}

func (p *RedisPool) RPush(key string, data interface{}) (interface{}, error) {
// set
return Do("RPUSH", key, data)
}

func (p *RedisPool) RPop(key string) (interface{}, error) {
return Do("RPOP", key)
}

func (p *RedisPool) RPopString(key string) (string, error) {
return redis.String(Do("RPOP", key))
}
func (p *RedisPool) RPopFloat(key string) (float64, error) {
f, err := redis.Float64(Do("RPOP", key))
return float64(f), err
}
func (p *RedisPool) RPopInt(key string) (int, error) {
return redis.Int(Do("RPOP", key))
}
func (p *RedisPool) RPopInt64(key string) (int64, error) {
return redis.Int64(Do("RPOP", key))
}

func (p *RedisPool) Scan(cursor int64, pattern string, count int64) (int64, []string, error) {
var items []string
var newCursor int64

values, err := redis.Values(Do("SCAN", cursor, "MATCH", pattern, "COUNT", count))
if err != nil {
return 0, nil, err
}
values, err = redis.Scan(values, &newCursor, &items)
if err != nil {
return 0, nil, err
}

return newCursor, items, nil
}

+ 617
- 0
utils/cache/redis_pool_cluster.go Voir le fichier

@@ -0,0 +1,617 @@
package zhios_tool_cache

import (
"strconv"
"time"

"github.com/go-redis/redis"
)

type RedisClusterPool struct {
client *redis.ClusterClient
}

func NewRedisClusterPool(addrs []string) (*RedisClusterPool, error) {
opt := &redis.ClusterOptions{
Addrs: addrs,
PoolSize: 512,
PoolTimeout: 10 * time.Second,
IdleTimeout: 10 * time.Second,
DialTimeout: 10 * time.Second,
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
}
c := redis.NewClusterClient(opt)
if err := c.Ping().Err(); err != nil {
return nil, err
}
return &RedisClusterPool{client: c}, nil
}

func (p *RedisClusterPool) Get(key string) (interface{}, error) {
res, err := p.client.Get(key).Result()
if err != nil {
return nil, convertError(err)
}
return []byte(res), nil
}
func (p *RedisClusterPool) Set(key string, value interface{}) error {
err := p.client.Set(key, value, 0).Err()
return convertError(err)
}
func (p *RedisClusterPool) GetSet(key string, value interface{}) (interface{}, error) {
res, err := p.client.GetSet(key, value).Result()
if err != nil {
return nil, convertError(err)
}
return []byte(res), nil
}
func (p *RedisClusterPool) SetNx(key string, value interface{}) (int64, error) {
res, err := p.client.SetNX(key, value, 0).Result()
if err != nil {
return 0, convertError(err)
}
if res {
return 1, nil
}
return 0, nil
}
func (p *RedisClusterPool) SetEx(key string, value interface{}, timeout int64) error {
_, err := p.client.Set(key, value, time.Duration(timeout)*time.Second).Result()
if err != nil {
return convertError(err)
}
return nil
}

// nil表示成功,ErrNil表示数据库内已经存在这个key,其他表示数据库发生错误
func (p *RedisClusterPool) SetNxEx(key string, value interface{}, timeout int64) error {
res, err := p.client.SetNX(key, value, time.Duration(timeout)*time.Second).Result()
if err != nil {
return convertError(err)
}
if res {
return nil
}
return ErrNil
}
func (p *RedisClusterPool) MGet(keys ...string) ([]interface{}, error) {
res, err := p.client.MGet(keys...).Result()
return res, convertError(err)
}

// 为确保多个key映射到同一个slot,每个key最好加上hash tag,如:{test}
func (p *RedisClusterPool) MSet(kvs map[string]interface{}) error {
pairs := make([]string, 0, len(kvs)*2)
for k, v := range kvs {
val, err := String(v, nil)
if err != nil {
return err
}
pairs = append(pairs, k, val)
}
return convertError(p.client.MSet(pairs).Err())
}

// 为确保多个key映射到同一个slot,每个key最好加上hash tag,如:{test}
func (p *RedisClusterPool) MSetNX(kvs map[string]interface{}) (bool, error) {
pairs := make([]string, 0, len(kvs)*2)
for k, v := range kvs {
val, err := String(v, nil)
if err != nil {
return false, err
}
pairs = append(pairs, k, val)
}
res, err := p.client.MSetNX(pairs).Result()
return res, convertError(err)
}
func (p *RedisClusterPool) ExpireAt(key string, timestamp int64) (int64, error) {
res, err := p.client.ExpireAt(key, time.Unix(timestamp, 0)).Result()
if err != nil {
return 0, convertError(err)
}
if res {
return 1, nil
}
return 0, nil
}
func (p *RedisClusterPool) Del(keys ...string) (int64, error) {
args := make([]interface{}, 0, len(keys))
for _, key := range keys {
args = append(args, key)
}
res, err := p.client.Del(keys...).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) Incr(key string) (int64, error) {
res, err := p.client.Incr(key).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) IncrBy(key string, delta int64) (int64, error) {
res, err := p.client.IncrBy(key, delta).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) Expire(key string, duration int64) (int64, error) {
res, err := p.client.Expire(key, time.Duration(duration)*time.Second).Result()
if err != nil {
return 0, convertError(err)
}
if res {
return 1, nil
}
return 0, nil
}
func (p *RedisClusterPool) Exists(key string) (bool, error) { // todo (bool, error)
res, err := p.client.Exists(key).Result()
if err != nil {
return false, convertError(err)
}
if res > 0 {
return true, nil
}
return false, nil
}
func (p *RedisClusterPool) HGet(key string, field string) (interface{}, error) {
res, err := p.client.HGet(key, field).Result()
if err != nil {
return nil, convertError(err)
}
return []byte(res), nil
}
func (p *RedisClusterPool) HLen(key string) (int64, error) {
res, err := p.client.HLen(key).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) HSet(key string, field string, val interface{}) error {
value, err := String(val, nil)
if err != nil && err != ErrNil {
return err
}
_, err = p.client.HSet(key, field, value).Result()
if err != nil {
return convertError(err)
}
return nil
}
func (p *RedisClusterPool) HDel(key string, fields ...string) (int64, error) {
args := make([]interface{}, 0, len(fields)+1)
args = append(args, key)
for _, field := range fields {
args = append(args, field)
}
res, err := p.client.HDel(key, fields...).Result()
if err != nil {
return 0, convertError(err)
}
return res, nil
}

func (p *RedisClusterPool) HMGet(key string, fields ...string) (interface{}, error) {
args := make([]interface{}, 0, len(fields)+1)
args = append(args, key)
for _, field := range fields {
args = append(args, field)
}
if len(fields) == 0 {
return nil, ErrNil
}
res, err := p.client.HMGet(key, fields...).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) HMSet(key string, kvs ...interface{}) error {
if len(kvs) == 0 {
return nil
}
if len(kvs)%2 != 0 {
return ErrWrongArgsNum
}
var err error
v := map[string]interface{}{} // todo change
v["field"], err = String(kvs[0], nil)
if err != nil && err != ErrNil {
return err
}
v["value"], err = String(kvs[1], nil)
if err != nil && err != ErrNil {
return err
}
pairs := make([]string, 0, len(kvs)-2)
if len(kvs) > 2 {
for _, kv := range kvs[2:] {
kvString, err := String(kv, nil)
if err != nil && err != ErrNil {
return err
}
pairs = append(pairs, kvString)
}
}
v["paris"] = pairs
_, err = p.client.HMSet(key, v).Result()
if err != nil {
return convertError(err)
}
return nil
}

func (p *RedisClusterPool) HKeys(key string) ([]string, error) {
res, err := p.client.HKeys(key).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) HVals(key string) ([]interface{}, error) {
res, err := p.client.HVals(key).Result()
if err != nil {
return nil, convertError(err)
}
rs := make([]interface{}, 0, len(res))
for _, res := range res {
rs = append(rs, res)
}
return rs, nil
}
func (p *RedisClusterPool) HGetAll(key string) (map[string]string, error) {
vals, err := p.client.HGetAll(key).Result()
if err != nil {
return nil, convertError(err)
}
return vals, nil
}
func (p *RedisClusterPool) HIncrBy(key, field string, delta int64) (int64, error) {
res, err := p.client.HIncrBy(key, field, delta).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) ZAdd(key string, kvs ...interface{}) (int64, error) {
args := make([]interface{}, 0, len(kvs)+1)
args = append(args, key)
args = append(args, kvs...)
if len(kvs) == 0 {
return 0, nil
}
if len(kvs)%2 != 0 {
return 0, ErrWrongArgsNum
}
zs := make([]redis.Z, len(kvs)/2)
for i := 0; i < len(kvs); i += 2 {
idx := i / 2
score, err := Float64(kvs[i], nil)
if err != nil && err != ErrNil {
return 0, err
}
zs[idx].Score = score
zs[idx].Member = kvs[i+1]
}
res, err := p.client.ZAdd(key, zs...).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) ZRem(key string, members ...string) (int64, error) {
args := make([]interface{}, 0, len(members))
args = append(args, key)
for _, member := range members {
args = append(args, member)
}
res, err := p.client.ZRem(key, members).Result()
if err != nil {
return res, convertError(err)
}
return res, err
}

func (p *RedisClusterPool) ZRange(key string, min, max int64, withScores bool) (interface{}, error) {
res := make([]interface{}, 0)
if withScores {
zs, err := p.client.ZRangeWithScores(key, min, max).Result()
if err != nil {
return nil, convertError(err)
}
for _, z := range zs {
res = append(res, z.Member, strconv.FormatFloat(z.Score, 'f', -1, 64))
}
} else {
ms, err := p.client.ZRange(key, min, max).Result()
if err != nil {
return nil, convertError(err)
}
for _, m := range ms {
res = append(res, m)
}
}
return res, nil
}
func (p *RedisClusterPool) ZRangeByScoreWithScore(key string, min, max int64) (map[string]int64, error) {
opt := new(redis.ZRangeBy)
opt.Min = strconv.FormatInt(int64(min), 10)
opt.Max = strconv.FormatInt(int64(max), 10)
opt.Count = -1
opt.Offset = 0
vals, err := p.client.ZRangeByScoreWithScores(key, *opt).Result()
if err != nil {
return nil, convertError(err)
}
res := make(map[string]int64, len(vals))
for _, val := range vals {
key, err := String(val.Member, nil)
if err != nil && err != ErrNil {
return nil, err
}
res[key] = int64(val.Score)
}
return res, nil
}
func (p *RedisClusterPool) LRange(key string, start, stop int64) (interface{}, error) {
res, err := p.client.LRange(key, start, stop).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) LSet(key string, index int, value interface{}) error {
err := p.client.LSet(key, int64(index), value).Err()
return convertError(err)
}
func (p *RedisClusterPool) LLen(key string) (int64, error) {
res, err := p.client.LLen(key).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) LRem(key string, count int, value interface{}) (int, error) {
val, _ := value.(string)
res, err := p.client.LRem(key, int64(count), val).Result()
if err != nil {
return int(res), convertError(err)
}
return int(res), nil
}
func (p *RedisClusterPool) TTl(key string) (int64, error) {
duration, err := p.client.TTL(key).Result()
if err != nil {
return int64(duration.Seconds()), convertError(err)
}
return int64(duration.Seconds()), nil
}
func (p *RedisClusterPool) LPop(key string) (interface{}, error) {
res, err := p.client.LPop(key).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) RPop(key string) (interface{}, error) {
res, err := p.client.RPop(key).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) BLPop(key string, timeout int) (interface{}, error) {
res, err := p.client.BLPop(time.Duration(timeout)*time.Second, key).Result()
if err != nil {
// 兼容redis 2.x
if err == redis.Nil {
return nil, ErrNil
}
return nil, err
}
return res[1], nil
}
func (p *RedisClusterPool) BRPop(key string, timeout int) (interface{}, error) {
res, err := p.client.BRPop(time.Duration(timeout)*time.Second, key).Result()
if err != nil {
// 兼容redis 2.x
if err == redis.Nil {
return nil, ErrNil
}
return nil, convertError(err)
}
return res[1], nil
}
func (p *RedisClusterPool) LPush(key string, value ...interface{}) error {
args := make([]interface{}, 0, len(value)+1)
args = append(args, key)
args = append(args, value...)
vals := make([]string, 0, len(value))
for _, v := range value {
val, err := String(v, nil)
if err != nil && err != ErrNil {
return err
}
vals = append(vals, val)
}
_, err := p.client.LPush(key, vals).Result() // todo ...
if err != nil {
return convertError(err)
}
return nil
}
func (p *RedisClusterPool) RPush(key string, value ...interface{}) error {
args := make([]interface{}, 0, len(value)+1)
args = append(args, key)
args = append(args, value...)
vals := make([]string, 0, len(value))
for _, v := range value {
val, err := String(v, nil)
if err != nil && err != ErrNil {
if err == ErrNil {
continue
}
return err
}
if val == "" {
continue
}
vals = append(vals, val)
}
_, err := p.client.RPush(key, vals).Result() // todo ...
if err != nil {
return convertError(err)
}
return nil
}

// 为确保srcKey跟destKey映射到同一个slot,srcKey和destKey需要加上hash tag,如:{test}
func (p *RedisClusterPool) BRPopLPush(srcKey string, destKey string, timeout int) (interface{}, error) {
res, err := p.client.BRPopLPush(srcKey, destKey, time.Duration(timeout)*time.Second).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}

// 为确保srcKey跟destKey映射到同一个slot,srcKey和destKey需要加上hash tag,如:{test}
func (p *RedisClusterPool) RPopLPush(srcKey string, destKey string) (interface{}, error) {
res, err := p.client.RPopLPush(srcKey, destKey).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) SAdd(key string, members ...interface{}) (int64, error) {
args := make([]interface{}, 0, len(members)+1)
args = append(args, key)
args = append(args, members...)
ms := make([]string, 0, len(members))
for _, member := range members {
m, err := String(member, nil)
if err != nil && err != ErrNil {
return 0, err
}
ms = append(ms, m)
}
res, err := p.client.SAdd(key, ms).Result() // todo ...
if err != nil {
return res, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) SPop(key string) ([]byte, error) {
res, err := p.client.SPop(key).Result()
if err != nil {
return nil, convertError(err)
}
return []byte(res), nil
}
func (p *RedisClusterPool) SIsMember(key string, member interface{}) (bool, error) {
m, _ := member.(string)
res, err := p.client.SIsMember(key, m).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) SRem(key string, members ...interface{}) (int64, error) {
args := make([]interface{}, 0, len(members)+1)
args = append(args, key)
args = append(args, members...)
ms := make([]string, 0, len(members))
for _, member := range members {
m, err := String(member, nil)
if err != nil && err != ErrNil {
return 0, err
}
ms = append(ms, m)
}
res, err := p.client.SRem(key, ms).Result() // todo ...
if err != nil {
return res, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) SMembers(key string) ([]string, error) {
res, err := p.client.SMembers(key).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) ScriptLoad(luaScript string) (interface{}, error) {
res, err := p.client.ScriptLoad(luaScript).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) EvalSha(sha1 string, numberKeys int, keysArgs ...interface{}) (interface{}, error) {
vals := make([]interface{}, 0, len(keysArgs)+2)
vals = append(vals, sha1, numberKeys)
vals = append(vals, keysArgs...)
keys := make([]string, 0, numberKeys)
args := make([]string, 0, len(keysArgs)-numberKeys)
for i, value := range keysArgs {
val, err := String(value, nil)
if err != nil && err != ErrNil {
return nil, err
}
if i < numberKeys {
keys = append(keys, val)
} else {
args = append(args, val)
}
}
res, err := p.client.EvalSha(sha1, keys, args).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) Eval(luaScript string, numberKeys int, keysArgs ...interface{}) (interface{}, error) {
vals := make([]interface{}, 0, len(keysArgs)+2)
vals = append(vals, luaScript, numberKeys)
vals = append(vals, keysArgs...)
keys := make([]string, 0, numberKeys)
args := make([]string, 0, len(keysArgs)-numberKeys)
for i, value := range keysArgs {
val, err := String(value, nil)
if err != nil && err != ErrNil {
return nil, err
}
if i < numberKeys {
keys = append(keys, val)
} else {
args = append(args, val)
}
}
res, err := p.client.Eval(luaScript, keys, args).Result()
if err != nil {
return nil, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) GetBit(key string, offset int64) (int64, error) {
res, err := p.client.GetBit(key, offset).Result()
if err != nil {
return res, convertError(err)
}
return res, nil
}
func (p *RedisClusterPool) SetBit(key string, offset uint32, value int) (int, error) {
res, err := p.client.SetBit(key, int64(offset), value).Result()
return int(res), convertError(err)
}
func (p *RedisClusterPool) GetClient() *redis.ClusterClient {
return pools
}

Chargement…
Annuler
Enregistrer