golang 的 rabbitmq 消费项目
選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。
 
 
 

161 行
3.7 KiB

  1. package consume
  2. import (
  3. "applet/app/db"
  4. "applet/app/db/model"
  5. "applet/app/utils"
  6. "applet/app/utils/logx"
  7. "applet/consume/md"
  8. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  9. "encoding/json"
  10. "errors"
  11. "fmt"
  12. "github.com/streadway/amqp"
  13. "strings"
  14. "time"
  15. "xorm.io/xorm"
  16. )
  17. func ZhiosUserRelate(queue md.MqQueue) {
  18. fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>")
  19. ch, err := rabbit.Cfg.Pool.GetChannel()
  20. if err != nil {
  21. logx.Error(err)
  22. return
  23. }
  24. defer ch.Release()
  25. //1、将自己绑定到交换机上
  26. ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
  27. //2、取出数据进行消费
  28. ch.Qos(1)
  29. delivery := ch.Consume(queue.Name, false)
  30. var res amqp.Delivery
  31. var ok bool
  32. for {
  33. res, ok = <-delivery
  34. if ok == true {
  35. //fmt.Println(string(res.Body))
  36. fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<")
  37. err = handleZhiosUserRelate(res.Body)
  38. //_ = res.Reject(false)
  39. if err != nil {
  40. _ = res.Reject(false)
  41. //TODO::重新推回队列末尾,避免造成队列堵塞
  42. var msg *md.ZhiosOrderBuckle
  43. var tmpString string
  44. err := json.Unmarshal(res.Body, &tmpString)
  45. if err != nil {
  46. return
  47. }
  48. fmt.Println(tmpString)
  49. err = json.Unmarshal([]byte(tmpString), &msg)
  50. if err != nil {
  51. return
  52. }
  53. ch.Publish(queue.ExchangeName, utils.SerializeStr(msg), queue.RoutKey)
  54. } else {
  55. _ = res.Ack(true)
  56. }
  57. } else {
  58. panic(errors.New("error getting message"))
  59. }
  60. }
  61. fmt.Println("get msg done")
  62. }
  63. func handleZhiosUserRelate(msg []byte) error {
  64. time.Sleep(time.Microsecond * 20) // 等待500毫秒
  65. //1、解析canal采集至mq中queue的数据结构体
  66. var canalMsg *md.ZhiosOrderBuckle
  67. fmt.Println(string(msg))
  68. var tmpString string
  69. err := json.Unmarshal(msg, &tmpString)
  70. if err != nil {
  71. fmt.Println("===with", err.Error())
  72. return err
  73. }
  74. fmt.Println(tmpString)
  75. err = json.Unmarshal([]byte(tmpString), &canalMsg)
  76. if err != nil {
  77. fmt.Println("===with", err.Error())
  78. return err
  79. }
  80. mid := canalMsg.Mid
  81. eg := db.DBs[mid]
  82. if eg == nil {
  83. return nil
  84. }
  85. profile, err := db.UserProfileFindByID(eg, canalMsg.Uid)
  86. if err != nil || profile == nil {
  87. return nil
  88. }
  89. if profile.ParentUid > 0 {
  90. ur := new(model.UserRelate)
  91. //如果有上级要加入关系链
  92. initLV := 1
  93. ur.ParentUid = profile.ParentUid
  94. ur.Uid = profile.Uid
  95. ur.Level = initLV
  96. ur.InviteTime = time.Now()
  97. _, err = db.UserRelateInsert(eg, ur)
  98. if err != nil && strings.Contains(err.Error(), "Duplicate") == false {
  99. return err
  100. }
  101. // 插入多级关联
  102. RoutineMultiRelate1(eg, ur.ParentUid, ur.Uid, initLV)
  103. }
  104. return nil
  105. }
  106. //RoutineMultiRelate is 多级关联
  107. func RoutineMultiRelate1(eg *xorm.Engine, pid int, uid int, lv int) {
  108. for {
  109. if pid == 0 {
  110. break
  111. }
  112. m, err := db.UserProfileFindByID(eg, pid)
  113. if err != nil {
  114. logx.Warn(err)
  115. break
  116. }
  117. if m != nil {
  118. if m.ParentUid == 0 {
  119. break
  120. }
  121. lv++
  122. ur := new(model.UserRelate)
  123. ur.ParentUid = m.ParentUid
  124. ur.Uid = uid
  125. ur.Level = lv
  126. ur.InviteTime = time.Now()
  127. _, err := db.UserRelateInsert(eg, ur)
  128. if err != nil && strings.Contains(err.Error(), "Duplicate") == false {
  129. logx.Warn(err)
  130. break
  131. }
  132. if err != nil && strings.Contains(err.Error(), "Duplicate") {
  133. tmp, _, _ := db.UserRelateByUIDAndPUID(eg, ur.Uid, ur.ParentUid)
  134. if tmp != nil && tmp.Level != ur.Level {
  135. db.UserRelateUpdate(eg, ur)
  136. }
  137. }
  138. // 还要关联当前的用户的所有下级,注意关联等级
  139. //go RoutineInsertUserRelate(c, m.ParentUid, uid, lv)
  140. // 下级关联上上级
  141. // 继续查询
  142. logx.Info(fmt.Sprintf("关联pid(%v) -> uid(%v),lv:%v", ur.ParentUid, ur.Uid, lv))
  143. logx.Info("继续查询")
  144. pid = m.ParentUid
  145. }
  146. if m == nil {
  147. logx.Info("查询结束,退出")
  148. break
  149. }
  150. }
  151. }