From 6c77504406498eda2048fbfaec4968a6e101c261 Mon Sep 17 00:00:00 2001 From: huangjiajun <582604932@qq.com> Date: Sun, 5 Jan 2025 05:25:01 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8D=87=E7=BA=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consume/egg_canal_user_down_consume.go | 17 ++--- consume/egg_canal_user_up_consume.go | 101 +++++++++++++++++++++++++ consume/init.go | 4 +- consume/md/consume_key.go | 10 +++ 4 files changed, 120 insertions(+), 12 deletions(-) create mode 100644 consume/egg_canal_user_up_consume.go diff --git a/consume/egg_canal_user_down_consume.go b/consume/egg_canal_user_down_consume.go index 618754a..3f34762 100644 --- a/consume/egg_canal_user_down_consume.go +++ b/consume/egg_canal_user_down_consume.go @@ -72,21 +72,16 @@ func handleEggCanalUserDownConsume(msgData []byte, ch *rabbit.Channel) error { return nil } sort.Slice(levelAll, func(i, j int) bool { - return levelAll[i].LevelWeight > levelAll[j].LevelWeight + return levelAll[i].LevelWeight <= levelAll[j].LevelWeight }) - isHas := 0 task := make([]map[string]string, 0) for _, v := range levelAll { - if v.Id == user.Level { - isHas = 1 - } - if isHas == 1 { - task, err = rule.UserUpgradeTask(engine, int(user.Id), v.Id) - if err != nil { - continue - } - level = v.Id + task1, err := rule.UserUpgradeTask(engine, int(user.Id), v.Id) + if err != nil { + continue } + level = v.Id + task = task1 } //升级 oldLevel := user.Level diff --git a/consume/egg_canal_user_up_consume.go b/consume/egg_canal_user_up_consume.go new file mode 100644 index 0000000..ce4dc06 --- /dev/null +++ b/consume/egg_canal_user_up_consume.go @@ -0,0 +1,101 @@ +package consume + +import ( + "applet/app/cfg" + "applet/app/db" + "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/EggPlanet/egg_models.git/src/implement" + "code.fnuoos.com/EggPlanet/egg_models.git/src/model" + "code.fnuoos.com/EggPlanet/egg_system_rules.git" + "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "encoding/json" + "errors" + "fmt" + "github.com/streadway/amqp" + "sort" + "time" +) + +func EggCanalUserUpConsume(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>EggCanalUserUpConsume>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(100) + delivery := ch.Consume(queue.Name, false) + + egg_system_rules.Init(cfg.RedisAddr) + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + err = handleEggCanalUserUpConsume(res.Body, ch) + err = res.Ack(true) + fmt.Println("err ::: ", err) + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleEggCanalUserUpConsume(msgData []byte, ch *rabbit.Channel) error { + var msg *md.CommUserId + err := json.Unmarshal(msgData, &msg) + if err != nil { + return err + } + engine := db.Db + NewUserDb := implement.NewUserDb(engine) + user, _ := NewUserDb.GetUser(utils.StrToInt64(msg.Uid)) + if user == nil { + return nil + } + levelDb := implement.NewUserLevelDb(engine) + levelAll, _ := levelDb.UserLevelAllByAsc() + level := 0 + if len(levelAll) == 0 { + return nil + } + + sort.Slice(levelAll, func(i, j int) bool { + return levelAll[i].LevelWeight > levelAll[j].LevelWeight + }) + task := make([]map[string]string, 0) + for _, v := range levelAll { + if level > 0 { + continue + } + task, err = rule.UserUpgradeTask(engine, int(user.Id), v.Id) + if err != nil { + continue + } + level = v.Id + } + //升级 + oldLevel := user.Level + user.Level = level + _, err = engine.Where("id=?", user.Id).Cols("level").Update(user) + if err != nil { + return err + } + var tmp = model.UserUpgradeLevel{ + Uid: int(user.Id), + Level: level, + OldLv: oldLevel, + CreateTime: time.Now(), + Task: utils.SerializeStr(task), + } + engine.Insert(&tmp) + return nil +} diff --git a/consume/init.go b/consume/init.go index b346a23..bf24832 100644 --- a/consume/init.go +++ b/consume/init.go @@ -17,7 +17,9 @@ func Init() { // 增加消费任务队列 func initConsumes() { - jobs[consumeMd.EggCanalUserDownConsume] = EggCanalUserDownConsume //监听 自动降级 + jobs[consumeMd.EggCanalUserUpConsume] = EggCanalUserUpConsume // 自动升级 + + jobs[consumeMd.EggCanalUserDownConsume] = EggCanalUserDownConsume // 自动降级 jobs[consumeMd.EggSlowAutoUpLvConsume] = EggSlowAutoUpLvConsume //缓慢 自动升级 jobs[consumeMd.EggEnergyUserActivityConsume] = EggEnergyUserActivityConsume //监听 自动升级 diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index 869a22b..cc3dbbb 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -293,6 +293,15 @@ var RabbitMqQueueKeyList = []*MqQueue{ BindKey: "", ConsumeFunName: "EggCanalUserDownConsume", }, + { + ExchangeName: "egg.user", + Name: "egg_canal_user_up", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "egg_canal_user_up", + BindKey: "", + ConsumeFunName: "EggCanalUserUpConsume", + }, { ExchangeName: "egg.energy", Name: "egg_energy_team_assistance_back", @@ -354,6 +363,7 @@ const ( EggEnergyUserActivityConsume = "EggEnergyUserActivityConsume" EggCanalUserConsume = "EggCanalUserConsume" EggCanalUserDownConsume = "EggCanalUserDownConsume" + EggCanalUserUpConsume = "EggCanalUserUpConsume" JpushRecordFunName = "JpushRecordConsume" AliyunSmsRecordFunName = "AliyunSmsRecordConsume" EggEnergyStartLevelDividendFunName = "EggEnergyStartLevelDividendConsume"