golang 的 rabbitmq 消费项目
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

229 lines
6.3 KiB

  1. package consume
  2. import (
  3. "applet/app/db"
  4. "applet/app/db/model"
  5. "applet/app/utils"
  6. "applet/app/utils/logx"
  7. "applet/consume/md"
  8. model2 "code.fnuoos.com/go_rely_warehouse/zyos_go_condition_statistics.git/db/model"
  9. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  10. "encoding/json"
  11. "errors"
  12. "fmt"
  13. "github.com/streadway/amqp"
  14. "xorm.io/xorm"
  15. )
  16. func ZhiosValidUser(queue md.MqQueue) {
  17. fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>")
  18. ch, err := rabbit.Cfg.Pool.GetChannel()
  19. if err != nil {
  20. logx.Error(err)
  21. return
  22. }
  23. defer ch.Release()
  24. //1、将自己绑定到交换机上
  25. ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
  26. //2、取出数据进行消费
  27. ch.Qos(300)
  28. delivery := ch.Consume(queue.Name, false)
  29. var res amqp.Delivery
  30. var ok bool
  31. for {
  32. res, ok = <-delivery
  33. if ok == true {
  34. //fmt.Println(string(res.Body))
  35. fmt.Println(">>>>>>>>>>>>>>>>ZhiosAcquisitionCondition<<<<<<<<<<<<<<<<<<<<<<<<<")
  36. err = handleValidUser(res.Body)
  37. //_ = res.Reject(false)
  38. fmt.Println(err)
  39. _ = res.Ack(true)
  40. } else {
  41. panic(errors.New("error getting message"))
  42. }
  43. }
  44. fmt.Println("get msg done")
  45. }
  46. func handleValidUser(msg []byte) error {
  47. //1、解析canal采集至mq中queue的数据结构体
  48. var canalMsg *md.ZhiosAcquisition
  49. fmt.Println(string(msg))
  50. var tmpString string
  51. err := json.Unmarshal(msg, &tmpString)
  52. if err != nil {
  53. fmt.Println(err.Error())
  54. return err
  55. }
  56. fmt.Println(tmpString)
  57. err = json.Unmarshal([]byte(tmpString), &canalMsg)
  58. if err != nil {
  59. return err
  60. }
  61. mid := canalMsg.Mid
  62. eg := db.DBs[mid]
  63. if eg == nil {
  64. return nil
  65. }
  66. if canalMsg.Uid == "" {
  67. return nil
  68. }
  69. RoutineUpdateUserComm(eg, mid, canalMsg.Uid)
  70. return nil
  71. }
  72. func RoutineUpdateUserComm(eg *xorm.Engine, dbName, uid string) {
  73. if utils.StrToInt(uid) == 0 {
  74. return
  75. }
  76. user, _ := db.UserFindByID(eg, uid)
  77. if user == nil {
  78. return
  79. }
  80. userProfile, _ := db.UserProfileFindByID(eg, uid)
  81. if userProfile == nil {
  82. return
  83. }
  84. if userProfile.IsVerify == 1 {
  85. return
  86. }
  87. validConditionMap := map[string]interface{}{}
  88. checkMap := map[string]interface{}{}
  89. todone := 0
  90. //有效会员 校验
  91. vdata := db.SysCfgGetWithDb(eg, dbName, "valid_member_condition")
  92. if err := json.Unmarshal([]byte(vdata), &validConditionMap); err != nil {
  93. return
  94. logx.Warn(err)
  95. }
  96. for k, v := range validConditionMap {
  97. if v != "" && utils.AnyToFloat64(v) > 0 {
  98. todone++
  99. checkMap[k] = v
  100. }
  101. }
  102. for k, v := range checkMap {
  103. switch k {
  104. case "taskVideo":
  105. sum, _ := eg.Where("uid=? and task_type=? and task_id>0", user.Uid, 3).Sum(&model.TaskVideoNum{}, "count")
  106. if sum >= utils.AnyToFloat64(v) {
  107. todone--
  108. }
  109. case "realCheck":
  110. // 检查实名认证
  111. one, _ := db.GetRealNameAuthByUidWithState(eg, user.Uid, 1)
  112. if one != nil && one.Uid == user.Uid {
  113. todone--
  114. }
  115. case "bindPhone":
  116. // 检查绑定手机号
  117. if user.Phone != "" {
  118. todone--
  119. }
  120. case "goodsCommission":
  121. sqlTpl := `SELECT SUM(olr.amount) AS amount
  122. FROM ord_list_relate olr
  123. LEFT JOIN ord_list ol ON olr.oid = ol.ord_id
  124. LEFT JOIN privilege_card_ord pco ON olr.oid =pco.ord_id
  125. LEFT JOIN duoyou_ord_list dol ON olr.oid =dol.oid
  126. LEFT JOIN recharge_order ro ON olr.oid =ro.oid
  127. LEFT JOIN playlet_sale_order pso ON olr.oid =pso.custom_oid
  128. WHERE olr.uid = ? AND (ol.state<>4 or pco.state=1 or dol.id>0 or ro.status<>'已退款' or pso.status<>'订单退款');
  129. `
  130. todayResult, err := db.QueryNativeString(eg, sqlTpl, uid)
  131. var sum float64 = 0
  132. if err != nil {
  133. _ = logx.Warn(err)
  134. sum = 0
  135. } else {
  136. sum = utils.StrToFloat64(todayResult[0]["amount"])
  137. }
  138. sqlTpl = `SELECT SUM(olr.amount) AS amount
  139. FROM o2o_ord_list_relate olr
  140. LEFT JOIN o2o_ord ol ON olr.oid = ol.ord_id
  141. LEFT JOIN o2o_pay_to_merchant optm ON optm.pay_id = olr.oid
  142. WHERE olr.uid = ? AND (ol.state IN (1,2,3,4) or optm.state=1);
  143. `
  144. todayResult, err = db.QueryNativeString(eg, sqlTpl, uid)
  145. if err != nil {
  146. _ = logx.Warn(err)
  147. sum += 0
  148. } else {
  149. sum += utils.StrToFloat64(todayResult[0]["amount"])
  150. }
  151. sqlTpl = `SELECT SUM(olr.amount) AS amount
  152. FROM b2c_ord_list_relate olr
  153. JOIN b2c_ord ol ON olr.oid = ol.ord_id
  154. WHERE olr.uid = ? AND ol.state IN (1,2,3,4);
  155. `
  156. todayResult, err = db.QueryNativeString(eg, sqlTpl, uid)
  157. if err != nil {
  158. _ = logx.Warn(err)
  159. sum += 0
  160. } else {
  161. sum += utils.StrToFloat64(todayResult[0]["amount"])
  162. }
  163. sqlTpl = `SELECT SUM(olr.amount) AS amount
  164. FROM mall_ord_list_relate olr
  165. JOIN mall_ord ol ON olr.oid = ol.ord_id
  166. WHERE olr.uid = ? AND ol.state IN (1,2,3);
  167. `
  168. todayResult, err = db.QueryNativeString(eg, sqlTpl, uid)
  169. if err != nil {
  170. _ = logx.Warn(err)
  171. sum += 0
  172. } else {
  173. sum += utils.StrToFloat64(todayResult[0]["amount"])
  174. }
  175. // 累计佣金
  176. if sum >= utils.AnyToFloat64(v) {
  177. todone--
  178. }
  179. case "orderPay":
  180. count, _ := eg.Where("uid=?", user.Uid).In("state", []string{"1", "2", "3", "5"}).Sum(&model.OrdList{}, "paid_price")
  181. count1, _ := eg.Where("uid=?", user.Uid).In("state", []string{"1", "2", "3"}).Sum(&model.MallOrd{}, "cost_price")
  182. count2, _ := eg.Where("uid=?", user.Uid).In("state", []string{"1", "2", "3"}).Sum(&model2.O2oOrd{}, "cost_price")
  183. count3, _ := eg.Where("uid=?", user.Uid).In("state", []string{"1", "2", "3"}).Sum(&model2.B2cOrd{}, "cost_price")
  184. if count+count3+count2+count1 >= utils.AnyToFloat64(v) {
  185. todone--
  186. }
  187. case "receive":
  188. // 已收货
  189. count, _ := eg.Where("uid=?", user.Uid).In("state", []string{"1", "2", "3", "5"}).Count(&model.OrdList{})
  190. count1, _ := eg.Where("uid=?", user.Uid).In("state", []string{"3"}).Count(&model.MallOrd{})
  191. count2, _ := eg.Where("uid=?", user.Uid).In("state", []string{"3"}).Count(&model2.O2oOrd{})
  192. count3, _ := eg.Where("uid=?", user.Uid).In("state", []string{"3"}).Count(&model2.B2cOrd{})
  193. if count+count3+count2+count1 > 0 {
  194. todone--
  195. }
  196. case "tbAuth":
  197. // 是否淘宝授权
  198. if userProfile.AccTaobaoAuthTime != 0 {
  199. todone--
  200. }
  201. case "withdraw":
  202. // 提现
  203. sum, err := db.UserWithDrawApplySumByState(eg, user.Uid, "1", "2")
  204. if err != nil {
  205. logx.Warn(err)
  206. }
  207. if sum > 0 {
  208. todone--
  209. }
  210. }
  211. }
  212. // 满足条件则将改用户打为有效用户
  213. if todone == 0 {
  214. userProfile.IsVerify = 1
  215. }
  216. _, err := db.UserProfileUpdate(eg, userProfile.Uid, userProfile)
  217. if err != nil {
  218. logx.Warn(err)
  219. }
  220. return
  221. }