golang 的 rabbitmq 消费项目
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

177 regels
5.5 KiB

  1. package consume
  2. import (
  3. "applet/app/cfg"
  4. "applet/app/db"
  5. db2 "applet/app/flexible_employment/db"
  6. "applet/app/flexible_employment/enum"
  7. "applet/app/flexible_employment/svc"
  8. "applet/app/flexible_employment/utils/aes"
  9. "applet/app/lib/flexible_employment"
  10. "applet/app/utils"
  11. "applet/app/utils/logx"
  12. "applet/consume/md"
  13. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  14. "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/rule/one_circles"
  15. "encoding/json"
  16. "errors"
  17. "fmt"
  18. "github.com/streadway/amqp"
  19. "time"
  20. )
  21. func FlexibleEmploymentWithdrawForPupiaoConsume(queue md.MqQueue) {
  22. fmt.Println(">>>>>>>>>>>>FlexibleEmploymentWithdrawForPupiaoConsume>>>>>>>>>>>>")
  23. ch, err := rabbit.Cfg.Pool.GetChannel()
  24. if err != nil {
  25. logx.Error(err)
  26. return
  27. }
  28. defer ch.Release()
  29. //1、将自己绑定到交换机上
  30. ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
  31. //2、取出数据进行消费
  32. ch.Qos(1)
  33. delivery := ch.Consume(queue.Name, false)
  34. one_circles.Init(cfg.RedisAddr)
  35. var res amqp.Delivery
  36. var ok bool
  37. for {
  38. res, ok = <-delivery
  39. if ok == true {
  40. err = handleFlexibleEmploymentWithdrawForPupiaoConsume(res.Body)
  41. fmt.Println("err ::: ", err)
  42. if err != nil {
  43. fmt.Println("FlexibleEmploymentWithdrawForPupiaoConsume_ERR:::::", err.Error())
  44. //_ = res.Reject(true)
  45. _ = res.Reject(false)
  46. var msg interface{}
  47. json.Unmarshal(res.Body, &msg)
  48. if err.Error() == "Connection timed out" {
  49. //TODO::重新推回队列末尾,避免造成队列堵塞
  50. ch.Publish(queue.ExchangeName, msg, queue.RoutKey)
  51. } else {
  52. //TODO::推入新的队列中备份
  53. utils.FilePutContents("FlexibleEmploymentWithdrawForPupiaoConsume_ERR", utils.SerializeStr(err.Error()))
  54. ch.Publish("zhios.app.user.withdraw.apply.exception.exchange", msg, "pupiao")
  55. }
  56. } else {
  57. err = res.Ack(true)
  58. }
  59. } else {
  60. panic(errors.New("error getting message"))
  61. }
  62. }
  63. fmt.Println("get msg done")
  64. }
  65. func handleFlexibleEmploymentWithdrawForPupiaoConsume(msgData []byte) error {
  66. time.Sleep(time.Microsecond * 200) // 等待200毫秒
  67. //1、解析mq中queue的数据结构体
  68. var msg struct {
  69. Uid string `json:"uid"`
  70. Nickname string `json:"nickname"`
  71. MasterId string `json:"master_id"`
  72. AppName string `json:"app_name"`
  73. ApplyOrder string `json:"apply_order"`
  74. ActualAmount string `json:"actual_amount"`
  75. Oid string `json:"oid"`
  76. MobCfg string `json:"mob_cfg"`
  77. }
  78. err := json.Unmarshal(msgData, &msg)
  79. if err != nil {
  80. return err
  81. }
  82. fmt.Println("pupiao_message:::::::::::>>>>>>>>>")
  83. fmt.Println(msg)
  84. if db.DBs[msg.MasterId] == nil {
  85. return nil
  86. }
  87. engine := db.DBs[msg.MasterId]
  88. //1、查找对应记录
  89. flexibleEmploymentPupiaoOrdDb := db2.FlexibleEmploymentPupiaoOrdDb{}
  90. flexibleEmploymentPupiaoOrdDb.Set(msg.MasterId)
  91. flexibleEmploymentPupiaoOrd, err := flexibleEmploymentPupiaoOrdDb.Get(msg.Oid)
  92. if err != nil {
  93. return err
  94. }
  95. if flexibleEmploymentPupiaoOrd == nil {
  96. return errors.New("未查询到对应订单记录")
  97. }
  98. flexibleEmploymentPuiaoBasicDb := db2.FlexibleEmploymentPuiaoBasicDb{}
  99. flexibleEmploymentPuiaoBasicDb.Set()
  100. basic, err := flexibleEmploymentPuiaoBasicDb.GetBasic(msg.MasterId)
  101. if err != nil {
  102. return err
  103. }
  104. //2、发起制单
  105. puPiao := flexible_employment.NewPuPiao(basic.AppId, basic.AppSecret)
  106. var userAESData = []map[string]string{
  107. {
  108. "outTradeNo": "o_" + utils.Int64ToStr(flexibleEmploymentPupiaoOrd.WithdrawApplyId),
  109. "settleType": flexibleEmploymentPupiaoOrd.SettleType,
  110. "payeeName": flexibleEmploymentPupiaoOrd.PayeeName,
  111. "payeeIdCard": flexibleEmploymentPupiaoOrd.PayeeIdCard,
  112. "payeePhone": flexibleEmploymentPupiaoOrd.PayeePhone,
  113. "payeeNo": flexibleEmploymentPupiaoOrd.PayeeNo,
  114. "orderAmount": flexibleEmploymentPupiaoOrd.TotalAmount,
  115. },
  116. }
  117. str, _ := json.Marshal(userAESData)
  118. itemAESContent, _ := aes.AesEncryptByECB([]byte(basic.AppSecret), string(str))
  119. result, err := puPiao.Curl(enum.OpenApiPaymentReceiveOrder, map[string]interface{}{
  120. "outBatchNo": flexibleEmploymentPupiaoOrd.OutBatchNo,
  121. "hrcompanyId": basic.HrCompanyId,
  122. "settleAccountId": basic.SettleAccountId,
  123. "totalCount": "1",
  124. "totalAmount": flexibleEmploymentPupiaoOrd.TotalAmount,
  125. "itemAESContent": itemAESContent,
  126. })
  127. if err != nil {
  128. return err
  129. }
  130. var response struct {
  131. IsSuccess string `json:"isSuccess"`
  132. Charset string `json:"charset"`
  133. ErrorCode string `json:"errorCode"`
  134. ErrorMsg string `json:"errorMsg"`
  135. Data struct {
  136. PlatformBatchNo string `json:"platformBatchNo"`
  137. OutBatchNo string `json:"outBatchNo"`
  138. } `json:"data"`
  139. }
  140. if err = json.Unmarshal(utils.Serialize(result), &response); err != nil {
  141. return err
  142. }
  143. if response.IsSuccess == "F" {
  144. flexibleEmploymentPupiaoOrd.BatchStatus = 1
  145. updateAck, err := flexibleEmploymentPupiaoOrdDb.Update(flexibleEmploymentPupiaoOrd.Id, flexibleEmploymentPupiaoOrd, "batch_status")
  146. if err != nil {
  147. return err
  148. }
  149. if updateAck <= 0 {
  150. return errors.New("更新 flexible_employment_pupiao_ord 状态失败")
  151. }
  152. //TODO::制单失败,将处理提现失败状态
  153. finWithdrawApply, err := db.UserWithDrawApplyByUIDById(engine, utils.Int64ToStr(flexibleEmploymentPupiaoOrd.WithdrawApplyId))
  154. if err != nil {
  155. return err
  156. }
  157. session := engine.NewSession()
  158. defer session.Close()
  159. session.Begin()
  160. err = svc.DealFailResult(session, finWithdrawApply, msg.MasterId, response.ErrorMsg)
  161. if err != nil {
  162. _ = session.Rollback()
  163. return err
  164. }
  165. return session.Commit()
  166. }
  167. return nil
  168. }