golang 的 rabbitmq 消费项目
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

143 lines
4.0 KiB

  1. package consume
  2. import (
  3. "applet/app/db"
  4. "applet/app/db/model"
  5. "applet/app/utils"
  6. "applet/app/utils/logx"
  7. "applet/consume/md"
  8. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  9. "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/rule"
  10. "encoding/json"
  11. "errors"
  12. "fmt"
  13. "github.com/streadway/amqp"
  14. "github.com/tidwall/gjson"
  15. "time"
  16. "xorm.io/xorm"
  17. )
  18. func ZhiosWithdrawReward(queue md.MqQueue) {
  19. fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>")
  20. ch, err := rabbit.Cfg.Pool.GetChannel()
  21. if err != nil {
  22. logx.Error(err)
  23. return
  24. }
  25. defer ch.Release()
  26. //1、将自己绑定到交换机上
  27. ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
  28. //2、取出数据进行消费
  29. ch.Qos(20)
  30. delivery := ch.Consume(queue.Name, false)
  31. var res amqp.Delivery
  32. var ok bool
  33. for {
  34. res, ok = <-delivery
  35. if ok == true {
  36. //fmt.Println(string(res.Body))
  37. fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<")
  38. err = handleZhiosWithdrawReward(res.Body)
  39. //_ = res.Reject(false)
  40. if err == nil {
  41. _ = res.Ack(true)
  42. }
  43. } else {
  44. panic(errors.New("error getting message"))
  45. }
  46. }
  47. fmt.Println("get msg done")
  48. }
  49. func handleZhiosWithdrawReward(msg []byte) error {
  50. //1、解析canal采集至mq中queue的数据结构体
  51. var canalMsg *md.ZhiosWithdraw
  52. fmt.Println(string(msg))
  53. var tmpString string
  54. err := json.Unmarshal(msg, &tmpString)
  55. if err != nil {
  56. fmt.Println(err.Error())
  57. return err
  58. }
  59. fmt.Println(tmpString)
  60. err = json.Unmarshal([]byte(tmpString), &canalMsg)
  61. if err != nil {
  62. return err
  63. }
  64. mid := canalMsg.Mid
  65. eg := db.DBs[mid]
  66. if eg == nil {
  67. return nil
  68. }
  69. //判断用户是什么等级
  70. var apply model.FinWithdrawApply
  71. get, err := eg.Where("id=?", canalMsg.Id).Get(&apply)
  72. if get == false || err != nil {
  73. return nil
  74. }
  75. if apply.Uid == 0 {
  76. return nil
  77. }
  78. //
  79. withdrawSetting := db.SysCfgGetWithDb(eg, mid, "withdraw_setting")
  80. withdrawFirstBili := gjson.Get(withdrawSetting, "withdrawFirstBili").String()
  81. publicWithdrawTeamBili := db.SysCfgGetWithDb(eg, mid, "public_withdraw_team_bili")
  82. publicWithdrawTeamCount := db.SysCfgGetWithDb(eg, mid, "public_withdraw_team_count")
  83. if utils.StrToFloat64(withdrawFirstBili) > 0 {
  84. level := 0
  85. all, _ := db.UserLevlEgAll(eg)
  86. for k, v := range all {
  87. if k == 0 {
  88. level = v.Id
  89. }
  90. }
  91. //分给粉丝
  92. levelUser := db.UserFindByLevel(eg, level)
  93. if len(levelUser) == 0 {
  94. return nil
  95. }
  96. ids := make([]int64, 0)
  97. for _, v := range levelUser {
  98. ids = append(ids, int64(v.Uid))
  99. }
  100. money := utils.FloatFormat(utils.StrToFloat64(apply.Amount)*(utils.StrToFloat64(withdrawFirstBili)/100)/float64(len(levelUser)), 6)
  101. comm(eg, canalMsg.Id, ids, money, "全网提现分红", "withdraw_reward", "92")
  102. }
  103. if utils.StrToFloat64(publicWithdrawTeamBili) > 0 && utils.StrToFloat64(publicWithdrawTeamCount) > 0 {
  104. //分给粉丝
  105. user, _ := rule.FindRandUser(eg, utils.StrToInt(publicWithdrawTeamCount))
  106. money := utils.FloatFormat(utils.StrToFloat64(apply.Amount)*(utils.StrToFloat64(publicWithdrawTeamBili)/100)/float64(len(user)), 6)
  107. comm(eg, canalMsg.Id, user, money, "公排团队提现分红", "withdraw_team_reward", "93")
  108. }
  109. return nil
  110. }
  111. func comm(eg *xorm.Engine, id string, levelUser []int64, money float64, title, types, ordAction string) {
  112. for _, v := range levelUser {
  113. profile, err := db.UserProfileFindByID(eg, v)
  114. if err != nil || profile == nil {
  115. continue
  116. }
  117. oldAmount := profile.FinValid
  118. profile.FinValid = utils.Float64ToStrByPrec(utils.StrToFloat64(profile.FinValid)+money, 6)
  119. eg.Where("uid=?", profile.Uid).Update(profile)
  120. var flow = model.FinUserFlow{
  121. Uid: int(v),
  122. Type: 0,
  123. Amount: utils.Float64ToStrByPrec(money, 6),
  124. BeforeAmount: oldAmount,
  125. AfterAmount: profile.FinValid,
  126. OrdType: types,
  127. OrdId: id,
  128. OrdTitle: title,
  129. OrdAction: utils.StrToInt(ordAction),
  130. OrdTime: int(time.Now().Unix()),
  131. State: 2,
  132. CreateAt: time.Now(),
  133. UpdateAt: time.Now(),
  134. }
  135. eg.Insert(&flow)
  136. }
  137. }