蛋蛋星球RabbitMq消费项目
Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.
 
 
 
 

165 řádky
5.1 KiB

  1. package consume
  2. import (
  3. "applet/app/cfg"
  4. "applet/app/db"
  5. utils2 "applet/app/utils"
  6. "applet/app/utils/logx"
  7. "applet/consume/md"
  8. "code.fnuoos.com/EggPlanet/egg_models.git/src/implement"
  9. "code.fnuoos.com/EggPlanet/egg_models.git/src/model"
  10. zhios_order_relate_utils "code.fnuoos.com/EggPlanet/egg_models.git/utils"
  11. "code.fnuoos.com/EggPlanet/egg_system_rules.git"
  12. "code.fnuoos.com/EggPlanet/egg_system_rules.git/enum"
  13. md3 "code.fnuoos.com/EggPlanet/egg_system_rules.git/md"
  14. "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule"
  15. md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/md"
  16. "code.fnuoos.com/EggPlanet/egg_system_rules.git/svc"
  17. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  18. "encoding/json"
  19. "errors"
  20. "fmt"
  21. "github.com/streadway/amqp"
  22. "strconv"
  23. "time"
  24. )
  25. // EggPublishDataConsume 处理冻结用户流水
  26. func EggPublishDataConsume(queue md.MqQueue) {
  27. fmt.Println(">>>>>>>>>>>>EggPublishDataConsume>>>>>>>>>>>>")
  28. ch, err := rabbit.Cfg.Pool.GetChannel()
  29. if err != nil {
  30. logx.Error(err)
  31. return
  32. }
  33. defer ch.Release()
  34. //1、将自己绑定到交换机上
  35. ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
  36. //2、取出数据进行消费
  37. ch.Qos(1)
  38. delivery := ch.Consume(queue.Name, false)
  39. egg_system_rules.Init(cfg.RedisAddr)
  40. var res amqp.Delivery
  41. var ok bool
  42. for {
  43. res, ok = <-delivery
  44. if ok == true {
  45. err = handleEggPublishDataConsume(res.Body)
  46. if err != nil {
  47. fmt.Println("EggPublishDataConsume_ERR:::::", err.Error())
  48. utils2.FilePutContents("EggPublishDataConsume_ERR", utils2.SerializeStr(map[string]interface{}{
  49. "body": res.Body,
  50. "err": err.Error(),
  51. }))
  52. }
  53. //_ = res.Reject(false)
  54. err = res.Ack(true)
  55. fmt.Println("err ::: ", err)
  56. } else {
  57. panic(errors.New("error getting message"))
  58. }
  59. }
  60. }
  61. func handleEggPublishDataConsume(msgData []byte) error {
  62. time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒
  63. // 1.解析mq中queue的数据结构体
  64. var msg *md2.EggRoutKeyForPunishmentRecordData
  65. err := json.Unmarshal(msgData, &msg)
  66. if err != nil {
  67. return err
  68. }
  69. settingDb := implement.NewEggEnergyBasicSettingDb(db.Db)
  70. setting, err := settingDb.EggEnergyBasicSettingGetOne()
  71. if err != nil {
  72. return err
  73. }
  74. //1、分布式锁阻拦
  75. requestIdPrefix1 := fmt.Sprintf(md2.DealUserCoinRequestIdPrefix, setting.PersonEggEnergyCoinId, msg.Uid)
  76. cb1, err := svc.HandleDistributedLock(zhios_order_relate_utils.Int64ToStr(msg.Uid), strconv.Itoa(setting.PersonEggEnergyCoinId), requestIdPrefix1)
  77. if err != nil {
  78. return err
  79. }
  80. if cb1 != nil {
  81. defer cb1() // 释放锁
  82. }
  83. requestIdPrefix2 := fmt.Sprintf(md2.DealUserCoinRequestIdPrefix, setting.TeamEggEnergyCoinId, msg.Uid)
  84. cb2, err := svc.HandleDistributedLock(zhios_order_relate_utils.Int64ToStr(msg.Uid), strconv.Itoa(setting.TeamEggEnergyCoinId), requestIdPrefix2)
  85. if err != nil {
  86. return err
  87. }
  88. if cb2 != nil {
  89. defer cb2() // 释放锁
  90. }
  91. session := db.Db.NewSession()
  92. defer session.Close()
  93. session.Begin()
  94. var userVirtualWallet model.UserVirtualAmount
  95. get, err := session.Table("user_virtual_amount").
  96. Where("uid = ?", msg.Uid).
  97. And("coin_id = ?", setting.TeamEggEnergyCoinId).Get(&userVirtualWallet)
  98. if err != nil {
  99. fmt.Println(err)
  100. return err
  101. }
  102. if !get {
  103. fmt.Println("userVirtualWallet1 not exist ======>", msg.Uid)
  104. return err
  105. }
  106. if utils2.StrToFloat64(msg.Amount) <= utils2.StrToFloat64(userVirtualWallet.Amount) {
  107. // 待结够扣
  108. err = rule.DealUserVirtualCoinMinus(session, md3.DealUserVirtualCoinReq{
  109. Kind: "sub",
  110. Title: enum.EggEnergyMaliciousAccountFlashingDeductionTeam.String(),
  111. TransferType: int(enum.EggEnergyMaliciousAccountFlashingDeductionTeam),
  112. CoinId: setting.TeamEggEnergyCoinId,
  113. Uid: msg.Uid,
  114. Amount: utils2.StrToFloat64(msg.Amount),
  115. })
  116. if err != nil {
  117. return err
  118. }
  119. } else {
  120. // 待结不够扣
  121. // 扣完当前待结余额
  122. if utils2.StrToFloat64(userVirtualWallet.Amount) > 0 {
  123. err = rule.DealUserVirtualCoinMinus(session, md3.DealUserVirtualCoinReq{
  124. Kind: "sub",
  125. Title: enum.EggEnergyMaliciousAccountFlashingDeductionTeam.String(),
  126. TransferType: int(enum.EggEnergyMaliciousAccountFlashingDeductionTeam),
  127. CoinId: setting.TeamEggEnergyCoinId,
  128. Uid: msg.Uid,
  129. Amount: utils2.StrToFloat64(userVirtualWallet.Amount),
  130. })
  131. }
  132. remainingAmount := utils2.StrToFloat64(msg.Amount) - utils2.StrToFloat64(userVirtualWallet.Amount)
  133. err := rule.DealUserVirtualCoinMinus(session, md3.DealUserVirtualCoinReq{
  134. Kind: "sub",
  135. Title: enum.EggEnergyMaliciousAccountFlashingDeductionPersonal.String(),
  136. TransferType: int(enum.EggEnergyMaliciousAccountFlashingDeductionPersonal),
  137. CoinId: setting.PersonEggEnergyCoinId,
  138. Uid: msg.Uid,
  139. Amount: remainingAmount,
  140. })
  141. if err != nil {
  142. return err
  143. }
  144. }
  145. _, err = session.Table("punishment_record").
  146. Where("uid = ?", msg.Uid).
  147. Update(model.PunishmentRecord{
  148. Complete: 1,
  149. })
  150. if err != nil {
  151. return err
  152. }
  153. session.Commit()
  154. fmt.Println("handleEggPublishDataConsume_succeed ====>", msg.Uid)
  155. return nil
  156. }