golang 的 rabbitmq 消费项目
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

285 lines
8.6 KiB

  1. package consume
  2. import (
  3. "applet/app/db"
  4. "applet/app/svc"
  5. "applet/app/utils"
  6. "applet/app/utils/logx"
  7. "applet/consume/md"
  8. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  9. md2 "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/md"
  10. "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/rule"
  11. "encoding/json"
  12. "errors"
  13. "fmt"
  14. "github.com/streadway/amqp"
  15. "xorm.io/xorm"
  16. )
  17. func ZhiosAppreciation(queue md.MqQueue) {
  18. fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>")
  19. ch, err := rabbit.Cfg.Pool.GetChannel()
  20. if err != nil {
  21. logx.Error(err)
  22. return
  23. }
  24. defer ch.Release()
  25. //1、将自己绑定到交换机上
  26. ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
  27. //2、取出数据进行消费
  28. ch.Qos(1)
  29. delivery := ch.Consume(queue.Name, false)
  30. var res amqp.Delivery
  31. var ok bool
  32. for {
  33. res, ok = <-delivery
  34. if ok == true {
  35. //fmt.Println(string(res.Body))
  36. fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<")
  37. err = handleZhiosAppreciation(res.Body)
  38. //_ = res.Reject(false)
  39. if err == nil {
  40. _ = res.Ack(true)
  41. }
  42. } else {
  43. panic(errors.New("error getting message"))
  44. }
  45. }
  46. fmt.Println("get msg done")
  47. }
  48. func handleZhiosAppreciation(msg []byte) error {
  49. //1、解析canal采集至mq中queue的数据结构体
  50. var canalMsg *md.ZhiosAppreciation
  51. fmt.Println(string(msg))
  52. var tmpString string
  53. err := json.Unmarshal(msg, &tmpString)
  54. if err != nil {
  55. fmt.Println("===with", err.Error())
  56. return err
  57. }
  58. fmt.Println(tmpString)
  59. err = json.Unmarshal([]byte(tmpString), &canalMsg)
  60. if err != nil {
  61. fmt.Println("===with", err.Error())
  62. return err
  63. }
  64. mid := canalMsg.Mid
  65. eg := db.DBs[mid]
  66. if eg == nil {
  67. return nil
  68. }
  69. //类型 转入 exchange
  70. if canalMsg.Type == "exchange" {
  71. err := exchange(eg, canalMsg)
  72. if err != nil {
  73. return err
  74. }
  75. }
  76. //类型 提现 withdraw 到余额
  77. if canalMsg.Type == "withdraw" {
  78. err := withdraw(eg, canalMsg)
  79. if err != nil {
  80. return err
  81. }
  82. }
  83. //类型 购物销毁
  84. if canalMsg.Type == "destroy" {
  85. err := destroy(eg, canalMsg)
  86. if err != nil {
  87. return err
  88. }
  89. }
  90. return nil
  91. }
  92. //转入 操作加入资金池和加入积分
  93. func exchange(eg *xorm.Engine, msg *md.ZhiosAppreciation) error {
  94. sess := eg.NewSession()
  95. defer sess.Close()
  96. sess.Begin()
  97. //计算出当前的价值
  98. args := make(map[string]string)
  99. json.Unmarshal([]byte(msg.Ext), &args)
  100. biliMap := caleBili(eg, sess, msg.Mid, args)
  101. ordId := utils.OrderUUID(utils.StrToInt(msg.Uid))
  102. coinMapInUse, _ := db.VirtualCoinMapInUse(eg, msg.Mid, "")
  103. //积分加入
  104. title := coinMapInUse[args["id"]].Name + "-转入"
  105. appreciationCoinId := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_coin_id")
  106. _, err := svc.ExchangeUserVirFinValidAndInterFlowWithSession(sess,
  107. utils.StrToFloat64(biliMap["in_coin"]), title, "0", 1, 109, utils.StrToInt(msg.Uid), utils.StrToInt(appreciationCoinId), 0, utils.StrToInt64(ordId), "", 0, 0)
  108. if err != nil {
  109. sess.Rollback()
  110. return err
  111. }
  112. base := db.GetAppreciationBase(sess)
  113. beforeValue := "0"
  114. beforeFlowValue := "0"
  115. if base != nil {
  116. beforeValue = base.Sum
  117. beforeFlowValue = base.FlowSum
  118. }
  119. err = db.InsertAppreciation(sess, appreciationCoinId, msg.Uid, "0", "0", msg.Oid, biliMap["coin"], biliMap["in_coin"], beforeValue, beforeFlowValue)
  120. if err != nil {
  121. sess.Rollback()
  122. return err
  123. }
  124. //加入资金池
  125. sql := `UPDATE appreciation_base SET sum=sum+?,flow_sum=flow_sum+? WHERE is_use=1;`
  126. _, err = sess.Exec(sql, utils.StrToFloat64(biliMap["coin"]), utils.StrToFloat64(biliMap["in_coin"]))
  127. if err != nil {
  128. sess.Rollback()
  129. return err
  130. }
  131. sess.Commit()
  132. return nil
  133. }
  134. //提现
  135. func withdraw(eg *xorm.Engine, msg *md.ZhiosAppreciation) error {
  136. sess := eg.NewSession()
  137. defer sess.Close()
  138. sess.Begin()
  139. args := make(map[string]string)
  140. json.Unmarshal([]byte(msg.Ext), &args)
  141. //资产价值
  142. appreciationWithdrawFee := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_withdraw_fee")
  143. appreciationWithdrawBack := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_withdraw_back")
  144. feeMap := md2.DealWithdrawalFeeResp{
  145. WithdrawalCommissionFee: utils.StrToFloat64(appreciationWithdrawFee) / 100,
  146. WithdrawalDestroyFee: (utils.StrToFloat64(appreciationWithdrawFee) - utils.StrToFloat64(appreciationWithdrawBack)) / 100,
  147. WithdrawalRefluxFee: utils.StrToFloat64(appreciationWithdrawBack) / 100,
  148. }
  149. _, resp := rule.DealWithdrawalAndDestroy(sess, feeMap, utils.StrToFloat64(args["amount"]))
  150. //这是到手的
  151. newAmount := utils.GetPrec(utils.Float64ToStrByPrec(resp.TransferOutValue-resp.AmountOut, 5), "4")
  152. //扣的
  153. coinSum := utils.GetPrec(utils.Float64ToStrByPrec(resp.TransferOut-resp.RefluxValue, 20), "4")
  154. err := svc.UpdateUserFinValidAndInterFlowSess(sess,
  155. newAmount, args["amount"]+"个积分转余额", "appreciation", 0, 56, utils.StrToInt(msg.Uid), utils.StrToInt(msg.Oid), utils.StrToInt64(msg.Oid), utils.StrToInt64(msg.Oid))
  156. if err != nil {
  157. sess.Rollback()
  158. return err
  159. }
  160. base := db.GetAppreciationBase(sess)
  161. beforeValue := "0"
  162. beforeFlowValue := "0"
  163. if base != nil {
  164. beforeValue = base.Sum
  165. beforeFlowValue = base.FlowSum
  166. }
  167. appreciationCoinId := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_coin_id")
  168. //转出
  169. err = db.InsertAppreciation(sess, appreciationCoinId, msg.Uid, "1", "1", msg.Oid, args["amount"], args["amount"], beforeValue, beforeFlowValue)
  170. if err != nil {
  171. sess.Rollback()
  172. return err
  173. }
  174. ////销毁的
  175. beforeValue = utils.GetPrec(utils.Float64ToStrByPrec(utils.StrToFloat64(beforeValue)-utils.StrToFloat64(args["amount"]), 5), "4")
  176. beforeFlowValue = utils.GetPrec(utils.Float64ToStrByPrec(utils.StrToFloat64(beforeFlowValue)-utils.StrToFloat64(args["amount"]), 5), "4")
  177. //DestroyValue := utils.GetPrec(utils.Float64ToStrByPrec(resp.DestroyValue, 5), "4")
  178. //err = db.InsertAppreciation(sess, appreciationCoinId, args["uid"], "2", "1", DestroyValue, "0", beforeValue, beforeFlowValue)
  179. //if err != nil {
  180. // sess.Rollback()
  181. // return err
  182. //}
  183. //回流的
  184. RefluxValue := utils.GetPrec(utils.Float64ToStrByPrec(resp.RefluxValue, 5), "4")
  185. err = db.InsertAppreciation(sess, appreciationCoinId, msg.Uid, "3", "0", msg.Oid, RefluxValue, "0", beforeValue, beforeFlowValue)
  186. if err != nil {
  187. sess.Rollback()
  188. return err
  189. }
  190. sql := `UPDATE appreciation_base SET sum=sum-?,flow_sum=flow_sum-? WHERE is_use=1;`
  191. _, err = eg.Exec(sql, utils.StrToFloat64(coinSum), utils.StrToFloat64(args["amount"]))
  192. if err != nil {
  193. sess.Rollback()
  194. return err
  195. }
  196. sess.Commit()
  197. return nil
  198. }
  199. //购物销毁
  200. func destroy(eg *xorm.Engine, msg *md.ZhiosAppreciation) error {
  201. args := make(map[string]string)
  202. json.Unmarshal([]byte(msg.Ext), &args)
  203. sess := eg.NewSession()
  204. defer sess.Close()
  205. sess.Begin()
  206. base := db.GetAppreciationBase(sess)
  207. beforeValue := "0"
  208. beforeFlowValue := "0"
  209. if base != nil {
  210. beforeValue = base.Sum
  211. beforeFlowValue = base.FlowSum
  212. }
  213. appreciationCoinId := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_coin_id")
  214. //转出
  215. err := db.InsertAppreciation(sess, appreciationCoinId, msg.Uid, "4", "1", msg.Oid, args["amount"], args["amount"], beforeValue, beforeFlowValue)
  216. if err != nil {
  217. sess.Rollback()
  218. return err
  219. }
  220. sql := `UPDATE appreciation_base SET sum=sum-?,flow_sum=flow_sum-? WHERE is_use=1;`
  221. _, err = eg.Exec(sql, utils.StrToFloat64(args["amount"]), utils.StrToFloat64(args["amount"]))
  222. if err != nil {
  223. sess.Rollback()
  224. return err
  225. }
  226. sess.Commit()
  227. return nil
  228. }
  229. func caleBili(eg *xorm.Engine, sess *xorm.Session, dbName string, args map[string]string) map[string]string {
  230. appreciationCoinId := db.SysCfgGetWithDb(eg, dbName, "appreciation_coin_id")
  231. bCoinStr := ""
  232. bcoin := ""
  233. if args["id"] == "cny" {
  234. bCoinStr = args["amount"]
  235. } else {
  236. ids := []string{args["id"], appreciationCoinId}
  237. coin := db.VirtualCoinByIds(eg, ids)
  238. aCoinBili := coin[args["id"]].ExchangeRatio
  239. //1:5=X:money X= 1:5*money
  240. amoney := (1 / utils.StrToFloat64(aCoinBili)) * utils.StrToFloat64(args["amount"])
  241. bcoin = utils.GetPrec(utils.Float64ToStrByPrec(amoney, 5), "4")
  242. //这是只返70%
  243. appreciationCoinFee := db.SysCfgGetWithDb(eg, dbName, "appreciation_coin_fee")
  244. bCoins := amoney * (utils.StrToFloat64(appreciationCoinFee) / 100)
  245. //除以当前的资产价值
  246. _, value := rule.DealTransferIn(sess, bCoins)
  247. bCoins = value
  248. bCoinStr = utils.GetPrec(utils.Float64ToStrByPrec(bCoins, 20), "4")
  249. }
  250. res := map[string]string{
  251. "in_coin": bCoinStr,
  252. "coin": bcoin,
  253. }
  254. return res
  255. }
  256. func coinPriceEg(eg *xorm.Engine) map[string]string {
  257. base := db.GetAppreciationBaseEg(eg)
  258. sum := "0"
  259. flowSum := "0"
  260. price := "1"
  261. if base != nil {
  262. sum = base.Sum
  263. flowSum = base.FlowSum
  264. }
  265. price = utils.GetPrec(utils.Float64ToStrByPrec(utils.StrToFloat64(sum)/utils.StrToFloat64(flowSum), 5), "4")
  266. res := map[string]string{
  267. "price": price,
  268. "sum": sum,
  269. "flow_sum": flowSum,
  270. }
  271. return res
  272. }