dengbiao 6 days ago
parent
commit
a64569ef59
5 changed files with 117 additions and 6 deletions
  1. +1
    -0
      consume/init.go
  2. +17
    -4
      consume/md/consume_key.go
  3. +96
    -0
      consume/temporary_egg_energy_deal_user_ecpm.go
  4. +2
    -1
      consume/user_delete_consume.go
  5. +1
    -1
      go.mod

+ 1
- 0
consume/init.go View File

@@ -31,6 +31,7 @@ func initConsumes() {
jobs[consumeMd.EggFinWithdrawApplyDataConsumeFunName] = EggFinWithdrawApplyDataConsume // 提现

jobs[consumeMd.EggEnergyDealUserECPMFunName] = EggEnergyDealUserECPMConsume // 处理给用户Ecpm值
jobs[consumeMd.EggEnergyTemporaryDealUserECPMFunName] = TemporaryEggEnergyDealUserECPMConsume // 临时处理给用户Ecpm值
jobs[consumeMd.EggCanalInviteUserNumsFunName] = EggCanalInviteUserNumsConsume // 处理拉新人数
jobs[consumeMd.EggCanalViolateNumsFunName] = EggCanalViolateNumsConsume // 处理违规次数
jobs[consumeMd.IMEggEnergySendRedPackageFunName] = IMEggEnergySendRedPackageConsume // 处理用户发送红包次数


+ 17
- 4
consume/md/consume_key.go View File

@@ -164,6 +164,15 @@ var RabbitMqQueueKeyList = []*MqQueue{
BindKey: "",
ConsumeFunName: "EggEnergyDealUserECPMConsume",
},
{
ExchangeName: "egg.energy",
Name: "egg_energy_user_ecpm_temporary",
Type: DirectQueueType,
IsPersistent: false,
RoutKey: "user_ecpm_temporary",
BindKey: "",
ConsumeFunName: "TemporaryEggEnergyDealUserECPMConsume",
},
{
ExchangeName: "egg.canal.topic",
Name: "egg_canal_invite_user_nums_queue",
@@ -237,10 +246,13 @@ var RabbitMqQueueKeyList = []*MqQueue{
ConsumeFunName: "UserDeleteConsume",
},
{
ExchangeName: "egg.app",
Name: "egg_auto_score_queue",
Type: DirectQueueType,
IsPersistent: false,

ExchangeName: "egg.app",
//Name: "egg_auto_score_test_queue",
Name: "egg_auto_score_queue",
Type: DirectQueueType,
IsPersistent: false,
//RoutKey: "egg_auto_score_test",
RoutKey: "egg_auto_score",
BindKey: "",
ConsumeFunName: "EggEnergyAutoScoreConsume",
@@ -337,6 +349,7 @@ const (
EggEnergyAutoExchangeGreenEnergyFunName = "EggEnergyAutoExchangeGreenEnergyConsume"
EggEnergyNewUserRegisterDataFunName = "EggEnergyNewUserRegisterDataConsume"
EggEnergyDealUserECPMFunName = "EggEnergyDealUserECPMConsume"
EggEnergyTemporaryDealUserECPMFunName = "TemporaryEggEnergyDealUserECPMConsume"
EggCanalInviteUserNumsFunName = "EggCanalInviteUserNumsConsume"
EggCanalViolateNumsFunName = "EggCanalViolateNumsConsume"
IMEggEnergySendRedPackageFunName = "IMEggEnergySendRedPackageConsume"


+ 96
- 0
consume/temporary_egg_energy_deal_user_ecpm.go View File

@@ -0,0 +1,96 @@
package consume

import (
"applet/app/cfg"
utils2 "applet/app/utils"
"applet/app/utils/logx"
"applet/consume/md"
"code.fnuoos.com/EggPlanet/egg_system_rules.git"
md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/md"
"code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy"
"code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/enum"
es2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/utils/es"
"code.fnuoos.com/go_rely_warehouse/zyos_go_es.git/es"
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
"context"
"encoding/json"
"errors"
"fmt"
"github.com/olivere/elastic/v7"
"github.com/streadway/amqp"
"strings"
"time"
)

func TemporaryEggEnergyDealUserECPMConsume(queue md.MqQueue) {
fmt.Println(">>>>>>>>>>>>EggEnergyDealUserECPMConsume>>>>>>>>>>>>")
ch, err := rabbit.Cfg.Pool.GetChannel()
if err != nil {
logx.Error(err)
return
}
defer ch.Release()
//1、将自己绑定到交换机上
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
//2、取出数据进行消费
ch.Qos(1)
delivery := ch.Consume(queue.Name, false)

egg_system_rules.Init(cfg.RedisAddr)
var res amqp.Delivery
var ok bool
for {
res, ok = <-delivery
if ok == true {
err = handleTemporaryEggEnergyDealUserECPMConsume(res.Body)
if err != nil {
fmt.Println("TemporaryEggEnergyDealUserECPMConsume_ERR:::::", err.Error())
utils2.FilePutContents("TemporaryEggEnergyDealUserECPMConsume_ERR", utils2.SerializeStr(map[string]interface{}{
"body": res.Body,
"err": err.Error(),
}))
}
//_ = res.Reject(false)
err = res.Ack(true)
fmt.Println("err ::: ", err)
} else {
panic(errors.New("error getting message"))
}
}
}

func handleTemporaryEggEnergyDealUserECPMConsume(msgData []byte) error {
//1、解析mq中queue的数据结构体
var msg *md2.DealUserEcpmReq
err := json.Unmarshal(msgData, &msg)
if err != nil {
return err
}

//2、更新用户信息
year, week := time.Now().AddDate(0, 0, -7).ISOWeek()
yearStr := utils2.IntToStr(year)
weekStr := utils2.IntToStr(week)
index := es2.GetAppointIndexFromAlias(yearStr, weekStr)
id := fmt.Sprintf("%d%d_%d", year, week, msg.Uid)

script := elastic.NewScript("ctx._source.ecpm += params.inc").Param("inc", utils2.StrToFloat64(msg.Ecpm))
_, err = es.EsClient.Update().
Index(index).
Id(id).
Script(script).
Do(context.Background())
if err != nil {
if strings.Contains(err.Error(), "elastic: Error 404 (Not Found)") {
// 蛋蛋分数据还不存在,创建蛋蛋分数据
now := time.Now().AddDate(0, 0, -7).Format("2006-01-02 15:04:05")
err1 := egg_energy.CreateEsScoreAndAssignValuesDoc(index, id, msg.Uid, enum.Ecpm, msg.Ecpm, now)
if err1 != nil {
return err1
}
return nil
}
return err
}
return err
}

+ 2
- 1
consume/user_delete_consume.go View File

@@ -92,8 +92,9 @@ func handleUserDeleteConsume(backEg *xorm.Engine, ch *rabbit.Channel, msgData []
backEg.Insert(&user)
user.Phone = ""
user.State = 2
user.ParentUid = 0
user.Nickname = "注销用户"
db.Db.Where("id=?", user.Id).Cols("state,phone,nickname").Update(&user)
db.Db.Where("id=?", user.Id).Cols("state,phone,nickname,parent_uid").Update(&user)
}
//2.用户关系链
//查出所有下级


+ 1
- 1
go.mod View File

@@ -7,7 +7,7 @@ go 1.19
// replace code.fnuoos.com/EggPlanet/egg_system_rules.git => E:/company/Egg/egg_system_rules

require (
code.fnuoos.com/EggPlanet/egg_models.git v0.2.1-0.20241224090637-89a57f7fbb1e
code.fnuoos.com/EggPlanet/egg_models.git v0.2.1-0.20241229040905-b840e6fb411a
code.fnuoos.com/EggPlanet/egg_system_rules.git v0.0.4-0.20241227092843-802cf07ae61c
code.fnuoos.com/go_rely_warehouse/zyos_go_es.git v1.0.1-0.20241118083738-0f22da9ba0be
code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git v0.0.5


Loading…
Cancel
Save