智盟项目
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

618 lines
16 KiB

  1. package cache
  2. import (
  3. "strconv"
  4. "time"
  5. "github.com/go-redis/redis"
  6. )
  7. type RedisClusterPool struct {
  8. client *redis.ClusterClient
  9. }
  10. func NewRedisClusterPool(addrs []string) (*RedisClusterPool, error) {
  11. opt := &redis.ClusterOptions{
  12. Addrs: addrs,
  13. PoolSize: 512,
  14. PoolTimeout: 10 * time.Second,
  15. IdleTimeout: 10 * time.Second,
  16. DialTimeout: 10 * time.Second,
  17. ReadTimeout: 3 * time.Second,
  18. WriteTimeout: 3 * time.Second,
  19. }
  20. c := redis.NewClusterClient(opt)
  21. if err := c.Ping().Err(); err != nil {
  22. return nil, err
  23. }
  24. return &RedisClusterPool{client: c}, nil
  25. }
  26. func (p *RedisClusterPool) Get(key string) (interface{}, error) {
  27. res, err := p.client.Get(key).Result()
  28. if err != nil {
  29. return nil, convertError(err)
  30. }
  31. return []byte(res), nil
  32. }
  33. func (p *RedisClusterPool) Set(key string, value interface{}) error {
  34. err := p.client.Set(key, value, 0).Err()
  35. return convertError(err)
  36. }
  37. func (p *RedisClusterPool) GetSet(key string, value interface{}) (interface{}, error) {
  38. res, err := p.client.GetSet(key, value).Result()
  39. if err != nil {
  40. return nil, convertError(err)
  41. }
  42. return []byte(res), nil
  43. }
  44. func (p *RedisClusterPool) SetNx(key string, value interface{}) (int64, error) {
  45. res, err := p.client.SetNX(key, value, 0).Result()
  46. if err != nil {
  47. return 0, convertError(err)
  48. }
  49. if res {
  50. return 1, nil
  51. }
  52. return 0, nil
  53. }
  54. func (p *RedisClusterPool) SetEx(key string, value interface{}, timeout int64) error {
  55. _, err := p.client.Set(key, value, time.Duration(timeout)*time.Second).Result()
  56. if err != nil {
  57. return convertError(err)
  58. }
  59. return nil
  60. }
  61. // nil表示成功,ErrNil表示数据库内已经存在这个key,其他表示数据库发生错误
  62. func (p *RedisClusterPool) SetNxEx(key string, value interface{}, timeout int64) error {
  63. res, err := p.client.SetNX(key, value, time.Duration(timeout)*time.Second).Result()
  64. if err != nil {
  65. return convertError(err)
  66. }
  67. if res {
  68. return nil
  69. }
  70. return ErrNil
  71. }
  72. func (p *RedisClusterPool) MGet(keys ...string) ([]interface{}, error) {
  73. res, err := p.client.MGet(keys...).Result()
  74. return res, convertError(err)
  75. }
  76. // 为确保多个key映射到同一个slot,每个key最好加上hash tag,如:{test}
  77. func (p *RedisClusterPool) MSet(kvs map[string]interface{}) error {
  78. pairs := make([]string, 0, len(kvs)*2)
  79. for k, v := range kvs {
  80. val, err := String(v, nil)
  81. if err != nil {
  82. return err
  83. }
  84. pairs = append(pairs, k, val)
  85. }
  86. return convertError(p.client.MSet(pairs).Err())
  87. }
  88. // 为确保多个key映射到同一个slot,每个key最好加上hash tag,如:{test}
  89. func (p *RedisClusterPool) MSetNX(kvs map[string]interface{}) (bool, error) {
  90. pairs := make([]string, 0, len(kvs)*2)
  91. for k, v := range kvs {
  92. val, err := String(v, nil)
  93. if err != nil {
  94. return false, err
  95. }
  96. pairs = append(pairs, k, val)
  97. }
  98. res, err := p.client.MSetNX(pairs).Result()
  99. return res, convertError(err)
  100. }
  101. func (p *RedisClusterPool) ExpireAt(key string, timestamp int64) (int64, error) {
  102. res, err := p.client.ExpireAt(key, time.Unix(timestamp, 0)).Result()
  103. if err != nil {
  104. return 0, convertError(err)
  105. }
  106. if res {
  107. return 1, nil
  108. }
  109. return 0, nil
  110. }
  111. func (p *RedisClusterPool) Del(keys ...string) (int64, error) {
  112. args := make([]interface{}, 0, len(keys))
  113. for _, key := range keys {
  114. args = append(args, key)
  115. }
  116. res, err := p.client.Del(keys...).Result()
  117. if err != nil {
  118. return res, convertError(err)
  119. }
  120. return res, nil
  121. }
  122. func (p *RedisClusterPool) Incr(key string) (int64, error) {
  123. res, err := p.client.Incr(key).Result()
  124. if err != nil {
  125. return res, convertError(err)
  126. }
  127. return res, nil
  128. }
  129. func (p *RedisClusterPool) IncrBy(key string, delta int64) (int64, error) {
  130. res, err := p.client.IncrBy(key, delta).Result()
  131. if err != nil {
  132. return res, convertError(err)
  133. }
  134. return res, nil
  135. }
  136. func (p *RedisClusterPool) Expire(key string, duration int64) (int64, error) {
  137. res, err := p.client.Expire(key, time.Duration(duration)*time.Second).Result()
  138. if err != nil {
  139. return 0, convertError(err)
  140. }
  141. if res {
  142. return 1, nil
  143. }
  144. return 0, nil
  145. }
  146. func (p *RedisClusterPool) Exists(key string) (bool, error) { // todo (bool, error)
  147. res, err := p.client.Exists(key).Result()
  148. if err != nil {
  149. return false, convertError(err)
  150. }
  151. if res > 0 {
  152. return true, nil
  153. }
  154. return false, nil
  155. }
  156. func (p *RedisClusterPool) HGet(key string, field string) (interface{}, error) {
  157. res, err := p.client.HGet(key, field).Result()
  158. if err != nil {
  159. return nil, convertError(err)
  160. }
  161. return []byte(res), nil
  162. }
  163. func (p *RedisClusterPool) HLen(key string) (int64, error) {
  164. res, err := p.client.HLen(key).Result()
  165. if err != nil {
  166. return res, convertError(err)
  167. }
  168. return res, nil
  169. }
  170. func (p *RedisClusterPool) HSet(key string, field string, val interface{}) error {
  171. value, err := String(val, nil)
  172. if err != nil && err != ErrNil {
  173. return err
  174. }
  175. _, err = p.client.HSet(key, field, value).Result()
  176. if err != nil {
  177. return convertError(err)
  178. }
  179. return nil
  180. }
  181. func (p *RedisClusterPool) HDel(key string, fields ...string) (int64, error) {
  182. args := make([]interface{}, 0, len(fields)+1)
  183. args = append(args, key)
  184. for _, field := range fields {
  185. args = append(args, field)
  186. }
  187. res, err := p.client.HDel(key, fields...).Result()
  188. if err != nil {
  189. return 0, convertError(err)
  190. }
  191. return res, nil
  192. }
  193. func (p *RedisClusterPool) HMGet(key string, fields ...string) (interface{}, error) {
  194. args := make([]interface{}, 0, len(fields)+1)
  195. args = append(args, key)
  196. for _, field := range fields {
  197. args = append(args, field)
  198. }
  199. if len(fields) == 0 {
  200. return nil, ErrNil
  201. }
  202. res, err := p.client.HMGet(key, fields...).Result()
  203. if err != nil {
  204. return nil, convertError(err)
  205. }
  206. return res, nil
  207. }
  208. func (p *RedisClusterPool) HMSet(key string, kvs ...interface{}) error {
  209. if len(kvs) == 0 {
  210. return nil
  211. }
  212. if len(kvs)%2 != 0 {
  213. return ErrWrongArgsNum
  214. }
  215. var err error
  216. v := map[string]interface{}{} // todo change
  217. v["field"], err = String(kvs[0], nil)
  218. if err != nil && err != ErrNil {
  219. return err
  220. }
  221. v["value"], err = String(kvs[1], nil)
  222. if err != nil && err != ErrNil {
  223. return err
  224. }
  225. pairs := make([]string, 0, len(kvs)-2)
  226. if len(kvs) > 2 {
  227. for _, kv := range kvs[2:] {
  228. kvString, err := String(kv, nil)
  229. if err != nil && err != ErrNil {
  230. return err
  231. }
  232. pairs = append(pairs, kvString)
  233. }
  234. }
  235. v["paris"] = pairs
  236. _, err = p.client.HMSet(key, v).Result()
  237. if err != nil {
  238. return convertError(err)
  239. }
  240. return nil
  241. }
  242. func (p *RedisClusterPool) HKeys(key string) ([]string, error) {
  243. res, err := p.client.HKeys(key).Result()
  244. if err != nil {
  245. return res, convertError(err)
  246. }
  247. return res, nil
  248. }
  249. func (p *RedisClusterPool) HVals(key string) ([]interface{}, error) {
  250. res, err := p.client.HVals(key).Result()
  251. if err != nil {
  252. return nil, convertError(err)
  253. }
  254. rs := make([]interface{}, 0, len(res))
  255. for _, res := range res {
  256. rs = append(rs, res)
  257. }
  258. return rs, nil
  259. }
  260. func (p *RedisClusterPool) HGetAll(key string) (map[string]string, error) {
  261. vals, err := p.client.HGetAll(key).Result()
  262. if err != nil {
  263. return nil, convertError(err)
  264. }
  265. return vals, nil
  266. }
  267. func (p *RedisClusterPool) HIncrBy(key, field string, delta int64) (int64, error) {
  268. res, err := p.client.HIncrBy(key, field, delta).Result()
  269. if err != nil {
  270. return res, convertError(err)
  271. }
  272. return res, nil
  273. }
  274. func (p *RedisClusterPool) ZAdd(key string, kvs ...interface{}) (int64, error) {
  275. args := make([]interface{}, 0, len(kvs)+1)
  276. args = append(args, key)
  277. args = append(args, kvs...)
  278. if len(kvs) == 0 {
  279. return 0, nil
  280. }
  281. if len(kvs)%2 != 0 {
  282. return 0, ErrWrongArgsNum
  283. }
  284. zs := make([]redis.Z, len(kvs)/2)
  285. for i := 0; i < len(kvs); i += 2 {
  286. idx := i / 2
  287. score, err := Float64(kvs[i], nil)
  288. if err != nil && err != ErrNil {
  289. return 0, err
  290. }
  291. zs[idx].Score = score
  292. zs[idx].Member = kvs[i+1]
  293. }
  294. res, err := p.client.ZAdd(key, zs...).Result()
  295. if err != nil {
  296. return res, convertError(err)
  297. }
  298. return res, nil
  299. }
  300. func (p *RedisClusterPool) ZRem(key string, members ...string) (int64, error) {
  301. args := make([]interface{}, 0, len(members))
  302. args = append(args, key)
  303. for _, member := range members {
  304. args = append(args, member)
  305. }
  306. res, err := p.client.ZRem(key, members).Result()
  307. if err != nil {
  308. return res, convertError(err)
  309. }
  310. return res, err
  311. }
  312. func (p *RedisClusterPool) ZRange(key string, min, max int64, withScores bool) (interface{}, error) {
  313. res := make([]interface{}, 0)
  314. if withScores {
  315. zs, err := p.client.ZRangeWithScores(key, min, max).Result()
  316. if err != nil {
  317. return nil, convertError(err)
  318. }
  319. for _, z := range zs {
  320. res = append(res, z.Member, strconv.FormatFloat(z.Score, 'f', -1, 64))
  321. }
  322. } else {
  323. ms, err := p.client.ZRange(key, min, max).Result()
  324. if err != nil {
  325. return nil, convertError(err)
  326. }
  327. for _, m := range ms {
  328. res = append(res, m)
  329. }
  330. }
  331. return res, nil
  332. }
  333. func (p *RedisClusterPool) ZRangeByScoreWithScore(key string, min, max int64) (map[string]int64, error) {
  334. opt := new(redis.ZRangeBy)
  335. opt.Min = strconv.FormatInt(int64(min), 10)
  336. opt.Max = strconv.FormatInt(int64(max), 10)
  337. opt.Count = -1
  338. opt.Offset = 0
  339. vals, err := p.client.ZRangeByScoreWithScores(key, *opt).Result()
  340. if err != nil {
  341. return nil, convertError(err)
  342. }
  343. res := make(map[string]int64, len(vals))
  344. for _, val := range vals {
  345. key, err := String(val.Member, nil)
  346. if err != nil && err != ErrNil {
  347. return nil, err
  348. }
  349. res[key] = int64(val.Score)
  350. }
  351. return res, nil
  352. }
  353. func (p *RedisClusterPool) LRange(key string, start, stop int64) (interface{}, error) {
  354. res, err := p.client.LRange(key, start, stop).Result()
  355. if err != nil {
  356. return nil, convertError(err)
  357. }
  358. return res, nil
  359. }
  360. func (p *RedisClusterPool) LSet(key string, index int, value interface{}) error {
  361. err := p.client.LSet(key, int64(index), value).Err()
  362. return convertError(err)
  363. }
  364. func (p *RedisClusterPool) LLen(key string) (int64, error) {
  365. res, err := p.client.LLen(key).Result()
  366. if err != nil {
  367. return res, convertError(err)
  368. }
  369. return res, nil
  370. }
  371. func (p *RedisClusterPool) LRem(key string, count int, value interface{}) (int, error) {
  372. val, _ := value.(string)
  373. res, err := p.client.LRem(key, int64(count), val).Result()
  374. if err != nil {
  375. return int(res), convertError(err)
  376. }
  377. return int(res), nil
  378. }
  379. func (p *RedisClusterPool) TTl(key string) (int64, error) {
  380. duration, err := p.client.TTL(key).Result()
  381. if err != nil {
  382. return int64(duration.Seconds()), convertError(err)
  383. }
  384. return int64(duration.Seconds()), nil
  385. }
  386. func (p *RedisClusterPool) LPop(key string) (interface{}, error) {
  387. res, err := p.client.LPop(key).Result()
  388. if err != nil {
  389. return nil, convertError(err)
  390. }
  391. return res, nil
  392. }
  393. func (p *RedisClusterPool) RPop(key string) (interface{}, error) {
  394. res, err := p.client.RPop(key).Result()
  395. if err != nil {
  396. return nil, convertError(err)
  397. }
  398. return res, nil
  399. }
  400. func (p *RedisClusterPool) BLPop(key string, timeout int) (interface{}, error) {
  401. res, err := p.client.BLPop(time.Duration(timeout)*time.Second, key).Result()
  402. if err != nil {
  403. // 兼容redis 2.x
  404. if err == redis.Nil {
  405. return nil, ErrNil
  406. }
  407. return nil, err
  408. }
  409. return res[1], nil
  410. }
  411. func (p *RedisClusterPool) BRPop(key string, timeout int) (interface{}, error) {
  412. res, err := p.client.BRPop(time.Duration(timeout)*time.Second, key).Result()
  413. if err != nil {
  414. // 兼容redis 2.x
  415. if err == redis.Nil {
  416. return nil, ErrNil
  417. }
  418. return nil, convertError(err)
  419. }
  420. return res[1], nil
  421. }
  422. func (p *RedisClusterPool) LPush(key string, value ...interface{}) error {
  423. args := make([]interface{}, 0, len(value)+1)
  424. args = append(args, key)
  425. args = append(args, value...)
  426. vals := make([]string, 0, len(value))
  427. for _, v := range value {
  428. val, err := String(v, nil)
  429. if err != nil && err != ErrNil {
  430. return err
  431. }
  432. vals = append(vals, val)
  433. }
  434. _, err := p.client.LPush(key, vals).Result() // todo ...
  435. if err != nil {
  436. return convertError(err)
  437. }
  438. return nil
  439. }
  440. func (p *RedisClusterPool) RPush(key string, value ...interface{}) error {
  441. args := make([]interface{}, 0, len(value)+1)
  442. args = append(args, key)
  443. args = append(args, value...)
  444. vals := make([]string, 0, len(value))
  445. for _, v := range value {
  446. val, err := String(v, nil)
  447. if err != nil && err != ErrNil {
  448. if err == ErrNil {
  449. continue
  450. }
  451. return err
  452. }
  453. if val == "" {
  454. continue
  455. }
  456. vals = append(vals, val)
  457. }
  458. _, err := p.client.RPush(key, vals).Result() // todo ...
  459. if err != nil {
  460. return convertError(err)
  461. }
  462. return nil
  463. }
  464. // 为确保srcKey跟destKey映射到同一个slot,srcKey和destKey需要加上hash tag,如:{test}
  465. func (p *RedisClusterPool) BRPopLPush(srcKey string, destKey string, timeout int) (interface{}, error) {
  466. res, err := p.client.BRPopLPush(srcKey, destKey, time.Duration(timeout)*time.Second).Result()
  467. if err != nil {
  468. return nil, convertError(err)
  469. }
  470. return res, nil
  471. }
  472. // 为确保srcKey跟destKey映射到同一个slot,srcKey和destKey需要加上hash tag,如:{test}
  473. func (p *RedisClusterPool) RPopLPush(srcKey string, destKey string) (interface{}, error) {
  474. res, err := p.client.RPopLPush(srcKey, destKey).Result()
  475. if err != nil {
  476. return nil, convertError(err)
  477. }
  478. return res, nil
  479. }
  480. func (p *RedisClusterPool) SAdd(key string, members ...interface{}) (int64, error) {
  481. args := make([]interface{}, 0, len(members)+1)
  482. args = append(args, key)
  483. args = append(args, members...)
  484. ms := make([]string, 0, len(members))
  485. for _, member := range members {
  486. m, err := String(member, nil)
  487. if err != nil && err != ErrNil {
  488. return 0, err
  489. }
  490. ms = append(ms, m)
  491. }
  492. res, err := p.client.SAdd(key, ms).Result() // todo ...
  493. if err != nil {
  494. return res, convertError(err)
  495. }
  496. return res, nil
  497. }
  498. func (p *RedisClusterPool) SPop(key string) ([]byte, error) {
  499. res, err := p.client.SPop(key).Result()
  500. if err != nil {
  501. return nil, convertError(err)
  502. }
  503. return []byte(res), nil
  504. }
  505. func (p *RedisClusterPool) SIsMember(key string, member interface{}) (bool, error) {
  506. m, _ := member.(string)
  507. res, err := p.client.SIsMember(key, m).Result()
  508. if err != nil {
  509. return res, convertError(err)
  510. }
  511. return res, nil
  512. }
  513. func (p *RedisClusterPool) SRem(key string, members ...interface{}) (int64, error) {
  514. args := make([]interface{}, 0, len(members)+1)
  515. args = append(args, key)
  516. args = append(args, members...)
  517. ms := make([]string, 0, len(members))
  518. for _, member := range members {
  519. m, err := String(member, nil)
  520. if err != nil && err != ErrNil {
  521. return 0, err
  522. }
  523. ms = append(ms, m)
  524. }
  525. res, err := p.client.SRem(key, ms).Result() // todo ...
  526. if err != nil {
  527. return res, convertError(err)
  528. }
  529. return res, nil
  530. }
  531. func (p *RedisClusterPool) SMembers(key string) ([]string, error) {
  532. res, err := p.client.SMembers(key).Result()
  533. if err != nil {
  534. return nil, convertError(err)
  535. }
  536. return res, nil
  537. }
  538. func (p *RedisClusterPool) ScriptLoad(luaScript string) (interface{}, error) {
  539. res, err := p.client.ScriptLoad(luaScript).Result()
  540. if err != nil {
  541. return nil, convertError(err)
  542. }
  543. return res, nil
  544. }
  545. func (p *RedisClusterPool) EvalSha(sha1 string, numberKeys int, keysArgs ...interface{}) (interface{}, error) {
  546. vals := make([]interface{}, 0, len(keysArgs)+2)
  547. vals = append(vals, sha1, numberKeys)
  548. vals = append(vals, keysArgs...)
  549. keys := make([]string, 0, numberKeys)
  550. args := make([]string, 0, len(keysArgs)-numberKeys)
  551. for i, value := range keysArgs {
  552. val, err := String(value, nil)
  553. if err != nil && err != ErrNil {
  554. return nil, err
  555. }
  556. if i < numberKeys {
  557. keys = append(keys, val)
  558. } else {
  559. args = append(args, val)
  560. }
  561. }
  562. res, err := p.client.EvalSha(sha1, keys, args).Result()
  563. if err != nil {
  564. return nil, convertError(err)
  565. }
  566. return res, nil
  567. }
  568. func (p *RedisClusterPool) Eval(luaScript string, numberKeys int, keysArgs ...interface{}) (interface{}, error) {
  569. vals := make([]interface{}, 0, len(keysArgs)+2)
  570. vals = append(vals, luaScript, numberKeys)
  571. vals = append(vals, keysArgs...)
  572. keys := make([]string, 0, numberKeys)
  573. args := make([]string, 0, len(keysArgs)-numberKeys)
  574. for i, value := range keysArgs {
  575. val, err := String(value, nil)
  576. if err != nil && err != ErrNil {
  577. return nil, err
  578. }
  579. if i < numberKeys {
  580. keys = append(keys, val)
  581. } else {
  582. args = append(args, val)
  583. }
  584. }
  585. res, err := p.client.Eval(luaScript, keys, args).Result()
  586. if err != nil {
  587. return nil, convertError(err)
  588. }
  589. return res, nil
  590. }
  591. func (p *RedisClusterPool) GetBit(key string, offset int64) (int64, error) {
  592. res, err := p.client.GetBit(key, offset).Result()
  593. if err != nil {
  594. return res, convertError(err)
  595. }
  596. return res, nil
  597. }
  598. func (p *RedisClusterPool) SetBit(key string, offset uint32, value int) (int, error) {
  599. res, err := p.client.SetBit(key, int64(offset), value).Result()
  600. return int(res), convertError(err)
  601. }
  602. func (p *RedisClusterPool) GetClient() *redis.ClusterClient {
  603. return pools
  604. }