golang 的 rabbitmq 消费项目
No puede seleccionar más de 25 temas Los temas deben comenzar con una letra o número, pueden incluir guiones ('-') y pueden tener hasta 35 caracteres de largo.
 
 
 

325 líneas
9.8 KiB

  1. package consume
  2. import (
  3. "applet/app/db"
  4. "applet/app/svc"
  5. "applet/app/utils"
  6. "applet/app/utils/logx"
  7. "applet/consume/md"
  8. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  9. md2 "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/md"
  10. "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/rule"
  11. "encoding/json"
  12. "errors"
  13. "fmt"
  14. "github.com/streadway/amqp"
  15. "xorm.io/xorm"
  16. )
  17. func ZhiosAppreciation(queue md.MqQueue) {
  18. fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>")
  19. ch, err := rabbit.Cfg.Pool.GetChannel()
  20. if err != nil {
  21. logx.Error(err)
  22. return
  23. }
  24. defer ch.Release()
  25. //1、将自己绑定到交换机上
  26. ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
  27. //2、取出数据进行消费
  28. ch.Qos(1)
  29. delivery := ch.Consume(queue.Name, false)
  30. var res amqp.Delivery
  31. var ok bool
  32. for {
  33. res, ok = <-delivery
  34. if ok == true {
  35. //fmt.Println(string(res.Body))
  36. fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<")
  37. err = handleZhiosAppreciation(res.Body)
  38. //_ = res.Reject(false)
  39. if err == nil {
  40. _ = res.Ack(true)
  41. }
  42. } else {
  43. panic(errors.New("error getting message"))
  44. }
  45. }
  46. fmt.Println("get msg done")
  47. }
  48. func handleZhiosAppreciation(msg []byte) error {
  49. //1、解析canal采集至mq中queue的数据结构体
  50. var canalMsg *md.ZhiosAppreciation
  51. fmt.Println(string(msg))
  52. var tmpString string
  53. err := json.Unmarshal(msg, &tmpString)
  54. if err != nil {
  55. fmt.Println("===with", err.Error())
  56. return err
  57. }
  58. fmt.Println(tmpString)
  59. err = json.Unmarshal([]byte(tmpString), &canalMsg)
  60. if err != nil {
  61. fmt.Println("===with", err.Error())
  62. return err
  63. }
  64. mid := canalMsg.Mid
  65. eg := db.DBs[mid]
  66. if eg == nil {
  67. return nil
  68. }
  69. //类型 转入 exchange
  70. if canalMsg.Type == "exchange" {
  71. err := exchange(eg, canalMsg)
  72. if err != nil {
  73. return err
  74. }
  75. }
  76. //类型 提现 withdraw 到余额
  77. if canalMsg.Type == "withdraw" {
  78. err := withdraw(eg, canalMsg)
  79. if err != nil {
  80. return err
  81. }
  82. }
  83. //类型 购物销毁
  84. if canalMsg.Type == "destroy" {
  85. err := destroy(eg, canalMsg)
  86. if err != nil {
  87. return err
  88. }
  89. }
  90. //类型 购物退回
  91. if canalMsg.Type == "buy_refund" {
  92. err := buyRefund(eg, canalMsg)
  93. if err != nil {
  94. return err
  95. }
  96. }
  97. return nil
  98. }
  99. //转入 操作加入资金池和加入积分
  100. func exchange(eg *xorm.Engine, msg *md.ZhiosAppreciation) error {
  101. sess := eg.NewSession()
  102. defer sess.Close()
  103. sess.Begin()
  104. //计算出当前的价值
  105. args := make(map[string]string)
  106. json.Unmarshal([]byte(msg.Ext), &args)
  107. biliMap := caleBili(eg, sess, msg.Mid, args)
  108. ordId := utils.OrderUUID(utils.StrToInt(msg.Uid))
  109. coinMapInUse, _ := db.VirtualCoinMapInUse(eg, msg.Mid, "")
  110. //积分加入
  111. title := coinMapInUse[args["id"]].Name + "-转入"
  112. appreciationCoinId := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_coin_id")
  113. _, err := svc.ExchangeUserVirFinValidAndInterFlowWithSession(sess,
  114. utils.StrToFloat64(biliMap["in_coin"]), title, "0", 1, 109, utils.StrToInt(msg.Uid), utils.StrToInt(appreciationCoinId), 0, utils.StrToInt64(ordId), "", 0, 0)
  115. if err != nil {
  116. sess.Rollback()
  117. return err
  118. }
  119. base := db.GetAppreciationBase(sess)
  120. beforeValue := "0"
  121. beforeFlowValue := "0"
  122. if base != nil {
  123. beforeValue = base.Sum
  124. beforeFlowValue = base.FlowSum
  125. }
  126. err = db.InsertAppreciation(sess, appreciationCoinId, msg.Uid, "0", "0", msg.Oid, biliMap["coin"], biliMap["in_coin"], beforeValue, beforeFlowValue)
  127. if err != nil {
  128. sess.Rollback()
  129. return err
  130. }
  131. //加入资金池
  132. sql := `UPDATE appreciation_base SET sum=sum+?,flow_sum=flow_sum+? WHERE is_use=1;`
  133. _, err = sess.Exec(sql, utils.StrToFloat64(biliMap["coin"]), utils.StrToFloat64(biliMap["in_coin"]))
  134. if err != nil {
  135. sess.Rollback()
  136. return err
  137. }
  138. sess.Commit()
  139. return nil
  140. }
  141. //提现
  142. func withdraw(eg *xorm.Engine, msg *md.ZhiosAppreciation) error {
  143. sess := eg.NewSession()
  144. defer sess.Close()
  145. sess.Begin()
  146. args := make(map[string]string)
  147. json.Unmarshal([]byte(msg.Ext), &args)
  148. //资产价值
  149. price := ""
  150. appreciationWithdrawFee := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_withdraw_fee")
  151. appreciationWithdrawBack := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_withdraw_back")
  152. feeMap := md2.DealWithdrawalFeeResp{
  153. WithdrawalCommissionFee: utils.StrToFloat64(appreciationWithdrawFee) / 100,
  154. WithdrawalDestroyFee: (utils.StrToFloat64(appreciationWithdrawFee) - utils.StrToFloat64(appreciationWithdrawBack)) / 100,
  155. WithdrawalRefluxFee: utils.StrToFloat64(appreciationWithdrawBack) / 100,
  156. }
  157. _, resp := rule.DealWithdrawalAndDestroy(sess, feeMap, utils.StrToFloat64(args["amount"]))
  158. //这是到手的
  159. newAmount := utils.GetPrec(utils.Float64ToStrByPrec(resp.TransferOutValue-resp.AmountOut, 5), "4")
  160. //扣的
  161. coinSum := utils.GetPrec(utils.Float64ToStrByPrec(resp.TransferOut-resp.RefluxValue, 20), "4")
  162. price = utils.GetPrec(utils.Float64ToStrByPrec(resp.Price, 20), "4")
  163. err := svc.UpdateUserFinValidAndInterFlowSess(sess,
  164. newAmount, args["amount"]+"个数字资产转余额,价值"+price+"/个", "appreciation", 0, 56, utils.StrToInt(msg.Uid), utils.StrToInt(msg.Oid), utils.StrToInt64(msg.Oid), utils.StrToInt64(msg.Oid))
  165. if err != nil {
  166. sess.Rollback()
  167. return err
  168. }
  169. base := db.GetAppreciationBase(sess)
  170. beforeValue := "0"
  171. beforeFlowValue := "0"
  172. if base != nil {
  173. beforeValue = base.Sum
  174. beforeFlowValue = base.FlowSum
  175. }
  176. appreciationCoinId := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_coin_id")
  177. //转出
  178. err = db.InsertAppreciation(sess, appreciationCoinId, msg.Uid, "1", "1", msg.Oid, args["amount"], args["amount"], beforeValue, beforeFlowValue)
  179. if err != nil {
  180. sess.Rollback()
  181. return err
  182. }
  183. ////销毁的
  184. beforeValue = utils.GetPrec(utils.Float64ToStrByPrec(utils.StrToFloat64(beforeValue)-utils.StrToFloat64(args["amount"]), 5), "4")
  185. beforeFlowValue = utils.GetPrec(utils.Float64ToStrByPrec(utils.StrToFloat64(beforeFlowValue)-utils.StrToFloat64(args["amount"]), 5), "4")
  186. //DestroyValue := utils.GetPrec(utils.Float64ToStrByPrec(resp.DestroyValue, 5), "4")
  187. //err = db.InsertAppreciation(sess, appreciationCoinId, args["uid"], "2", "1", DestroyValue, "0", beforeValue, beforeFlowValue)
  188. //if err != nil {
  189. // sess.Rollback()
  190. // return err
  191. //}
  192. //回流的
  193. RefluxValue := utils.GetPrec(utils.Float64ToStrByPrec(resp.RefluxValue, 5), "4")
  194. err = db.InsertAppreciation(sess, appreciationCoinId, msg.Uid, "3", "0", msg.Oid, RefluxValue, "0", beforeValue, beforeFlowValue)
  195. if err != nil {
  196. sess.Rollback()
  197. return err
  198. }
  199. sql := `UPDATE appreciation_base SET sum=sum-?,flow_sum=flow_sum-? WHERE is_use=1;`
  200. _, err = eg.Exec(sql, utils.StrToFloat64(coinSum), utils.StrToFloat64(args["amount"]))
  201. if err != nil {
  202. sess.Rollback()
  203. return err
  204. }
  205. sess.Commit()
  206. return nil
  207. }
  208. //购物销毁
  209. func destroy(eg *xorm.Engine, msg *md.ZhiosAppreciation) error {
  210. args := make(map[string]string)
  211. json.Unmarshal([]byte(msg.Ext), &args)
  212. sess := eg.NewSession()
  213. defer sess.Close()
  214. sess.Begin()
  215. base := db.GetAppreciationBase(sess)
  216. beforeValue := "0"
  217. beforeFlowValue := "0"
  218. if base != nil {
  219. beforeValue = base.Sum
  220. beforeFlowValue = base.FlowSum
  221. }
  222. appreciationCoinId := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_coin_id")
  223. //转出
  224. err := db.InsertAppreciation(sess, appreciationCoinId, msg.Uid, "4", "1", msg.Oid, args["amount"], args["amount"], beforeValue, beforeFlowValue)
  225. if err != nil {
  226. sess.Rollback()
  227. return err
  228. }
  229. sql := `UPDATE appreciation_base SET sum=sum-?,flow_sum=flow_sum-? WHERE is_use=1;`
  230. _, err = eg.Exec(sql, utils.StrToFloat64(args["amount"]), utils.StrToFloat64(args["amount"]))
  231. if err != nil {
  232. sess.Rollback()
  233. return err
  234. }
  235. sess.Commit()
  236. return nil
  237. }
  238. //购物退回
  239. func buyRefund(eg *xorm.Engine, msg *md.ZhiosAppreciation) error {
  240. args := make(map[string]string)
  241. json.Unmarshal([]byte(msg.Ext), &args)
  242. sess := eg.NewSession()
  243. defer sess.Close()
  244. sess.Begin()
  245. base := db.GetAppreciationBase(sess)
  246. beforeValue := "0"
  247. beforeFlowValue := "0"
  248. if base != nil {
  249. beforeValue = base.Sum
  250. beforeFlowValue = base.FlowSum
  251. }
  252. appreciationCoinId := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_coin_id")
  253. //转出
  254. err := db.InsertAppreciation(sess, appreciationCoinId, msg.Uid, "5", "0", msg.Oid, args["amount"], args["amount"], beforeValue, beforeFlowValue)
  255. if err != nil {
  256. sess.Rollback()
  257. return err
  258. }
  259. sql := `UPDATE appreciation_base SET sum=sum+?,flow_sum=flow_sum+? WHERE is_use=1;`
  260. _, err = eg.Exec(sql, utils.StrToFloat64(args["amount"]), utils.StrToFloat64(args["amount"]))
  261. if err != nil {
  262. sess.Rollback()
  263. return err
  264. }
  265. sess.Commit()
  266. return nil
  267. }
  268. func caleBili(eg *xorm.Engine, sess *xorm.Session, dbName string, args map[string]string) map[string]string {
  269. appreciationCoinId := db.SysCfgGetWithDb(eg, dbName, "appreciation_coin_id")
  270. bCoinStr := ""
  271. bcoin := ""
  272. if args["id"] == "cny" {
  273. bCoinStr = args["amount"]
  274. } else {
  275. ids := []string{args["id"], appreciationCoinId}
  276. coin := db.VirtualCoinByIds(eg, ids)
  277. aCoinBili := coin[args["id"]].ExchangeRatio
  278. //1:5=X:money X= 1:5*money
  279. amoney := (1 / utils.StrToFloat64(aCoinBili)) * utils.StrToFloat64(args["amount"])
  280. bcoin = utils.GetPrec(utils.Float64ToStrByPrec(amoney, 5), "4")
  281. //这是只返70%
  282. appreciationCoinFee := db.SysCfgGetWithDb(eg, dbName, "appreciation_coin_fee")
  283. bCoins := amoney * (utils.StrToFloat64(appreciationCoinFee) / 100)
  284. //除以当前的资产价值
  285. _, value := rule.DealTransferIn(sess, bCoins)
  286. bCoins = value
  287. bCoinStr = utils.GetPrec(utils.Float64ToStrByPrec(bCoins, 20), "4")
  288. }
  289. res := map[string]string{
  290. "in_coin": bCoinStr,
  291. "coin": bcoin,
  292. }
  293. return res
  294. }
  295. func coinPriceEg(eg *xorm.Engine) map[string]string {
  296. base := db.GetAppreciationBaseEg(eg)
  297. sum := "0"
  298. flowSum := "0"
  299. price := "1"
  300. if base != nil {
  301. sum = base.Sum
  302. flowSum = base.FlowSum
  303. }
  304. price = utils.GetPrec(utils.Float64ToStrByPrec(utils.StrToFloat64(sum)/utils.StrToFloat64(flowSum), 5), "4")
  305. res := map[string]string{
  306. "price": price,
  307. "sum": sum,
  308. "flow_sum": flowSum,
  309. }
  310. return res
  311. }