From 1943da0d83c758ebd2f68c024e06dd4df85496da Mon Sep 17 00:00:00 2001 From: huangjiajun <582604932@qq.com> Date: Fri, 26 Apr 2024 11:21:01 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/db/db_coin_amount_date_total.go | 43 ++++++ app/db/model/coin_amount_date_total.go | 8 ++ app/db/model/coin_amount_user_total.go | 9 ++ ...e_orenge_user_virtual_coin_flow_consume.go | 130 ++++++++++++++++++ .../canal_user_virtual_coin_flow_consume.go | 100 +++++--------- consume/init.go | 1 + consume/md/consume_key.go | 18 ++- 7 files changed, 240 insertions(+), 69 deletions(-) create mode 100644 app/db/db_coin_amount_date_total.go create mode 100644 app/db/model/coin_amount_date_total.go create mode 100644 app/db/model/coin_amount_user_total.go create mode 100644 consume/canal_one_orenge_user_virtual_coin_flow_consume.go diff --git a/app/db/db_coin_amount_date_total.go b/app/db/db_coin_amount_date_total.go new file mode 100644 index 0000000..90ba4a5 --- /dev/null +++ b/app/db/db_coin_amount_date_total.go @@ -0,0 +1,43 @@ +package db + +import ( + "applet/app/db/model" + "applet/app/utils" + "xorm.io/xorm" +) + +func GetCoinAmountDate(sess *xorm.Session, coinId, date string) *model.CoinAmountDateTotal { + var data model.CoinAmountDateTotal + get, _ := sess.Where("coin_id=? and date=?", coinId, date).Get(&data) + if get == false { + data = model.CoinAmountDateTotal{ + Date: utils.StrToInt(date), + Amount: "0", + CoinId: utils.StrToInt(coinId), + } + one, err := sess.InsertOne(&data) + if one == 0 || err != nil { + return nil + } + } + return &data + +} +func GetCoinAmountDateForUse(sess *xorm.Session, coinId, date, uid string) *model.CoinAmountUserTotal { + var data model.CoinAmountUserTotal + get, _ := sess.Where("coin_id=? and date=? and uid=?", coinId, date, uid).Get(&data) + if get == false { + data = model.CoinAmountUserTotal{ + Date: utils.StrToInt(date), + Uid: utils.StrToInt(uid), + Amount: "0", + CoinId: utils.StrToInt(coinId), + } + one, err := sess.InsertOne(&data) + if one == 0 || err != nil { + return nil + } + } + return &data + +} diff --git a/app/db/model/coin_amount_date_total.go b/app/db/model/coin_amount_date_total.go new file mode 100644 index 0000000..e04fd7b --- /dev/null +++ b/app/db/model/coin_amount_date_total.go @@ -0,0 +1,8 @@ +package model + +type CoinAmountDateTotal struct { + Id int `json:"id" xorm:"not null pk autoincr INT(11)"` + Date int `json:"date" xorm:"default 0 INT(11)"` + Amount string `json:"amount" xorm:"default 0.000000 DECIMAL(20,6)"` + CoinId int `json:"coin_id" xorm:"default 0 INT(11)"` +} diff --git a/app/db/model/coin_amount_user_total.go b/app/db/model/coin_amount_user_total.go new file mode 100644 index 0000000..ed07561 --- /dev/null +++ b/app/db/model/coin_amount_user_total.go @@ -0,0 +1,9 @@ +package model + +type CoinAmountUserTotal struct { + Id int `json:"id" xorm:"not null pk autoincr INT(11)"` + Date int `json:"date" xorm:"default 0 INT(11)"` + Amount string `json:"amount" xorm:"default 0.000000 DECIMAL(20,6)"` + CoinId int `json:"coin_id" xorm:"default 0 INT(11)"` + Uid int `json:"uid" xorm:"default 0 INT(11)"` +} diff --git a/consume/canal_one_orenge_user_virtual_coin_flow_consume.go b/consume/canal_one_orenge_user_virtual_coin_flow_consume.go new file mode 100644 index 0000000..9513de2 --- /dev/null +++ b/consume/canal_one_orenge_user_virtual_coin_flow_consume.go @@ -0,0 +1,130 @@ +package consume + +import ( + "applet/app/db" + "applet/app/db/model" + "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "encoding/json" + "errors" + "fmt" + "github.com/shopspring/decimal" + "github.com/streadway/amqp" + "strings" + "time" +) + +func CanalUserVirtualCoinFlowConsume(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>CanalUserVirtualCoinFlowConsume>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1) + delivery := ch.Consume(queue.Name, false) + + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + //fmt.Println(string(res.Body)) + fmt.Println(">>>>>>>>>>>>>>>>CanalUserVirtualCoinFlowConsume<<<<<<<<<<<<<<<<<<<<<<<<<") + err = handleCanalUserVirtualCoinFlow(res.Body) + if err != nil { + fmt.Println("handleCanalUserVirtualCoinFlow_ERR:::::", err.Error()) + } + //_ = res.Reject(false) + err = res.Ack(true) + fmt.Println("err ::: ", err) + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleCanalUserVirtualCoinFlow(msg []byte) error { + //1、解析canal采集至mq中queue的数据结构体 + var canalMsg *md.CanalUserVirtualCoinFlowOrderMessage[md.CanalUserVirtualCoinFlowOrder] + err := json.Unmarshal(msg, &canalMsg) + if err != nil { + return err + } + + masterId := strings.Split(canalMsg.Database, "_")[1] + if masterId != "32053480" { + return nil + } + engine := db.DBs[masterId] + now := time.Now() + + //2、查找 one_circles_green_energy_basic_setting 基础设置 + userPublicPlatoonDoubleNetworkSetting, err := db.UserPublicPlatoonDoubleNetworkSettingGetOneByParams(engine, map[string]interface{}{ + "key": "is_open", + "value": 1, + }) + if err != nil { + return err + } + if userPublicPlatoonDoubleNetworkSetting == nil { + return errors.New("公排双网未开启") + } + if canalMsg.Type == md.CanalMsgInsertSqlType { + if canalMsg.Data[0].CoinId == utils.IntToStr(userPublicPlatoonDoubleNetworkSetting.CoinId) { + //3、查找 user_public_platoon_double_network_user_coin_record + userPublicPlatoonDoubleNetworkUserCoinRecord, err1 := db.UserPublicPlatoonDoubleNetworkUserCoinRecordGetOneByParams(engine, map[string]interface{}{ + "key": "uid", + "value": canalMsg.Data[0].Uid, + }) + if err1 != nil { + return err1 + } + if userPublicPlatoonDoubleNetworkUserCoinRecord == nil { + userProfile, err2 := db.UserProfileFindByIDSess(engine.NewSession(), canalMsg.Data[0].Uid) + if userProfile == nil { + return errors.New("用户不存在") + } + if err2 != nil { + return err2 + } + //新增记录 + _, err3 := db.UserPublicPlatoonDoubleNetworkUserCoinRecordInsert(engine, &model.UserPublicPlatoonDoubleNetworkUserCoinRecord{ + Uid: utils.StrToInt(canalMsg.Data[0].Uid), + LastAmount: canalMsg.Data[0].AfterAmout, + Amount: canalMsg.Data[0].AfterAmout, + RecommendUid: userProfile.ParentUid, + CoinId: utils.StrToInt(canalMsg.Data[0].CoinId), + CreateAt: now.Format("2006-01-02 15:04:05"), + UpdateAt: now.Format("2006-01-02 15:04:05"), + }) + if err3 != nil { + return err3 + } + } else { + //更新记录 + afterAmount, _ := decimal.NewFromString(canalMsg.Data[0].AfterAmout) + amount, _ := decimal.NewFromString(userPublicPlatoonDoubleNetworkUserCoinRecord.Amount) + lastAmount, _ := decimal.NewFromString(userPublicPlatoonDoubleNetworkUserCoinRecord.LastAmount) + if canalMsg.Data[0].Direction == "1" || canalMsg.Data[0].Direction == "2" { + //收入 && 支出 + userPublicPlatoonDoubleNetworkUserCoinRecord.Amount = amount.Add(afterAmount.Sub(lastAmount)).String() + } + userPublicPlatoonDoubleNetworkUserCoinRecord.LastAmount = canalMsg.Data[0].AfterAmout + _, err2 := db.UserPublicPlatoonDoubleNetworkUserCoinRecordUpdate(engine, userPublicPlatoonDoubleNetworkUserCoinRecord.Id, userPublicPlatoonDoubleNetworkUserCoinRecord, "amount", "last_amount") + if err2 != nil { + return err2 + } + } + } + } + + return nil +} diff --git a/consume/canal_user_virtual_coin_flow_consume.go b/consume/canal_user_virtual_coin_flow_consume.go index 9513de2..34801fe 100644 --- a/consume/canal_user_virtual_coin_flow_consume.go +++ b/consume/canal_user_virtual_coin_flow_consume.go @@ -2,7 +2,6 @@ package consume import ( "applet/app/db" - "applet/app/db/model" "applet/app/utils" "applet/app/utils/logx" "applet/consume/md" @@ -10,13 +9,11 @@ import ( "encoding/json" "errors" "fmt" - "github.com/shopspring/decimal" "github.com/streadway/amqp" "strings" - "time" ) -func CanalUserVirtualCoinFlowConsume(queue md.MqQueue) { +func CanalOneOrengeUserVirtualCoinFlowConsume(queue md.MqQueue) { fmt.Println(">>>>>>>>>>>>CanalUserVirtualCoinFlowConsume>>>>>>>>>>>>") ch, err := rabbit.Cfg.Pool.GetChannel() if err != nil { @@ -37,7 +34,7 @@ func CanalUserVirtualCoinFlowConsume(queue md.MqQueue) { if ok == true { //fmt.Println(string(res.Body)) fmt.Println(">>>>>>>>>>>>>>>>CanalUserVirtualCoinFlowConsume<<<<<<<<<<<<<<<<<<<<<<<<<") - err = handleCanalUserVirtualCoinFlow(res.Body) + err = handleCanalOneOrengeUserVirtualCoinFlow(res.Body) if err != nil { fmt.Println("handleCanalUserVirtualCoinFlow_ERR:::::", err.Error()) } @@ -51,7 +48,7 @@ func CanalUserVirtualCoinFlowConsume(queue md.MqQueue) { fmt.Println("get msg done") } -func handleCanalUserVirtualCoinFlow(msg []byte) error { +func handleCanalOneOrengeUserVirtualCoinFlow(msg []byte) error { //1、解析canal采集至mq中queue的数据结构体 var canalMsg *md.CanalUserVirtualCoinFlowOrderMessage[md.CanalUserVirtualCoinFlowOrder] err := json.Unmarshal(msg, &canalMsg) @@ -60,70 +57,43 @@ func handleCanalUserVirtualCoinFlow(msg []byte) error { } masterId := strings.Split(canalMsg.Database, "_")[1] - if masterId != "32053480" { + if masterId != "15763466" { return nil } engine := db.DBs[masterId] - now := time.Now() - - //2、查找 one_circles_green_energy_basic_setting 基础设置 - userPublicPlatoonDoubleNetworkSetting, err := db.UserPublicPlatoonDoubleNetworkSettingGetOneByParams(engine, map[string]interface{}{ - "key": "is_open", - "value": 1, - }) - if err != nil { - return err - } - if userPublicPlatoonDoubleNetworkSetting == nil { - return errors.New("公排双网未开启") - } if canalMsg.Type == md.CanalMsgInsertSqlType { - if canalMsg.Data[0].CoinId == utils.IntToStr(userPublicPlatoonDoubleNetworkSetting.CoinId) { - //3、查找 user_public_platoon_double_network_user_coin_record - userPublicPlatoonDoubleNetworkUserCoinRecord, err1 := db.UserPublicPlatoonDoubleNetworkUserCoinRecordGetOneByParams(engine, map[string]interface{}{ - "key": "uid", - "value": canalMsg.Data[0].Uid, - }) - if err1 != nil { - return err1 - } - if userPublicPlatoonDoubleNetworkUserCoinRecord == nil { - userProfile, err2 := db.UserProfileFindByIDSess(engine.NewSession(), canalMsg.Data[0].Uid) - if userProfile == nil { - return errors.New("用户不存在") - } - if err2 != nil { - return err2 - } - //新增记录 - _, err3 := db.UserPublicPlatoonDoubleNetworkUserCoinRecordInsert(engine, &model.UserPublicPlatoonDoubleNetworkUserCoinRecord{ - Uid: utils.StrToInt(canalMsg.Data[0].Uid), - LastAmount: canalMsg.Data[0].AfterAmout, - Amount: canalMsg.Data[0].AfterAmout, - RecommendUid: userProfile.ParentUid, - CoinId: utils.StrToInt(canalMsg.Data[0].CoinId), - CreateAt: now.Format("2006-01-02 15:04:05"), - UpdateAt: now.Format("2006-01-02 15:04:05"), - }) - if err3 != nil { - return err3 - } - } else { - //更新记录 - afterAmount, _ := decimal.NewFromString(canalMsg.Data[0].AfterAmout) - amount, _ := decimal.NewFromString(userPublicPlatoonDoubleNetworkUserCoinRecord.Amount) - lastAmount, _ := decimal.NewFromString(userPublicPlatoonDoubleNetworkUserCoinRecord.LastAmount) - if canalMsg.Data[0].Direction == "1" || canalMsg.Data[0].Direction == "2" { - //收入 && 支出 - userPublicPlatoonDoubleNetworkUserCoinRecord.Amount = amount.Add(afterAmount.Sub(lastAmount)).String() - } - userPublicPlatoonDoubleNetworkUserCoinRecord.LastAmount = canalMsg.Data[0].AfterAmout - _, err2 := db.UserPublicPlatoonDoubleNetworkUserCoinRecordUpdate(engine, userPublicPlatoonDoubleNetworkUserCoinRecord.Id, userPublicPlatoonDoubleNetworkUserCoinRecord, "amount", "last_amount") - if err2 != nil { - return err2 - } - } + if canalMsg.Data[0].TransferType == "9" || canalMsg.Data[0].Direction != "1" || utils.StrToFloat64(canalMsg.Data[0].Amout) <= 0 { + //转赠不记录 + return nil + } + sess := engine.NewSession() + defer sess.Close() + sess.Begin() + date := utils.TimeParseStd(canalMsg.Data[0].CreateTime).Format("20060102") + amountDate := db.GetCoinAmountDate(sess, canalMsg.Data[0].CoinId, date) + if amountDate == nil { + sess.Rollback() + return errors.New("失败") + } + amountDate.Amount = utils.Float64ToStrByPrec(utils.StrToFloat64(canalMsg.Data[0].Amout)+utils.StrToFloat64(amountDate.Amount), 8) + update, err := sess.Where("id=?", amountDate.Id).Cols("amount").Update(amountDate) + if update == 0 || err != nil { + sess.Rollback() + return errors.New("失败") + } + + amountUser := db.GetCoinAmountDateForUse(sess, canalMsg.Data[0].CoinId, date, canalMsg.Data[0].Uid) + if amountUser == nil { + sess.Rollback() + return errors.New("失败") + } + amountUser.Amount = utils.Float64ToStrByPrec(utils.StrToFloat64(canalMsg.Data[0].Amout)+utils.StrToFloat64(amountUser.Amount), 8) + update, err = sess.Where("id=?", amountDate.Id).Cols("amount").Update(amountUser) + if update == 0 || err != nil { + sess.Rollback() + return errors.New("失败") } + sess.Commit() } return nil diff --git a/consume/init.go b/consume/init.go index 3bab926..55fbf99 100644 --- a/consume/init.go +++ b/consume/init.go @@ -92,6 +92,7 @@ func initConsumes() { jobs[consumeMd.CancalUserRelateConsumeFunName] = CancalUserRelateConsume //推荐人数 jobs[consumeMd.CancalUserIntegralExchangeConsumeFunName] = CancalUserIntegralExchange //兑换 jobs[consumeMd.ZhiosTaskRewardConsumeFunName] = ZhiosTaskRewardExchange //兑换 + jobs[consumeMd.CanalOneOrengeUserVirtualCcoinFlowFunName] = CanalOneOrengeUserVirtualCoinFlowConsume } diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index 39fd511..25c981e 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -398,6 +398,15 @@ var RabbitMqQueueKeyList = []*MqQueue{ BindKey: "", ConsumeFunName: "CanalUserVirtualCoinFlowConsume", }, + { + ExchangeName: "canal.topic", + Name: "canal_user_virtual_coin_flow_15763466", + Type: TopicQueueType, + IsPersistent: false, + RoutKey: "canal_user_virtual_coin_flow_15763466", + BindKey: "", + ConsumeFunName: "CanalOneOrengeUserVirtualCoinFlowConsume", + }, { ExchangeName: "one.circles", Name: "one_circles_sign_in_green_energy", @@ -522,8 +531,9 @@ const ( OneCirclesSignInCopyGreenEnergyFunName = "OneCirclesSignInCopyGreenEnergyConsume" WithdrawConsumeFunName = "WithdrawConsume" - CancalUserMoneyConsumeFunName = "CancalUserMoneyConsume" - CancalUserRelateConsumeFunName = "CancalUserRelateConsume" - CancalUserIntegralExchangeConsumeFunName = "CancalUserIntegralExchange" - ZhiosTaskRewardConsumeFunName = "ZhiosTaskRewardExchange" + CancalUserMoneyConsumeFunName = "CancalUserMoneyConsume" + CancalUserRelateConsumeFunName = "CancalUserRelateConsume" + CancalUserIntegralExchangeConsumeFunName = "CancalUserIntegralExchange" + ZhiosTaskRewardConsumeFunName = "ZhiosTaskRewardExchange" + CanalOneOrengeUserVirtualCcoinFlowFunName = "CanalOneOrengeUserVirtualCoinFlowConsume" )