golang 的 rabbitmq 消费项目
Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.
 
 
 

166 rindas
4.9 KiB

  1. package consume
  2. import (
  3. "applet/app/cfg"
  4. "applet/app/db"
  5. db2 "applet/app/flexible_employment/db"
  6. "applet/app/flexible_employment/enum"
  7. "applet/app/flexible_employment/svc"
  8. "applet/app/lib/flexible_employment"
  9. "applet/app/utils"
  10. "applet/app/utils/logx"
  11. "applet/consume/md"
  12. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  13. "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/rule/one_circles"
  14. "encoding/json"
  15. "errors"
  16. "fmt"
  17. "github.com/streadway/amqp"
  18. "time"
  19. )
  20. func FlexibleEmploymentWithdrawForGongMaoConsume(queue md.MqQueue) {
  21. fmt.Println(">>>>>>>>>>>>FlexibleEmploymentWithdrawForGongMaoConsume>>>>>>>>>>>>")
  22. ch, err := rabbit.Cfg.Pool.GetChannel()
  23. if err != nil {
  24. logx.Error(err)
  25. return
  26. }
  27. defer ch.Release()
  28. //1、将自己绑定到交换机上
  29. ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
  30. //2、取出数据进行消费
  31. ch.Qos(1)
  32. delivery := ch.Consume(queue.Name, false)
  33. one_circles.Init(cfg.RedisAddr)
  34. var res amqp.Delivery
  35. var ok bool
  36. for {
  37. res, ok = <-delivery
  38. if ok == true {
  39. err = handleFlexibleEmploymentWithdrawForGongMaoConsume(res.Body)
  40. fmt.Println("err ::: ", err)
  41. if err != nil {
  42. fmt.Println("FlexibleEmploymentWithdrawForGongMaoConsume_ERR:::::", err.Error())
  43. _ = res.Reject(true) //TODO::拒绝 Ack
  44. //_ = res.Reject(false)
  45. var msg interface{}
  46. json.Unmarshal(res.Body, &msg)
  47. if err.Error() == "Connection timed out" {
  48. //TODO::重新推回队列末尾,避免造成队列堵塞
  49. ch.Publish(queue.ExchangeName, msg, queue.RoutKey)
  50. } else {
  51. //TODO::推入新的队列中备份
  52. utils.FilePutContents("FlexibleEmploymentWithdrawForGongMaoConsume_ERR", utils.SerializeStr(err.Error()))
  53. ch.Publish("zhios.app.user.withdraw.apply.exception.exchange", msg, "gongmao")
  54. }
  55. } else {
  56. err = res.Ack(true)
  57. }
  58. } else {
  59. panic(errors.New("error getting message"))
  60. }
  61. }
  62. fmt.Println("get msg done")
  63. }
  64. func handleFlexibleEmploymentWithdrawForGongMaoConsume(msgData []byte) error {
  65. var ms string
  66. err := json.Unmarshal(msgData, &ms)
  67. if err != nil {
  68. return err
  69. }
  70. time.Sleep(time.Microsecond * 200) // 等待200毫秒
  71. //1、解析mq中queue的数据结构体
  72. var msg struct {
  73. Uid string `json:"uid"`
  74. Nickname string `json:"nickname"`
  75. MasterId string `json:"master_id"`
  76. AppName string `json:"app_name"`
  77. ApplyOrder string `json:"apply_order"`
  78. ActualAmount string `json:"actual_amount"`
  79. MobCfg interface{} `json:"mob_cfg"`
  80. }
  81. err = json.Unmarshal([]byte(ms), &msg)
  82. if err != nil {
  83. return err
  84. }
  85. fmt.Println("gongmao_message:::::::::::>>>>>>>>>")
  86. fmt.Println(msg)
  87. if db.DBs[msg.MasterId] == nil {
  88. return nil
  89. }
  90. engine := db.DBs[msg.MasterId]
  91. //1、查找对应记录
  92. flexibleEmploymentOrdDb := db2.FlexibleEmploymentOrdDb{}
  93. flexibleEmploymentOrdDb.Set(msg.MasterId)
  94. flexibleEmploymentOrd, err := flexibleEmploymentOrdDb.Get(msg.ApplyOrder)
  95. if err != nil {
  96. return err
  97. }
  98. if flexibleEmploymentOrd == nil {
  99. return errors.New("未查询到对应订单记录")
  100. }
  101. flexibleEmploymentBasicDb := db2.FlexibleEmploymentBasicDb{}
  102. flexibleEmploymentBasicDb.Set()
  103. basic, err := flexibleEmploymentBasicDb.Get(msg.MasterId)
  104. if err != nil {
  105. return err
  106. }
  107. gongMao := flexible_employment.New(basic.AppKey, basic.AppSecret, basic.SecretId)
  108. result, err := gongMao.Curl(enum.MerchantDoSinglePayment, map[string]interface{}{
  109. "requestId": flexibleEmploymentOrd.RequestId,
  110. "mobile": flexibleEmploymentOrd.Mobile,
  111. "name": flexibleEmploymentOrd.Name,
  112. "amount": flexibleEmploymentOrd.Amount,
  113. "identity": flexibleEmploymentOrd.Identity,
  114. "bankAccount": flexibleEmploymentOrd.BankAccount,
  115. "dateTime": time.Now().Format("20060102150405"),
  116. "salaryType": flexibleEmploymentOrd.SettleType,
  117. })
  118. if err != nil {
  119. return err
  120. }
  121. var response struct {
  122. Success bool `json:"success"`
  123. ErrorCode string `json:"errorCode"`
  124. ErrorMsg string `json:"errorMsg"`
  125. Data struct {
  126. RequestId string `json:"requestId"`
  127. AppmentTime string `json:"appmentTime"`
  128. } `json:"data"`
  129. }
  130. if err = json.Unmarshal(utils.Serialize(result), &response); err != nil {
  131. return err
  132. }
  133. if !response.Success {
  134. //TODO::发起提现失败,将处理提现失败状态
  135. finWithdrawApply, err := db.UserWithDrawApplyByUIDById(engine, flexibleEmploymentOrd.RequestId)
  136. if err != nil {
  137. return err
  138. }
  139. session := engine.NewSession()
  140. defer session.Close()
  141. session.Begin()
  142. err = svc.DealFailResult(session, finWithdrawApply, msg.MasterId, response.ErrorMsg)
  143. if err != nil {
  144. _ = session.Rollback()
  145. return err
  146. }
  147. return session.Commit()
  148. }
  149. flexibleEmploymentOrd.State = 1
  150. updateAck, err := flexibleEmploymentOrdDb.Update(flexibleEmploymentOrd.Id, flexibleEmploymentOrd, "state")
  151. if err != nil {
  152. return err
  153. }
  154. if updateAck <= 0 {
  155. return errors.New("更新 flexible_employment_ord 状态失败")
  156. }
  157. return nil
  158. }