From 0c46a09d70c364b9b7708bde5f124a6c8c0e89c9 Mon Sep 17 00:00:00 2001 From: shenjiachi Date: Thu, 26 Dec 2024 17:29:19 +0800 Subject: [PATCH] feat: add es socre update when send friend circle and energy exchange account --- ...g_canal_energy_exchange_account_consume.go | 109 ++++++++++++++++++ ..._person_add_activity_value_consume_test.go | 67 ----------- consume/egg_send_friend_circle_consume.go | 97 ++++++++++++++++ consume/init.go | 2 + consume/md/consume_key.go | 20 ++++ .../md_egg_canal_energy_exchange_account.go | 31 +++++ go.mod | 2 +- 7 files changed, 260 insertions(+), 68 deletions(-) create mode 100644 consume/egg_canal_energy_exchange_account_consume.go delete mode 100644 consume/egg_canal_person_add_activity_value_consume_test.go create mode 100644 consume/egg_send_friend_circle_consume.go create mode 100644 consume/md/md_egg_canal_energy_exchange_account.go diff --git a/consume/egg_canal_energy_exchange_account_consume.go b/consume/egg_canal_energy_exchange_account_consume.go new file mode 100644 index 0000000..780c1f4 --- /dev/null +++ b/consume/egg_canal_energy_exchange_account_consume.go @@ -0,0 +1,109 @@ +package consume + +import ( + utils2 "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + md2 "applet/es/md" + "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy" + "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/enum" + es2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/utils/es" + "code.fnuoos.com/go_rely_warehouse/zyos_go_es.git/es" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "context" + "encoding/json" + "errors" + "fmt" + "github.com/olivere/elastic/v7" + "github.com/streadway/amqp" + "strings" + "time" +) + +func EggCanalEnergyExchangeAccountConsume(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>EggCanalEnergyExchangeAccountConsume>>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1000) + delivery := ch.Consume(queue.Name, true) //设置自动应答 + + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + //fmt.Println(string(res.Body)) + fmt.Println(">>>>>>>>>>>>>>>>>>EggCanalEnergyExchangeAccountConsume<<<<<<<<<<<<<<<<<<<<<<<<<") + err = handleEggCanalEnergyExchangeAccountConsume(res.Body) + if err != nil { + fmt.Println("EggCanalEnergyExchangeAccountConsume_ERR:::::", err.Error()) + utils2.FilePutContents("EggCanalEnergyExchangeAccountConsume_ERR", utils2.SerializeStr(map[string]interface{}{ + "body": res.Body, + "err": err.Error(), + })) + } + //_ = res.Reject(false) + //_ = res.Ack(true) + } else { + panic(errors.New("error getting message")) + } + } +} + +func handleEggCanalEnergyExchangeAccountConsume(msg []byte) error { + //1.解析canal采集至mq中queue的数据结构体 + time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒 + var canalMsg *md.CanalEnergyExchangeAccountMessage[md.CanalEnergyExchangeAccount] + err := json.Unmarshal(msg, &canalMsg) + if err != nil { + fmt.Println("EggCanalEnergyExchangeAccountConsumeUnMarshalFailed_ERR:::::", err.Error()) + return nil + } + + year, week := time.Now().ISOWeek() + yearStr := utils2.IntToStr(year) + weekStr := utils2.IntToStr(week) + index := es2.GetAppointIndexFromAlias(yearStr, weekStr) + // 2. 监听插入信息 + if canalMsg.Type == md2.CanalMsgInsertSqlType { + for _, item := range canalMsg.Data { + uid := item.Uid + id := fmt.Sprintf("%d%d_%s", year, week, uid) + + if item.Title != enum.EggEnergyExchangeAccountBalance.String() { + continue + } + + // 3. 增加 蛋蛋能量兑换余额 数量 + amount := utils2.StrToFloat64(item.Amount) + script := elastic.NewScript("ctx._source.egg_energy_exchange_account_balance += params.inc").Param("inc", amount) + updateDoc, err := es.EsClient.Update(). + Index(index). + Id(id). + Script(script). + Do(context.Background()) + if err != nil { + if strings.Contains(err.Error(), "elastic: Error 404 (Not Found)") { + // 蛋蛋分数据还不存在,创建蛋蛋分数据 + now := time.Now().Format("2006-01-02 15:04:05") + err1 := egg_energy.CreateEsScoreAndAssignValuesDoc(index, id, utils2.StrToInt64(uid), enum.EsEggEnergyExchangeAccountBalance, utils2.Float64ToStr(amount), now) + if err1 != nil { + return err1 + } + return nil + } + fmt.Println("EggCanalPersonAddActivityValueConsumeUpdateDoc_ERR::::", err.Error()) + return err + } + fmt.Println("updateDoc==========>", updateDoc) + } + } + return nil +} diff --git a/consume/egg_canal_person_add_activity_value_consume_test.go b/consume/egg_canal_person_add_activity_value_consume_test.go deleted file mode 100644 index c34fb7b..0000000 --- a/consume/egg_canal_person_add_activity_value_consume_test.go +++ /dev/null @@ -1,67 +0,0 @@ -package consume - -import ( - utils2 "applet/app/utils" - "code.fnuoos.com/EggPlanet/egg_system_rules.git/md" - es2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/utils/es" - "code.fnuoos.com/go_rely_warehouse/zyos_go_es.git/es" - "context" - "fmt" - "github.com/olivere/elastic/v7" - "strings" - "testing" - "time" -) - -func TestEs(t *testing.T) { - es.Init("http://123.57.140.192:9200", "elastic", "fnuo123") - year, week := time.Now().ISOWeek() - yearStr := utils2.IntToStr(year) - weekStr := utils2.IntToStr(week) - index := es2.GetAppointIndexFromAlias(yearStr, weekStr) - amount := utils2.StrToFloat64("100") - uid := "100" - id := fmt.Sprintf("%d%d_%s", year, week, uid) - script := elastic.NewScript("ctx._source.person_add_activity_value += params.inc").Param("inc", int(amount)) - updateDoc, err := es.EsClient.Update(). - Index(index). - Id(id). - Script(script). - Do(context.Background()) - now := time.Now().Format("2006-01-02 15:04:05") - if err != nil { - if strings.Contains(err.Error(), "elastic: Error 404 (Not Found)") { - // 如果记录不存在就创建记录 - m := md.EggEnergyUserEggScoreEs{ - Uid: utils2.StrToInt64(uid), - ScoreValue: 0, - ScoreValueKind: 0, - Ecpm: 0, - InviteUserNums: 0, - TeamActivityNums: 0, - SignInNums: 0, - ImActivityNums: 0, - SendRedPackageNums: 0, - EggEnergyExchangeAccountBalance: 0, - AccountBalanceExchangeEggEnergyNums: 0, - SendCircleOfFriendNums: 0, - ForumCommentsNums: 0, - CollegeLearningNums: 0, - ViolateNums: 0, - BrowseInterfaceNums: 0, - PersonAddActivityValue: 1, - CreatedAt: now, - UpdatedAt: now, - } - createDoc, err1 := es.CreateDoc(index, id, m) - if err1 != nil { - fmt.Println("EggCanalPersonAddActivityValueConsumeCreateDoc_ERR::::", err1.Error()) - return - } - fmt.Println("createDoc==========>", createDoc) - return - } - fmt.Println("EggCanalPersonAddActivityValueConsumeUpdateDoc_ERR::::", err.Error()) - } - fmt.Println("updateDoc==========>", updateDoc) -} diff --git a/consume/egg_send_friend_circle_consume.go b/consume/egg_send_friend_circle_consume.go new file mode 100644 index 0000000..e1c7eab --- /dev/null +++ b/consume/egg_send_friend_circle_consume.go @@ -0,0 +1,97 @@ +package consume + +import ( + "applet/app/cfg" + utils2 "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/EggPlanet/egg_system_rules.git" + "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy" + "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/enum" + md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/md" + es2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/utils/es" + "code.fnuoos.com/go_rely_warehouse/zyos_go_es.git/es" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "context" + "encoding/json" + "errors" + "fmt" + "github.com/olivere/elastic/v7" + "github.com/streadway/amqp" + "strings" + "time" +) + +func EggSendFriendCircleDataConsume(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>EggSendFriendCircleDataConsume>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1) + delivery := ch.Consume(queue.Name, false) + + egg_system_rules.Init(cfg.RedisAddr) + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + err = handleEggSendFriendCircleDataConsume(res.Body) + if err != nil { + fmt.Println("EggSendFriendCircleDataConsume_ERR:::::", err.Error()) + utils2.FilePutContents("EggSendFriendCircleDataConsume_ERR", utils2.SerializeStr(map[string]interface{}{ + "body": res.Body, + "err": err.Error(), + })) + } + //_ = res.Reject(false) + err = res.Ack(true) + fmt.Println("err ::: ", err) + } else { + panic(errors.New("error getting message")) + } + } +} + +func handleEggSendFriendCircleDataConsume(msgData []byte) error { + time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒 + // 1.解析mq中queue的数据结构体 + var msg *md2.EggSendFriendCircleData + err := json.Unmarshal(msgData, &msg) + if err != nil { + return err + } + + year, week := time.Now().ISOWeek() + yearStr := utils2.IntToStr(year) + weekStr := utils2.IntToStr(week) + index := es2.GetAppointIndexFromAlias(yearStr, weekStr) + id := fmt.Sprintf("%d%d_%d", year, week, msg.Uid) + script := elastic.NewScript("ctx._source.send_circle_of_friend_nums += params.inc").Param("inc", 1) + updateDoc, err := es.EsClient.Update(). + Index(index). + Id(id). + Script(script). + Do(context.Background()) + if err != nil { + if strings.Contains(err.Error(), "elastic: Error 404 (Not Found)") { + // 蛋蛋分数据还不存在,创建蛋蛋分数据 + now := time.Now().Format("2006-01-02 15:04:05") + err1 := egg_energy.CreateEsScoreAndAssignValuesDoc(index, id, msg.Uid, enum.SendCircleOfFriendNums, "1", now) + if err1 != nil { + return err1 + } + return nil + } + fmt.Println("EggSendFriendCircleDataConsumeUpdateDoc_ERR::::", err.Error()) + return err + } + fmt.Println("updateDoc==========>", updateDoc) + return nil +} diff --git a/consume/init.go b/consume/init.go index 7bd5b06..3dac567 100644 --- a/consume/init.go +++ b/consume/init.go @@ -37,6 +37,8 @@ func initConsumes() { jobs[consumeMd.EggCanalPersonAddActivityValueFunName] = EggCanalPersonAddActivityValueConsume // 用户活跃积分变更时更新es jobs[consumeMd.EggRecordActiveDataFunName] = EggRecordActiveDataConsume // 用户签到后更新活跃数据 jobs[consumeMd.EggEnergyAutoScoreDataFunName] = EggEnergyAutoScoreConsume // 自动打分 + jobs[consumeMd.EggSendFriendCircleDataFunName] = EggSendFriendCircleDataConsume // 用户发送朋友圈后更新es + jobs[consumeMd.EggCanalEnergyExchangeAccountFunName] = EggCanalEnergyExchangeAccountConsume // 蛋蛋能量兑换为余额的时候更新es jobs[consumeMd.VideoRewardFunName] = VideoRewardConsume // 短视频奖励 jobs[consumeMd.PlayletRewardFunName] = PlayletRewardConsume // 短剧奖励 diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index 800c02b..cc41e55 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -290,6 +290,24 @@ var RabbitMqQueueKeyList = []*MqQueue{ BindKey: "", ConsumeFunName: "EggCanalUserVirtualCoinFlowAggregationConsume", }, + { + ExchangeName: "egg.app", + Name: "egg_send_friend_circle_queue", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "egg_send_friend_circle", + BindKey: "", + ConsumeFunName: "EggSendFriendCircleDataConsume", + }, + { + ExchangeName: "egg.canal.topic", + Name: "egg_canal_energy_exchange_account_queue", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "egg_canal_user_wallet_flow", + BindKey: "", + ConsumeFunName: "EggCanalEnergyExchangeAccountConsume", + }, } const ( @@ -322,4 +340,6 @@ const ( PublicPlatoonUserRelationCommissionConsumeFunName = "AddPublicPlatoonUserRelationCommissionConsume" EggEnergyTeamAssistanceConsumeFunName = "EggEnergyTeamAssistanceConsume" EggCanalUserVirtualCoinFlowAggregationConsumeFunName = "EggCanalUserVirtualCoinFlowAggregationConsume" + EggSendFriendCircleDataFunName = "EggSendFriendCircleDataConsume" + EggCanalEnergyExchangeAccountFunName = "EggCanalEnergyExchangeAccountConsume" ) diff --git a/consume/md/md_egg_canal_energy_exchange_account.go b/consume/md/md_egg_canal_energy_exchange_account.go new file mode 100644 index 0000000..e95ed61 --- /dev/null +++ b/consume/md/md_egg_canal_energy_exchange_account.go @@ -0,0 +1,31 @@ +package md + +type CanalEnergyExchangeAccount struct { + Id string `json:"id"` + Uid string `json:"uid"` + Direction string `json:"direction"` + Amount string `json:"amount"` + BeforeAmount string `json:"before_amount"` + AfterAmount string `json:"after_amount"` + SysFee string `json:"sys_fee"` + OrdId string `json:"ord_id"` + Title string `json:"title"` + Kind string `json:"kind"` + State string `json:"state"` + Memo string `json:"memo"` + CreateAt string `json:"create_at"` + UpdateAt string `json:"update_at"` +} + +type CanalEnergyExchangeAccountMessage[T any] struct { + Data []T `json:"data"` + Database string `json:"database"` + ES int64 `json:"es"` + ID int64 `json:"id"` + IsDdl bool `json:"isDdl"` + Old []T `json:"old"` + PkNames []string `json:"pkNames"` + Table string `json:"table"` + TS int64 `json:"ts"` + Type string `json:"type"` +} diff --git a/go.mod b/go.mod index 6f46651..0db839c 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ go 1.19 require ( code.fnuoos.com/EggPlanet/egg_models.git v0.2.1-0.20241224090637-89a57f7fbb1e - code.fnuoos.com/EggPlanet/egg_system_rules.git v0.0.4-0.20241226020214-a56eb16f6264 + code.fnuoos.com/EggPlanet/egg_system_rules.git v0.0.4-0.20241226091556-c909dd302df7 code.fnuoos.com/go_rely_warehouse/zyos_go_es.git v1.0.1-0.20241118083738-0f22da9ba0be code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git v0.0.5 github.com/boombuler/barcode v1.0.1