附近小店
25개 이상의 토픽을 선택하실 수 없습니다. Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

101 lines
2.7 KiB

  1. package svc
  2. import (
  3. "applet/app/md"
  4. "applet/app/utils"
  5. "applet/app/utils/cache"
  6. "errors"
  7. "fmt"
  8. "math/rand"
  9. "reflect"
  10. "time"
  11. )
  12. const redisMutexLockExpTime = 15
  13. // TryGetDistributedLock 分布式锁获取
  14. // requestId 用于标识请求客户端,可以是随机字符串,需确保唯一
  15. func TryGetDistributedLock(lockKey, requestId string, isNegative bool) bool {
  16. if isNegative { // 多次尝试获取
  17. retry := 1
  18. for {
  19. ok, err := cache.Do("SET", lockKey, requestId, "EX", redisMutexLockExpTime, "NX")
  20. // 获取锁成功
  21. if err == nil && ok == "OK" {
  22. return true
  23. }
  24. // 尝试多次没获取成功
  25. if retry > 10 {
  26. return false
  27. }
  28. time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
  29. retry += 1
  30. }
  31. } else { // 只尝试一次
  32. ok, err := cache.Do("SET", lockKey, requestId, "EX", redisMutexLockExpTime, "NX")
  33. // 获取锁成功
  34. if err == nil && ok == "OK" {
  35. return true
  36. }
  37. return false
  38. }
  39. }
  40. // ReleaseDistributedLock 释放锁,通过比较requestId,用于确保客户端只释放自己的锁,使用lua脚本保证操作的原子型
  41. func ReleaseDistributedLock(lockKey, requestId string) (bool, error) {
  42. luaScript := `
  43. if redis.call("get",KEYS[1]) == ARGV[1]
  44. then
  45. return redis.call("del",KEYS[1])
  46. else
  47. return 0
  48. end`
  49. do, err := cache.Do("eval", luaScript, 1, lockKey, requestId)
  50. fmt.Println(reflect.TypeOf(do))
  51. fmt.Println(do)
  52. if utils.AnyToInt64(do) == 1 {
  53. return true, err
  54. } else {
  55. return false, err
  56. }
  57. }
  58. func GetDistributedLockRequestId(prefix string) string {
  59. return prefix + utils.IntToStr(rand.Intn(100000000))
  60. }
  61. // HandleBalanceDistributedLock 处理余额更新时获取锁和释放锁 如果加锁成功,使用语句 ` defer cb() ` 释放锁
  62. func HandleBalanceDistributedLock(masterId, uid, requestIdPrefix string) (cb func(), err error) {
  63. // 获取余额更新锁
  64. balanceLockKey := fmt.Sprintf(md.UserFinValidUpdateLock, masterId, uid)
  65. requestId := GetDistributedLockRequestId(requestIdPrefix)
  66. balanceLockOk := TryGetDistributedLock(balanceLockKey, requestId, true)
  67. if !balanceLockOk {
  68. return nil, errors.New("系统繁忙,请稍后再试")
  69. }
  70. cb = func() {
  71. _, _ = ReleaseDistributedLock(balanceLockKey, requestId)
  72. }
  73. return cb, nil
  74. }
  75. func HandleLimiterDistributedLock(masterId, ip, requestIdPrefix string) (cb func(), err error) {
  76. balanceLockKey := fmt.Sprintf(md.AppLimiterLock, masterId, ip)
  77. requestId := GetDistributedLockRequestId(requestIdPrefix)
  78. balanceLockOk := TryGetDistributedLock(balanceLockKey, requestId, true)
  79. if !balanceLockOk {
  80. return nil, errors.New("系统繁忙,请稍后再试")
  81. }
  82. cb = func() {
  83. _, _ = ReleaseDistributedLock(balanceLockKey, requestId)
  84. }
  85. return cb, nil
  86. }