From 7dcd20a4cfb3613dc69af52a43d39ecad3d21242 Mon Sep 17 00:00:00 2001 From: huangjiajun <582604932@qq.com> Date: Sat, 14 Dec 2024 14:27:37 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8D=87=E7=BA=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consume/egg_canal_user_consume.go | 79 ++++++++++++++++++ consume/egg_energy_user_activity_consume.go | 83 +++++++++++++++++++ consume/egg_slow_auto_up_lv_consume.go | 60 ++++++++++++++ consume/init.go | 3 + consume/md/consume_key.go | 32 ++++++- .../md/md_egg_canal_user_activity_consume.go | 19 +++++ consume/md/md_egg_canal_user_consume.go | 20 +++++ go.mod | 4 +- 8 files changed, 296 insertions(+), 4 deletions(-) create mode 100644 consume/egg_canal_user_consume.go create mode 100644 consume/egg_energy_user_activity_consume.go create mode 100644 consume/egg_slow_auto_up_lv_consume.go create mode 100644 consume/md/md_egg_canal_user_activity_consume.go create mode 100644 consume/md/md_egg_canal_user_consume.go diff --git a/consume/egg_canal_user_consume.go b/consume/egg_canal_user_consume.go new file mode 100644 index 0000000..da88db6 --- /dev/null +++ b/consume/egg_canal_user_consume.go @@ -0,0 +1,79 @@ +package consume + +import ( + "applet/app/cfg" + "applet/app/db" + "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + md2 "applet/es/md" + "code.fnuoos.com/EggPlanet/egg_system_rules.git" + "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "encoding/json" + "errors" + "fmt" + "github.com/streadway/amqp" + "time" +) + +func EggCanalUserConsume(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>EggCanalUserConsume>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1) + delivery := ch.Consume(queue.Name, false) + + egg_system_rules.Init(cfg.RedisAddr) + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + err = handleEggCanalUserConsume(res.Body, ch) + err = res.Ack(true) + fmt.Println("err ::: ", err) + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleEggCanalUserConsume(msg []byte, ch *rabbit.Channel) error { + time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒 + var canalMsg *md.CanalUserMessage[md.CanalUser] + err := json.Unmarshal(msg, &canalMsg) + if err != nil { + fmt.Println("EggCanalInviteUserNumsConsumeFaliedUnMarshal_ERR:::::", err.Error()) + return nil + } + if canalMsg.Type == md2.CanalMsgInsertSqlType || canalMsg.Type == md2.CanalMsgUpdateSqlType { + oldUser := make(map[string]md.CanalUser) + for _, item := range canalMsg.Old { + oldUser[item.Id] = item + } + for _, item := range canalMsg.Data { + if utils.StrToInt(item.ParentUid) == 0 { + continue + } + count := rule.ExtendUserCount(db.Db, utils.StrToInt(item.ParentUid)) + if count > 1000 { + msg1 := md.CommUserId{ + Uid: item.ParentUid, + } + ch.Publish("egg.app", msg1, "egg_slow_auto_up_lv") + continue + } + rule.UserUpgradeInsert(db.Db, utils.StrToInt(item.ParentUid)) + } + } + return nil +} diff --git a/consume/egg_energy_user_activity_consume.go b/consume/egg_energy_user_activity_consume.go new file mode 100644 index 0000000..2eb4439 --- /dev/null +++ b/consume/egg_energy_user_activity_consume.go @@ -0,0 +1,83 @@ +package consume + +import ( + "applet/app/cfg" + "applet/app/db" + "applet/app/utils" + md2 "applet/es/md" + "code.fnuoos.com/EggPlanet/egg_models.git/src/implement" + "code.fnuoos.com/EggPlanet/egg_system_rules.git" + "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule" + + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "encoding/json" + "errors" + "fmt" + "github.com/streadway/amqp" + "time" +) + +func EggEnergyUserActivityConsume(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>EggEnergyUserActivityConsume>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1) + delivery := ch.Consume(queue.Name, false) + + egg_system_rules.Init(cfg.RedisAddr) + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + err = handleEggEnergyUserActivityConsume(res.Body, ch) + err = res.Ack(true) + fmt.Println("err ::: ", err) + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleEggEnergyUserActivityConsume(msg []byte, ch *rabbit.Channel) error { + time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒 + var canalMsg *md.CanalEggEnergyUserActivityMessage[md.CanalEggEnergyUserActivity] + err := json.Unmarshal(msg, &canalMsg) + if err != nil { + fmt.Println("EggCanalInviteUserNumsConsumeFaliedUnMarshal_ERR:::::", err.Error()) + return nil + } + if canalMsg.Type == md2.CanalMsgInsertSqlType { + for _, item := range canalMsg.Data { + userDb := implement.NewUserDb(db.Db) + user, _ := userDb.GetUser(utils.StrToInt64(item.Uid)) + if user == nil { + continue + } + if user.ParentUid == 0 { + continue + } + count := rule.ExtendUserCount(db.Db, int(user.ParentUid)) + if count > 1000 { + msg1 := md.CommUserId{ + Uid: utils.Int64ToStr(user.ParentUid), + } + ch.Publish("egg.app", msg1, "egg_slow_auto_up_lv") + continue + } + rule.UserUpgradeInsert(db.Db, int(user.ParentUid)) + } + } + + return nil +} diff --git a/consume/egg_slow_auto_up_lv_consume.go b/consume/egg_slow_auto_up_lv_consume.go new file mode 100644 index 0000000..df8ed48 --- /dev/null +++ b/consume/egg_slow_auto_up_lv_consume.go @@ -0,0 +1,60 @@ +package consume + +import ( + "applet/app/cfg" + "applet/app/db" + "applet/app/utils" + "code.fnuoos.com/EggPlanet/egg_system_rules.git" + "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule" + + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "encoding/json" + "errors" + "fmt" + "github.com/streadway/amqp" + "time" +) + +func EggSlowAutoUpLvConsume(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>EggSlowAutoUpLvConsume>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1) + delivery := ch.Consume(queue.Name, false) + + egg_system_rules.Init(cfg.RedisAddr) + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + err = handleEggSlowAutoUpLvConsume(res.Body) + err = res.Ack(true) + fmt.Println("err ::: ", err) + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleEggSlowAutoUpLvConsume(msgData []byte) error { + time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒 + // 1.解析mq中queue的数据结构体 + var msg *md.CommUserId + err := json.Unmarshal(msgData, &msg) + if err != nil { + return err + } + rule.UserUpgradeInsert(db.Db, utils.StrToInt(msg.Uid)) + return nil +} diff --git a/consume/init.go b/consume/init.go index 1203907..5eecd05 100644 --- a/consume/init.go +++ b/consume/init.go @@ -17,6 +17,9 @@ func Init() { // 增加消费任务队列 func initConsumes() { + jobs[consumeMd.EggSlowAutoUpLvConsume] = EggSlowAutoUpLvConsume //缓慢 自动升级 + jobs[consumeMd.EggEnergyUserActivityConsume] = EggEnergyUserActivityConsume //监听 自动升级 + jobs[consumeMd.EggCanalUserConsume] = EggCanalUserConsume //监听 自动升级 jobs[consumeMd.AliyunSmsRecordFunName] = AliyunSmsRecordConsume //阿里云短信 jobs[consumeMd.JpushRecordFunName] = JpushRecordConsume //极光推送 diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index 5a8e402..6b12f6c 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -245,11 +245,39 @@ var RabbitMqQueueKeyList = []*MqQueue{ BindKey: "", ConsumeFunName: "EggEnergyAutoScoreConsume", }, + { + ExchangeName: "egg.app", + Name: "egg_slow_auto_up_lv", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "egg_slow_auto_up_lv", + BindKey: "", + ConsumeFunName: "EggSlowAutoUpLvConsume", + }, + { + ExchangeName: "egg.canal.topic", + Name: "egg_energy_user_activity", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "egg_energy_user_activity", + BindKey: "", + ConsumeFunName: "EggEnergyUserActivityConsume", + }, + { + ExchangeName: "egg.canal.topic", + Name: "egg_canal_user", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "egg_canal_user", + BindKey: "", + ConsumeFunName: "EggCanalUserConsume", + }, } const ( - AdvertisingSignConsume = "AdvertisingSignConsume" - AdvertisingSmashConsume = "AdvertisingSmashConsume" + EggSlowAutoUpLvConsume = "EggSlowAutoUpLvConsume" + EggEnergyUserActivityConsume = "EggEnergyUserActivityConsume" + EggCanalUserConsume = "EggCanalUserConsume" JpushRecordFunName = "JpushRecordConsume" AliyunSmsRecordFunName = "AliyunSmsRecordConsume" EggEnergyStartLevelDividendFunName = "EggEnergyStartLevelDividendConsume" diff --git a/consume/md/md_egg_canal_user_activity_consume.go b/consume/md/md_egg_canal_user_activity_consume.go new file mode 100644 index 0000000..8b8e249 --- /dev/null +++ b/consume/md/md_egg_canal_user_activity_consume.go @@ -0,0 +1,19 @@ +package md + +type CanalEggEnergyUserActivity struct { + Id string `json:"id"` + Uid string `json:"uid"` +} + +type CanalEggEnergyUserActivityMessage[T any] struct { + Data []T `json:"data"` + Database string `json:"database"` + ES int64 `json:"es"` + ID int64 `json:"id"` + IsDdl bool `json:"isDdl"` + Old []T `json:"old"` + PkNames []string `json:"pkNames"` + Table string `json:"table"` + TS int64 `json:"ts"` + Type string `json:"type"` +} diff --git a/consume/md/md_egg_canal_user_consume.go b/consume/md/md_egg_canal_user_consume.go new file mode 100644 index 0000000..7d0e974 --- /dev/null +++ b/consume/md/md_egg_canal_user_consume.go @@ -0,0 +1,20 @@ +package md + +type CanalUser struct { + Id string `json:"id"` + Level string `json:"level"` + ParentUid string `json:"parent_uid"` +} + +type CanalUserMessage[T any] struct { + Data []T `json:"data"` + Database string `json:"database"` + ES int64 `json:"es"` + ID int64 `json:"id"` + IsDdl bool `json:"isDdl"` + Old []T `json:"old"` + PkNames []string `json:"pkNames"` + Table string `json:"table"` + TS int64 `json:"ts"` + Type string `json:"type"` +} diff --git a/go.mod b/go.mod index 025080a..84f2b8a 100644 --- a/go.mod +++ b/go.mod @@ -7,8 +7,8 @@ go 1.19 //replace code.fnuoos.com/EggPlanet/egg_system_rules.git => E:/company/Egg/egg_system_rules require ( - code.fnuoos.com/EggPlanet/egg_models.git v0.2.1-0.20241212120727-3681308aeb14 - code.fnuoos.com/EggPlanet/egg_system_rules.git v0.0.4-0.20241213073654-f37e71ad92ef + code.fnuoos.com/EggPlanet/egg_models.git v0.2.1-0.20241214062221-cde2ce240fa8 + code.fnuoos.com/EggPlanet/egg_system_rules.git v0.0.4-0.20241214062309-291e3233ae28 code.fnuoos.com/go_rely_warehouse/zyos_go_es.git v1.0.1-0.20241118083738-0f22da9ba0be code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git v0.0.5 github.com/boombuler/barcode v1.0.1