golang 的 rabbitmq 消费项目
Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

10 місяці тому
10 місяці тому
10 місяці тому
10 місяці тому
10 місяці тому
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. package consume
  2. import (
  3. "applet/app/cfg"
  4. "applet/app/utils"
  5. "applet/app/utils/logx"
  6. "applet/consume/md"
  7. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  8. "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/rule/one_circles"
  9. "encoding/json"
  10. "errors"
  11. "fmt"
  12. "github.com/streadway/amqp"
  13. "time"
  14. )
  15. func WithdrawConsume(queue md.MqQueue) {
  16. fmt.Println(">>>>>>>>>>>>WithdrawConsume>>>>>>>>>>>>")
  17. ch, err := rabbit.Cfg.Pool.GetChannel()
  18. if err != nil {
  19. logx.Error(err)
  20. return
  21. }
  22. defer ch.Release()
  23. //1、将自己绑定到交换机上
  24. ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
  25. //2、取出数据进行消费
  26. ch.Qos(1)
  27. delivery := ch.Consume(queue.Name, false)
  28. one_circles.Init(cfg.RedisAddr)
  29. var res amqp.Delivery
  30. var ok bool
  31. for {
  32. res, ok = <-delivery
  33. if ok == true {
  34. err = handleWithdrawConsume(res.Body)
  35. fmt.Println("err ::: ", err)
  36. if err != nil {
  37. fmt.Println("WithdrawConsume_ERR:::::", err.Error())
  38. //_ = res.Reject(true)
  39. _ = res.Reject(false)
  40. var msg interface{}
  41. json.Unmarshal(res.Body, &msg)
  42. if err.Error() == "Connection timed out" {
  43. //TODO::重新推回队列末尾,避免造成队列堵塞
  44. ch.Publish(queue.ExchangeName, msg, queue.RoutKey)
  45. } else {
  46. //TODO::推入新的队列中备份
  47. utils.FilePutContents("WithdrawConsume_ERR", utils.SerializeStr(err.Error()))
  48. ch.Publish("zhios.app.user.withdraw.apply.exception.exchange", msg, "queues_one")
  49. }
  50. } else {
  51. err = res.Ack(true)
  52. }
  53. } else {
  54. panic(errors.New("error getting message"))
  55. }
  56. }
  57. fmt.Println("get msg done")
  58. }
  59. func handleWithdrawConsume(msgData []byte) error {
  60. time.Sleep(time.Microsecond * 200) // 等待200毫秒
  61. //1、解析mq中queue的数据结构体
  62. var msg interface{}
  63. err := json.Unmarshal(msgData, &msg)
  64. if err != nil {
  65. return err
  66. }
  67. fmt.Println("message:::::::::::>>>>>>>>>")
  68. fmt.Println(msg)
  69. var url = "http://admin.99813608.zhiyingos.com/index/transfer"
  70. if cfg.Prd {
  71. url = "http://zhios-admin/index/transfer"
  72. }
  73. post, err := utils.CurlPost(url, msg, nil)
  74. if err != nil {
  75. return err
  76. }
  77. fmt.Println("transfer:::::::::::<<<<<<<<<")
  78. fmt.Println(string(post), "\n========================================\n\n")
  79. var postResult struct {
  80. Code int `json:"code"`
  81. Msg string `json:"msg"`
  82. Data struct {
  83. IsOk bool `json:"isOk"`
  84. Msg string `json:"msg"`
  85. } `json:"data"`
  86. }
  87. err = json.Unmarshal(post, &postResult)
  88. if err != nil {
  89. return err
  90. }
  91. if !postResult.Data.IsOk {
  92. return errors.New(postResult.Data.Msg)
  93. }
  94. return nil
  95. }