diff --git a/consume/canal_user_relate_consume.go b/consume/canal_user_relate_consume.go new file mode 100644 index 0000000..5d2d69f --- /dev/null +++ b/consume/canal_user_relate_consume.go @@ -0,0 +1,92 @@ +package consume + +import ( + "applet/app/db" + "applet/app/db/model" + "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "encoding/json" + "errors" + "fmt" + "github.com/streadway/amqp" + "strings" + "time" +) + +func CancalUserRelateConsume(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>CancalUserRelateConsume>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1) + delivery := ch.Consume(queue.Name, false) + + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + //fmt.Println(string(res.Body)) + fmt.Println(">>>>>>>>>>>>>>>>CancalUserRelateConsume<<<<<<<<<<<<<<<<<<<<<<<<<") + err = handleCancalUserRelateConsume(res.Body) + if err != nil { + fmt.Println("CancalUserRelateConsume_ERR:::::", err.Error()) + } + //_ = res.Reject(false) + err = res.Ack(true) + fmt.Println("err ::: ", err) + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleCancalUserRelateConsume(msg []byte) error { + //1、解析canal采集至mq中queue的数据结构体 + var canalMsg *md.CanalUserRelateMessage[md.CanalUserRelate] + err := json.Unmarshal(msg, &canalMsg) + if err != nil { + return err + } + + masterId := strings.Split(canalMsg.Database, "_")[1] + if masterId != "15763466" { + return nil + } + engine := db.DBs["123456"] + // + + if canalMsg.Type == md.CanalMsgInsertSqlType { + var data1 model.UserExtendTotal + date := time.Unix(utils.TimeStdParseUnix(canalMsg.Data[0].InviteTime), 0).Format("20060102") + month := time.Unix(utils.TimeStdParseUnix(canalMsg.Data[0].InviteTime), 0).Format("200601") + engine.Where("uid=? and date=?", canalMsg.Data[0].ParentUid, date).Get(&data1) + if data1.Id == 0 { + data1 = model.UserExtendTotal{ + Uid: utils.StrToInt(canalMsg.Data[0].ParentUid), + Date: utils.StrToInt(date), + Month: utils.StrToInt(month), + Count: 0, + TeamCount: 0, + } + engine.Insert(&data1) + } + if utils.StrToInt(canalMsg.Data[0].Level) == 1 { + data1.Count++ + } else { + data1.TeamCount++ + } + engine.Where("id=?", data1).AllCols().Update(&data1) + } + + return nil +} diff --git a/consume/init.go b/consume/init.go index 89c251c..ff0f7be 100644 --- a/consume/init.go +++ b/consume/init.go @@ -89,6 +89,7 @@ func initConsumes() { //一个橘子 jobs[consumeMd.CancalUserMoneyConsumeFunName] = CancalUserMoneyConsume //余额 + jobs[consumeMd.CancalUserRelateConsumeFunName] = CancalUserRelateConsume //推荐人数 jobs[consumeMd.CancalUserIntegralExchangeConsumeFunName] = CancalUserIntegralExchange //兑换 } diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index 7e51032..2cd9f09 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -452,6 +452,15 @@ var RabbitMqQueueKeyList = []*MqQueue{ BindKey: "", ConsumeFunName: "CancalUserMoneyConsume", }, + { + ExchangeName: "canal.topic", // + Name: "canal_user_relate", + Type: TopicQueueType, + IsPersistent: false, + RoutKey: "canal_user_relate", + BindKey: "", + ConsumeFunName: "CancalUserRelateConsume", + }, } const ( @@ -505,5 +514,6 @@ const ( WithdrawConsumeFunName = "WithdrawConsume" CancalUserMoneyConsumeFunName = "CancalUserMoneyConsume" + CancalUserRelateConsumeFunName = "CancalUserRelateConsume" CancalUserIntegralExchangeConsumeFunName = "CancalUserIntegralExchange" ) diff --git a/consume/md/md_canal_user_relate.go b/consume/md/md_canal_user_relate.go new file mode 100644 index 0000000..701e842 --- /dev/null +++ b/consume/md/md_canal_user_relate.go @@ -0,0 +1,23 @@ +package md + +type CanalUserRelate struct { + Id string `json:"id" xorm:"pk autoincr comment('主键') BIGINT(10)"` + ParentUid string `json:"parent_uid" xorm:"not null default 0 comment('上级会员ID') unique(idx_union_u_p_id) INT(20)"` + Uid string `json:"uid" xorm:"not null default 0 comment('关联UserID') unique(idx_union_u_p_id) INT(20)"` + Level string `json:"level" xorm:"not null default 1 comment('推广等级(1直属,大于1非直属)') INT(10)"` + InviteTime string `json:"invite_time" xorm:"not null default CURRENT_TIMESTAMP comment('邀请时间') TIMESTAMP"` +} + +// +type CanalUserRelateMessage[T any] struct { + Data []T `json:"data"` + Database string `json:"database"` + ES int64 `json:"es"` + ID int64 `json:"id"` + IsDdl bool `json:"isDdl"` + Old []T `json:"old"` + PkNames []string `json:"pkNames"` + Table string `json:"table"` + TS int64 `json:"ts"` + Type string `json:"type"` +}