From b9285d612e0586ae3e0e013e15cf91e7c8c2df23 Mon Sep 17 00:00:00 2001 From: huangjiajun <582604932@qq.com> Date: Wed, 10 Apr 2024 14:01:32 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=80=E4=B8=AA=E6=A9=98=E5=AD=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/db/db_money_reward.go | 27 ++++++++++ app/db/model/money_reward.go | 14 ++++++ app/db/model/user_extend_total.go | 10 ++++ consume/canal_user_money_consume.go | 77 +++++++++++++++++++++++++++++ consume/init.go | 3 ++ consume/md/consume_key.go | 11 +++++ consume/md/md_canal_user_money.go | 38 ++++++++++++++ 7 files changed, 180 insertions(+) create mode 100644 app/db/db_money_reward.go create mode 100644 app/db/model/money_reward.go create mode 100644 app/db/model/user_extend_total.go create mode 100644 consume/canal_user_money_consume.go create mode 100644 consume/md/md_canal_user_money.go diff --git a/app/db/db_money_reward.go b/app/db/db_money_reward.go new file mode 100644 index 0000000..1008198 --- /dev/null +++ b/app/db/db_money_reward.go @@ -0,0 +1,27 @@ +package db + +import ( + "applet/app/db/model" + "applet/app/utils" + "time" + "xorm.io/xorm" +) + +func GetMoneyReward(eg *xorm.Engine, uid, date, month string) *model.MoneyReward { + var data model.MoneyReward + get, err := eg.Where("uid=? and date=?", uid, date).Get(&data) + if err != nil { + return nil + } + if get == false { + data = model.MoneyReward{ + Uid: utils.StrToInt(uid), + Date: utils.StrToInt(date), + Time: time.Now(), + Month: utils.StrToInt(month), + Amount: "", + } + eg.Insert(&data) + } + return &data +} diff --git a/app/db/model/money_reward.go b/app/db/model/money_reward.go new file mode 100644 index 0000000..e9414f4 --- /dev/null +++ b/app/db/model/money_reward.go @@ -0,0 +1,14 @@ +package model + +import ( + "time" +) + +type MoneyReward struct { + Id int `json:"id" xorm:"not null pk autoincr INT(11)"` + Uid int `json:"uid" xorm:"INT(11)"` + Date int `json:"date" xorm:"default 0 INT(11)"` + Time time.Time `json:"time" xorm:"DATETIME"` + Month int `json:"month" xorm:"default 0 INT(11)"` + Amount string `json:"amount" xorm:"default 0.00 DECIMAL(20,2)"` +} diff --git a/app/db/model/user_extend_total.go b/app/db/model/user_extend_total.go new file mode 100644 index 0000000..cb15913 --- /dev/null +++ b/app/db/model/user_extend_total.go @@ -0,0 +1,10 @@ +package model + +type UserExtendTotal struct { + Id int `json:"id" xorm:"not null pk autoincr INT(11)"` + Uid int `json:"uid" xorm:"default 0 INT(11)"` + Date int `json:"date" xorm:"default 0 INT(11)"` + Month int `json:"month" xorm:"default 0 INT(11)"` + Count int `json:"count" xorm:"default 0 comment('直推的') INT(11)"` + TeamCount int `json:"team_count" xorm:"default 0 comment('团队的') INT(11)"` +} diff --git a/consume/canal_user_money_consume.go b/consume/canal_user_money_consume.go new file mode 100644 index 0000000..f0175a7 --- /dev/null +++ b/consume/canal_user_money_consume.go @@ -0,0 +1,77 @@ +package consume + +import ( + "applet/app/db" + "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "encoding/json" + "errors" + "fmt" + "github.com/streadway/amqp" + "strings" +) + +func CancalUserMoneyConsume(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>handleCancalUserMoneyConsume>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1) + delivery := ch.Consume(queue.Name, false) + + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + //fmt.Println(string(res.Body)) + fmt.Println(">>>>>>>>>>>>>>>>handleCancalUserMoneyConsume<<<<<<<<<<<<<<<<<<<<<<<<<") + err = handleCancalUserMoneyConsume(res.Body) + if err != nil { + fmt.Println("handleCancalUserMoneyConsume_ERR:::::", err.Error()) + } + //_ = res.Reject(false) + err = res.Ack(true) + fmt.Println("err ::: ", err) + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleCancalUserMoneyConsume(msg []byte) error { + //1、解析canal采集至mq中queue的数据结构体 + var canalMsg *md.CanalUserMoneyMessage[md.CanalUserMoney] + err := json.Unmarshal(msg, &canalMsg) + if err != nil { + return err + } + + masterId := strings.Split(canalMsg.Database, "_")[1] + if masterId != "15763466" { + return nil + } + engine := db.DBs[masterId] + + if canalMsg.Type == md.CanalMsgInsertSqlType { + if utils.StrToFloat64(canalMsg.Data[0].Amount) > 0 && canalMsg.Data[0].Type == "0" { + ex := strings.Split(canalMsg.Data[0].CreateAt, " ") + date := strings.ReplaceAll(ex[0], "-", "") + monthEx := strings.Split(ex[0], "-") + reward := db.GetMoneyReward(engine, canalMsg.Data[0].Uid, date, monthEx[0]+monthEx[1]) + reward.Amount = utils.Float64ToStrByPrec(utils.StrToFloat64(reward.Amount)+utils.StrToFloat64(canalMsg.Data[0].Amount), 6) + engine.Where("id=?", reward.Id).Update(reward) + } + } + + return nil +} diff --git a/consume/init.go b/consume/init.go index 212db74..dfe7d40 100644 --- a/consume/init.go +++ b/consume/init.go @@ -86,6 +86,9 @@ func initConsumes() { //jobs[consumeMd.WithdrawConsumeFunName] = WithdrawConsume //jobs[consumeMd.ZhiosMallGreenCoinConsumeFunName] = ZhiosMallGreenCoinConsume //绿色双链积分 + + //一个橘子 + jobs[consumeMd.CancalUserMoneyConsumeFunName] = CancalUserMoneyConsume //余额 } func Run() { diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index e44e3e9..299ce10 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -434,6 +434,15 @@ var RabbitMqQueueKeyList = []*MqQueue{ BindKey: "", ConsumeFunName: "WithdrawConsume", }, + { + ExchangeName: "canal.topic", + Name: "canal_fin_user_flow", + Type: TopicQueueType, + IsPersistent: false, + RoutKey: "canal_fin_user_flow", + BindKey: "", + ConsumeFunName: "CancalUserMoneyConsume", + }, } const ( @@ -485,4 +494,6 @@ const ( OneCirclesStartLevelDividendFunName = "OneCirclesStartLevelDividendConsume" OneCirclesSignInCopyGreenEnergyFunName = "OneCirclesSignInCopyGreenEnergyConsume" WithdrawConsumeFunName = "WithdrawConsume" + + CancalUserMoneyConsumeFunName = "CancalUserMoneyConsume" ) diff --git a/consume/md/md_canal_user_money.go b/consume/md/md_canal_user_money.go new file mode 100644 index 0000000..a1de585 --- /dev/null +++ b/consume/md/md_canal_user_money.go @@ -0,0 +1,38 @@ +package md + +type CanalUserMoney struct { + Id string `json:"id" xorm:"pk autoincr comment('流水编号') BIGINT(20)"` + Uid string `json:"uid" xorm:"not null default 0 comment('用户id') INT(11)"` + Type string `json:"type" xorm:"not null default 0 comment('0收入,1支出') TINYINT(1)"` + Amount string `json:"amount" xorm:"not null default 0.0000 comment('变动金额') DECIMAL(11,4)"` + BeforeAmount string `json:"before_amount" xorm:"not null default 0.0000 comment('变动前金额') DECIMAL(11,4)"` + AfterAmount string `json:"after_amount" xorm:"not null default 0.0000 comment('变动后金额') DECIMAL(11,4)"` + SysFee string `json:"sys_fee" xorm:"not null default 0.0000 comment('手续费') DECIMAL(11,4)"` + PaymentType string `json:"payment_type" xorm:"not null default 1 comment('1支付宝,2微信.3手动转账') TINYINT(1)"` + OrdType string `json:"ord_type" xorm:"not null default '' comment('订单类型taobao,jd,pdd,vip,suning,kaola,own自营,withdraw提现') VARCHAR(20)"` + OrdId string `json:"ord_id" xorm:"not null default '' comment('对应订单编号') VARCHAR(50)"` + OrdTitle string `json:"ord_title" xorm:"not null default '' comment('订单标题') VARCHAR(50)"` + OrdAction string `json:"ord_action" xorm:"not null default 0 comment('10自购,11推广,12团队,20提现,21消费') TINYINT(2)"` + OrdTime string `json:"ord_time" xorm:"not null default 0 comment('下单时间or提现时间') INT(11)"` + OrdDetail string `json:"ord_detail" xorm:"not null default '' comment('记录商品ID或提现账号') VARCHAR(50)"` + ExpectedTime string `json:"expected_time" xorm:"not null default '0' comment('预期到账时间,字符串用于直接显示,结算后清除内容') VARCHAR(30)"` + State string `json:"state" xorm:"not null default 1 comment('1未到账,2已到账') TINYINT(1)"` + Memo string `json:"memo" xorm:"not null default '' comment('备注') VARCHAR(2000)"` + OtherId string `json:"other_id" xorm:"not null default 0 comment('其他关联订单,具体根据订单类型判断') BIGINT(20)"` + AliOrdId string `json:"ali_ord_id" xorm:"default '' comment('支付宝订单号') VARCHAR(128)"` + CreateAt string `json:"create_at" xorm:"created not null default CURRENT_TIMESTAMP comment('创建时间') TIMESTAMP"` + UpdateAt string `json:"update_at" xorm:"updated not null default CURRENT_TIMESTAMP comment('更新时间') TIMESTAMP"` +} + +type CanalUserMoneyMessage[T any] struct { + Data []T `json:"data"` + Database string `json:"database"` + ES int64 `json:"es"` + ID int64 `json:"id"` + IsDdl bool `json:"isDdl"` + Old []T `json:"old"` + PkNames []string `json:"pkNames"` + Table string `json:"table"` + TS int64 `json:"ts"` + Type string `json:"type"` +}