package consume import ( "applet/app/db" "applet/app/svc" "applet/app/utils" "applet/app/utils/logx" "applet/consume/md" "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" md2 "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/md" "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/rule" "encoding/json" "errors" "fmt" "github.com/streadway/amqp" "xorm.io/xorm" ) func ZhiosAppreciation(queue md.MqQueue) { fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>") ch, err := rabbit.Cfg.Pool.GetChannel() if err != nil { logx.Error(err) return } defer ch.Release() //1、将自己绑定到交换机上 ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) //2、取出数据进行消费 ch.Qos(1) delivery := ch.Consume(queue.Name, false) var res amqp.Delivery var ok bool for { res, ok = <-delivery if ok == true { //fmt.Println(string(res.Body)) fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<") err = handleZhiosAppreciation(res.Body) //_ = res.Reject(false) if err == nil { _ = res.Ack(true) } } else { panic(errors.New("error getting message")) } } fmt.Println("get msg done") } func handleZhiosAppreciation(msg []byte) error { //1、解析canal采集至mq中queue的数据结构体 var canalMsg *md.ZhiosAppreciation fmt.Println(string(msg)) var tmpString string err := json.Unmarshal(msg, &tmpString) if err != nil { fmt.Println("===with", err.Error()) return err } fmt.Println(tmpString) err = json.Unmarshal([]byte(tmpString), &canalMsg) if err != nil { fmt.Println("===with", err.Error()) return err } mid := canalMsg.Mid eg := db.DBs[mid] if eg == nil { return nil } //类型 转入 exchange if canalMsg.Type == "exchange" { err := exchange(eg, canalMsg) if err != nil { return err } } //类型 提现 withdraw 到余额 if canalMsg.Type == "withdraw" { err := withdraw(eg, canalMsg) if err != nil { return err } } //类型 购物销毁 if canalMsg.Type == "destroy" { err := destroy(eg, canalMsg) if err != nil { return err } } //类型 购物退回 if canalMsg.Type == "buy_refund" { err := buyRefund(eg, canalMsg) if err != nil { return err } } return nil } //转入 操作加入资金池和加入积分 func exchange(eg *xorm.Engine, msg *md.ZhiosAppreciation) error { sess := eg.NewSession() defer sess.Close() sess.Begin() //计算出当前的价值 args := make(map[string]string) json.Unmarshal([]byte(msg.Ext), &args) biliMap := caleBili(eg, sess, msg.Mid, args) ordId := utils.OrderUUID(utils.StrToInt(msg.Uid)) coinMapInUse, _ := db.VirtualCoinMapInUse(eg, msg.Mid, "") //积分加入 title := coinMapInUse[args["id"]].Name + "-转入" appreciationCoinId := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_coin_id") _, err := svc.ExchangeUserVirFinValidAndInterFlowWithSession(sess, utils.StrToFloat64(biliMap["in_coin"]), title, "0", 1, 109, utils.StrToInt(msg.Uid), utils.StrToInt(appreciationCoinId), 0, utils.StrToInt64(ordId), "", 0, 0) if err != nil { sess.Rollback() return err } base := db.GetAppreciationBase(sess) beforeValue := "0" beforeFlowValue := "0" if base != nil { beforeValue = base.Sum beforeFlowValue = base.FlowSum } err = db.InsertAppreciation(sess, appreciationCoinId, msg.Uid, "0", "0", msg.Oid, biliMap["coin"], biliMap["in_coin"], beforeValue, beforeFlowValue) if err != nil { sess.Rollback() return err } //加入资金池 sql := `UPDATE appreciation_base SET sum=sum+?,flow_sum=flow_sum+? WHERE is_use=1;` _, err = sess.Exec(sql, utils.StrToFloat64(biliMap["coin"]), utils.StrToFloat64(biliMap["in_coin"])) if err != nil { sess.Rollback() return err } sess.Commit() return nil } //提现 func withdraw(eg *xorm.Engine, msg *md.ZhiosAppreciation) error { sess := eg.NewSession() defer sess.Close() sess.Begin() args := make(map[string]string) json.Unmarshal([]byte(msg.Ext), &args) //资产价值 price := "" appreciationWithdrawFee := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_withdraw_fee") appreciationWithdrawBack := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_withdraw_back") feeMap := md2.DealWithdrawalFeeResp{ WithdrawalCommissionFee: utils.StrToFloat64(appreciationWithdrawFee) / 100, WithdrawalDestroyFee: (utils.StrToFloat64(appreciationWithdrawFee) - utils.StrToFloat64(appreciationWithdrawBack)) / 100, WithdrawalRefluxFee: utils.StrToFloat64(appreciationWithdrawBack) / 100, } _, resp := rule.DealWithdrawalAndDestroy(sess, feeMap, utils.StrToFloat64(args["amount"])) //这是到手的 newAmount := utils.GetPrec(utils.Float64ToStrByPrec(resp.TransferOutValue-resp.AmountOut, 5), "4") //扣的 coinSum := utils.GetPrec(utils.Float64ToStrByPrec(resp.TransferOut-resp.RefluxValue, 20), "4") price = utils.GetPrec(utils.Float64ToStrByPrec(resp.Price, 20), "4") err := svc.UpdateUserFinValidAndInterFlowSess(sess, newAmount, args["amount"]+"个数字资产转余额,价值"+price+"/个", "appreciation", 0, 56, utils.StrToInt(msg.Uid), utils.StrToInt(msg.Oid), utils.StrToInt64(msg.Oid), utils.StrToInt64(msg.Oid)) if err != nil { sess.Rollback() return err } base := db.GetAppreciationBase(sess) beforeValue := "0" beforeFlowValue := "0" if base != nil { beforeValue = base.Sum beforeFlowValue = base.FlowSum } appreciationCoinId := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_coin_id") //转出 err = db.InsertAppreciation(sess, appreciationCoinId, msg.Uid, "1", "1", msg.Oid, args["amount"], args["amount"], beforeValue, beforeFlowValue) if err != nil { sess.Rollback() return err } ////销毁的 beforeValue = utils.GetPrec(utils.Float64ToStrByPrec(utils.StrToFloat64(beforeValue)-utils.StrToFloat64(args["amount"]), 5), "4") beforeFlowValue = utils.GetPrec(utils.Float64ToStrByPrec(utils.StrToFloat64(beforeFlowValue)-utils.StrToFloat64(args["amount"]), 5), "4") //DestroyValue := utils.GetPrec(utils.Float64ToStrByPrec(resp.DestroyValue, 5), "4") //err = db.InsertAppreciation(sess, appreciationCoinId, args["uid"], "2", "1", DestroyValue, "0", beforeValue, beforeFlowValue) //if err != nil { // sess.Rollback() // return err //} //回流的 RefluxValue := utils.GetPrec(utils.Float64ToStrByPrec(resp.RefluxValue, 5), "4") err = db.InsertAppreciation(sess, appreciationCoinId, msg.Uid, "3", "0", msg.Oid, RefluxValue, "0", beforeValue, beforeFlowValue) if err != nil { sess.Rollback() return err } sql := `UPDATE appreciation_base SET sum=sum-?,flow_sum=flow_sum-? WHERE is_use=1;` _, err = eg.Exec(sql, utils.StrToFloat64(coinSum), utils.StrToFloat64(args["amount"])) if err != nil { sess.Rollback() return err } sess.Commit() return nil } //购物销毁 func destroy(eg *xorm.Engine, msg *md.ZhiosAppreciation) error { args := make(map[string]string) json.Unmarshal([]byte(msg.Ext), &args) sess := eg.NewSession() defer sess.Close() sess.Begin() base := db.GetAppreciationBase(sess) beforeValue := "0" beforeFlowValue := "0" if base != nil { beforeValue = base.Sum beforeFlowValue = base.FlowSum } appreciationCoinId := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_coin_id") //转出 err := db.InsertAppreciation(sess, appreciationCoinId, msg.Uid, "4", "1", msg.Oid, args["amount"], args["amount"], beforeValue, beforeFlowValue) if err != nil { sess.Rollback() return err } sql := `UPDATE appreciation_base SET sum=sum-?,flow_sum=flow_sum-? WHERE is_use=1;` _, err = eg.Exec(sql, utils.StrToFloat64(args["amount"]), utils.StrToFloat64(args["amount"])) if err != nil { sess.Rollback() return err } sess.Commit() return nil } //购物退回 func buyRefund(eg *xorm.Engine, msg *md.ZhiosAppreciation) error { args := make(map[string]string) json.Unmarshal([]byte(msg.Ext), &args) sess := eg.NewSession() defer sess.Close() sess.Begin() base := db.GetAppreciationBase(sess) beforeValue := "0" beforeFlowValue := "0" if base != nil { beforeValue = base.Sum beforeFlowValue = base.FlowSum } appreciationCoinId := db.SysCfgGetWithDb(eg, msg.Mid, "appreciation_coin_id") //转出 err := db.InsertAppreciation(sess, appreciationCoinId, msg.Uid, "5", "0", msg.Oid, args["amount"], args["amount"], beforeValue, beforeFlowValue) if err != nil { sess.Rollback() return err } sql := `UPDATE appreciation_base SET sum=sum+?,flow_sum=flow_sum+? WHERE is_use=1;` _, err = eg.Exec(sql, utils.StrToFloat64(args["amount"]), utils.StrToFloat64(args["amount"])) if err != nil { sess.Rollback() return err } sess.Commit() return nil } func caleBili(eg *xorm.Engine, sess *xorm.Session, dbName string, args map[string]string) map[string]string { appreciationCoinId := db.SysCfgGetWithDb(eg, dbName, "appreciation_coin_id") bCoinStr := "" bcoin := "" if args["id"] == "cny" { bCoinStr = args["amount"] } else { ids := []string{args["id"], appreciationCoinId} coin := db.VirtualCoinByIds(eg, ids) aCoinBili := coin[args["id"]].ExchangeRatio //1:5=X:money X= 1:5*money amoney := (1 / utils.StrToFloat64(aCoinBili)) * utils.StrToFloat64(args["amount"]) bcoin = utils.GetPrec(utils.Float64ToStrByPrec(amoney, 5), "4") //这是只返70% appreciationCoinFee := db.SysCfgGetWithDb(eg, dbName, "appreciation_coin_fee") bCoins := amoney * (utils.StrToFloat64(appreciationCoinFee) / 100) //除以当前的资产价值 _, value := rule.DealTransferIn(sess, bCoins) bCoins = value bCoinStr = utils.GetPrec(utils.Float64ToStrByPrec(bCoins, 20), "4") } res := map[string]string{ "in_coin": bCoinStr, "coin": bcoin, } return res } func coinPriceEg(eg *xorm.Engine) map[string]string { base := db.GetAppreciationBaseEg(eg) sum := "0" flowSum := "0" price := "1" if base != nil { sum = base.Sum flowSum = base.FlowSum } price = utils.GetPrec(utils.Float64ToStrByPrec(utils.StrToFloat64(sum)/utils.StrToFloat64(flowSum), 5), "4") res := map[string]string{ "price": price, "sum": sum, "flow_sum": flowSum, } return res }