golang 的 rabbitmq 消费项目
25'ten fazla konu seçemezsiniz Konular bir harf veya rakamla başlamalı, kısa çizgiler ('-') içerebilir ve en fazla 35 karakter uzunluğunda olabilir.
 
 
 

186 satır
5.7 KiB

  1. package consume
  2. import (
  3. "applet/app/cfg"
  4. "applet/app/db"
  5. db2 "applet/app/flexible_employment/db"
  6. "applet/app/flexible_employment/enum"
  7. "applet/app/flexible_employment/svc"
  8. "applet/app/flexible_employment/utils/aes"
  9. "applet/app/lib/flexible_employment"
  10. "applet/app/utils"
  11. "applet/app/utils/logx"
  12. "applet/consume/md"
  13. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  14. "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/rule/one_circles"
  15. "encoding/json"
  16. "errors"
  17. "fmt"
  18. "github.com/streadway/amqp"
  19. "time"
  20. )
  21. func FlexibleEmploymentWithdrawForPupiaoConsume(queue md.MqQueue) {
  22. fmt.Println(">>>>>>>>>>>>FlexibleEmploymentWithdrawForPupiaoConsume>>>>>>>>>>>>")
  23. ch, err := rabbit.Cfg.Pool.GetChannel()
  24. if err != nil {
  25. logx.Error(err)
  26. return
  27. }
  28. defer ch.Release()
  29. //1、将自己绑定到交换机上
  30. ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
  31. //2、取出数据进行消费
  32. ch.Qos(1)
  33. delivery := ch.Consume(queue.Name, false)
  34. one_circles.Init(cfg.RedisAddr)
  35. var res amqp.Delivery
  36. var ok bool
  37. for {
  38. res, ok = <-delivery
  39. if ok == true {
  40. err = handleFlexibleEmploymentWithdrawForPupiaoConsume(res.Body)
  41. fmt.Println("err ::: ", err)
  42. if err != nil {
  43. fmt.Println("FlexibleEmploymentWithdrawForPupiaoConsume_ERR:::::", err.Error())
  44. //_ = res.Reject(true)
  45. _ = res.Reject(false)
  46. var msg interface{}
  47. json.Unmarshal(res.Body, &msg)
  48. if err.Error() == "Connection timed out" {
  49. //TODO::重新推回队列末尾,避免造成队列堵塞
  50. ch.Publish(queue.ExchangeName, msg, queue.RoutKey)
  51. } else {
  52. //TODO::推入新的队列中备份
  53. utils.FilePutContents("FlexibleEmploymentWithdrawForPupiaoConsume_ERR", utils.SerializeStr(err.Error()))
  54. ch.Publish("zhios.app.user.withdraw.apply.exception.exchange", msg, "pupiao")
  55. }
  56. } else {
  57. err = res.Ack(true)
  58. }
  59. } else {
  60. panic(errors.New("error getting message"))
  61. }
  62. }
  63. fmt.Println("get msg done")
  64. }
  65. func handleFlexibleEmploymentWithdrawForPupiaoConsume(msgData []byte) error {
  66. var ms string
  67. err := json.Unmarshal(msgData, &ms)
  68. if err != nil {
  69. return err
  70. }
  71. time.Sleep(time.Microsecond * 200) // 等待200毫秒
  72. //1、解析mq中queue的数据结构体
  73. var msg struct {
  74. Uid string `json:"uid"`
  75. Nickname string `json:"nickname"`
  76. MasterId string `json:"master_id"`
  77. AppName string `json:"app_name"`
  78. ApplyOrder string `json:"apply_order"`
  79. ActualAmount string `json:"actual_amount"`
  80. Oid string `json:"oid"`
  81. MobCfg interface{} `json:"mob_cfg"`
  82. }
  83. err = json.Unmarshal([]byte(ms), &msg)
  84. if err != nil {
  85. return err
  86. }
  87. fmt.Println("pupiao_message:::::::::::>>>>>>>>>")
  88. fmt.Println(msg)
  89. if db.DBs[msg.MasterId] == nil {
  90. return nil
  91. }
  92. engine := db.DBs[msg.MasterId]
  93. //1、查找对应记录
  94. flexibleEmploymentPupiaoOrdDb := db2.FlexibleEmploymentPupiaoOrdDb{}
  95. flexibleEmploymentPupiaoOrdDb.Set(msg.MasterId)
  96. flexibleEmploymentPupiaoOrd, err := flexibleEmploymentPupiaoOrdDb.Get(msg.Oid)
  97. if err != nil {
  98. return err
  99. }
  100. if flexibleEmploymentPupiaoOrd == nil {
  101. return errors.New("未查询到对应订单记录")
  102. }
  103. flexibleEmploymentPuiaoBasicDb := db2.FlexibleEmploymentPuiaoBasicDb{}
  104. flexibleEmploymentPuiaoBasicDb.Set()
  105. basic, err := flexibleEmploymentPuiaoBasicDb.GetBasic(msg.MasterId)
  106. if err != nil {
  107. return err
  108. }
  109. //2、发起制单
  110. puPiao := flexible_employment.NewPuPiao(basic.AppId, basic.AppSecret)
  111. var userAESData = []map[string]string{
  112. {
  113. "outTradeNo": "o_" + utils.Int64ToStr(flexibleEmploymentPupiaoOrd.WithdrawApplyId),
  114. "settleType": flexibleEmploymentPupiaoOrd.SettleType,
  115. "payeeName": flexibleEmploymentPupiaoOrd.PayeeName,
  116. "payeeIdCard": flexibleEmploymentPupiaoOrd.PayeeIdCard,
  117. "payeePhone": flexibleEmploymentPupiaoOrd.PayeePhone,
  118. "payeeNo": flexibleEmploymentPupiaoOrd.PayeeNo,
  119. "orderAmount": flexibleEmploymentPupiaoOrd.TotalAmount,
  120. },
  121. }
  122. str, _ := json.Marshal(userAESData)
  123. itemAESContent, _ := aes.AesEncryptByECB([]byte(basic.AppSecret), string(str))
  124. settleAccountId := basic.SettleAccountId
  125. if flexibleEmploymentPupiaoOrd.SettleType == "ALIPAY" {
  126. settleAccountId = basic.SettleAccountIdForAli
  127. }
  128. result, err := puPiao.Curl(enum.OpenApiPaymentReceiveOrder, map[string]interface{}{
  129. "outBatchNo": flexibleEmploymentPupiaoOrd.OutBatchNo,
  130. "hrcompanyId": basic.HrCompanyId,
  131. "settleAccountId": settleAccountId,
  132. "totalCount": "1",
  133. "totalAmount": flexibleEmploymentPupiaoOrd.TotalAmount,
  134. "itemAESContent": itemAESContent,
  135. })
  136. if err != nil {
  137. return err
  138. }
  139. var response struct {
  140. IsSuccess string `json:"isSuccess"`
  141. Charset string `json:"charset"`
  142. ErrorCode string `json:"errorCode"`
  143. ErrorMsg string `json:"errorMsg"`
  144. Data struct {
  145. PlatformBatchNo string `json:"platformBatchNo"`
  146. OutBatchNo string `json:"outBatchNo"`
  147. } `json:"data"`
  148. }
  149. if err = json.Unmarshal(utils.Serialize(result), &response); err != nil {
  150. return err
  151. }
  152. if response.IsSuccess == "F" {
  153. flexibleEmploymentPupiaoOrd.BatchStatus = 1
  154. updateAck, err := flexibleEmploymentPupiaoOrdDb.Update(flexibleEmploymentPupiaoOrd.Id, flexibleEmploymentPupiaoOrd, "batch_status")
  155. if err != nil {
  156. return err
  157. }
  158. if updateAck <= 0 {
  159. return errors.New("更新 flexible_employment_pupiao_ord 状态失败")
  160. }
  161. //TODO::制单失败,将处理提现失败状态
  162. finWithdrawApply, err := db.UserWithDrawApplyByUIDById(engine, utils.Int64ToStr(flexibleEmploymentPupiaoOrd.WithdrawApplyId))
  163. if err != nil {
  164. return err
  165. }
  166. session := engine.NewSession()
  167. defer session.Close()
  168. session.Begin()
  169. err = svc.DealFailResultForPuPiao(session, finWithdrawApply, msg.MasterId, response.ErrorMsg)
  170. if err != nil {
  171. _ = session.Rollback()
  172. return err
  173. }
  174. return session.Commit()
  175. }
  176. return nil
  177. }