golang 的 rabbitmq 消费项目
Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.
 
 
 

323 lignes
9.7 KiB

  1. package consume
  2. import (
  3. "applet/app/db"
  4. "applet/app/svc"
  5. "applet/app/utils"
  6. "applet/app/utils/logx"
  7. "applet/consume/md"
  8. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  9. md2 "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/md"
  10. "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/rule"
  11. "encoding/json"
  12. "errors"
  13. "fmt"
  14. "github.com/streadway/amqp"
  15. "xorm.io/xorm"
  16. )
  17. func ZhiosAppreciation(queue md.MqQueue) {
  18. fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>")
  19. ch, err := rabbit.Cfg.Pool.GetChannel()
  20. if err != nil {
  21. logx.Error(err)
  22. return
  23. }
  24. defer ch.Release()
  25. //1、将自己绑定到交换机上
  26. ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
  27. //2、取出数据进行消费
  28. ch.Qos(1)
  29. delivery := ch.Consume(queue.Name, false)
  30. var res amqp.Delivery
  31. var ok bool
  32. for {
  33. res, ok = <-delivery
  34. if ok == true {
  35. //fmt.Println(string(res.Body))
  36. fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<")
  37. err = handleZhiosAppreciation(res.Body)
  38. //_ = res.Reject(false)
  39. if err == nil {
  40. _ = res.Ack(true)
  41. }
  42. } else {
  43. panic(errors.New("error getting message"))
  44. }
  45. }
  46. fmt.Println("get msg done")
  47. }
  48. func handleZhiosAppreciation(msg []byte) error {
  49. //1、解析canal采集至mq中queue的数据结构体
  50. var canalMsg *md.ZhiosAppreciation
  51. fmt.Println(string(msg))
  52. var tmpString string
  53. err := json.Unmarshal(msg, &tmpString)
  54. if err != nil {
  55. fmt.Println("===with", err.Error())
  56. return err
  57. }
  58. fmt.Println(tmpString)
  59. err = json.Unmarshal([]byte(tmpString), &canalMsg)
  60. if err != nil {
  61. fmt.Println("===with", err.Error())
  62. return err
  63. }
  64. mid := canalMsg.Mid
  65. eg := db.DBs[mid]
  66. if eg == nil {
  67. return nil
  68. }
  69. //类型 转入 exchange
  70. if canalMsg.Type == "exchange" {
  71. err := exchange(eg, canalMsg)
  72. if err != nil {
  73. return err
  74. }
  75. }
  76. //类型 提现 withdraw 到余额
  77. if canalMsg.Type == "withdraw" {
  78. err := withdraw(eg, canalMsg)
  79. if err != nil {
  80. return err
  81. }
  82. }
  83. //类型 购物销毁
  84. if canalMsg.Type == "destroy" {
  85. err := destroy(eg, canalMsg)
  86. if err != nil {
  87. return err
  88. }
  89. }
  90. //类型 购物退回
  91. if canalMsg.Type == "buy_refund" {
  92. err := buyRefund(eg, canalMsg)
  93. if err != nil {
  94. return err
  95. }
  96. }
  97. return nil
  98. }
  99. //转入 操作加入资金池和加入积分
  100. func exchange(eg *xorm.Engine, msg *md.ZhiosAppreciation) error {
  101. sess := eg.NewSession()
  102. defer sess.Close()
  103. sess.Begin()
  104. //计算出当前的价值
  105. args := make(map[string]string)
  106. json.Unmarshal([]byte(msg.Ext), &args)
  107. biliMap := caleBili(eg, sess, msg.Mid, args)
  108. ordId := utils.OrderUUID(utils.StrToInt(msg.Uid))
  109. coinMapInUse, _ := db.VirtualCoinMapInUse(eg, msg.Mid, "")
  110. //积分加入
  111. title := coinMapInUse[args["id"]].Name + "-转入"
  112. appreciationCoinId := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_coin_id")
  113. _, err := svc.ExchangeUserVirFinValidAndInterFlowWithSession(sess,
  114. utils.StrToFloat64(biliMap["in_coin"]), title, "0", 1, 109, utils.StrToInt(msg.Uid), utils.StrToInt(appreciationCoinId), 0, utils.StrToInt64(ordId), "", 0, 0)
  115. if err != nil {
  116. sess.Rollback()
  117. return err
  118. }
  119. base := db.GetAppreciationBase(sess)
  120. beforeValue := "0"
  121. beforeFlowValue := "0"
  122. if base != nil {
  123. beforeValue = base.Sum
  124. beforeFlowValue = base.FlowSum
  125. }
  126. err = db.InsertAppreciation(sess, appreciationCoinId, msg.Uid, "0", "0", msg.Oid, biliMap["coin"], biliMap["in_coin"], beforeValue, beforeFlowValue)
  127. if err != nil {
  128. sess.Rollback()
  129. return err
  130. }
  131. //加入资金池
  132. sql := `UPDATE appreciation_base SET sum=sum+?,flow_sum=flow_sum+? WHERE is_use=1;`
  133. _, err = sess.Exec(sql, utils.StrToFloat64(biliMap["coin"]), utils.StrToFloat64(biliMap["in_coin"]))
  134. if err != nil {
  135. sess.Rollback()
  136. return err
  137. }
  138. sess.Commit()
  139. return nil
  140. }
  141. //提现
  142. func withdraw(eg *xorm.Engine, msg *md.ZhiosAppreciation) error {
  143. sess := eg.NewSession()
  144. defer sess.Close()
  145. sess.Begin()
  146. args := make(map[string]string)
  147. json.Unmarshal([]byte(msg.Ext), &args)
  148. //资产价值
  149. appreciationWithdrawFee := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_withdraw_fee")
  150. appreciationWithdrawBack := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_withdraw_back")
  151. feeMap := md2.DealWithdrawalFeeResp{
  152. WithdrawalCommissionFee: utils.StrToFloat64(appreciationWithdrawFee) / 100,
  153. WithdrawalDestroyFee: (utils.StrToFloat64(appreciationWithdrawFee) - utils.StrToFloat64(appreciationWithdrawBack)) / 100,
  154. WithdrawalRefluxFee: utils.StrToFloat64(appreciationWithdrawBack) / 100,
  155. }
  156. _, resp := rule.DealWithdrawalAndDestroy(sess, feeMap, utils.StrToFloat64(args["amount"]))
  157. //这是到手的
  158. newAmount := utils.GetPrec(utils.Float64ToStrByPrec(resp.TransferOutValue-resp.AmountOut, 5), "4")
  159. //扣的
  160. coinSum := utils.GetPrec(utils.Float64ToStrByPrec(resp.TransferOut-resp.RefluxValue, 20), "4")
  161. err := svc.UpdateUserFinValidAndInterFlowSess(sess,
  162. newAmount, args["amount"]+"个积分转余额", "appreciation", 0, 56, utils.StrToInt(msg.Uid), utils.StrToInt(msg.Oid), utils.StrToInt64(msg.Oid), utils.StrToInt64(msg.Oid))
  163. if err != nil {
  164. sess.Rollback()
  165. return err
  166. }
  167. base := db.GetAppreciationBase(sess)
  168. beforeValue := "0"
  169. beforeFlowValue := "0"
  170. if base != nil {
  171. beforeValue = base.Sum
  172. beforeFlowValue = base.FlowSum
  173. }
  174. appreciationCoinId := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_coin_id")
  175. //转出
  176. err = db.InsertAppreciation(sess, appreciationCoinId, msg.Uid, "1", "1", msg.Oid, args["amount"], args["amount"], beforeValue, beforeFlowValue)
  177. if err != nil {
  178. sess.Rollback()
  179. return err
  180. }
  181. ////销毁的
  182. beforeValue = utils.GetPrec(utils.Float64ToStrByPrec(utils.StrToFloat64(beforeValue)-utils.StrToFloat64(args["amount"]), 5), "4")
  183. beforeFlowValue = utils.GetPrec(utils.Float64ToStrByPrec(utils.StrToFloat64(beforeFlowValue)-utils.StrToFloat64(args["amount"]), 5), "4")
  184. //DestroyValue := utils.GetPrec(utils.Float64ToStrByPrec(resp.DestroyValue, 5), "4")
  185. //err = db.InsertAppreciation(sess, appreciationCoinId, args["uid"], "2", "1", DestroyValue, "0", beforeValue, beforeFlowValue)
  186. //if err != nil {
  187. // sess.Rollback()
  188. // return err
  189. //}
  190. //回流的
  191. RefluxValue := utils.GetPrec(utils.Float64ToStrByPrec(resp.RefluxValue, 5), "4")
  192. err = db.InsertAppreciation(sess, appreciationCoinId, msg.Uid, "3", "0", msg.Oid, RefluxValue, "0", beforeValue, beforeFlowValue)
  193. if err != nil {
  194. sess.Rollback()
  195. return err
  196. }
  197. sql := `UPDATE appreciation_base SET sum=sum-?,flow_sum=flow_sum-? WHERE is_use=1;`
  198. _, err = eg.Exec(sql, utils.StrToFloat64(coinSum), utils.StrToFloat64(args["amount"]))
  199. if err != nil {
  200. sess.Rollback()
  201. return err
  202. }
  203. sess.Commit()
  204. return nil
  205. }
  206. //购物销毁
  207. func destroy(eg *xorm.Engine, msg *md.ZhiosAppreciation) error {
  208. args := make(map[string]string)
  209. json.Unmarshal([]byte(msg.Ext), &args)
  210. sess := eg.NewSession()
  211. defer sess.Close()
  212. sess.Begin()
  213. base := db.GetAppreciationBase(sess)
  214. beforeValue := "0"
  215. beforeFlowValue := "0"
  216. if base != nil {
  217. beforeValue = base.Sum
  218. beforeFlowValue = base.FlowSum
  219. }
  220. appreciationCoinId := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_coin_id")
  221. //转出
  222. err := db.InsertAppreciation(sess, appreciationCoinId, msg.Uid, "4", "1", msg.Oid, args["amount"], args["amount"], beforeValue, beforeFlowValue)
  223. if err != nil {
  224. sess.Rollback()
  225. return err
  226. }
  227. sql := `UPDATE appreciation_base SET sum=sum-?,flow_sum=flow_sum-? WHERE is_use=1;`
  228. _, err = eg.Exec(sql, utils.StrToFloat64(args["amount"]), utils.StrToFloat64(args["amount"]))
  229. if err != nil {
  230. sess.Rollback()
  231. return err
  232. }
  233. sess.Commit()
  234. return nil
  235. }
  236. //购物退回
  237. func buyRefund(eg *xorm.Engine, msg *md.ZhiosAppreciation) error {
  238. args := make(map[string]string)
  239. json.Unmarshal([]byte(msg.Ext), &args)
  240. sess := eg.NewSession()
  241. defer sess.Close()
  242. sess.Begin()
  243. base := db.GetAppreciationBase(sess)
  244. beforeValue := "0"
  245. beforeFlowValue := "0"
  246. if base != nil {
  247. beforeValue = base.Sum
  248. beforeFlowValue = base.FlowSum
  249. }
  250. appreciationCoinId := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_coin_id")
  251. //转出
  252. err := db.InsertAppreciation(sess, appreciationCoinId, msg.Uid, "5", "0", msg.Oid, args["amount"], args["amount"], beforeValue, beforeFlowValue)
  253. if err != nil {
  254. sess.Rollback()
  255. return err
  256. }
  257. sql := `UPDATE appreciation_base SET sum=sum+?,flow_sum=flow_sum+? WHERE is_use=1;`
  258. _, err = eg.Exec(sql, utils.StrToFloat64(args["amount"]), utils.StrToFloat64(args["amount"]))
  259. if err != nil {
  260. sess.Rollback()
  261. return err
  262. }
  263. sess.Commit()
  264. return nil
  265. }
  266. func caleBili(eg *xorm.Engine, sess *xorm.Session, dbName string, args map[string]string) map[string]string {
  267. appreciationCoinId := db.SysCfgGetWithDb(eg, dbName, "appreciation_coin_id")
  268. bCoinStr := ""
  269. bcoin := ""
  270. if args["id"] == "cny" {
  271. bCoinStr = args["amount"]
  272. } else {
  273. ids := []string{args["id"], appreciationCoinId}
  274. coin := db.VirtualCoinByIds(eg, ids)
  275. aCoinBili := coin[args["id"]].ExchangeRatio
  276. //1:5=X:money X= 1:5*money
  277. amoney := (1 / utils.StrToFloat64(aCoinBili)) * utils.StrToFloat64(args["amount"])
  278. bcoin = utils.GetPrec(utils.Float64ToStrByPrec(amoney, 5), "4")
  279. //这是只返70%
  280. appreciationCoinFee := db.SysCfgGetWithDb(eg, dbName, "appreciation_coin_fee")
  281. bCoins := amoney * (utils.StrToFloat64(appreciationCoinFee) / 100)
  282. //除以当前的资产价值
  283. _, value := rule.DealTransferIn(sess, bCoins)
  284. bCoins = value
  285. bCoinStr = utils.GetPrec(utils.Float64ToStrByPrec(bCoins, 20), "4")
  286. }
  287. res := map[string]string{
  288. "in_coin": bCoinStr,
  289. "coin": bcoin,
  290. }
  291. return res
  292. }
  293. func coinPriceEg(eg *xorm.Engine) map[string]string {
  294. base := db.GetAppreciationBaseEg(eg)
  295. sum := "0"
  296. flowSum := "0"
  297. price := "1"
  298. if base != nil {
  299. sum = base.Sum
  300. flowSum = base.FlowSum
  301. }
  302. price = utils.GetPrec(utils.Float64ToStrByPrec(utils.StrToFloat64(sum)/utils.StrToFloat64(flowSum), 5), "4")
  303. res := map[string]string{
  304. "price": price,
  305. "sum": sum,
  306. "flow_sum": flowSum,
  307. }
  308. return res
  309. }