golang 的 rabbitmq 消费项目
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

withdraw_consume_pupiao.go 5.6 KiB

7 months ago
6 months ago
7 months ago
6 months ago
7 months ago
6 months ago
7 months ago
6 months ago
7 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. package consume
  2. import (
  3. "applet/app/cfg"
  4. "applet/app/db"
  5. db2 "applet/app/flexible_employment/db"
  6. "applet/app/flexible_employment/enum"
  7. "applet/app/flexible_employment/svc"
  8. "applet/app/flexible_employment/utils/aes"
  9. "applet/app/lib/flexible_employment"
  10. "applet/app/utils"
  11. "applet/app/utils/logx"
  12. "applet/consume/md"
  13. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  14. "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/rule/one_circles"
  15. "encoding/json"
  16. "errors"
  17. "fmt"
  18. "github.com/streadway/amqp"
  19. "time"
  20. )
  21. func FlexibleEmploymentWithdrawForPupiaoConsume(queue md.MqQueue) {
  22. fmt.Println(">>>>>>>>>>>>FlexibleEmploymentWithdrawForPupiaoConsume>>>>>>>>>>>>")
  23. ch, err := rabbit.Cfg.Pool.GetChannel()
  24. if err != nil {
  25. logx.Error(err)
  26. return
  27. }
  28. defer ch.Release()
  29. //1、将自己绑定到交换机上
  30. ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
  31. //2、取出数据进行消费
  32. ch.Qos(1)
  33. delivery := ch.Consume(queue.Name, false)
  34. one_circles.Init(cfg.RedisAddr)
  35. var res amqp.Delivery
  36. var ok bool
  37. for {
  38. res, ok = <-delivery
  39. if ok == true {
  40. err = handleFlexibleEmploymentWithdrawForPupiaoConsume(res.Body)
  41. fmt.Println("err ::: ", err)
  42. if err != nil {
  43. fmt.Println("FlexibleEmploymentWithdrawForPupiaoConsume_ERR:::::", err.Error())
  44. //_ = res.Reject(true)
  45. _ = res.Reject(false)
  46. var msg interface{}
  47. json.Unmarshal(res.Body, &msg)
  48. if err.Error() == "Connection timed out" {
  49. //TODO::重新推回队列末尾,避免造成队列堵塞
  50. ch.Publish(queue.ExchangeName, msg, queue.RoutKey)
  51. } else {
  52. //TODO::推入新的队列中备份
  53. utils.FilePutContents("FlexibleEmploymentWithdrawForPupiaoConsume_ERR", utils.SerializeStr(err.Error()))
  54. ch.Publish("zhios.app.user.withdraw.apply.exception.exchange", msg, "pupiao")
  55. }
  56. } else {
  57. err = res.Ack(true)
  58. }
  59. } else {
  60. panic(errors.New("error getting message"))
  61. }
  62. }
  63. fmt.Println("get msg done")
  64. }
  65. func handleFlexibleEmploymentWithdrawForPupiaoConsume(msgData []byte) error {
  66. var ms string
  67. err := json.Unmarshal(msgData, &ms)
  68. if err != nil {
  69. return err
  70. }
  71. time.Sleep(time.Microsecond * 200) // 等待200毫秒
  72. //1、解析mq中queue的数据结构体
  73. var msg struct {
  74. Uid string `json:"uid"`
  75. Nickname string `json:"nickname"`
  76. MasterId string `json:"master_id"`
  77. AppName string `json:"app_name"`
  78. ApplyOrder string `json:"apply_order"`
  79. ActualAmount string `json:"actual_amount"`
  80. Oid string `json:"oid"`
  81. MobCfg interface{} `json:"mob_cfg"`
  82. }
  83. err = json.Unmarshal([]byte(ms), &msg)
  84. if err != nil {
  85. return err
  86. }
  87. fmt.Println("pupiao_message:::::::::::>>>>>>>>>")
  88. fmt.Println(msg)
  89. if db.DBs[msg.MasterId] == nil {
  90. return nil
  91. }
  92. engine := db.DBs[msg.MasterId]
  93. //1、查找对应记录
  94. flexibleEmploymentPupiaoOrdDb := db2.FlexibleEmploymentPupiaoOrdDb{}
  95. flexibleEmploymentPupiaoOrdDb.Set(msg.MasterId)
  96. flexibleEmploymentPupiaoOrd, err := flexibleEmploymentPupiaoOrdDb.Get(msg.Oid)
  97. if err != nil {
  98. return err
  99. }
  100. if flexibleEmploymentPupiaoOrd == nil {
  101. return errors.New("未查询到对应订单记录")
  102. }
  103. flexibleEmploymentPuiaoBasicDb := db2.FlexibleEmploymentPuiaoBasicDb{}
  104. flexibleEmploymentPuiaoBasicDb.Set()
  105. basic, err := flexibleEmploymentPuiaoBasicDb.GetBasic(msg.MasterId)
  106. if err != nil {
  107. return err
  108. }
  109. //2、发起制单
  110. puPiao := flexible_employment.NewPuPiao(basic.AppId, basic.AppSecret)
  111. var userAESData = []map[string]string{
  112. {
  113. "outTradeNo": "o_" + utils.Int64ToStr(flexibleEmploymentPupiaoOrd.WithdrawApplyId),
  114. "settleType": flexibleEmploymentPupiaoOrd.SettleType,
  115. "payeeName": flexibleEmploymentPupiaoOrd.PayeeName,
  116. "payeeIdCard": flexibleEmploymentPupiaoOrd.PayeeIdCard,
  117. "payeePhone": flexibleEmploymentPupiaoOrd.PayeePhone,
  118. "payeeNo": flexibleEmploymentPupiaoOrd.PayeeNo,
  119. "orderAmount": flexibleEmploymentPupiaoOrd.TotalAmount,
  120. },
  121. }
  122. str, _ := json.Marshal(userAESData)
  123. itemAESContent, _ := aes.AesEncryptByECB([]byte(basic.AppSecret), string(str))
  124. result, err := puPiao.Curl(enum.OpenApiPaymentReceiveOrder, map[string]interface{}{
  125. "outBatchNo": flexibleEmploymentPupiaoOrd.OutBatchNo,
  126. "hrcompanyId": basic.HrCompanyId,
  127. "settleAccountId": basic.SettleAccountId,
  128. "totalCount": "1",
  129. "totalAmount": flexibleEmploymentPupiaoOrd.TotalAmount,
  130. "itemAESContent": itemAESContent,
  131. })
  132. if err != nil {
  133. return err
  134. }
  135. var response struct {
  136. IsSuccess string `json:"isSuccess"`
  137. Charset string `json:"charset"`
  138. ErrorCode string `json:"errorCode"`
  139. ErrorMsg string `json:"errorMsg"`
  140. Data struct {
  141. PlatformBatchNo string `json:"platformBatchNo"`
  142. OutBatchNo string `json:"outBatchNo"`
  143. } `json:"data"`
  144. }
  145. if err = json.Unmarshal(utils.Serialize(result), &response); err != nil {
  146. return err
  147. }
  148. if response.IsSuccess == "F" {
  149. flexibleEmploymentPupiaoOrd.BatchStatus = 1
  150. updateAck, err := flexibleEmploymentPupiaoOrdDb.Update(flexibleEmploymentPupiaoOrd.Id, flexibleEmploymentPupiaoOrd, "batch_status")
  151. if err != nil {
  152. return err
  153. }
  154. if updateAck <= 0 {
  155. return errors.New("更新 flexible_employment_pupiao_ord 状态失败")
  156. }
  157. //TODO::制单失败,将处理提现失败状态
  158. finWithdrawApply, err := db.UserWithDrawApplyByUIDById(engine, utils.Int64ToStr(flexibleEmploymentPupiaoOrd.WithdrawApplyId))
  159. if err != nil {
  160. return err
  161. }
  162. session := engine.NewSession()
  163. defer session.Close()
  164. session.Begin()
  165. err = svc.DealFailResultForPuPiao(session, finWithdrawApply, msg.MasterId, response.ErrorMsg)
  166. if err != nil {
  167. _ = session.Rollback()
  168. return err
  169. }
  170. return session.Commit()
  171. }
  172. return nil
  173. }