From 5f027c65710c8811f7b43b022929a197bfcaf3dc Mon Sep 17 00:00:00 2001 From: shenjiachi Date: Sat, 7 Dec 2024 18:04:33 +0800 Subject: [PATCH] add data statistics method --- consume/egg_canal_invite_user_nums_consume.go | 88 +++++++++++++++ consume/egg_canal_violate_nums_consume.go | 105 ++++++++++++++++++ consume/egg_energy_deal_user_ecpm.go | 81 ++++++++++++++ .../im_egg_energy_send_red_package_consume.go | 84 ++++++++++++++ consume/init.go | 4 + consume/md/consume_key.go | 40 +++++++ consume/md/egg_canal_violate_nums_consume.go | 23 ++++ .../md/md_egg_canal_user_relate_consume.go | 22 ++++ es/md/canal.go | 7 ++ go.mod | 8 +- 10 files changed, 458 insertions(+), 4 deletions(-) create mode 100644 consume/egg_canal_invite_user_nums_consume.go create mode 100644 consume/egg_canal_violate_nums_consume.go create mode 100644 consume/egg_energy_deal_user_ecpm.go create mode 100644 consume/im_egg_energy_send_red_package_consume.go create mode 100644 consume/md/egg_canal_violate_nums_consume.go create mode 100644 consume/md/md_egg_canal_user_relate_consume.go create mode 100644 es/md/canal.go diff --git a/consume/egg_canal_invite_user_nums_consume.go b/consume/egg_canal_invite_user_nums_consume.go new file mode 100644 index 0000000..9c19110 --- /dev/null +++ b/consume/egg_canal_invite_user_nums_consume.go @@ -0,0 +1,88 @@ +package consume + +import ( + utils2 "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + md2 "applet/es/md" + es2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/utils/es" + "code.fnuoos.com/go_rely_warehouse/zyos_go_es.git/es" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "context" + "encoding/json" + "errors" + "fmt" + "github.com/olivere/elastic/v7" + "github.com/streadway/amqp" + "time" +) + +func EggCanalInviteUserNumsConsume(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>EggCanalInviteUserNumsConsume>>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1000) + delivery := ch.Consume(queue.Name, true) //设置自动应答 + + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + //fmt.Println(string(res.Body)) + fmt.Println(">>>>>>>>>>>>>>>>>>EggCanalInviteUserNumsConsume<<<<<<<<<<<<<<<<<<<<<<<<<") + err = handleEggCanalInviteUserNumsConsume(res.Body) + if err != nil { + fmt.Println("EggCanalInviteUserNumsConsume_ERR:::::", err.Error()) + utils2.FilePutContents("EggCanalInviteUserNumsConsume_ERR", utils2.SerializeStr(map[string]interface{}{ + "body": res.Body, + "err": err.Error(), + })) + } + //_ = res.Reject(false) + //_ = res.Ack(true) + } else { + panic(errors.New("error getting message")) + } + } +} + +func handleEggCanalInviteUserNumsConsume(msg []byte) error { + //1、解析canal采集至mq中queue的数据结构体 + var canalMsg *md.CanalUserRelateMessage[md.CanalUserRelate] + err := json.Unmarshal(msg, &canalMsg) + if err != nil { + return nil + } + + year, week := time.Now().ISOWeek() + yearStr := utils2.IntToStr(year) + weekStr := utils2.IntToStr(week) + index := es2.GetAppointIndexFromAlias(yearStr, weekStr) + // 2. 监听插入信息 + if canalMsg.Type == md2.CanalMsgInsertSqlType { + for _, item := range canalMsg.Data { + parentUid := item.ParentUid + id := fmt.Sprintf("%d%d-%d", year, week, parentUid) + + // 新增拉新人数 + script := elastic.NewScript("ctx._source.invite_user_nums += params.inc").Param("inc", 1) + _, err = es.EsClient.Update(). + Index(index). + Id(id). + Script(script). + Do(context.Background()) + if err != nil { + return err + } + } + } + return nil +} diff --git a/consume/egg_canal_violate_nums_consume.go b/consume/egg_canal_violate_nums_consume.go new file mode 100644 index 0000000..243bffc --- /dev/null +++ b/consume/egg_canal_violate_nums_consume.go @@ -0,0 +1,105 @@ +package consume + +import ( + utils2 "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + md2 "applet/es/md" + es2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/utils/es" + "code.fnuoos.com/go_rely_warehouse/zyos_go_es.git/es" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "context" + "encoding/json" + "errors" + "fmt" + "github.com/olivere/elastic/v7" + "github.com/streadway/amqp" + "time" +) + +func EggCanalViolateNumsConsume(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>EggCanalViolateNumsConsume>>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1000) + delivery := ch.Consume(queue.Name, true) //设置自动应答 + + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + //fmt.Println(string(res.Body)) + fmt.Println(">>>>>>>>>>>>>>>>>>EggCanalViolateNumsConsume<<<<<<<<<<<<<<<<<<<<<<<<<") + err = handleEggCanalViolateNumsConsume(res.Body) + if err != nil { + fmt.Println("EggCanalViolateNumsConsume_ERR:::::", err.Error()) + utils2.FilePutContents("EggCanalViolateNumsConsume_ERR", utils2.SerializeStr(map[string]interface{}{ + "body": res.Body, + "err": err.Error(), + })) + } + //_ = res.Reject(false) + //_ = res.Ack(true) + } else { + panic(errors.New("error getting message")) + } + } +} + +func handleEggCanalViolateNumsConsume(msg []byte) error { + //1、解析canal采集至mq中queue的数据结构体 + var canalMsg *md.CanalUserRelateMessage[md.CanalUserRelate] + err := json.Unmarshal(msg, &canalMsg) + if err != nil { + return nil + } + + year, week := time.Now().ISOWeek() + yearStr := utils2.IntToStr(year) + weekStr := utils2.IntToStr(week) + index := es2.GetAppointIndexFromAlias(yearStr, weekStr) + // 2. 监听插入信息 + if canalMsg.Type == md2.CanalMsgInsertSqlType { + for _, item := range canalMsg.Data { + parentUid := item.ParentUid + id := fmt.Sprintf("%d%d-%d", year, week, parentUid) + + // 增加违规次数记录 + script := elastic.NewScript("ctx._source.violate_nums += params.inc").Param("inc", 1) + _, err = es.EsClient.Update(). + Index(index). + Id(id). + Script(script). + Do(context.Background()) + if err != nil { + return err + } + } + } + if canalMsg.Type == md2.CanalMsgDeleteSqlType { + for _, item := range canalMsg.Data { + parentUid := item.ParentUid + id := fmt.Sprintf("%d%d-%d", year, week, parentUid) + + // 减少违规次数记录 + script := elastic.NewScript("ctx._source.violate_nums -= params.inc").Param("inc", 1) + _, err = es.EsClient.Update(). + Index(index). + Id(id). + Script(script). + Do(context.Background()) + if err != nil { + return err + } + } + } + return nil +} diff --git a/consume/egg_energy_deal_user_ecpm.go b/consume/egg_energy_deal_user_ecpm.go new file mode 100644 index 0000000..500d331 --- /dev/null +++ b/consume/egg_energy_deal_user_ecpm.go @@ -0,0 +1,81 @@ +package consume + +import ( + "applet/app/cfg" + utils2 "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/EggPlanet/egg_system_rules.git" + md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/md" + es2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/utils/es" + "code.fnuoos.com/go_rely_warehouse/zyos_go_es.git/es" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "encoding/json" + "errors" + "fmt" + "github.com/streadway/amqp" + "time" +) + +func EggEnergyDealUserECPMConsume(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>EggEnergyDealUserECPMConsume>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1) + delivery := ch.Consume(queue.Name, false) + + egg_system_rules.Init(cfg.RedisAddr) + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + err = handleEggEnergyDealUserECPMConsume(res.Body) + if err != nil { + fmt.Println("EggEnergyDealUserECPMConsume_ERR:::::", err.Error()) + utils2.FilePutContents("EggEnergyDealUserECPMConsume_ERR", utils2.SerializeStr(map[string]interface{}{ + "body": res.Body, + "err": err.Error(), + })) + } + //_ = res.Reject(false) + err = res.Ack(true) + fmt.Println("err ::: ", err) + } else { + panic(errors.New("error getting message")) + } + } +} + +func handleEggEnergyDealUserECPMConsume(msgData []byte) error { + //1、解析mq中queue的数据结构体 + var msg *md2.DealUserEcpmReq + err := json.Unmarshal(msgData, &msg) + if err != nil { + return err + } + + //2、更新用户信息 + year, week := time.Now().ISOWeek() + yearStr := utils2.IntToStr(year) + weekStr := utils2.IntToStr(week) + index := es2.GetAppointIndexFromAlias(yearStr, weekStr) + id := fmt.Sprintf("%d%d-%d", year, week, msg.Uid) + + m := md2.EggEnergyUserEggScoreEs{ + Ecpm: utils2.StrToFloat64(msg.Ecpm), + } + updateDocRet, err := es.UpdateDoc(index, id, m) + if err != nil { + return err + } + fmt.Printf("UpdateCreateDoc ==> %+v \n\n", updateDocRet) + return err +} diff --git a/consume/im_egg_energy_send_red_package_consume.go b/consume/im_egg_energy_send_red_package_consume.go new file mode 100644 index 0000000..1f6ffbe --- /dev/null +++ b/consume/im_egg_energy_send_red_package_consume.go @@ -0,0 +1,84 @@ +package consume + +import ( + "applet/app/cfg" + utils2 "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/EggPlanet/egg_system_rules.git" + md3 "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/md" + es2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/utils/es" + "code.fnuoos.com/go_rely_warehouse/zyos_go_es.git/es" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "context" + "encoding/json" + "errors" + "fmt" + "github.com/olivere/elastic/v7" + "github.com/streadway/amqp" + "time" +) + +func IMEggEnergySendRedPackageConsume(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>IMEggEnergySendRedPackageConsume>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1) + delivery := ch.Consume(queue.Name, false) + + egg_system_rules.Init(cfg.RedisAddr) + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + err = handleIMEggEnergySendRedPackageConsume(res.Body) + if err != nil { + fmt.Println("IMEggEnergySendRedPackageConsume_ERR:::::", err.Error()) + utils2.FilePutContents("IMEggEnergySendRedPackageConsume_ERR", utils2.SerializeStr(map[string]interface{}{ + "body": res.Body, + "err": err.Error(), + })) + } + //_ = res.Reject(false) + err = res.Ack(true) + fmt.Println("err ::: ", err) + } else { + panic(errors.New("error getting message")) + } + } +} + +func handleIMEggEnergySendRedPackageConsume(msgData []byte) error { + //1、解析mq中queue的数据结构体 + var msg *md3.IMEggEnergyStructForSendRedPackageData + err := json.Unmarshal(msgData, &msg) + if err != nil { + return err + } + + //2、更新用户信息 + year, week := time.Now().ISOWeek() + yearStr := utils2.IntToStr(year) + weekStr := utils2.IntToStr(week) + index := es2.GetAppointIndexFromAlias(yearStr, weekStr) + id := fmt.Sprintf("%d%d-%d", year, week, msg.Uid) + // 新增拉新人数 + script := elastic.NewScript("ctx._source.send_red_package_nums += params.inc").Param("inc", 1) + _, err = es.EsClient.Update(). + Index(index). + Id(id). + Script(script). + Do(context.Background()) + if err != nil { + return err + } + return err +} diff --git a/consume/init.go b/consume/init.go index f2dd204..ff7a468 100644 --- a/consume/init.go +++ b/consume/init.go @@ -30,6 +30,10 @@ func initConsumes() { jobs[consumeMd.EggEnergyStartExchangeGreenEnergyFunName] = EggEnergyStartExchangeGreenEnergyConsume jobs[consumeMd.EggEnergyAutoExchangeGreenEnergyFunName] = EggEnergyAutoExchangeGreenEnergyConsume jobs[consumeMd.EggEnergyNewUserRegisterDataFunName] = EggEnergyNewUserRegisterDataConsume // 新用户注册 + jobs[consumeMd.EggEnergyDealUserECPMFunName] = EggEnergyDealUserECPMConsume // 处理给用户Ecpm值 + jobs[consumeMd.EggCanalInviteUserNumsFunName] = EggCanalInviteUserNumsConsume // 处理拉新人数 + jobs[consumeMd.EggCanalViolateNumsFunName] = EggCanalViolateNumsConsume // 处理违规次数 + jobs[consumeMd.IMEggEnergySendRedPackageFunName] = IMEggEnergySendRedPackageConsume // 处理用户发送红包次数 } func Run() { diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index 9bd0432..499765b 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -146,6 +146,42 @@ var RabbitMqQueueKeyList = []*MqQueue{ BindKey: "", ConsumeFunName: "EggEnergyNewUserRegisterDataConsume", }, + { + ExchangeName: "egg.app", + Name: "egg_energy_user_ecpm", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "user_ecpm", + BindKey: "", + ConsumeFunName: "EggEnergyDealUserECPMConsume", + }, + { + ExchangeName: "egg.canal.topic", + Name: "egg_canal_invite_user_nums_queue", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "egg_canal_invite_user_nums", + BindKey: "", + ConsumeFunName: "EggEnergyDealUserECPMConsume", + }, + { + ExchangeName: "egg.canal.topic", + Name: "egg_canal_violate_nums_queue", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "egg_canal_violate_nums", + BindKey: "", + ConsumeFunName: "EggCanalViolateNumsConsume", + }, + { + ExchangeName: "im.egg.energy", + Name: "im_egg_energy_send_red_package_queue", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "send_red_package", + BindKey: "", + ConsumeFunName: "IMEggEnergySendRedPackageConsume", + }, } const ( @@ -164,4 +200,8 @@ const ( EggEnergyStartExchangeGreenEnergyFunName = "EggEnergyStartExchangeGreenEnergyConsume" EggEnergyAutoExchangeGreenEnergyFunName = "EggEnergyAutoExchangeGreenEnergyConsume" EggEnergyNewUserRegisterDataFunName = "EggEnergyNewUserRegisterDataConsume" + EggEnergyDealUserECPMFunName = "EggEnergyDealUserECPMConsume" + EggCanalInviteUserNumsFunName = "EggCanalInviteUserNumsConsume" + EggCanalViolateNumsFunName = "EggCanalViolateNumsConsume" + IMEggEnergySendRedPackageFunName = "IMEggEnergySendRedPackageConsume" ) diff --git a/consume/md/egg_canal_violate_nums_consume.go b/consume/md/egg_canal_violate_nums_consume.go new file mode 100644 index 0000000..297e8cd --- /dev/null +++ b/consume/md/egg_canal_violate_nums_consume.go @@ -0,0 +1,23 @@ +package md + +type CanalTagRecords struct { + Id int `json:"id" ` + TagId int `json:"tag_id" ` + Uid int64 `json:"uid" ` + Memo string `json:"memo" ` + CreateAt string `json:"create_at" ` + UpdateAt string `json:"update_at" ` +} + +type CanalTagRecordsMessage[T any] struct { + Data []T `json:"data"` + Database string `json:"database"` + ES int64 `json:"es"` + ID int64 `json:"id"` + IsDdl bool `json:"isDdl"` + Old []T `json:"old"` + PkNames []string `json:"pkNames"` + Table string `json:"table"` + TS int64 `json:"ts"` + Type string `json:"type"` +} diff --git a/consume/md/md_egg_canal_user_relate_consume.go b/consume/md/md_egg_canal_user_relate_consume.go new file mode 100644 index 0000000..7bb22d7 --- /dev/null +++ b/consume/md/md_egg_canal_user_relate_consume.go @@ -0,0 +1,22 @@ +package md + +type CanalUserRelate struct { + Id int64 `json:"id"` + ParentUid int64 `json:"parent_uid"` + Uid int64 `json:"uid"` + Level int `json:"level"` + InviteTime string `json:"invite_time"` +} + +type CanalUserRelateMessage[T any] struct { + Data []T `json:"data"` + Database string `json:"database"` + ES int64 `json:"es"` + ID int64 `json:"id"` + IsDdl bool `json:"isDdl"` + Old []T `json:"old"` + PkNames []string `json:"pkNames"` + Table string `json:"table"` + TS int64 `json:"ts"` + Type string `json:"type"` +} diff --git a/es/md/canal.go b/es/md/canal.go new file mode 100644 index 0000000..c3ef090 --- /dev/null +++ b/es/md/canal.go @@ -0,0 +1,7 @@ +package md + +const ( + CanalMsgInsertSqlType = "INSERT" + CanalMsgDeleteSqlType = "DELETE" + CanalMsgUpdateSqlType = "UPDATE" +) diff --git a/go.mod b/go.mod index 05b0a64..de08ee1 100644 --- a/go.mod +++ b/go.mod @@ -4,11 +4,11 @@ go 1.19 //replace code.fnuoos.com/EggPlanet/egg_models.git => E:/company/Egg/egg_models -//replace code.fnuoos.com/EggPlanet/egg_system_rules.git => E:/company/Egg/egg_system_rules +// replace code.fnuoos.com/EggPlanet/egg_system_rules.git => E:/company/Egg/egg_system_rules require ( - code.fnuoos.com/EggPlanet/egg_models.git v0.2.1-0.20241205041102-0e106357c399 - code.fnuoos.com/EggPlanet/egg_system_rules.git v0.0.4-0.20241205061938-91f42710d6cd + code.fnuoos.com/EggPlanet/egg_models.git v0.2.1-0.20241206115326-8cbc93c7c64d + code.fnuoos.com/EggPlanet/egg_system_rules.git v0.0.4-0.20241207095445-64c8aa0b486e code.fnuoos.com/go_rely_warehouse/zyos_go_es.git v1.0.1-0.20241118083738-0f22da9ba0be code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git v0.0.5 github.com/boombuler/barcode v1.0.1 @@ -79,7 +79,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/nxadm/tail v1.4.8 // indirect - github.com/olivere/elastic/v7 v7.0.32 // indirect + github.com/olivere/elastic/v7 v7.0.32 github.com/pelletier/go-toml/v2 v2.2.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/syndtr/goleveldb v1.0.0 // indirect