rabbitmq 操作库
Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.

623 wiersze
15 KiB

  1. package cache
  2. import (
  3. "strconv"
  4. "time"
  5. "github.com/go-redis/redis"
  6. )
  7. var pools *redis.ClusterClient
  8. func NewRedisCluster(addrs []string) error {
  9. opt := &redis.ClusterOptions{
  10. Addrs: addrs,
  11. PoolSize: redisPoolSize,
  12. PoolTimeout: redisPoolTTL,
  13. IdleTimeout: redisIdleTTL,
  14. DialTimeout: redisDialTTL,
  15. ReadTimeout: redisReadTTL,
  16. WriteTimeout: redisWriteTTL,
  17. }
  18. pools = redis.NewClusterClient(opt)
  19. if err := pools.Ping().Err(); err != nil {
  20. return err
  21. }
  22. return nil
  23. }
  24. func RCGet(key string) (interface{}, error) {
  25. res, err := pools.Get(key).Result()
  26. if err != nil {
  27. return nil, convertError(err)
  28. }
  29. return []byte(res), nil
  30. }
  31. func RCSet(key string, value interface{}) error {
  32. err := pools.Set(key, value, 0).Err()
  33. return convertError(err)
  34. }
  35. func RCGetSet(key string, value interface{}) (interface{}, error) {
  36. res, err := pools.GetSet(key, value).Result()
  37. if err != nil {
  38. return nil, convertError(err)
  39. }
  40. return []byte(res), nil
  41. }
  42. func RCSetNx(key string, value interface{}) (int64, error) {
  43. res, err := pools.SetNX(key, value, 0).Result()
  44. if err != nil {
  45. return 0, convertError(err)
  46. }
  47. if res {
  48. return 1, nil
  49. }
  50. return 0, nil
  51. }
  52. func RCSetEx(key string, value interface{}, timeout int64) error {
  53. _, err := pools.Set(key, value, time.Duration(timeout)*time.Second).Result()
  54. if err != nil {
  55. return convertError(err)
  56. }
  57. return nil
  58. }
  59. // nil表示成功,ErrNil表示数据库内已经存在这个key,其他表示数据库发生错误
  60. func RCSetNxEx(key string, value interface{}, timeout int64) error {
  61. res, err := pools.SetNX(key, value, time.Duration(timeout)*time.Second).Result()
  62. if err != nil {
  63. return convertError(err)
  64. }
  65. if res {
  66. return nil
  67. }
  68. return ErrNil
  69. }
  70. func RCMGet(keys ...string) ([]interface{}, error) {
  71. res, err := pools.MGet(keys...).Result()
  72. return res, convertError(err)
  73. }
  74. // 为确保多个key映射到同一个slot,每个key最好加上hash tag,如:{test}
  75. func RCMSet(kvs map[string]interface{}) error {
  76. pairs := make([]string, 0, len(kvs)*2)
  77. for k, v := range kvs {
  78. val, err := String(v, nil)
  79. if err != nil {
  80. return err
  81. }
  82. pairs = append(pairs, k, val)
  83. }
  84. return convertError(pools.MSet(pairs).Err())
  85. }
  86. // 为确保多个key映射到同一个slot,每个key最好加上hash tag,如:{test}
  87. func RCMSetNX(kvs map[string]interface{}) (bool, error) {
  88. pairs := make([]string, 0, len(kvs)*2)
  89. for k, v := range kvs {
  90. val, err := String(v, nil)
  91. if err != nil {
  92. return false, err
  93. }
  94. pairs = append(pairs, k, val)
  95. }
  96. res, err := pools.MSetNX(pairs).Result()
  97. return res, convertError(err)
  98. }
  99. func RCExpireAt(key string, timestamp int64) (int64, error) {
  100. res, err := pools.ExpireAt(key, time.Unix(timestamp, 0)).Result()
  101. if err != nil {
  102. return 0, convertError(err)
  103. }
  104. if res {
  105. return 1, nil
  106. }
  107. return 0, nil
  108. }
  109. func RCDel(keys ...string) (int64, error) {
  110. args := make([]interface{}, 0, len(keys))
  111. for _, key := range keys {
  112. args = append(args, key)
  113. }
  114. res, err := pools.Del(keys...).Result()
  115. if err != nil {
  116. return res, convertError(err)
  117. }
  118. return res, nil
  119. }
  120. func RCIncr(key string) (int64, error) {
  121. res, err := pools.Incr(key).Result()
  122. if err != nil {
  123. return res, convertError(err)
  124. }
  125. return res, nil
  126. }
  127. func RCIncrBy(key string, delta int64) (int64, error) {
  128. res, err := pools.IncrBy(key, delta).Result()
  129. if err != nil {
  130. return res, convertError(err)
  131. }
  132. return res, nil
  133. }
  134. func RCExpire(key string, duration int64) (int64, error) {
  135. res, err := pools.Expire(key, time.Duration(duration)*time.Second).Result()
  136. if err != nil {
  137. return 0, convertError(err)
  138. }
  139. if res {
  140. return 1, nil
  141. }
  142. return 0, nil
  143. }
  144. func RCExists(key string) (bool, error) {
  145. res, err := pools.Exists(key).Result()
  146. if err != nil {
  147. return false, convertError(err)
  148. }
  149. if res > 0 {
  150. return true, nil
  151. }
  152. return false, nil
  153. }
  154. func RCHGet(key string, field string) (interface{}, error) {
  155. res, err := pools.HGet(key, field).Result()
  156. if err != nil {
  157. return nil, convertError(err)
  158. }
  159. return []byte(res), nil
  160. }
  161. func RCHLen(key string) (int64, error) {
  162. res, err := pools.HLen(key).Result()
  163. if err != nil {
  164. return res, convertError(err)
  165. }
  166. return res, nil
  167. }
  168. func RCHSet(key string, field string, val interface{}) error {
  169. value, err := String(val, nil)
  170. if err != nil && err != ErrNil {
  171. return err
  172. }
  173. _, err = pools.HSet(key, field, value).Result()
  174. if err != nil {
  175. return convertError(err)
  176. }
  177. return nil
  178. }
  179. func RCHDel(key string, fields ...string) (int64, error) {
  180. args := make([]interface{}, 0, len(fields)+1)
  181. args = append(args, key)
  182. for _, field := range fields {
  183. args = append(args, field)
  184. }
  185. res, err := pools.HDel(key, fields...).Result()
  186. if err != nil {
  187. return 0, convertError(err)
  188. }
  189. return res, nil
  190. }
  191. func RCHMGet(key string, fields ...string) (interface{}, error) {
  192. args := make([]interface{}, 0, len(fields)+1)
  193. args = append(args, key)
  194. for _, field := range fields {
  195. args = append(args, field)
  196. }
  197. if len(fields) == 0 {
  198. return nil, ErrNil
  199. }
  200. res, err := pools.HMGet(key, fields...).Result()
  201. if err != nil {
  202. return nil, convertError(err)
  203. }
  204. return res, nil
  205. }
  206. func RCHMSet(key string, kvs ...interface{}) error {
  207. if len(kvs) == 0 {
  208. return nil
  209. }
  210. if len(kvs)%2 != 0 {
  211. return ErrWrongArgsNum
  212. }
  213. var err error
  214. v := map[string]interface{}{} // todo change
  215. v["field"], err = String(kvs[0], nil)
  216. if err != nil && err != ErrNil {
  217. return err
  218. }
  219. v["value"], err = String(kvs[1], nil)
  220. if err != nil && err != ErrNil {
  221. return err
  222. }
  223. pairs := make([]string, 0, len(kvs)-2)
  224. if len(kvs) > 2 {
  225. for _, kv := range kvs[2:] {
  226. kvString, err := String(kv, nil)
  227. if err != nil && err != ErrNil {
  228. return err
  229. }
  230. pairs = append(pairs, kvString)
  231. }
  232. }
  233. v["paris"] = pairs
  234. _, err = pools.HMSet(key, v).Result()
  235. if err != nil {
  236. return convertError(err)
  237. }
  238. return nil
  239. }
  240. func RCHKeys(key string) ([]string, error) {
  241. res, err := pools.HKeys(key).Result()
  242. if err != nil {
  243. return res, convertError(err)
  244. }
  245. return res, nil
  246. }
  247. func RCHVals(key string) ([]interface{}, error) {
  248. res, err := pools.HVals(key).Result()
  249. if err != nil {
  250. return nil, convertError(err)
  251. }
  252. rs := make([]interface{}, 0, len(res))
  253. for _, res := range res {
  254. rs = append(rs, res)
  255. }
  256. return rs, nil
  257. }
  258. func RCHGetAll(key string) (map[string]string, error) {
  259. vals, err := pools.HGetAll(key).Result()
  260. if err != nil {
  261. return nil, convertError(err)
  262. }
  263. return vals, nil
  264. }
  265. func RCHIncrBy(key, field string, delta int64) (int64, error) {
  266. res, err := pools.HIncrBy(key, field, delta).Result()
  267. if err != nil {
  268. return res, convertError(err)
  269. }
  270. return res, nil
  271. }
  272. func RCZAdd(key string, kvs ...interface{}) (int64, error) {
  273. args := make([]interface{}, 0, len(kvs)+1)
  274. args = append(args, key)
  275. args = append(args, kvs...)
  276. if len(kvs) == 0 {
  277. return 0, nil
  278. }
  279. if len(kvs)%2 != 0 {
  280. return 0, ErrWrongArgsNum
  281. }
  282. zs := make([]redis.Z, len(kvs)/2)
  283. for i := 0; i < len(kvs); i += 2 {
  284. idx := i / 2
  285. score, err := Float64(kvs[i], nil)
  286. if err != nil && err != ErrNil {
  287. return 0, err
  288. }
  289. zs[idx].Score = score
  290. zs[idx].Member = kvs[i+1]
  291. }
  292. res, err := pools.ZAdd(key, zs...).Result()
  293. if err != nil {
  294. return res, convertError(err)
  295. }
  296. return res, nil
  297. }
  298. func RCZRem(key string, members ...string) (int64, error) {
  299. args := make([]interface{}, 0, len(members))
  300. args = append(args, key)
  301. for _, member := range members {
  302. args = append(args, member)
  303. }
  304. res, err := pools.ZRem(key, members).Result()
  305. if err != nil {
  306. return res, convertError(err)
  307. }
  308. return res, err
  309. }
  310. func RCZRange(key string, min, max int64, withScores bool) (interface{}, error) {
  311. res := make([]interface{}, 0)
  312. if withScores {
  313. zs, err := pools.ZRangeWithScores(key, min, max).Result()
  314. if err != nil {
  315. return nil, convertError(err)
  316. }
  317. for _, z := range zs {
  318. res = append(res, z.Member, strconv.FormatFloat(z.Score, 'f', -1, 64))
  319. }
  320. } else {
  321. ms, err := pools.ZRange(key, min, max).Result()
  322. if err != nil {
  323. return nil, convertError(err)
  324. }
  325. for _, m := range ms {
  326. res = append(res, m)
  327. }
  328. }
  329. return res, nil
  330. }
  331. func RCZRangeByScoreWithScore(key string, min, max int64) (map[string]int64, error) {
  332. opt := new(redis.ZRangeBy)
  333. opt.Min = strconv.FormatInt(int64(min), 10)
  334. opt.Max = strconv.FormatInt(int64(max), 10)
  335. opt.Count = -1
  336. opt.Offset = 0
  337. vals, err := pools.ZRangeByScoreWithScores(key, *opt).Result()
  338. if err != nil {
  339. return nil, convertError(err)
  340. }
  341. res := make(map[string]int64, len(vals))
  342. for _, val := range vals {
  343. key, err := String(val.Member, nil)
  344. if err != nil && err != ErrNil {
  345. return nil, err
  346. }
  347. res[key] = int64(val.Score)
  348. }
  349. return res, nil
  350. }
  351. func RCLRange(key string, start, stop int64) (interface{}, error) {
  352. res, err := pools.LRange(key, start, stop).Result()
  353. if err != nil {
  354. return nil, convertError(err)
  355. }
  356. return res, nil
  357. }
  358. func RCLSet(key string, index int, value interface{}) error {
  359. err := pools.LSet(key, int64(index), value).Err()
  360. return convertError(err)
  361. }
  362. func RCLLen(key string) (int64, error) {
  363. res, err := pools.LLen(key).Result()
  364. if err != nil {
  365. return res, convertError(err)
  366. }
  367. return res, nil
  368. }
  369. func RCLRem(key string, count int, value interface{}) (int, error) {
  370. val, _ := value.(string)
  371. res, err := pools.LRem(key, int64(count), val).Result()
  372. if err != nil {
  373. return int(res), convertError(err)
  374. }
  375. return int(res), nil
  376. }
  377. func RCTTl(key string) (int64, error) {
  378. duration, err := pools.TTL(key).Result()
  379. if err != nil {
  380. return int64(duration.Seconds()), convertError(err)
  381. }
  382. return int64(duration.Seconds()), nil
  383. }
  384. func RCLPop(key string) (interface{}, error) {
  385. res, err := pools.LPop(key).Result()
  386. if err != nil {
  387. return nil, convertError(err)
  388. }
  389. return res, nil
  390. }
  391. func RCRPop(key string) (interface{}, error) {
  392. res, err := pools.RPop(key).Result()
  393. if err != nil {
  394. return nil, convertError(err)
  395. }
  396. return res, nil
  397. }
  398. func RCBLPop(key string, timeout int) (interface{}, error) {
  399. res, err := pools.BLPop(time.Duration(timeout)*time.Second, key).Result()
  400. if err != nil {
  401. // 兼容redis 2.x
  402. if err == redis.Nil {
  403. return nil, ErrNil
  404. }
  405. return nil, err
  406. }
  407. return res[1], nil
  408. }
  409. func RCBRPop(key string, timeout int) (interface{}, error) {
  410. res, err := pools.BRPop(time.Duration(timeout)*time.Second, key).Result()
  411. if err != nil {
  412. // 兼容redis 2.x
  413. if err == redis.Nil {
  414. return nil, ErrNil
  415. }
  416. return nil, convertError(err)
  417. }
  418. return res[1], nil
  419. }
  420. func RCLPush(key string, value ...interface{}) error {
  421. args := make([]interface{}, 0, len(value)+1)
  422. args = append(args, key)
  423. args = append(args, value...)
  424. vals := make([]string, 0, len(value))
  425. for _, v := range value {
  426. val, err := String(v, nil)
  427. if err != nil && err != ErrNil {
  428. return err
  429. }
  430. vals = append(vals, val)
  431. }
  432. _, err := pools.LPush(key, vals).Result() // todo ...
  433. if err != nil {
  434. return convertError(err)
  435. }
  436. return nil
  437. }
  438. func RCRPush(key string, value ...interface{}) error {
  439. args := make([]interface{}, 0, len(value)+1)
  440. args = append(args, key)
  441. args = append(args, value...)
  442. vals := make([]string, 0, len(value))
  443. for _, v := range value {
  444. val, err := String(v, nil)
  445. if err != nil && err != ErrNil {
  446. if err == ErrNil {
  447. continue
  448. }
  449. return err
  450. }
  451. if val == "" {
  452. continue
  453. }
  454. vals = append(vals, val)
  455. }
  456. _, err := pools.RPush(key, vals).Result() // todo ...
  457. if err != nil {
  458. return convertError(err)
  459. }
  460. return nil
  461. }
  462. // 为确保srcKey跟destKey映射到同一个slot,srcKey和destKey需要加上hash tag,如:{test}
  463. func RCBRPopLPush(srcKey string, destKey string, timeout int) (interface{}, error) {
  464. res, err := pools.BRPopLPush(srcKey, destKey, time.Duration(timeout)*time.Second).Result()
  465. if err != nil {
  466. return nil, convertError(err)
  467. }
  468. return res, nil
  469. }
  470. // 为确保srcKey跟destKey映射到同一个slot,srcKey和destKey需要加上hash tag,如:{test}
  471. func RCRPopLPush(srcKey string, destKey string) (interface{}, error) {
  472. res, err := pools.RPopLPush(srcKey, destKey).Result()
  473. if err != nil {
  474. return nil, convertError(err)
  475. }
  476. return res, nil
  477. }
  478. func RCSAdd(key string, members ...interface{}) (int64, error) {
  479. args := make([]interface{}, 0, len(members)+1)
  480. args = append(args, key)
  481. args = append(args, members...)
  482. ms := make([]string, 0, len(members))
  483. for _, member := range members {
  484. m, err := String(member, nil)
  485. if err != nil && err != ErrNil {
  486. return 0, err
  487. }
  488. ms = append(ms, m)
  489. }
  490. res, err := pools.SAdd(key, ms).Result() // todo ...
  491. if err != nil {
  492. return res, convertError(err)
  493. }
  494. return res, nil
  495. }
  496. func RCSPop(key string) ([]byte, error) {
  497. res, err := pools.SPop(key).Result()
  498. if err != nil {
  499. return nil, convertError(err)
  500. }
  501. return []byte(res), nil
  502. }
  503. func RCSIsMember(key string, member interface{}) (bool, error) {
  504. m, _ := member.(string)
  505. res, err := pools.SIsMember(key, m).Result()
  506. if err != nil {
  507. return res, convertError(err)
  508. }
  509. return res, nil
  510. }
  511. func RCSRem(key string, members ...interface{}) (int64, error) {
  512. args := make([]interface{}, 0, len(members)+1)
  513. args = append(args, key)
  514. args = append(args, members...)
  515. ms := make([]string, 0, len(members))
  516. for _, member := range members {
  517. m, err := String(member, nil)
  518. if err != nil && err != ErrNil {
  519. return 0, err
  520. }
  521. ms = append(ms, m)
  522. }
  523. res, err := pools.SRem(key, ms).Result() // todo ...
  524. if err != nil {
  525. return res, convertError(err)
  526. }
  527. return res, nil
  528. }
  529. func RCSMembers(key string) ([]string, error) {
  530. res, err := pools.SMembers(key).Result()
  531. if err != nil {
  532. return nil, convertError(err)
  533. }
  534. return res, nil
  535. }
  536. func RCScriptLoad(luaScript string) (interface{}, error) {
  537. res, err := pools.ScriptLoad(luaScript).Result()
  538. if err != nil {
  539. return nil, convertError(err)
  540. }
  541. return res, nil
  542. }
  543. func RCEvalSha(sha1 string, numberKeys int, keysArgs ...interface{}) (interface{}, error) {
  544. vals := make([]interface{}, 0, len(keysArgs)+2)
  545. vals = append(vals, sha1, numberKeys)
  546. vals = append(vals, keysArgs...)
  547. keys := make([]string, 0, numberKeys)
  548. args := make([]string, 0, len(keysArgs)-numberKeys)
  549. for i, value := range keysArgs {
  550. val, err := String(value, nil)
  551. if err != nil && err != ErrNil {
  552. return nil, err
  553. }
  554. if i < numberKeys {
  555. keys = append(keys, val)
  556. } else {
  557. args = append(args, val)
  558. }
  559. }
  560. res, err := pools.EvalSha(sha1, keys, args).Result()
  561. if err != nil {
  562. return nil, convertError(err)
  563. }
  564. return res, nil
  565. }
  566. func RCEval(luaScript string, numberKeys int, keysArgs ...interface{}) (interface{}, error) {
  567. vals := make([]interface{}, 0, len(keysArgs)+2)
  568. vals = append(vals, luaScript, numberKeys)
  569. vals = append(vals, keysArgs...)
  570. keys := make([]string, 0, numberKeys)
  571. args := make([]string, 0, len(keysArgs)-numberKeys)
  572. for i, value := range keysArgs {
  573. val, err := String(value, nil)
  574. if err != nil && err != ErrNil {
  575. return nil, err
  576. }
  577. if i < numberKeys {
  578. keys = append(keys, val)
  579. } else {
  580. args = append(args, val)
  581. }
  582. }
  583. res, err := pools.Eval(luaScript, keys, args).Result()
  584. if err != nil {
  585. return nil, convertError(err)
  586. }
  587. return res, nil
  588. }
  589. func RCGetBit(key string, offset int64) (int64, error) {
  590. res, err := pools.GetBit(key, offset).Result()
  591. if err != nil {
  592. return res, convertError(err)
  593. }
  594. return res, nil
  595. }
  596. func RCSetBit(key string, offset uint32, value int) (int, error) {
  597. res, err := pools.SetBit(key, int64(offset), value).Result()
  598. return int(res), convertError(err)
  599. }
  600. func RCGetClient() *redis.ClusterClient {
  601. return pools
  602. }
  603. func convertError(err error) error {
  604. if err == redis.Nil {
  605. // 为了兼容redis 2.x,这里不返回 ErrNil,ErrNil在调用redis_cluster_reply函数时才返回
  606. return nil
  607. }
  608. return err
  609. }