From 9d8d04dfbd079dfc8aee19cca5d2204e9c5ac404 Mon Sep 17 00:00:00 2001 From: huangjiajun <582604932@qq.com> Date: Sun, 29 Dec 2024 12:16:47 +0800 Subject: [PATCH 1/6] =?UTF-8?q?=E6=B3=A8=E9=94=80=E5=A4=84=E7=BD=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consume/user_delete_consume.go | 3 ++- go.mod | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/consume/user_delete_consume.go b/consume/user_delete_consume.go index 627a1ce..42fb3fe 100644 --- a/consume/user_delete_consume.go +++ b/consume/user_delete_consume.go @@ -91,8 +91,9 @@ func handleUserDeleteConsume(backEg *xorm.Engine, ch *rabbit.Channel, msgData [] backEg.Insert(&user) user.Phone = "" user.State = 2 + user.ParentUid = 0 user.Nickname = "注销用户" - db.Db.Where("id=?", user.Id).Cols("state,phone,nickname").Update(&user) + db.Db.Where("id=?", user.Id).Cols("state,phone,nickname,parent_uid").Update(&user) } //2.用户关系链 //查出所有下级 diff --git a/go.mod b/go.mod index 25aeb87..2ac258a 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ go 1.19 // replace code.fnuoos.com/EggPlanet/egg_system_rules.git => E:/company/Egg/egg_system_rules require ( - code.fnuoos.com/EggPlanet/egg_models.git v0.2.1-0.20241224090637-89a57f7fbb1e + code.fnuoos.com/EggPlanet/egg_models.git v0.2.1-0.20241229040905-b840e6fb411a code.fnuoos.com/EggPlanet/egg_system_rules.git v0.0.4-0.20241227092843-802cf07ae61c code.fnuoos.com/go_rely_warehouse/zyos_go_es.git v1.0.1-0.20241118083738-0f22da9ba0be code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git v0.0.5 From 73c4e730f684254f7862ffd4b8c3c23755fce145 Mon Sep 17 00:00:00 2001 From: shenjiachi Date: Mon, 30 Dec 2024 20:33:29 +0800 Subject: [PATCH 2/6] =?UTF-8?q?update:=20=E6=B6=88=E8=B4=B9=E4=B8=B4?= =?UTF-8?q?=E6=97=B6=E9=98=9F=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consume/md/consume_key.go | 184 ++------------------------------------ etc/cfg.yml | 30 +++---- 2 files changed, 23 insertions(+), 191 deletions(-) diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index ce97056..a502122 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -66,182 +66,14 @@ var RabbitMqQueueKeyList = []*MqQueue{ ConsumeFunName: "IMEggEnergyDelFriendCircleDataConsume", }, { - ExchangeName: "egg.app", - Name: "egg_fin_withdraw_apply", - Type: DirectQueueType, - IsPersistent: false, - RoutKey: "egg_fin_withdraw_apply", - BindKey: "", - ConsumeFunName: "EggFinWithdrawApplyDataConsume", - }, - { - ExchangeName: "egg.energy", - Name: "egg_energy_settlement_public_give_activity_coin", - Type: DirectQueueType, - IsPersistent: false, - RoutKey: "settlement_public_give_activity_coin", - BindKey: "", - ConsumeFunName: "EggEnergySettlementPublicGiveActivityCoinConsume", - }, - { - ExchangeName: "egg.energy", - Name: "egg_energy_start_exchange_egg_energy", - Type: DirectQueueType, - IsPersistent: false, - RoutKey: "start_exchange_egg_energy", - BindKey: "", - ConsumeFunName: "EggEnergyStartExchangeGreenEnergyConsume", - }, - { - ExchangeName: "egg.energy", - Name: "egg_energy_auto_exchange_egg_energy", - Type: DirectQueueType, - IsPersistent: false, - RoutKey: "auto_exchange_egg_energy", - BindKey: "", - ConsumeFunName: "EggEnergyAutoExchangeGreenEnergyConsume", - }, - { - ExchangeName: "egg.app", - Name: "add_public_platoon_user_relation_commission", - Type: DirectQueueType, - IsPersistent: false, - RoutKey: "add_public_platoon_user_relation_commission", - BindKey: "", - ConsumeFunName: "AddPublicPlatoonUserRelationCommissionConsume", - }, - { - ExchangeName: "egg.jpush", - Name: "egg_jpush_record_queue", - Type: DirectQueueType, - IsPersistent: false, - RoutKey: "jpush_record", - BindKey: "", - ConsumeFunName: "JpushRecordConsume", - }, - { - ExchangeName: "egg.aliyun_sms", - Name: "egg_aliyun_sms_record_queue_new", - Type: DirectQueueType, - IsPersistent: false, - RoutKey: "aliyun_sms_record", - BindKey: "", - ConsumeFunName: "AliyunSmsRecordConsume", - }, - { - ExchangeName: "egg.advertising", - Name: "egg_advertising_smash", - Type: DirectQueueType, - IsPersistent: false, - RoutKey: "advertising_smash", - BindKey: "", - ConsumeFunName: "AdvertisingSmashConsume", - }, - { - ExchangeName: "egg.advertising", - Name: "egg_advertising_sign", - Type: DirectQueueType, - IsPersistent: false, - RoutKey: "advertising_sign", - BindKey: "", - ConsumeFunName: "AdvertisingSignConsume", - }, - { - ExchangeName: "egg.app", - Name: "egg_new_user_register_queue", - Type: DirectQueueType, - IsPersistent: false, - RoutKey: "egg_new_user_register", - BindKey: "", - ConsumeFunName: "EggEnergyNewUserRegisterDataConsume", - }, - { - ExchangeName: "egg.energy", - Name: "egg_energy_user_ecpm", - Type: DirectQueueType, - IsPersistent: false, - RoutKey: "user_ecpm", - BindKey: "", - ConsumeFunName: "EggEnergyDealUserECPMConsume", - }, - { - ExchangeName: "egg.canal.topic", - Name: "egg_canal_invite_user_nums_queue", - Type: DirectQueueType, - IsPersistent: false, - RoutKey: "user_relate", - BindKey: "", - ConsumeFunName: "EggCanalInviteUserNumsConsume", - }, - { - ExchangeName: "egg.canal.topic", - Name: "egg_canal_violate_nums_queue", - Type: DirectQueueType, - IsPersistent: false, - RoutKey: "user_tag_records", - BindKey: "", - ConsumeFunName: "EggCanalViolateNumsConsume", - }, - { - ExchangeName: "im.egg.energy", - Name: "im_egg_energy_send_red_package_queue", - Type: DirectQueueType, - IsPersistent: false, - RoutKey: "send_red_package", - BindKey: "", - ConsumeFunName: "IMEggEnergySendRedPackageConsume", - }, - { - ExchangeName: "egg.app", - Name: "egg_record_active_queue", - Type: DirectQueueType, - IsPersistent: false, - RoutKey: "egg_record_active", - BindKey: "", - ConsumeFunName: "EggRecordActiveDataConsume", - }, - { - ExchangeName: "egg.canal.topic", - Name: "egg_canal_person_add_activity_value_queue", - Type: DirectQueueType, - IsPersistent: false, - RoutKey: "egg_canal_user_virtual_coin_flow", - BindKey: "", - ConsumeFunName: "EggCanalPersonAddActivityValueConsume", - }, - { - ExchangeName: "egg.video_playlet", - Name: "egg_video_reward", - Type: DirectQueueType, - IsPersistent: false, - RoutKey: "video", - BindKey: "", - ConsumeFunName: "VideoRewardFunName", - }, - { - ExchangeName: "egg.video_playlet", - Name: "egg_playlet_reward", - Type: DirectQueueType, - IsPersistent: false, - RoutKey: "playlet", - BindKey: "", - ConsumeFunName: "PlayletRewardFunName", - }, - { - ExchangeName: "egg.user", - Name: "egg_user_delete", - Type: DirectQueueType, - IsPersistent: false, - RoutKey: "user_delete", - BindKey: "", - ConsumeFunName: "UserDeleteConsume", - }, - { - ExchangeName: "egg.app", - Name: "egg_auto_score_queue", - Type: DirectQueueType, - IsPersistent: false, - RoutKey: "egg_auto_score", + // 消费临时队列 + ExchangeName: "egg.app", + Name: "egg_auto_score_test_queue", + //Name: "egg_auto_score_queue", + Type: DirectQueueType, + IsPersistent: false, + //RoutKey: "egg_auto_score", + RoutKey: "egg_auto_score_test", BindKey: "", ConsumeFunName: "EggEnergyAutoScoreConsume", }, diff --git a/etc/cfg.yml b/etc/cfg.yml index cae636e..e08bbf2 100644 --- a/etc/cfg.yml +++ b/etc/cfg.yml @@ -81,26 +81,26 @@ log: file_name: 'debug.log' # 连接RabbitMq -mq: - host: '120.77.153.180' - port: '5672' - user: 'guest' - pwd: 'guest' #mq: -# host: '116.62.62.35' +# host: '120.77.153.180' # port: '5672' -# user: 'zhios' -# pwd: 'ZHIoscnfnuo123' - -#es: -# url: 'http://120.55.48.175:9200' -# user: 'elastic' -# pwd: 'fnuo123' +# user: 'guest' +# pwd: 'guest' +mq: + host: '116.62.62.35' + port: '5672' + user: 'zhios' + pwd: 'ZHIoscnfnuo123' es: - url: 'http://123.57.140.192:9200' + url: 'http://120.55.48.175:9200' user: 'elastic' - pwd: 'se4BxqYyGhHyPWPA3w8Q' + pwd: 'fnuo123' + +#es: +# url: 'http://123.57.140.192:9200' +# user: 'elastic' +# pwd: 'se4BxqYyGhHyPWPA3w8Q' im_business_rpc: url: im-rpc-business.izhim.com From 3f559bdbd7037a88f7479aee6d6faf30666ec4d2 Mon Sep 17 00:00:00 2001 From: shenjiachi Date: Mon, 30 Dec 2024 20:42:02 +0800 Subject: [PATCH 3/6] update --- consume/md/consume_key.go | 173 +++++++++++++++++++++++++++++++++++++- etc/cfg.yml | 30 +++---- 2 files changed, 187 insertions(+), 16 deletions(-) diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index a502122..2867d76 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -65,6 +65,177 @@ var RabbitMqQueueKeyList = []*MqQueue{ BindKey: "", ConsumeFunName: "IMEggEnergyDelFriendCircleDataConsume", }, + { + ExchangeName: "egg.app", + Name: "egg_fin_withdraw_apply", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "egg_fin_withdraw_apply", + BindKey: "", + ConsumeFunName: "EggFinWithdrawApplyDataConsume", + }, + { + ExchangeName: "egg.energy", + Name: "egg_energy_settlement_public_give_activity_coin", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "settlement_public_give_activity_coin", + BindKey: "", + ConsumeFunName: "EggEnergySettlementPublicGiveActivityCoinConsume", + }, + { + ExchangeName: "egg.energy", + Name: "egg_energy_start_exchange_egg_energy", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "start_exchange_egg_energy", + BindKey: "", + ConsumeFunName: "EggEnergyStartExchangeGreenEnergyConsume", + }, + { + ExchangeName: "egg.energy", + Name: "egg_energy_auto_exchange_egg_energy", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "auto_exchange_egg_energy", + BindKey: "", + ConsumeFunName: "EggEnergyAutoExchangeGreenEnergyConsume", + }, + { + ExchangeName: "egg.app", + Name: "add_public_platoon_user_relation_commission", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "add_public_platoon_user_relation_commission", + BindKey: "", + ConsumeFunName: "AddPublicPlatoonUserRelationCommissionConsume", + }, + { + ExchangeName: "egg.jpush", + Name: "egg_jpush_record_queue", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "jpush_record", + BindKey: "", + ConsumeFunName: "JpushRecordConsume", + }, + { + ExchangeName: "egg.aliyun_sms", + Name: "egg_aliyun_sms_record_queue_new", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "aliyun_sms_record", + BindKey: "", + ConsumeFunName: "AliyunSmsRecordConsume", + }, + { + ExchangeName: "egg.advertising", + Name: "egg_advertising_smash", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "advertising_smash", + BindKey: "", + ConsumeFunName: "AdvertisingSmashConsume", + }, + { + ExchangeName: "egg.advertising", + Name: "egg_advertising_sign", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "advertising_sign", + BindKey: "", + ConsumeFunName: "AdvertisingSignConsume", + }, + { + ExchangeName: "egg.app", + Name: "egg_new_user_register_queue", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "egg_new_user_register", + BindKey: "", + ConsumeFunName: "EggEnergyNewUserRegisterDataConsume", + }, + { + ExchangeName: "egg.energy", + Name: "egg_energy_user_ecpm", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "user_ecpm", + BindKey: "", + ConsumeFunName: "EggEnergyDealUserECPMConsume", + }, + { + ExchangeName: "egg.canal.topic", + Name: "egg_canal_invite_user_nums_queue", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "user_relate", + BindKey: "", + ConsumeFunName: "EggCanalInviteUserNumsConsume", + }, + { + ExchangeName: "egg.canal.topic", + Name: "egg_canal_violate_nums_queue", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "user_tag_records", + BindKey: "", + ConsumeFunName: "EggCanalViolateNumsConsume", + }, + { + ExchangeName: "im.egg.energy", + Name: "im_egg_energy_send_red_package_queue", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "send_red_package", + BindKey: "", + ConsumeFunName: "IMEggEnergySendRedPackageConsume", + }, + { + ExchangeName: "egg.app", + Name: "egg_record_active_queue", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "egg_record_active", + BindKey: "", + ConsumeFunName: "EggRecordActiveDataConsume", + }, + { + ExchangeName: "egg.canal.topic", + Name: "egg_canal_person_add_activity_value_queue", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "egg_canal_user_virtual_coin_flow", + BindKey: "", + ConsumeFunName: "EggCanalPersonAddActivityValueConsume", + }, + { + ExchangeName: "egg.video_playlet", + Name: "egg_video_reward", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "video", + BindKey: "", + ConsumeFunName: "VideoRewardFunName", + }, + { + ExchangeName: "egg.video_playlet", + Name: "egg_playlet_reward", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "playlet", + BindKey: "", + ConsumeFunName: "PlayletRewardFunName", + }, + { + ExchangeName: "egg.user", + Name: "egg_user_delete", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "user_delete", + BindKey: "", + ConsumeFunName: "UserDeleteConsume", + }, { // 消费临时队列 ExchangeName: "egg.app", @@ -72,8 +243,8 @@ var RabbitMqQueueKeyList = []*MqQueue{ //Name: "egg_auto_score_queue", Type: DirectQueueType, IsPersistent: false, + RoutKey: "egg_auto_score_test", //RoutKey: "egg_auto_score", - RoutKey: "egg_auto_score_test", BindKey: "", ConsumeFunName: "EggEnergyAutoScoreConsume", }, diff --git a/etc/cfg.yml b/etc/cfg.yml index e08bbf2..cae636e 100644 --- a/etc/cfg.yml +++ b/etc/cfg.yml @@ -81,26 +81,26 @@ log: file_name: 'debug.log' # 连接RabbitMq -#mq: -# host: '120.77.153.180' -# port: '5672' -# user: 'guest' -# pwd: 'guest' mq: - host: '116.62.62.35' + host: '120.77.153.180' port: '5672' - user: 'zhios' - pwd: 'ZHIoscnfnuo123' - -es: - url: 'http://120.55.48.175:9200' - user: 'elastic' - pwd: 'fnuo123' + user: 'guest' + pwd: 'guest' +#mq: +# host: '116.62.62.35' +# port: '5672' +# user: 'zhios' +# pwd: 'ZHIoscnfnuo123' #es: -# url: 'http://123.57.140.192:9200' +# url: 'http://120.55.48.175:9200' # user: 'elastic' -# pwd: 'se4BxqYyGhHyPWPA3w8Q' +# pwd: 'fnuo123' + +es: + url: 'http://123.57.140.192:9200' + user: 'elastic' + pwd: 'se4BxqYyGhHyPWPA3w8Q' im_business_rpc: url: im-rpc-business.izhim.com From 2f4c07ad6b1bc4e0fb667ebbe1bddc002e29222a Mon Sep 17 00:00:00 2001 From: shenjiachi Date: Mon, 30 Dec 2024 21:55:10 +0800 Subject: [PATCH 4/6] update --- consume/md/consume_key.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index 2867d76..5d185c3 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -237,14 +237,14 @@ var RabbitMqQueueKeyList = []*MqQueue{ ConsumeFunName: "UserDeleteConsume", }, { - // 消费临时队列 + ExchangeName: "egg.app", - Name: "egg_auto_score_test_queue", - //Name: "egg_auto_score_queue", + //Name: "egg_auto_score_test_queue", + Name: "egg_auto_score_queue", Type: DirectQueueType, IsPersistent: false, - RoutKey: "egg_auto_score_test", - //RoutKey: "egg_auto_score", + //RoutKey: "egg_auto_score_test", + RoutKey: "egg_auto_score", BindKey: "", ConsumeFunName: "EggEnergyAutoScoreConsume", }, From 22aca7a3a274c95ed7801c11da38bf577377b3f5 Mon Sep 17 00:00:00 2001 From: shenjiachi Date: Tue, 31 Dec 2024 17:54:00 +0800 Subject: [PATCH 5/6] update --- consume/init.go | 1 + consume/md/consume_key.go | 10 ++ .../temporary_egg_energy_deal_user_ecpm.go | 96 +++++++++++++++++++ 3 files changed, 107 insertions(+) create mode 100644 consume/temporary_egg_energy_deal_user_ecpm.go diff --git a/consume/init.go b/consume/init.go index 5a1bdbb..86ff65d 100644 --- a/consume/init.go +++ b/consume/init.go @@ -31,6 +31,7 @@ func initConsumes() { jobs[consumeMd.EggFinWithdrawApplyDataConsumeFunName] = EggFinWithdrawApplyDataConsume // 提现 jobs[consumeMd.EggEnergyDealUserECPMFunName] = EggEnergyDealUserECPMConsume // 处理给用户Ecpm值 + jobs[consumeMd.EggEnergyTemporaryDealUserECPMFunName] = TemporaryEggEnergyDealUserECPMConsume // 临时处理给用户Ecpm值 jobs[consumeMd.EggCanalInviteUserNumsFunName] = EggCanalInviteUserNumsConsume // 处理拉新人数 jobs[consumeMd.EggCanalViolateNumsFunName] = EggCanalViolateNumsConsume // 处理违规次数 jobs[consumeMd.IMEggEnergySendRedPackageFunName] = IMEggEnergySendRedPackageConsume // 处理用户发送红包次数 diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index 5d185c3..354c899 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -164,6 +164,15 @@ var RabbitMqQueueKeyList = []*MqQueue{ BindKey: "", ConsumeFunName: "EggEnergyDealUserECPMConsume", }, + { + ExchangeName: "egg.energy", + Name: "egg_energy_user_ecpm_temporary", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "user_ecpm_temporary", + BindKey: "", + ConsumeFunName: "EggEnergyTemporaryDealUserECPMFunName", + }, { ExchangeName: "egg.canal.topic", Name: "egg_canal_invite_user_nums_queue", @@ -340,6 +349,7 @@ const ( EggEnergyAutoExchangeGreenEnergyFunName = "EggEnergyAutoExchangeGreenEnergyConsume" EggEnergyNewUserRegisterDataFunName = "EggEnergyNewUserRegisterDataConsume" EggEnergyDealUserECPMFunName = "EggEnergyDealUserECPMConsume" + EggEnergyTemporaryDealUserECPMFunName = "TemporaryEggEnergyDealUserECPMConsume" EggCanalInviteUserNumsFunName = "EggCanalInviteUserNumsConsume" EggCanalViolateNumsFunName = "EggCanalViolateNumsConsume" IMEggEnergySendRedPackageFunName = "IMEggEnergySendRedPackageConsume" diff --git a/consume/temporary_egg_energy_deal_user_ecpm.go b/consume/temporary_egg_energy_deal_user_ecpm.go new file mode 100644 index 0000000..3e3d6b4 --- /dev/null +++ b/consume/temporary_egg_energy_deal_user_ecpm.go @@ -0,0 +1,96 @@ +package consume + +import ( + "applet/app/cfg" + utils2 "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/EggPlanet/egg_system_rules.git" + md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/md" + "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy" + "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/enum" + es2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/utils/es" + "code.fnuoos.com/go_rely_warehouse/zyos_go_es.git/es" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "context" + "encoding/json" + "errors" + "fmt" + "github.com/olivere/elastic/v7" + "github.com/streadway/amqp" + "strings" + "time" +) + +func TemporaryEggEnergyDealUserECPMConsume(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>EggEnergyDealUserECPMConsume>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1) + delivery := ch.Consume(queue.Name, false) + + egg_system_rules.Init(cfg.RedisAddr) + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + err = handleTemporaryEggEnergyDealUserECPMConsume(res.Body) + if err != nil { + fmt.Println("TemporaryEggEnergyDealUserECPMConsume_ERR:::::", err.Error()) + utils2.FilePutContents("TemporaryEggEnergyDealUserECPMConsume_ERR", utils2.SerializeStr(map[string]interface{}{ + "body": res.Body, + "err": err.Error(), + })) + } + //_ = res.Reject(false) + err = res.Ack(true) + fmt.Println("err ::: ", err) + } else { + panic(errors.New("error getting message")) + } + } +} + +func handleTemporaryEggEnergyDealUserECPMConsume(msgData []byte) error { + //1、解析mq中queue的数据结构体 + var msg *md2.DealUserEcpmReq + err := json.Unmarshal(msgData, &msg) + if err != nil { + return err + } + + //2、更新用户信息 + year, week := time.Now().AddDate(0, 0, -7).ISOWeek() + yearStr := utils2.IntToStr(year) + weekStr := utils2.IntToStr(week) + index := es2.GetAppointIndexFromAlias(yearStr, weekStr) + id := fmt.Sprintf("%d%d_%d", year, week, msg.Uid) + + script := elastic.NewScript("ctx._source.ecpm += params.inc").Param("inc", utils2.StrToFloat64(msg.Ecpm)) + _, err = es.EsClient.Update(). + Index(index). + Id(id). + Script(script). + Do(context.Background()) + if err != nil { + if strings.Contains(err.Error(), "elastic: Error 404 (Not Found)") { + // 蛋蛋分数据还不存在,创建蛋蛋分数据 + now := time.Now().AddDate(0, 0, -7).Format("2006-01-02 15:04:05") + err1 := egg_energy.CreateEsScoreAndAssignValuesDoc(index, id, msg.Uid, enum.Ecpm, msg.Ecpm, now) + if err1 != nil { + return err1 + } + return nil + } + return err + } + return err +} From d4510c7451224d5b49730c7e9bfc2aab3d26fddc Mon Sep 17 00:00:00 2001 From: shenjiachi Date: Tue, 31 Dec 2024 17:55:53 +0800 Subject: [PATCH 6/6] update --- consume/md/consume_key.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index 354c899..b5feaa4 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -171,7 +171,7 @@ var RabbitMqQueueKeyList = []*MqQueue{ IsPersistent: false, RoutKey: "user_ecpm_temporary", BindKey: "", - ConsumeFunName: "EggEnergyTemporaryDealUserECPMFunName", + ConsumeFunName: "TemporaryEggEnergyDealUserECPMConsume", }, { ExchangeName: "egg.canal.topic",