golang 的 rabbitmq 消费项目
Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.
 
 
 

182 lignes
4.2 KiB

  1. package consume
  2. import (
  3. "applet/app/db"
  4. "applet/app/utils"
  5. "applet/app/utils/logx"
  6. "applet/consume/md"
  7. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "github.com/streadway/amqp"
  12. "xorm.io/xorm"
  13. )
  14. func ZhiosValidUser(queue md.MqQueue) {
  15. fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>")
  16. ch, err := rabbit.Cfg.Pool.GetChannel()
  17. if err != nil {
  18. logx.Error(err)
  19. return
  20. }
  21. defer ch.Release()
  22. //1、将自己绑定到交换机上
  23. ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
  24. //2、取出数据进行消费
  25. ch.Qos(300)
  26. delivery := ch.Consume(queue.Name, false)
  27. var res amqp.Delivery
  28. var ok bool
  29. for {
  30. res, ok = <-delivery
  31. if ok == true {
  32. //fmt.Println(string(res.Body))
  33. fmt.Println(">>>>>>>>>>>>>>>>ZhiosAcquisitionCondition<<<<<<<<<<<<<<<<<<<<<<<<<")
  34. err = handleValidUser(res.Body)
  35. //_ = res.Reject(false)
  36. fmt.Println(err)
  37. _ = res.Ack(true)
  38. } else {
  39. panic(errors.New("error getting message"))
  40. }
  41. }
  42. fmt.Println("get msg done")
  43. }
  44. func handleValidUser(msg []byte) error {
  45. //1、解析canal采集至mq中queue的数据结构体
  46. var canalMsg *md.ZhiosAcquisition
  47. fmt.Println(string(msg))
  48. var tmpString string
  49. err := json.Unmarshal(msg, &tmpString)
  50. if err != nil {
  51. fmt.Println(err.Error())
  52. return err
  53. }
  54. fmt.Println(tmpString)
  55. err = json.Unmarshal([]byte(tmpString), &canalMsg)
  56. if err != nil {
  57. return err
  58. }
  59. mid := canalMsg.Mid
  60. eg := db.DBs[mid]
  61. if eg == nil {
  62. return nil
  63. }
  64. if canalMsg.Uid == "" {
  65. return nil
  66. }
  67. RoutineUpdateUserComm(eg, mid, canalMsg.Uid)
  68. return nil
  69. }
  70. func RoutineUpdateUserComm(eg *xorm.Engine, dbName, uid string) {
  71. if utils.StrToInt(uid) == 0 {
  72. return
  73. }
  74. user, _ := db.UserFindByID(eg, uid)
  75. if user == nil {
  76. return
  77. }
  78. userProfile, _ := db.UserProfileFindByID(eg, uid)
  79. if userProfile == nil {
  80. return
  81. }
  82. if userProfile.IsVerify == 1 {
  83. return
  84. }
  85. validConditionMap := map[string]interface{}{}
  86. checkMap := map[string]interface{}{}
  87. todone := 0
  88. //有效会员 校验
  89. vdata := db.SysCfgGetWithDb(eg, dbName, "valid_member_condition")
  90. if err := json.Unmarshal([]byte(vdata), &validConditionMap); err != nil {
  91. return
  92. logx.Warn(err)
  93. }
  94. for k, v := range validConditionMap {
  95. if v != "" && utils.AnyToFloat64(v) > 0 {
  96. todone++
  97. checkMap[k] = v
  98. }
  99. }
  100. for k, v := range checkMap {
  101. switch k {
  102. case "realCheck":
  103. // 检查实名认证
  104. one, _ := db.GetRealNameAuthByUidWithState(eg, user.Uid, 1)
  105. if one != nil && one.Uid == user.Uid {
  106. todone--
  107. }
  108. case "bindPhone":
  109. // 检查绑定手机号
  110. if user.Phone != "" {
  111. todone--
  112. }
  113. case "goodsCommission":
  114. sqlTpl := `SELECT cast(SUM(LEFT(olr.amount,LENGTH(olr.amount)-2)) as decimal(50,4)) AS amount
  115. FROM ord_list_relate olr
  116. LEFT JOIN ord_list ol ON olr.oid = ol.ord_id
  117. LEFT JOIN privilege_card_ord pco ON olr.oid =pco.ord_id
  118. LEFT JOIN duoyou_ord_list dol ON olr.oid =dol.oid
  119. LEFT JOIN recharge_order ro ON olr.oid =ro.oid
  120. LEFT JOIN playlet_sale_order pso ON olr.oid =pso.custom_oid
  121. WHERE olr.uid = ? AND (ol.state<>4 or pco.state=1 or dol.id>0 or ro.status<>'已退款' or pso.status<>'订单退款');
  122. `
  123. todayResult, err := db.QueryNativeString(eg, sqlTpl, uid)
  124. today := "0"
  125. if err == nil {
  126. today = todayResult[0]["amount"]
  127. }
  128. // 累计佣金
  129. if utils.StrToFloat64(today) >= utils.AnyToFloat64(v) {
  130. todone--
  131. }
  132. case "orderPay":
  133. // 付款订单满足v元
  134. ms, err := db.OrderListByUIDByPaidPrice(eg, user.Uid, v, []string{"0", "1", "2", "3", "5"})
  135. if err != nil {
  136. logx.Warn(err)
  137. }
  138. mss := *ms
  139. if len(mss) > 0 {
  140. todone--
  141. }
  142. case "receive":
  143. // 已收货
  144. ms, err := db.OrderListByUIDByState(eg, user.Uid, []string{"1", "2", "3", "5"})
  145. if err != nil {
  146. logx.Warn(err)
  147. }
  148. mss := *ms
  149. if len(mss) > 0 {
  150. todone--
  151. }
  152. case "tbAuth":
  153. // 是否淘宝授权
  154. if userProfile.AccTaobaoAuthTime != 0 {
  155. todone--
  156. }
  157. case "withdraw":
  158. // 提现
  159. sum, err := db.UserWithDrawApplySumByState(eg, user.Uid, "1", "2")
  160. if err != nil {
  161. logx.Warn(err)
  162. }
  163. if sum > 0 {
  164. todone--
  165. }
  166. }
  167. }
  168. // 满足条件则将改用户打为有效用户
  169. if todone == 0 {
  170. userProfile.IsVerify = 1
  171. }
  172. _, err := db.UserProfileUpdate(eg, userProfile.Uid, userProfile)
  173. if err != nil {
  174. logx.Warn(err)
  175. }
  176. return
  177. }