From 2a4b4a7f7545d3fdbdf400f80503b61efab04c2c Mon Sep 17 00:00:00 2001 From: huangjiajun <582604932@qq.com> Date: Thu, 11 Apr 2024 09:00:00 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=80=E4=B8=AA=E6=A9=98=E5=AD=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/db/db_task_center.go | 16 +++ app/db/db_virtual_coin.go | 20 +++ app/db/model/one_orenge_task_base.go | 15 +++ app/svc/svc_user_virtual_coin_flow.go | 66 ++++++++++ consume/init.go | 3 +- consume/md/consume_key.go | 12 +- consume/zhios_user_integral_exchange.go | 155 ++++++++++++++++++++++++ 7 files changed, 285 insertions(+), 2 deletions(-) create mode 100644 app/db/db_task_center.go create mode 100644 app/db/model/one_orenge_task_base.go create mode 100644 consume/zhios_user_integral_exchange.go diff --git a/app/db/db_task_center.go b/app/db/db_task_center.go new file mode 100644 index 0000000..9184e37 --- /dev/null +++ b/app/db/db_task_center.go @@ -0,0 +1,16 @@ +package db + +import ( + "applet/app/db/model" + "xorm.io/xorm" +) + +// +func TaskCenterBase(Db *xorm.Engine) (*model.OneOrengeTaskBase, error) { + var PineappleTaskBase model.OneOrengeTaskBase + has, err := Db.Get(&PineappleTaskBase) + if has == false || err != nil { + return nil, err + } + return &PineappleTaskBase, nil +} diff --git a/app/db/db_virtual_coin.go b/app/db/db_virtual_coin.go index 61278b7..b31d2ea 100644 --- a/app/db/db_virtual_coin.go +++ b/app/db/db_virtual_coin.go @@ -119,6 +119,15 @@ func VirtualCoinListInUse(Db *xorm.Engine, masterId, isFreeze string) ([]*model. return m, nil } +func VirtualCoinListInUseSess(sess *xorm.Session, masterId, isFreeze string) ([]*model.VirtualCoin, error) { + var m []*model.VirtualCoin + err := sess.Where("is_use=1").Asc("id").Find(&m) + if err != nil { + return nil, err + } + + return m, nil +} func VirtualCoinMapInUse(Db *xorm.Engine, masterId, isFreeze string) (map[string]model.VirtualCoin, error) { virtualCoinMap := make(map[string]model.VirtualCoin) @@ -131,6 +140,17 @@ func VirtualCoinMapInUse(Db *xorm.Engine, masterId, isFreeze string) (map[string } return virtualCoinMap, nil } +func VirtualCoinMapInUseSess(sess *xorm.Session, masterId, isFreeze string) (map[string]model.VirtualCoin, error) { + virtualCoinMap := make(map[string]model.VirtualCoin) + listInUse, err := VirtualCoinListInUseSess(sess, masterId, isFreeze) + if err != nil { + return nil, err + } + for _, coin := range listInUse { + virtualCoinMap[utils.AnyToString(coin.Id)] = *coin + } + return virtualCoinMap, nil +} func VirtualCoinByIds(eg *xorm.Engine, ids []string) map[string]model.VirtualCoin { var data []model.VirtualCoin diff --git a/app/db/model/one_orenge_task_base.go b/app/db/model/one_orenge_task_base.go new file mode 100644 index 0000000..fb8b77c --- /dev/null +++ b/app/db/model/one_orenge_task_base.go @@ -0,0 +1,15 @@ +package model + +type OneOrengeTaskBase struct { + Id int `json:"id" xorm:"not null pk autoincr INT(11)"` + CoinId int `json:"coin_id" xorm:"default 0 INT(11)"` + SignCondition int `json:"sign_condition" xorm:"default 0 comment('0无条件 1看完视频') INT(1)"` + SignWayType int `json:"sign_way_type" xorm:"default 0 comment('签到玩法 0连续签到 1累计签到') INT(1)"` + SignWay string `json:"sign_way" xorm:"default '' comment('签到玩法') VARCHAR(255)"` + MustSign int `json:"must_sign" xorm:"default 0 comment('强制签到') INT(1)"` + WithdrawClearDay int `json:"withdraw_clear_day" xorm:"default 0 comment('收益清空 未进入app天数') INT(11)"` + BaseSignReward string `json:"base_sign_reward" xorm:"default 0.00 comment('签到基础奖励') DECIMAL(20,2)"` + AdvId int `json:"adv_id" xorm:"default 0 INT(11)"` + VideoTotal int `json:"video_total" xorm:"default 0 INT(11)"` + VideoReward string `json:"video_reward" xorm:"default 0.00 comment('签到基础奖励') DECIMAL(20,2)"` +} diff --git a/app/svc/svc_user_virtual_coin_flow.go b/app/svc/svc_user_virtual_coin_flow.go index bd9ddaa..1e7810d 100644 --- a/app/svc/svc_user_virtual_coin_flow.go +++ b/app/svc/svc_user_virtual_coin_flow.go @@ -74,3 +74,69 @@ func virtualCoinFlowInsert(session *xorm.Session, uid, coinId, coinIdTo int, mon } return data.Id, nil } + +func UpdateUserFinValidAndInterFlowWithSession(session *xorm.Session, money, Title, ordType string, types, orderAction, uid, id int, ordId, otherId int64) error { + if utils.StrToFloat64(money) <= 0 { + return nil + } + userProfile, err := db.UserProfileFindByIdWithSession(session, uid) + if err != nil || userProfile == nil { + _ = session.Rollback() + if err == nil { + err = errors.New("获取用户余额信息失败") + } + return err + } + beforeAmount := userProfile.FinValid + if types == 0 { + userProfile.FinValid = utils.AnyToString(utils.AnyToFloat64(userProfile.FinValid) + utils.StrToFloat64(money)) + } else if types == 1 { + userProfile.FinValid = utils.AnyToString(utils.AnyToFloat64(userProfile.FinValid) - utils.StrToFloat64(money)) + } + afterAmount := userProfile.FinValid + userProfile.FinTotal = userProfile.FinTotal + utils.StrToFloat32(money) + affected, err := db.UserProfileUpdateWithSession(session, uid, userProfile, "fin_valid,fin_total") + if err != nil || affected == 0 { + if err == nil { + err = errors.New("更新用户余额信息失败") + } + return err + } + err = flowInsert(session, uid, money, orderAction, ordId, otherId, id, Title, ordType, types, beforeAmount, afterAmount) + if err != nil { + return err + } + return nil +} + +// 开始写入流水 +//uid:用户id,paidPrice金额,ordId订单id,id其他关联订单,具体根据订单类型判断,goodsId(OrdDetail记录商品ID或提现账号),ItemTitle订单标题 +//type:0收入,1支出,beforeAmount变更前金额,afterAmount变更后 +//orderAction:10自购,11推广,12团队,13免单,20提现,21消费,22退款,23拼团返佣,24资金池分红 +//ordType:订单类型taobao,jd,pdd,vip,suning,kaola,own自营,withdraw提现,vip_refund会员升级退款,vip_order会员升级余额支付流水,group_buy拼团 +func flowInsert(session *xorm.Session, uid int, paidPrice string, orderAction int, ordId int64, id int64, goodsId int, ItemTitle string, ordType string, types int, beforeAmount string, afterAmount string) error { + now := time.Now() + if err := db.FinUserFlowInsertOneWithSession( + session, + &model.FinUserFlow{ + Type: types, + Uid: uid, + Amount: paidPrice, + BeforeAmount: beforeAmount, + AfterAmount: afterAmount, + OrdType: ordType, + OrdId: utils.Int64ToStr(ordId), + OrdAction: orderAction, + OrdDetail: utils.IntToStr(goodsId), + State: 2, + OtherId: id, + OrdTitle: ItemTitle, + OrdTime: int(now.Unix()), + CreateAt: now, + UpdateAt: now, + }); err != nil { + _ = logx.Warn(err) + return err + } + return nil +} diff --git a/consume/init.go b/consume/init.go index dfe7d40..89c251c 100644 --- a/consume/init.go +++ b/consume/init.go @@ -88,7 +88,8 @@ func initConsumes() { //jobs[consumeMd.ZhiosMallGreenCoinConsumeFunName] = ZhiosMallGreenCoinConsume //绿色双链积分 //一个橘子 - jobs[consumeMd.CancalUserMoneyConsumeFunName] = CancalUserMoneyConsume //余额 + jobs[consumeMd.CancalUserMoneyConsumeFunName] = CancalUserMoneyConsume //余额 + jobs[consumeMd.CancalUserIntegralExchangeConsumeFunName] = CancalUserIntegralExchange //兑换 } func Run() { diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index e175bb2..7e51032 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -434,6 +434,15 @@ var RabbitMqQueueKeyList = []*MqQueue{ BindKey: "", ConsumeFunName: "WithdrawConsume", }, + { + ExchangeName: "zhios.one.orenge.exchange", + Name: "zhios_one_orenge_exchange", + Type: FanOutQueueType, + IsPersistent: false, + RoutKey: "integral_exchange", + BindKey: "", + ConsumeFunName: "CancalUserIntegralExchange", + }, { ExchangeName: "canal.topic", // Name: "canal_fin_user_flow", @@ -495,5 +504,6 @@ const ( OneCirclesSignInCopyGreenEnergyFunName = "OneCirclesSignInCopyGreenEnergyConsume" WithdrawConsumeFunName = "WithdrawConsume" - CancalUserMoneyConsumeFunName = "CancalUserMoneyConsume" + CancalUserMoneyConsumeFunName = "CancalUserMoneyConsume" + CancalUserIntegralExchangeConsumeFunName = "CancalUserIntegralExchange" ) diff --git a/consume/zhios_user_integral_exchange.go b/consume/zhios_user_integral_exchange.go new file mode 100644 index 0000000..a3ce130 --- /dev/null +++ b/consume/zhios_user_integral_exchange.go @@ -0,0 +1,155 @@ +package consume + +import ( + "applet/app/db" + "applet/app/e" + "applet/app/svc" + "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + db2 "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/db" + "encoding/json" + "errors" + "fmt" + "github.com/streadway/amqp" + "xorm.io/xorm" +) + +// +func CancalUserIntegralExchange(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1) + delivery := ch.Consume(queue.Name, false) + + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + //fmt.Println(string(res.Body)) + fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<") + err = handleCancalUserIntegralExchange(res.Body) + //_ = res.Reject(false) + fmt.Println(err) + if err == nil { + _ = res.Ack(true) + } else { + var canalMsg *md.ZhiosAcquisition + var tmpString string + err := json.Unmarshal(res.Body, &tmpString) + if err == nil { + fmt.Println(tmpString) + err = json.Unmarshal([]byte(tmpString), &canalMsg) + if err == nil { + ch.Publish(queue.ExchangeName, utils.SerializeStr(canalMsg), queue.RoutKey) + } + } + + } + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleCancalUserIntegralExchange(msg []byte) error { + //1、解析canal采集至mq中queue的数据结构体 + var canalMsg *md.ZhiosAcquisition + fmt.Println(string(msg)) + var tmpString string + err := json.Unmarshal(msg, &tmpString) + if err != nil { + fmt.Println(err.Error()) + return err + } + fmt.Println(tmpString) + err = json.Unmarshal([]byte(tmpString), &canalMsg) + if err != nil { + return err + } + mid := canalMsg.Mid + eg := db.DBs[mid] + if eg == nil { + return nil + } + uid := canalMsg.Uid + base, _ := db.TaskCenterBase(eg) + if base == nil { + return nil + } + sess := eg.NewSession() + defer sess.Close() + sess.Begin() + amount, _ := db.UserVirtualAmountFindById(sess, utils.StrToInt(uid), base.CoinId) + if amount != nil && utils.StrToFloat64(amount.Amount) > 0 { + err := CoinExchange(eg, sess, utils.StrToInt(uid), utils.IntToStr(base.CoinId), "cny", amount.Amount, mid) + if err != nil { + sess.Rollback() + return err + } + } + sess.Commit() + return nil +} +func CoinExchange(eg *xorm.Engine, sess *xorm.Session, uid int, coinId, coinIdTo, amount, masterId string) error { + //虚拟币转换 + //获取目前可以兑换的虚拟币 + coinMapInUse, err := db.VirtualCoinMapInUseSess(sess, masterId, "") + if err != nil { + return nil + } + fromCoin, ok := coinMapInUse[coinId] + toCoin, ok2 := coinMapInUse[coinIdTo] + //判断兑换的虚拟币是否可以兑换 + if !ok || (!ok2 && coinIdTo != "cny") { + return nil + } + //被兑换虚拟币金额比例,兑换虚拟币金额比例,手续费比例 + var ( + fromRate, toRate float64 + ) + //获取用户两个虚拟币的余额数据 + fromWallet, err := db2.GetUserVirtualWalletWithSession(sess, uid, utils.StrToInt(coinId)) + if err != nil { + return nil + } + amount = fromWallet.Amount + fromRate = utils.AnyToFloat64(fromCoin.ExchangeRatio) + toRate = utils.AnyToFloat64(toCoin.ExchangeRatio) + if coinIdTo == "cny" { + toRate = 1 + } + //兑换比例 + rate := fromRate / toRate + //计算兑换的总值 + toValue := utils.AnyToFloat64(amount) / rate + toAmount := toValue + //先扣 + title := coinMapInUse[coinId].Name + "兑换" + coinMapInUse[coinIdTo].Name + + _, err = svc.ExchangeUserVirFinValidAndInterFlowWithSession(sess, + utils.StrToFloat64(amount), title, "0", 2, 5, uid, utils.StrToInt(coinId), utils.StrToInt(coinIdTo), -1, "", 0, 0) + + if err != nil { + return e.NewErrCode(e.ERR_BAD_REQUEST) + } + //再给 + err = svc.UpdateUserFinValidAndInterFlowWithSession(sess, + utils.Float64ToStr(toAmount), title, "money_exchange", 0, 35, uid, 0, utils.StrToInt64(coinId), 0) + if err != nil { + return err + + } + return nil +}