golang 的 rabbitmq 消费项目
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

327 lines
9.9 KiB

  1. package consume
  2. import (
  3. "applet/app/db"
  4. "applet/app/db/model"
  5. "applet/app/svc"
  6. "applet/app/utils"
  7. "applet/app/utils/logx"
  8. "applet/consume/md"
  9. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  10. md2 "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/md"
  11. "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/rule"
  12. "encoding/json"
  13. "errors"
  14. "fmt"
  15. "github.com/streadway/amqp"
  16. "xorm.io/xorm"
  17. )
  18. func ZhiosAppreciation(queue md.MqQueue) {
  19. fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>")
  20. ch, err := rabbit.Cfg.Pool.GetChannel()
  21. if err != nil {
  22. logx.Error(err)
  23. return
  24. }
  25. defer ch.Release()
  26. //1、将自己绑定到交换机上
  27. ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
  28. //2、取出数据进行消费
  29. ch.Qos(1)
  30. delivery := ch.Consume(queue.Name, false)
  31. var res amqp.Delivery
  32. var ok bool
  33. for {
  34. res, ok = <-delivery
  35. if ok == true {
  36. //fmt.Println(string(res.Body))
  37. fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<")
  38. err = handleZhiosAppreciation(res.Body)
  39. //_ = res.Reject(false)
  40. if err == nil {
  41. _ = res.Ack(true)
  42. }
  43. } else {
  44. panic(errors.New("error getting message"))
  45. }
  46. }
  47. fmt.Println("get msg done")
  48. }
  49. func handleZhiosAppreciation(msg []byte) error {
  50. //1、解析canal采集至mq中queue的数据结构体
  51. var canalMsg *md.ZhiosAppreciation
  52. fmt.Println(string(msg))
  53. var tmpString string
  54. err := json.Unmarshal(msg, &tmpString)
  55. if err != nil {
  56. fmt.Println("===with", err.Error())
  57. return err
  58. }
  59. fmt.Println(tmpString)
  60. err = json.Unmarshal([]byte(tmpString), &canalMsg)
  61. if err != nil {
  62. fmt.Println("===with", err.Error())
  63. return err
  64. }
  65. mid := canalMsg.Mid
  66. eg := db.DBs[mid]
  67. if eg == nil {
  68. return nil
  69. }
  70. //类型 转入 exchange
  71. if canalMsg.Type == "exchange" {
  72. err := exchange(eg, canalMsg)
  73. if err != nil {
  74. return err
  75. }
  76. }
  77. //类型 提现 withdraw 到余额
  78. if canalMsg.Type == "withdraw" {
  79. err := withdraw(eg, canalMsg)
  80. if err != nil {
  81. return err
  82. }
  83. }
  84. //类型 购物销毁
  85. if canalMsg.Type == "destroy" {
  86. err := destroy(eg, canalMsg)
  87. if err != nil {
  88. return err
  89. }
  90. }
  91. //类型 购物退回
  92. if canalMsg.Type == "buy_refund" {
  93. err := buyRefund(eg, canalMsg)
  94. if err != nil {
  95. return err
  96. }
  97. }
  98. return nil
  99. }
  100. //转入 操作加入资金池和加入积分
  101. func exchange(eg *xorm.Engine, msg *md.ZhiosAppreciation) error {
  102. sess := eg.NewSession()
  103. defer sess.Close()
  104. sess.Begin()
  105. //计算出当前的价值
  106. args := make(map[string]string)
  107. json.Unmarshal([]byte(msg.Ext), &args)
  108. biliMap := caleBili(eg, sess, msg.Mid, args)
  109. ordId := utils.OrderUUID(utils.StrToInt(msg.Uid))
  110. coinMapInUse, _ := db.VirtualCoinMapInUse(eg, msg.Mid, "")
  111. //积分加入
  112. title := coinMapInUse[args["id"]].Name + "-转入"
  113. appreciationCoinId := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_coin_id")
  114. _, err := svc.ExchangeUserVirFinValidAndInterFlowWithSession(sess,
  115. utils.StrToFloat64(biliMap["in_coin"]), title, "0", 1, 109, utils.StrToInt(msg.Uid), utils.StrToInt(appreciationCoinId), 0, utils.StrToInt64(ordId), "", 0, 0)
  116. if err != nil {
  117. sess.Rollback()
  118. return err
  119. }
  120. base := db.GetAppreciationBase(sess)
  121. beforeValue := "0"
  122. beforeFlowValue := "0"
  123. if base != nil {
  124. beforeValue = base.Sum
  125. beforeFlowValue = base.FlowSum
  126. }
  127. err = db.InsertAppreciation(sess, appreciationCoinId, msg.Uid, "0", "0", msg.Oid, biliMap["coin"], biliMap["in_coin"], beforeValue, beforeFlowValue)
  128. if err != nil {
  129. sess.Rollback()
  130. return err
  131. }
  132. //加入资金池
  133. sql := `UPDATE appreciation_base SET sum=sum+?,flow_sum=flow_sum+? WHERE is_use=1;`
  134. _, err = sess.Exec(sql, utils.StrToFloat64(biliMap["coin"]), utils.StrToFloat64(biliMap["in_coin"]))
  135. if err != nil {
  136. sess.Rollback()
  137. return err
  138. }
  139. sess.Commit()
  140. return nil
  141. }
  142. //提现
  143. func withdraw(eg *xorm.Engine, msg *md.ZhiosAppreciation) error {
  144. sess := eg.NewSession()
  145. defer sess.Close()
  146. sess.Begin()
  147. args := make(map[string]string)
  148. json.Unmarshal([]byte(msg.Ext), &args)
  149. //资产价值
  150. price := ""
  151. appreciationWithdrawFee := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_withdraw_fee")
  152. appreciationWithdrawBack := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_withdraw_back")
  153. feeMap := md2.DealWithdrawalFeeResp{
  154. WithdrawalCommissionFee: utils.StrToFloat64(appreciationWithdrawFee) / 100,
  155. WithdrawalDestroyFee: (utils.StrToFloat64(appreciationWithdrawFee) - utils.StrToFloat64(appreciationWithdrawBack)) / 100,
  156. WithdrawalRefluxFee: utils.StrToFloat64(appreciationWithdrawBack) / 100,
  157. }
  158. _, resp := rule.DealWithdrawalAndDestroy(sess, feeMap, utils.StrToFloat64(args["amount"]))
  159. //这是到手的
  160. newAmount := utils.GetPrec(utils.Float64ToStrByPrec(resp.TransferOutValue-resp.AmountOut, 5), "4")
  161. //扣的
  162. coinSum := utils.GetPrec(utils.Float64ToStrByPrec(resp.TransferOutValue-resp.RefluxValue, 20), "4")
  163. price = utils.GetPrec(utils.Float64ToStrByPrec(resp.Price, 20), "4")
  164. err := svc.UpdateUserFinValidAndInterFlowSess(sess,
  165. newAmount, args["amount"]+"个数字资产转余额,价值"+price+"/个", "appreciation", 0, 56, utils.StrToInt(msg.Uid), utils.StrToInt(msg.Oid), utils.StrToInt64(msg.Oid), utils.StrToInt64(msg.Oid))
  166. if err != nil {
  167. sess.Rollback()
  168. return err
  169. }
  170. base := db.GetAppreciationBase(sess)
  171. beforeValue := "0"
  172. beforeFlowValue := "0"
  173. if base != nil {
  174. beforeValue = base.Sum
  175. beforeFlowValue = base.FlowSum
  176. }
  177. appreciationCoinId := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_coin_id")
  178. //转出
  179. err = db.InsertAppreciation(sess, appreciationCoinId, msg.Uid, "1", "1", msg.Oid, args["amount"], args["amount"], beforeValue, beforeFlowValue)
  180. if err != nil {
  181. sess.Rollback()
  182. return err
  183. }
  184. ////销毁的
  185. beforeValue = utils.GetPrec(utils.Float64ToStrByPrec(utils.StrToFloat64(beforeValue)-resp.TransferOutValue, 5), "4")
  186. beforeFlowValue = utils.GetPrec(utils.Float64ToStrByPrec(utils.StrToFloat64(beforeFlowValue)-utils.StrToFloat64(args["amount"]), 5), "4")
  187. //DestroyValue := utils.GetPrec(utils.Float64ToStrByPrec(resp.DestroyValue, 5), "4")
  188. //err = db.InsertAppreciation(sess, appreciationCoinId, args["uid"], "2", "1", DestroyValue, "0", beforeValue, beforeFlowValue)
  189. //if err != nil {
  190. // sess.Rollback()
  191. // return err
  192. //}
  193. //回流的
  194. RefluxValue := utils.GetPrec(utils.Float64ToStrByPrec(resp.RefluxValue, 5), "4")
  195. err = db.InsertAppreciation(sess, appreciationCoinId, msg.Uid, "3", "0", msg.Oid, RefluxValue, "0", beforeValue, beforeFlowValue)
  196. if err != nil {
  197. sess.Rollback()
  198. return err
  199. }
  200. sql := `UPDATE appreciation_base SET sum=sum-?,flow_sum=flow_sum-? WHERE is_use=1;`
  201. _, err = eg.Exec(sql, utils.StrToFloat64(coinSum), utils.StrToFloat64(args["amount"]))
  202. if err != nil {
  203. sess.Rollback()
  204. return err
  205. }
  206. sess.Where("ord_id=?", msg.Oid).Cols("price").Update(&model.UserVirtualCoinFlow{Price: price})
  207. sess.Commit()
  208. return nil
  209. }
  210. //购物销毁
  211. func destroy(eg *xorm.Engine, msg *md.ZhiosAppreciation) error {
  212. args := make(map[string]string)
  213. json.Unmarshal([]byte(msg.Ext), &args)
  214. sess := eg.NewSession()
  215. defer sess.Close()
  216. sess.Begin()
  217. base := db.GetAppreciationBase(sess)
  218. beforeValue := "0"
  219. beforeFlowValue := "0"
  220. if base != nil {
  221. beforeValue = base.Sum
  222. beforeFlowValue = base.FlowSum
  223. }
  224. appreciationCoinId := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_coin_id")
  225. //转出
  226. err := db.InsertAppreciation(sess, appreciationCoinId, msg.Uid, "4", "1", msg.Oid, args["coinPrice"], args["amount"], beforeValue, beforeFlowValue)
  227. if err != nil {
  228. sess.Rollback()
  229. return err
  230. }
  231. sql := `UPDATE appreciation_base SET sum=sum-?,flow_sum=flow_sum-? WHERE is_use=1;`
  232. _, err = eg.Exec(sql, utils.StrToFloat64(args["coinPrice"]), utils.StrToFloat64(args["amount"]))
  233. if err != nil {
  234. sess.Rollback()
  235. return err
  236. }
  237. sess.Commit()
  238. return nil
  239. }
  240. //购物退回
  241. func buyRefund(eg *xorm.Engine, msg *md.ZhiosAppreciation) error {
  242. args := make(map[string]string)
  243. json.Unmarshal([]byte(msg.Ext), &args)
  244. sess := eg.NewSession()
  245. defer sess.Close()
  246. sess.Begin()
  247. base := db.GetAppreciationBase(sess)
  248. beforeValue := "0"
  249. beforeFlowValue := "0"
  250. if base != nil {
  251. beforeValue = base.Sum
  252. beforeFlowValue = base.FlowSum
  253. }
  254. appreciationCoinId := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_coin_id")
  255. //转出
  256. err := db.InsertAppreciation(sess, appreciationCoinId, msg.Uid, "5", "0", msg.Oid, args["coinPrice"], args["amount"], beforeValue, beforeFlowValue)
  257. if err != nil {
  258. sess.Rollback()
  259. return err
  260. }
  261. sql := `UPDATE appreciation_base SET sum=sum+?,flow_sum=flow_sum+? WHERE is_use=1;`
  262. _, err = eg.Exec(sql, utils.StrToFloat64(args["coinPrice"]), utils.StrToFloat64(args["amount"]))
  263. if err != nil {
  264. sess.Rollback()
  265. return err
  266. }
  267. sess.Commit()
  268. return nil
  269. }
  270. func caleBili(eg *xorm.Engine, sess *xorm.Session, dbName string, args map[string]string) map[string]string {
  271. appreciationCoinId := db.SysCfgGetWithDb(eg, dbName, "appreciation_coin_id")
  272. bCoinStr := ""
  273. bcoin := ""
  274. if args["id"] == "cny" {
  275. bCoinStr = args["amount"]
  276. } else {
  277. ids := []string{args["id"], appreciationCoinId}
  278. coin := db.VirtualCoinByIds(eg, ids)
  279. aCoinBili := coin[args["id"]].ExchangeRatio
  280. //1:5=X:money X= 1:5*money
  281. amoney := (1 / utils.StrToFloat64(aCoinBili)) * utils.StrToFloat64(args["amount"])
  282. bcoin = utils.GetPrec(utils.Float64ToStrByPrec(amoney, 5), "4")
  283. //这是只返70%
  284. appreciationCoinFee := db.SysCfgGetWithDb(eg, dbName, "appreciation_coin_fee")
  285. bCoins := amoney * (utils.StrToFloat64(appreciationCoinFee) / 100)
  286. //除以当前的资产价值
  287. _, value := rule.DealTransferIn(sess, bCoins)
  288. bCoins = value
  289. bCoinStr = utils.GetPrec(utils.Float64ToStrByPrec(bCoins, 20), "4")
  290. }
  291. res := map[string]string{
  292. "in_coin": bCoinStr,
  293. "coin": bcoin,
  294. }
  295. return res
  296. }
  297. func coinPriceEg(eg *xorm.Engine) map[string]string {
  298. base := db.GetAppreciationBaseEg(eg)
  299. sum := "0"
  300. flowSum := "0"
  301. price := "1"
  302. if base != nil {
  303. sum = base.Sum
  304. flowSum = base.FlowSum
  305. }
  306. price = utils.GetPrec(utils.Float64ToStrByPrec(utils.StrToFloat64(sum)/utils.StrToFloat64(flowSum), 5), "4")
  307. res := map[string]string{
  308. "price": price,
  309. "sum": sum,
  310. "flow_sum": flowSum,
  311. }
  312. return res
  313. }