golang 的 rabbitmq 消费项目
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

235 lines
6.5 KiB

  1. package consume
  2. import (
  3. "applet/app/db"
  4. "applet/app/svc"
  5. "applet/app/utils"
  6. "applet/app/utils/logx"
  7. "applet/consume/md"
  8. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  9. "encoding/json"
  10. "errors"
  11. "fmt"
  12. "github.com/streadway/amqp"
  13. "xorm.io/xorm"
  14. )
  15. func ZhiosAppreciation(queue md.MqQueue) {
  16. fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>")
  17. ch, err := rabbit.Cfg.Pool.GetChannel()
  18. if err != nil {
  19. logx.Error(err)
  20. return
  21. }
  22. defer ch.Release()
  23. //1、将自己绑定到交换机上
  24. ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
  25. //2、取出数据进行消费
  26. ch.Qos(1)
  27. delivery := ch.Consume(queue.Name, false)
  28. var res amqp.Delivery
  29. var ok bool
  30. for {
  31. res, ok = <-delivery
  32. if ok == true {
  33. //fmt.Println(string(res.Body))
  34. fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<")
  35. err = handleZhiosAppreciation(res.Body)
  36. //_ = res.Reject(false)
  37. if err == nil {
  38. _ = res.Ack(true)
  39. }
  40. } else {
  41. panic(errors.New("error getting message"))
  42. }
  43. }
  44. fmt.Println("get msg done")
  45. }
  46. func handleZhiosAppreciation(msg []byte) error {
  47. //1、解析canal采集至mq中queue的数据结构体
  48. var canalMsg *md.ZhiosAppreciation
  49. fmt.Println(string(msg))
  50. var tmpString string
  51. err := json.Unmarshal(msg, &tmpString)
  52. if err != nil {
  53. fmt.Println("===with", err.Error())
  54. return err
  55. }
  56. fmt.Println(tmpString)
  57. err = json.Unmarshal([]byte(tmpString), &canalMsg)
  58. if err != nil {
  59. fmt.Println("===with", err.Error())
  60. return err
  61. }
  62. mid := canalMsg.Mid
  63. eg := db.DBs[mid]
  64. if eg == nil {
  65. return nil
  66. }
  67. //类型 转入 exchange
  68. if canalMsg.Type == "exchange" {
  69. err := exchange(eg, canalMsg)
  70. if err != nil {
  71. return err
  72. }
  73. }
  74. //类型 提现 withdraw 到余额
  75. if canalMsg.Type == "withdraw" {
  76. err := withdraw(eg, canalMsg)
  77. if err != nil {
  78. return err
  79. }
  80. }
  81. //类型 购物销毁
  82. if canalMsg.Type == "destroy" {
  83. err := destroy(eg, canalMsg)
  84. if err != nil {
  85. return err
  86. }
  87. }
  88. return nil
  89. }
  90. //转入 操作加入资金池和加入积分
  91. func exchange(eg *xorm.Engine, msg *md.ZhiosAppreciation) error {
  92. sess := eg.NewSession()
  93. defer sess.Close()
  94. sess.Begin()
  95. //计算出当前的价值
  96. args := make(map[string]string)
  97. json.Unmarshal([]byte(msg.Ext), &args)
  98. biliMap := caleBili(eg, sess, msg.Mid, args)
  99. ordId := utils.OrderUUID(utils.StrToInt(msg.Uid))
  100. coinMapInUse, _ := db.VirtualCoinMapInUse(eg, msg.Mid, "")
  101. //积分加入
  102. title := coinMapInUse[args["id"]].Name + "-转入"
  103. appreciationCoinId := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_coin_id")
  104. _, err := svc.ExchangeUserVirFinValidAndInterFlowWithSession(sess,
  105. utils.StrToFloat64(biliMap["in_coin"]), title, "0", 1, 109, utils.StrToInt(msg.Uid), utils.StrToInt(appreciationCoinId), 0, utils.StrToInt64(ordId), "", 0, 0)
  106. if err != nil {
  107. sess.Rollback()
  108. return err
  109. }
  110. //加入资金池
  111. sql := `UPDATE appreciation_base SET sum=sum+?,flow_sum=flow_sum+? WHERE is_use=1;`
  112. _, err = sess.Exec(sql, biliMap["coin"], biliMap["in_coin"])
  113. if err != nil {
  114. sess.Rollback()
  115. return err
  116. }
  117. sess.Commit()
  118. return nil
  119. }
  120. //提现
  121. func withdraw(eg *xorm.Engine, msg *md.ZhiosAppreciation) error {
  122. sess := eg.NewSession()
  123. defer sess.Close()
  124. sess.Begin()
  125. args := make(map[string]string)
  126. json.Unmarshal([]byte(msg.Ext), &args)
  127. coinMap := coinPriceEg(eg)
  128. appreciationWithdrawFee := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_withdraw_fee")
  129. appreciationWithdrawBack := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_withdraw_back")
  130. //实际到账的
  131. amount := utils.StrToFloat64(args["amount"]) * (1 - (utils.StrToFloat64(appreciationWithdrawFee) / 100))
  132. newAmount := utils.GetPrec(utils.Float64ToStrByPrec(amount*utils.StrToFloat64(coinMap["price"]), 5), "4")
  133. //扣的
  134. coinSum := utils.StrToFloat64(args["amount"]) * (1 - (utils.StrToFloat64(appreciationWithdrawFee) / 100) - (utils.StrToFloat64(appreciationWithdrawBack) / 100))
  135. err := svc.UpdateUserFinValidAndInterFlowSess(sess,
  136. newAmount, args["amount"]+"个积分转余额", "appreciation", 0, 56, utils.StrToInt(msg.Uid), utils.StrToInt(msg.Oid), utils.StrToInt64(msg.Oid), utils.StrToInt64(msg.Oid))
  137. if err != nil {
  138. sess.Rollback()
  139. return err
  140. }
  141. sql := `UPDATE appreciation_base SET sum=sum-?,flow_sum=flow_sum-? WHERE is_use=1;`
  142. _, err = eg.Exec(sql, coinSum, args["amount"])
  143. if err != nil {
  144. sess.Rollback()
  145. return err
  146. }
  147. sess.Commit()
  148. return err
  149. }
  150. //购物销毁
  151. func destroy(eg *xorm.Engine, msg *md.ZhiosAppreciation) error {
  152. args := make(map[string]string)
  153. json.Unmarshal([]byte(msg.Ext), &args)
  154. sql := `UPDATE appreciation_base SET sum=sum-?,flow_sum=flow_sum-? WHERE is_use=1;`
  155. _, err := eg.Exec(sql, args["amount"], args["amount"])
  156. if err != nil {
  157. return err
  158. }
  159. return err
  160. }
  161. func caleBili(eg *xorm.Engine, sess *xorm.Session, dbName string, args map[string]string) map[string]string {
  162. appreciationCoinId := db.SysCfgGetWithDb(eg, dbName, "appreciation_coin_id")
  163. bCoinStr := ""
  164. bcoin := ""
  165. if args["id"] == "cny" {
  166. bCoinStr = args["amount"]
  167. } else {
  168. ids := []string{args["id"], appreciationCoinId}
  169. coin := db.VirtualCoinByIds(eg, ids)
  170. aCoinBili := coin[args["id"]].ExchangeRatio
  171. //1:5=X:money X= 1:5*money
  172. amoney := (1 / utils.StrToFloat64(aCoinBili)) * utils.StrToFloat64(args["amount"])
  173. bcoin = utils.GetPrec(utils.Float64ToStrByPrec(amoney, 5), "4")
  174. //这是只返70%
  175. appreciationCoinFee := db.SysCfgGetWithDb(eg, dbName, "appreciation_coin_fee")
  176. bCoins := amoney * (utils.StrToFloat64(appreciationCoinFee) / 100)
  177. coinPriceMap := coinPrice(sess)
  178. //除以当前的资产价值
  179. bCoins = bCoins / utils.StrToFloat64(coinPriceMap["price"])
  180. bCoinStr = utils.GetPrec(utils.Float64ToStrByPrec(bCoins, 5), "4")
  181. }
  182. res := map[string]string{
  183. "in_coin": bCoinStr,
  184. "coin": bcoin,
  185. }
  186. return res
  187. }
  188. func coinPrice(sess *xorm.Session) map[string]string {
  189. base := db.GetAppreciationBase(sess)
  190. sum := "0"
  191. flowSum := "0"
  192. price := "1"
  193. if base != nil {
  194. sum = base.Sum
  195. flowSum = base.FlowSum
  196. }
  197. if utils.StrToFloat64(base.Sum) > 0 {
  198. price = utils.GetPrec(utils.Float64ToStrByPrec(utils.StrToFloat64(sum)/utils.StrToFloat64(flowSum), 5), "4")
  199. }
  200. res := map[string]string{
  201. "price": price,
  202. "sum": sum,
  203. "flow_sum": flowSum,
  204. }
  205. return res
  206. }
  207. func coinPriceEg(eg *xorm.Engine) map[string]string {
  208. base := db.GetAppreciationBaseEg(eg)
  209. sum := "0"
  210. flowSum := "0"
  211. price := "1"
  212. if base != nil {
  213. sum = base.Sum
  214. flowSum = base.FlowSum
  215. }
  216. price = utils.GetPrec(utils.Float64ToStrByPrec(utils.StrToFloat64(sum)/utils.StrToFloat64(flowSum), 5), "4")
  217. res := map[string]string{
  218. "price": price,
  219. "sum": sum,
  220. "flow_sum": flowSum,
  221. }
  222. return res
  223. }