golang 的 rabbitmq 消费项目
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

zhios_appreciation.go 9.9 KiB

11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. package consume
  2. import (
  3. "applet/app/db"
  4. "applet/app/db/model"
  5. "applet/app/svc"
  6. "applet/app/utils"
  7. "applet/app/utils/logx"
  8. "applet/consume/md"
  9. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  10. md2 "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/md"
  11. "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/rule"
  12. "encoding/json"
  13. "errors"
  14. "fmt"
  15. "github.com/streadway/amqp"
  16. "xorm.io/xorm"
  17. )
  18. func ZhiosAppreciation(queue md.MqQueue) {
  19. fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>")
  20. ch, err := rabbit.Cfg.Pool.GetChannel()
  21. if err != nil {
  22. logx.Error(err)
  23. return
  24. }
  25. defer ch.Release()
  26. //1、将自己绑定到交换机上
  27. ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
  28. //2、取出数据进行消费
  29. ch.Qos(1)
  30. delivery := ch.Consume(queue.Name, false)
  31. var res amqp.Delivery
  32. var ok bool
  33. for {
  34. res, ok = <-delivery
  35. if ok == true {
  36. //fmt.Println(string(res.Body))
  37. fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<")
  38. err = handleZhiosAppreciation(res.Body)
  39. //_ = res.Reject(false)
  40. if err == nil {
  41. _ = res.Ack(true)
  42. }
  43. } else {
  44. panic(errors.New("error getting message"))
  45. }
  46. }
  47. fmt.Println("get msg done")
  48. }
  49. func handleZhiosAppreciation(msg []byte) error {
  50. //1、解析canal采集至mq中queue的数据结构体
  51. var canalMsg *md.ZhiosAppreciation
  52. fmt.Println(string(msg))
  53. var tmpString string
  54. err := json.Unmarshal(msg, &tmpString)
  55. if err != nil {
  56. fmt.Println("===with", err.Error())
  57. return err
  58. }
  59. fmt.Println(tmpString)
  60. err = json.Unmarshal([]byte(tmpString), &canalMsg)
  61. if err != nil {
  62. fmt.Println("===with", err.Error())
  63. return err
  64. }
  65. mid := canalMsg.Mid
  66. eg := db.DBs[mid]
  67. if eg == nil {
  68. return nil
  69. }
  70. //类型 转入 exchange
  71. if canalMsg.Type == "exchange" {
  72. err := exchange(eg, canalMsg)
  73. if err != nil {
  74. return err
  75. }
  76. }
  77. //类型 提现 withdraw 到余额
  78. if canalMsg.Type == "withdraw" {
  79. err := withdraw(eg, canalMsg)
  80. if err != nil {
  81. return err
  82. }
  83. }
  84. //类型 购物销毁
  85. if canalMsg.Type == "destroy" {
  86. err := destroy(eg, canalMsg)
  87. if err != nil {
  88. return err
  89. }
  90. }
  91. //类型 购物退回
  92. if canalMsg.Type == "buy_refund" {
  93. err := buyRefund(eg, canalMsg)
  94. if err != nil {
  95. return err
  96. }
  97. }
  98. return nil
  99. }
  100. //转入 操作加入资金池和加入积分
  101. func exchange(eg *xorm.Engine, msg *md.ZhiosAppreciation) error {
  102. sess := eg.NewSession()
  103. defer sess.Close()
  104. sess.Begin()
  105. //计算出当前的价值
  106. args := make(map[string]string)
  107. json.Unmarshal([]byte(msg.Ext), &args)
  108. biliMap := caleBili(eg, sess, msg.Mid, args)
  109. ordId := utils.OrderUUID(utils.StrToInt(msg.Uid))
  110. coinMapInUse, _ := db.VirtualCoinMapInUse(eg, msg.Mid, "")
  111. //积分加入
  112. title := coinMapInUse[args["id"]].Name + "-转入"
  113. appreciationCoinId := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_coin_id")
  114. _, err := svc.ExchangeUserVirFinValidAndInterFlowWithSession(sess,
  115. utils.StrToFloat64(biliMap["in_coin"]), title, "0", 1, 109, utils.StrToInt(msg.Uid), utils.StrToInt(appreciationCoinId), 0, utils.StrToInt64(ordId), "", 0, 0)
  116. if err != nil {
  117. sess.Rollback()
  118. return err
  119. }
  120. base := db.GetAppreciationBase(sess)
  121. beforeValue := "0"
  122. beforeFlowValue := "0"
  123. if base != nil {
  124. beforeValue = base.Sum
  125. beforeFlowValue = base.FlowSum
  126. }
  127. err = db.InsertAppreciation(sess, appreciationCoinId, msg.Uid, "0", "0", msg.Oid, biliMap["coin"], biliMap["in_coin"], beforeValue, beforeFlowValue)
  128. if err != nil {
  129. sess.Rollback()
  130. return err
  131. }
  132. //加入资金池
  133. sql := `UPDATE appreciation_base SET sum=sum+?,flow_sum=flow_sum+? WHERE is_use=1;`
  134. _, err = sess.Exec(sql, utils.StrToFloat64(biliMap["coin"]), utils.StrToFloat64(biliMap["in_coin"]))
  135. if err != nil {
  136. sess.Rollback()
  137. return err
  138. }
  139. sess.Commit()
  140. return nil
  141. }
  142. //提现
  143. func withdraw(eg *xorm.Engine, msg *md.ZhiosAppreciation) error {
  144. sess := eg.NewSession()
  145. defer sess.Close()
  146. sess.Begin()
  147. args := make(map[string]string)
  148. json.Unmarshal([]byte(msg.Ext), &args)
  149. //资产价值
  150. price := ""
  151. appreciationWithdrawFee := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_withdraw_fee")
  152. appreciationWithdrawBack := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_withdraw_back")
  153. feeMap := md2.DealWithdrawalFeeResp{
  154. WithdrawalCommissionFee: utils.StrToFloat64(appreciationWithdrawFee) / 100,
  155. WithdrawalDestroyFee: (utils.StrToFloat64(appreciationWithdrawFee) - utils.StrToFloat64(appreciationWithdrawBack)) / 100,
  156. WithdrawalRefluxFee: utils.StrToFloat64(appreciationWithdrawBack) / 100,
  157. }
  158. _, resp := rule.DealWithdrawalAndDestroy(sess, feeMap, utils.StrToFloat64(args["amount"]))
  159. //这是到手的
  160. newAmount := utils.GetPrec(utils.Float64ToStrByPrec(resp.TransferOutValue-resp.AmountOut, 5), "4")
  161. //扣的
  162. coinSum := utils.GetPrec(utils.Float64ToStrByPrec(resp.TransferOutValue-resp.RefluxValue, 20), "4")
  163. price = utils.GetPrec(utils.Float64ToStrByPrec(resp.Price, 20), "4")
  164. err := svc.UpdateUserFinValidAndInterFlowSess(sess,
  165. newAmount, args["amount"]+"个数字资产转余额,价值"+price+"/个", "appreciation", 0, 56, utils.StrToInt(msg.Uid), utils.StrToInt(msg.Oid), utils.StrToInt64(msg.Oid), utils.StrToInt64(msg.Oid))
  166. if err != nil {
  167. sess.Rollback()
  168. return err
  169. }
  170. base := db.GetAppreciationBase(sess)
  171. beforeValue := "0"
  172. beforeFlowValue := "0"
  173. if base != nil {
  174. beforeValue = base.Sum
  175. beforeFlowValue = base.FlowSum
  176. }
  177. appreciationCoinId := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_coin_id")
  178. //转出
  179. err = db.InsertAppreciation(sess, appreciationCoinId, msg.Uid, "1", "1", msg.Oid, args["amount"], args["amount"], beforeValue, beforeFlowValue)
  180. if err != nil {
  181. sess.Rollback()
  182. return err
  183. }
  184. ////销毁的
  185. beforeValue = utils.GetPrec(utils.Float64ToStrByPrec(utils.StrToFloat64(beforeValue)-resp.TransferOutValue, 5), "4")
  186. beforeFlowValue = utils.GetPrec(utils.Float64ToStrByPrec(utils.StrToFloat64(beforeFlowValue)-utils.StrToFloat64(args["amount"]), 5), "4")
  187. //DestroyValue := utils.GetPrec(utils.Float64ToStrByPrec(resp.DestroyValue, 5), "4")
  188. //err = db.InsertAppreciation(sess, appreciationCoinId, args["uid"], "2", "1", DestroyValue, "0", beforeValue, beforeFlowValue)
  189. //if err != nil {
  190. // sess.Rollback()
  191. // return err
  192. //}
  193. //回流的
  194. RefluxValue := utils.GetPrec(utils.Float64ToStrByPrec(resp.RefluxValue, 5), "4")
  195. err = db.InsertAppreciation(sess, appreciationCoinId, msg.Uid, "3", "0", msg.Oid, RefluxValue, "0", beforeValue, beforeFlowValue)
  196. if err != nil {
  197. sess.Rollback()
  198. return err
  199. }
  200. sql := `UPDATE appreciation_base SET sum=sum-?,flow_sum=flow_sum-? WHERE is_use=1;`
  201. _, err = eg.Exec(sql, utils.StrToFloat64(coinSum), utils.StrToFloat64(args["amount"]))
  202. if err != nil {
  203. sess.Rollback()
  204. return err
  205. }
  206. sess.Where("ord_id=?", msg.Oid).Cols("price").Update(&model.UserVirtualCoinFlow{Price: price})
  207. sess.Commit()
  208. return nil
  209. }
  210. //购物销毁
  211. func destroy(eg *xorm.Engine, msg *md.ZhiosAppreciation) error {
  212. args := make(map[string]string)
  213. json.Unmarshal([]byte(msg.Ext), &args)
  214. sess := eg.NewSession()
  215. defer sess.Close()
  216. sess.Begin()
  217. base := db.GetAppreciationBase(sess)
  218. beforeValue := "0"
  219. beforeFlowValue := "0"
  220. if base != nil {
  221. beforeValue = base.Sum
  222. beforeFlowValue = base.FlowSum
  223. }
  224. appreciationCoinId := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_coin_id")
  225. //转出
  226. err := db.InsertAppreciation(sess, appreciationCoinId, msg.Uid, "4", "1", msg.Oid, args["coinPrice"], args["amount"], beforeValue, beforeFlowValue)
  227. if err != nil {
  228. sess.Rollback()
  229. return err
  230. }
  231. sql := `UPDATE appreciation_base SET sum=sum-?,flow_sum=flow_sum-? WHERE is_use=1;`
  232. _, err = eg.Exec(sql, utils.StrToFloat64(args["coinPrice"]), utils.StrToFloat64(args["amount"]))
  233. if err != nil {
  234. sess.Rollback()
  235. return err
  236. }
  237. sess.Commit()
  238. return nil
  239. }
  240. //购物退回
  241. func buyRefund(eg *xorm.Engine, msg *md.ZhiosAppreciation) error {
  242. args := make(map[string]string)
  243. json.Unmarshal([]byte(msg.Ext), &args)
  244. sess := eg.NewSession()
  245. defer sess.Close()
  246. sess.Begin()
  247. base := db.GetAppreciationBase(sess)
  248. beforeValue := "0"
  249. beforeFlowValue := "0"
  250. if base != nil {
  251. beforeValue = base.Sum
  252. beforeFlowValue = base.FlowSum
  253. }
  254. appreciationCoinId := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_coin_id")
  255. //转出
  256. err := db.InsertAppreciation(sess, appreciationCoinId, msg.Uid, "5", "0", msg.Oid, args["coinPrice"], args["amount"], beforeValue, beforeFlowValue)
  257. if err != nil {
  258. sess.Rollback()
  259. return err
  260. }
  261. sql := `UPDATE appreciation_base SET sum=sum+?,flow_sum=flow_sum+? WHERE is_use=1;`
  262. _, err = eg.Exec(sql, utils.StrToFloat64(args["coinPrice"]), utils.StrToFloat64(args["amount"]))
  263. if err != nil {
  264. sess.Rollback()
  265. return err
  266. }
  267. sess.Commit()
  268. return nil
  269. }
  270. func caleBili(eg *xorm.Engine, sess *xorm.Session, dbName string, args map[string]string) map[string]string {
  271. appreciationCoinId := db.SysCfgGetWithDb(eg, dbName, "appreciation_coin_id")
  272. bCoinStr := ""
  273. bcoin := ""
  274. if args["id"] == "cny" {
  275. bCoinStr = args["amount"]
  276. } else {
  277. ids := []string{args["id"], appreciationCoinId}
  278. coin := db.VirtualCoinByIds(eg, ids)
  279. aCoinBili := coin[args["id"]].ExchangeRatio
  280. //1:5=X:money X= 1:5*money
  281. amoney := (1 / utils.StrToFloat64(aCoinBili)) * utils.StrToFloat64(args["amount"])
  282. bcoin = utils.GetPrec(utils.Float64ToStrByPrec(amoney, 5), "4")
  283. //这是只返70%
  284. appreciationCoinFee := db.SysCfgGetWithDb(eg, dbName, "appreciation_coin_fee")
  285. bCoins := amoney * (utils.StrToFloat64(appreciationCoinFee) / 100)
  286. //除以当前的资产价值
  287. _, value := rule.DealTransferIn(sess, bCoins)
  288. bCoins = value
  289. bCoinStr = utils.GetPrec(utils.Float64ToStrByPrec(bCoins, 20), "4")
  290. }
  291. res := map[string]string{
  292. "in_coin": bCoinStr,
  293. "coin": bcoin,
  294. }
  295. return res
  296. }
  297. func coinPriceEg(eg *xorm.Engine) map[string]string {
  298. base := db.GetAppreciationBaseEg(eg)
  299. sum := "0"
  300. flowSum := "0"
  301. price := "1"
  302. if base != nil {
  303. sum = base.Sum
  304. flowSum = base.FlowSum
  305. }
  306. price = utils.GetPrec(utils.Float64ToStrByPrec(utils.StrToFloat64(sum)/utils.StrToFloat64(flowSum), 5), "4")
  307. res := map[string]string{
  308. "price": price,
  309. "sum": sum,
  310. "flow_sum": flowSum,
  311. }
  312. return res
  313. }