From 14ee4b84e4cc155896a47395481ad3f4e99453d4 Mon Sep 17 00:00:00 2001 From: huangjiajun <582604932@qq.com> Date: Wed, 3 Jan 2024 18:10:53 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...canal_guide_order_by_user_up_lv_consume.go | 89 +++++++++++++++++++ consume/init.go | 2 + consume/md/consume_key.go | 20 +++++ consume/zhios_user_up_lv.go | 83 +++++++++++++++++ 4 files changed, 194 insertions(+) create mode 100644 consume/canal_guide_order_by_user_up_lv_consume.go create mode 100644 consume/zhios_user_up_lv.go diff --git a/consume/canal_guide_order_by_user_up_lv_consume.go b/consume/canal_guide_order_by_user_up_lv_consume.go new file mode 100644 index 0000000..9ec0a6b --- /dev/null +++ b/consume/canal_guide_order_by_user_up_lv_consume.go @@ -0,0 +1,89 @@ +package consume + +import ( + "applet/app/db" + "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "encoding/json" + "errors" + "fmt" + "github.com/cc14514/go-geoip2" + geoip2db "github.com/cc14514/go-geoip2-db" + "github.com/streadway/amqp" + "strings" +) + +func CanalGuideOrderByUserUpLvConsume(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1000) + delivery := ch.Consume(queue.Name, true) //设置自动应答 + + geoIp2db, _ := geoip2db.NewGeoipDbByStatik() + defer geoIp2db.Close() + + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + //fmt.Println(string(res.Body)) + fmt.Println(">>>>>>>>>>>>>>>>>>CanalGuideOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<") + err = handleCanalGuideOrderByUserUpLvConsume(res.Body, geoIp2db) + //_ = res.Reject(false) + //_ = res.Ack(true) + _ = res.Ack(true) + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleCanalGuideOrderByUserUpLvConsume(msg []byte, geoIp2db *geoip2.DBReader) error { + //1、解析canal采集至mq中queue的数据结构体 + var canalMsg *md.CanalOrderMessage[md.CanalGuideOrder] + err := json.Unmarshal(msg, &canalMsg) + if err != nil { + return err + } + + masterId := strings.Split(canalMsg.Database, "_")[1] + uids := make([]string, 0) + for _, data := range canalMsg.Data { + uids = append(uids, data.Uid) + } + eg := db.DBs[masterId] + if eg == nil { + return nil + } + levelList, _ := db.UserLevlEgAll(eg) + isAuto := 0 + for _, v := range levelList { + if v.AutoUpdate == 1 { + isAuto = 1 + } + } + if isAuto == 0 { + return nil + } + for _, v := range uids { + FindUser(eg, v, masterId, levelList) + oneUser, _ := db.UserProfileFindByID(eg, v) + if oneUser == nil || (oneUser != nil && oneUser.ParentUid == 0) { + continue + } + FindUser(eg, utils.IntToStr(oneUser.ParentUid), masterId, levelList) + } + return nil +} diff --git a/consume/init.go b/consume/init.go index b1a54ad..b6905d6 100644 --- a/consume/init.go +++ b/consume/init.go @@ -17,6 +17,8 @@ func Init() { // 增加消费任务队列 func initConsumes() { + jobs[consumeMd.ZhiosUserUpLvFunName] = ZhiosUserUpLv + jobs[consumeMd.CanalGuideOrderByUserUpLvConsume] = CanalGuideOrderByUserUpLvConsume jobs[consumeMd.ZhiosOrderFreeFunName] = ZhiosOrderFree jobs[consumeMd.ZhiosOrderTotalFunName] = ZhiosOrderTotal jobs[consumeMd.ZhiosOrderTotalSecondFunName] = ZhiosOrderTotalSecond diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index b598bec..c013194 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -47,6 +47,15 @@ var RabbitMqQueueKeyList = []*MqQueue{ BindKey: "", ConsumeFunName: "CanalGuideOrderConsume", }, + { + ExchangeName: "canal.topic", + Name: "canal_guide_order_by_user_up_lv", + Type: TopicQueueType, + IsPersistent: false, + RoutKey: "canal_order_list", + BindKey: "", + ConsumeFunName: "CanalGuideOrderByUserUpLvConsume", + }, { ExchangeName: "zhios.app.user.visit.ip.address.exchange", Name: "zhios_user_visit_ip_address_queue", @@ -335,6 +344,15 @@ var RabbitMqQueueKeyList = []*MqQueue{ BindKey: "", ConsumeFunName: "ZhiosOrderFree", }, + { + ExchangeName: "zhios.user_up_lv.exchange", + Name: "zhios_user_up_lv", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "user_up_lv", + BindKey: "", + ConsumeFunName: "ZhiosUserUpLv", + }, //{ // ExchangeName: "zhios.order_buckle.exchange", // Name: "zhios_order_buckle_dev", @@ -347,6 +365,8 @@ var RabbitMqQueueKeyList = []*MqQueue{ } const ( + ZhiosUserUpLvFunName = "ZhiosUserUpLv" + CanalGuideOrderByUserUpLvConsume = "CanalGuideOrderByUserUpLvConsume" ZhiosOrderFreeFunName = "ZhiosOrderFree" ZhiosOrderSettleTotalFunName = "ZhiosOrderSettleTotal" ZhiosOrderTotalFunName = "ZhiosOrderTotal" diff --git a/consume/zhios_user_up_lv.go b/consume/zhios_user_up_lv.go new file mode 100644 index 0000000..12516eb --- /dev/null +++ b/consume/zhios_user_up_lv.go @@ -0,0 +1,83 @@ +package consume + +import ( + "applet/app/db" + "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "encoding/json" + "errors" + "fmt" + "github.com/streadway/amqp" +) + +//自动升级 +func ZhiosUserUpLv(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1000) + delivery := ch.Consume(queue.Name, false) + + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + //fmt.Println(string(res.Body)) + fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<") + err = handleUserUpLv(res.Body) + //_ = res.Reject(false) + fmt.Println(err) + _ = res.Ack(true) + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleUserUpLv(msg []byte) error { + //1、解析canal采集至mq中queue的数据结构体 + var canalMsg *md.ZhiosAcquisition + fmt.Println(string(msg)) + var tmpString string + err := json.Unmarshal(msg, &tmpString) + if err != nil { + fmt.Println(err.Error()) + return err + } + fmt.Println(tmpString) + err = json.Unmarshal([]byte(tmpString), &canalMsg) + if err != nil { + return err + } + mid := canalMsg.Mid + eg := db.DBs[mid] + uid := canalMsg.Uid + levelList, _ := db.UserLevlEgAll(eg) + isAuto := 0 + for _, v := range levelList { + if v.AutoUpdate == 1 { + isAuto = 1 + } + } + if isAuto == 0 { + return nil + } + FindUser(eg, uid, mid, levelList) + oneUser, _ := db.UserProfileFindByID(eg, uid) + if oneUser == nil || (oneUser != nil && oneUser.ParentUid == 0) { + return nil + } + FindUser(eg, utils.IntToStr(oneUser.ParentUid), mid, levelList) + return nil +}