golang 的 rabbitmq 消费项目
您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
 
 
 

103 行
2.3 KiB

  1. package consume
  2. import (
  3. "applet/app/db"
  4. "applet/app/utils"
  5. "applet/app/utils/logx"
  6. "applet/consume/md"
  7. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "github.com/streadway/amqp"
  12. "time"
  13. )
  14. func ZhiosTaskTotal(queue md.MqQueue) {
  15. fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>")
  16. ch, err := rabbit.Cfg.Pool.GetChannel()
  17. if err != nil {
  18. logx.Error(err)
  19. return
  20. }
  21. defer ch.Release()
  22. //1、将自己绑定到交换机上
  23. ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
  24. //2、取出数据进行消费
  25. ch.Qos(1000)
  26. delivery := ch.Consume(queue.Name, false)
  27. var res amqp.Delivery
  28. var ok bool
  29. for {
  30. res, ok = <-delivery
  31. if ok == true {
  32. //fmt.Println(string(res.Body))
  33. fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<")
  34. err = handleZhiosTaskTotal(res.Body)
  35. //_ = res.Reject(false)
  36. if err == nil {
  37. _ = res.Ack(true)
  38. }
  39. } else {
  40. panic(errors.New("error getting message"))
  41. }
  42. }
  43. fmt.Println("get msg done")
  44. }
  45. func handleZhiosTaskTotal(msg []byte) error {
  46. //1、解析canal采集至mq中queue的数据结构体
  47. var canalMsg *md.ZhiosAcquisition
  48. fmt.Println(string(msg))
  49. var tmpString string
  50. err := json.Unmarshal(msg, &tmpString)
  51. if err != nil {
  52. fmt.Println(err.Error())
  53. return err
  54. }
  55. fmt.Println(tmpString)
  56. err = json.Unmarshal([]byte(tmpString), &canalMsg)
  57. if err != nil {
  58. return err
  59. }
  60. mid := canalMsg.Mid
  61. eg := db.DBs[mid]
  62. if eg == nil {
  63. return nil
  64. }
  65. sess := eg.NewSession()
  66. defer sess.Close()
  67. sess.Begin()
  68. list := db.GetTaskCenterRewardList(sess, canalMsg.Id)
  69. if list == nil {
  70. sess.Rollback()
  71. return nil
  72. }
  73. if list.IsSend == 1 {
  74. sess.Rollback()
  75. return nil
  76. }
  77. var tmp = make(map[string]string)
  78. json.Unmarshal([]byte(list.Reward), &tmp)
  79. for k, v := range tmp {
  80. if utils.StrToFloat64(v) == 0 {
  81. continue
  82. }
  83. err := db.GetTaskTotal(sess, utils.IntToStr(list.Uid), time.Unix(int64(list.CreateTime), 0).Format("20060102"), time.Unix(int64(list.CreateTime), 0).Format("200601"), k, list.Type, list.TaskType, list.Title, v)
  84. if err != nil {
  85. sess.Rollback()
  86. return err
  87. }
  88. }
  89. list.IsSend = 1
  90. update, err := sess.Where("id=?", list.Id).Cols("is_send").Update(list)
  91. if update == 0 || err != nil {
  92. sess.Rollback()
  93. return errors.New("失败")
  94. }
  95. sess.Commit()
  96. return nil
  97. }