# Conflicts: # consume/init.go # consume/md/consume_key.go # go.modmaster
@@ -0,0 +1,102 @@ | |||||
package consume | |||||
import ( | |||||
"applet/app/db" | |||||
utils2 "applet/app/utils" | |||||
"applet/app/utils/logx" | |||||
"applet/consume/md" | |||||
md2 "applet/es/md" | |||||
"code.fnuoos.com/EggPlanet/egg_models.git/src/implement" | |||||
es2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/utils/es" | |||||
"code.fnuoos.com/go_rely_warehouse/zyos_go_es.git/es" | |||||
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" | |||||
"context" | |||||
"encoding/json" | |||||
"errors" | |||||
"fmt" | |||||
"github.com/olivere/elastic/v7" | |||||
"github.com/streadway/amqp" | |||||
"time" | |||||
) | |||||
// EggCanalPersonAddActivityValueConsume 用户获得活跃积分时更新到 es | |||||
func EggCanalPersonAddActivityValueConsume(queue md.MqQueue) { | |||||
fmt.Println(">>>>>>>>>>>EggCanalPersonAddActivityValueConsume>>>>>>>>>>>>>") | |||||
ch, err := rabbit.Cfg.Pool.GetChannel() | |||||
if err != nil { | |||||
logx.Error(err) | |||||
return | |||||
} | |||||
defer ch.Release() | |||||
//1、将自己绑定到交换机上 | |||||
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) | |||||
//2、取出数据进行消费 | |||||
ch.Qos(1000) | |||||
delivery := ch.Consume(queue.Name, true) //设置自动应答 | |||||
var res amqp.Delivery | |||||
var ok bool | |||||
for { | |||||
res, ok = <-delivery | |||||
if ok == true { | |||||
//fmt.Println(string(res.Body)) | |||||
fmt.Println(">>>>>>>>>>>>>>>>>>EggCanalPersonAddActivityValueConsume<<<<<<<<<<<<<<<<<<<<<<<<<") | |||||
err = handleEggCanalPersonAddActivityValueConsume(res.Body) | |||||
if err != nil { | |||||
fmt.Println("EggCanalPersonAddActivityValueConsume_ERR:::::", err.Error()) | |||||
utils2.FilePutContents("EggCanalPersonAddActivityValueConsume_ERR", utils2.SerializeStr(map[string]interface{}{ | |||||
"body": res.Body, | |||||
"err": err.Error(), | |||||
})) | |||||
} | |||||
//_ = res.Reject(false) | |||||
//_ = res.Ack(true) | |||||
} else { | |||||
panic(errors.New("error getting message")) | |||||
} | |||||
} | |||||
} | |||||
func handleEggCanalPersonAddActivityValueConsume(msg []byte) error { | |||||
//1.解析canal采集至mq中queue的数据结构体 | |||||
var canalMsg *md.CanalUserVirtualCoinFlowMessage[md.CanalUserVirtualCoinFlow] | |||||
err := json.Unmarshal(msg, &canalMsg) | |||||
if err != nil { | |||||
return nil | |||||
} | |||||
year, week := time.Now().ISOWeek() | |||||
yearStr := utils2.IntToStr(year) | |||||
weekStr := utils2.IntToStr(week) | |||||
index := es2.GetAppointIndexFromAlias(yearStr, weekStr) | |||||
settingDb := implement.NewEggEnergyBasicSettingDb(db.Db) | |||||
setting, err := settingDb.EggEnergyBasicSettingGetOne() | |||||
if err != nil { | |||||
return err | |||||
} | |||||
personEggPointsCoinId := setting.PersonEggPointsCoinId | |||||
// 2. 监听插入信息 | |||||
if canalMsg.Type == md2.CanalMsgInsertSqlType { | |||||
for _, item := range canalMsg.Data { | |||||
uid := item.Uid | |||||
id := fmt.Sprintf("%d%d-%d", year, week, uid) | |||||
if item.CoinId != personEggPointsCoinId && item.Direction != 1 { | |||||
continue | |||||
} | |||||
// 3. 增加个人活跃积分 | |||||
script := elastic.NewScript("ctx._source.person_add_activity_value += params.inc").Param("inc", item.Amount) | |||||
_, err = es.EsClient.Update(). | |||||
Index(index). | |||||
Id(id). | |||||
Script(script). | |||||
Do(context.Background()) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
} | |||||
} | |||||
return nil | |||||
} |
@@ -1,10 +1,12 @@ | |||||
package consume | package consume | ||||
import ( | import ( | ||||
"applet/app/db" | |||||
utils2 "applet/app/utils" | utils2 "applet/app/utils" | ||||
"applet/app/utils/logx" | "applet/app/utils/logx" | ||||
"applet/consume/md" | "applet/consume/md" | ||||
md2 "applet/es/md" | md2 "applet/es/md" | ||||
"code.fnuoos.com/EggPlanet/egg_models.git/src/implement" | |||||
es2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/utils/es" | es2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/utils/es" | ||||
"code.fnuoos.com/go_rely_warehouse/zyos_go_es.git/es" | "code.fnuoos.com/go_rely_warehouse/zyos_go_es.git/es" | ||||
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" | "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" | ||||
@@ -17,6 +19,7 @@ import ( | |||||
"time" | "time" | ||||
) | ) | ||||
// EggCanalViolateNumsConsume 更新违规次数 | |||||
func EggCanalViolateNumsConsume(queue md.MqQueue) { | func EggCanalViolateNumsConsume(queue md.MqQueue) { | ||||
fmt.Println(">>>>>>>>>>>EggCanalViolateNumsConsume>>>>>>>>>>>>>") | fmt.Println(">>>>>>>>>>>EggCanalViolateNumsConsume>>>>>>>>>>>>>") | ||||
ch, err := rabbit.Cfg.Pool.GetChannel() | ch, err := rabbit.Cfg.Pool.GetChannel() | ||||
@@ -56,7 +59,7 @@ func EggCanalViolateNumsConsume(queue md.MqQueue) { | |||||
func handleEggCanalViolateNumsConsume(msg []byte) error { | func handleEggCanalViolateNumsConsume(msg []byte) error { | ||||
//1、解析canal采集至mq中queue的数据结构体 | //1、解析canal采集至mq中queue的数据结构体 | ||||
var canalMsg *md.CanalUserRelateMessage[md.CanalUserRelate] | |||||
var canalMsg *md.CanalTagRecordsMessage[md.CanalTagRecords] | |||||
err := json.Unmarshal(msg, &canalMsg) | err := json.Unmarshal(msg, &canalMsg) | ||||
if err != nil { | if err != nil { | ||||
return nil | return nil | ||||
@@ -68,11 +71,23 @@ func handleEggCanalViolateNumsConsume(msg []byte) error { | |||||
index := es2.GetAppointIndexFromAlias(yearStr, weekStr) | index := es2.GetAppointIndexFromAlias(yearStr, weekStr) | ||||
// 2. 监听插入信息 | // 2. 监听插入信息 | ||||
if canalMsg.Type == md2.CanalMsgInsertSqlType { | if canalMsg.Type == md2.CanalMsgInsertSqlType { | ||||
tagDb := implement.NewUserTagDb(db.Db) | |||||
for _, item := range canalMsg.Data { | for _, item := range canalMsg.Data { | ||||
parentUid := item.ParentUid | |||||
id := fmt.Sprintf("%d%d-%d", year, week, parentUid) | |||||
tag, err1 := tagDb.UserTagGetOneByParams(map[string]interface{}{ | |||||
"key": "id", | |||||
"value": item.TagId, | |||||
}) | |||||
if err1 != nil { | |||||
return err1 | |||||
} | |||||
// 2.1. 判断是否为处罚标签 | |||||
if tag.IsPunish == 0 { | |||||
continue | |||||
} | |||||
uid := item.Uid | |||||
id := fmt.Sprintf("%d%d-%d", year, week, uid) | |||||
// 增加违规次数记录 | |||||
// 2.2. 增加违规次数记录 | |||||
script := elastic.NewScript("ctx._source.violate_nums += params.inc").Param("inc", 1) | script := elastic.NewScript("ctx._source.violate_nums += params.inc").Param("inc", 1) | ||||
_, err = es.EsClient.Update(). | _, err = es.EsClient.Update(). | ||||
Index(index). | Index(index). | ||||
@@ -84,13 +99,26 @@ func handleEggCanalViolateNumsConsume(msg []byte) error { | |||||
} | } | ||||
} | } | ||||
} | } | ||||
// 3. 监听删除信息 | |||||
if canalMsg.Type == md2.CanalMsgDeleteSqlType { | if canalMsg.Type == md2.CanalMsgDeleteSqlType { | ||||
tagDb := implement.NewUserTagDb(db.Db) | |||||
for _, item := range canalMsg.Data { | for _, item := range canalMsg.Data { | ||||
parentUid := item.ParentUid | |||||
id := fmt.Sprintf("%d%d-%d", year, week, parentUid) | |||||
tag, err1 := tagDb.UserTagGetOneByParams(map[string]interface{}{ | |||||
"key": "id", | |||||
"value": item.TagId, | |||||
}) | |||||
if err1 != nil { | |||||
return err1 | |||||
} | |||||
// 3.1 判断是否为处罚标签 | |||||
if tag.IsPunish == 0 { | |||||
continue | |||||
} | |||||
uid := item.Uid | |||||
id := fmt.Sprintf("%d%d-%d", year, week, uid) | |||||
// 减少违规次数记录 | |||||
script := elastic.NewScript("ctx._source.violate_nums -= params.inc").Param("inc", 1) | |||||
// 3.2 减少违规次数记录 | |||||
script := elastic.NewScript("ctx._source.violate_nums -= params.dec").Param("dec", 1) | |||||
_, err = es.EsClient.Update(). | _, err = es.EsClient.Update(). | ||||
Index(index). | Index(index). | ||||
Id(id). | Id(id). | ||||
@@ -0,0 +1,114 @@ | |||||
package consume | |||||
import ( | |||||
"applet/app/cfg" | |||||
"applet/app/db" | |||||
utils2 "applet/app/utils" | |||||
"applet/app/utils/logx" | |||||
"applet/consume/md" | |||||
"code.fnuoos.com/EggPlanet/egg_models.git/src/implement" | |||||
"code.fnuoos.com/EggPlanet/egg_system_rules.git" | |||||
md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/md" | |||||
es2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/utils/es" | |||||
"code.fnuoos.com/go_rely_warehouse/zyos_go_es.git/es" | |||||
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" | |||||
"context" | |||||
"encoding/json" | |||||
"errors" | |||||
"fmt" | |||||
"github.com/olivere/elastic/v7" | |||||
"github.com/streadway/amqp" | |||||
"time" | |||||
) | |||||
// EggRecordActiveDataConsume 更新团队活跃次数和签到次数 | |||||
func EggRecordActiveDataConsume(queue md.MqQueue) { | |||||
fmt.Println(">>>>>>>>>>>>EggRecordActiveDataConsume>>>>>>>>>>>>") | |||||
ch, err := rabbit.Cfg.Pool.GetChannel() | |||||
if err != nil { | |||||
logx.Error(err) | |||||
return | |||||
} | |||||
defer ch.Release() | |||||
//1、将自己绑定到交换机上 | |||||
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) | |||||
//2、取出数据进行消费 | |||||
ch.Qos(1) | |||||
delivery := ch.Consume(queue.Name, false) | |||||
egg_system_rules.Init(cfg.RedisAddr) | |||||
var res amqp.Delivery | |||||
var ok bool | |||||
for { | |||||
res, ok = <-delivery | |||||
if ok == true { | |||||
err = handleEggRecordActiveDataConsume(res.Body) | |||||
if err != nil { | |||||
fmt.Println("EggRecordActiveDataConsume_ERR:::::", err.Error()) | |||||
utils2.FilePutContents("EggRecordActiveDataConsume_ERR", utils2.SerializeStr(map[string]interface{}{ | |||||
"body": res.Body, | |||||
"err": err.Error(), | |||||
})) | |||||
} | |||||
//_ = res.Reject(false) | |||||
err = res.Ack(true) | |||||
fmt.Println("err ::: ", err) | |||||
} else { | |||||
panic(errors.New("error getting message")) | |||||
} | |||||
} | |||||
} | |||||
func handleEggRecordActiveDataConsume(msgData []byte) error { | |||||
time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒 | |||||
// 1.解析mq中queue的数据结构体 | |||||
var msg *md2.EggStructForRecordActiveData | |||||
err := json.Unmarshal(msgData, &msg) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
year, week := time.Now().ISOWeek() | |||||
yearStr := utils2.IntToStr(year) | |||||
weekStr := utils2.IntToStr(week) | |||||
index := es2.GetAppointIndexFromAlias(yearStr, weekStr) | |||||
// 2.获取用户直推上级 | |||||
userRelateDb := implement.NewUserRelateDb(db.Db) | |||||
userRelate, err := userRelateDb.GetUserParentUserRelate(msg.Uid) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
// 3.添加团队活跃次数 | |||||
if userRelate != nil { | |||||
parentUid := userRelate.ParentUid | |||||
parentEsId := fmt.Sprintf("%d%d-%d", year, week, parentUid) | |||||
// 增加团队活跃次数记录 | |||||
script := elastic.NewScript("ctx._source.team_activity_nums += params.inc").Param("inc", 1) | |||||
_, err = es.EsClient.Update(). | |||||
Index(index). | |||||
Id(parentEsId). | |||||
Script(script). | |||||
Do(context.Background()) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
} | |||||
// 4.更新签到次数 | |||||
esId := fmt.Sprintf("%d%d-%d", year, week, msg.Uid) | |||||
script := elastic.NewScript(` | |||||
ctx._source.sign_in_nums += params.signInInc; | |||||
`).Param("signInInc", 1) | |||||
_, err = es.EsClient.Update(). | |||||
Index(index). | |||||
Id(esId). | |||||
Script(script). | |||||
Do(context.Background()) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
return nil | |||||
} |
@@ -70,7 +70,7 @@ func handleIMEggEnergySendRedPackageConsume(msgData []byte) error { | |||||
weekStr := utils2.IntToStr(week) | weekStr := utils2.IntToStr(week) | ||||
index := es2.GetAppointIndexFromAlias(yearStr, weekStr) | index := es2.GetAppointIndexFromAlias(yearStr, weekStr) | ||||
id := fmt.Sprintf("%d%d-%d", year, week, msg.Uid) | id := fmt.Sprintf("%d%d-%d", year, week, msg.Uid) | ||||
// 新增拉新人数 | |||||
// 3.新增发红包次数 | |||||
script := elastic.NewScript("ctx._source.send_red_package_nums += params.inc").Param("inc", 1) | script := elastic.NewScript("ctx._source.send_red_package_nums += params.inc").Param("inc", 1) | ||||
_, err = es.EsClient.Update(). | _, err = es.EsClient.Update(). | ||||
Index(index). | Index(index). | ||||
@@ -36,6 +36,13 @@ func initConsumes() { | |||||
jobs[consumeMd.IMEggEnergySendRedPackageFunName] = IMEggEnergySendRedPackageConsume // 处理用户发送红包次数 | jobs[consumeMd.IMEggEnergySendRedPackageFunName] = IMEggEnergySendRedPackageConsume // 处理用户发送红包次数 | ||||
jobs[consumeMd.VideoRewardFunName] = VideoRewardConsume // 短视频奖励 | jobs[consumeMd.VideoRewardFunName] = VideoRewardConsume // 短视频奖励 | ||||
jobs[consumeMd.PlayletRewardFunName] = PlayletRewardConsume // 短剧奖励 | jobs[consumeMd.PlayletRewardFunName] = PlayletRewardConsume // 短剧奖励 | ||||
jobs[consumeMd.EggEnergyNewUserRegisterDataFunName] = EggEnergyNewUserRegisterDataConsume // 新用户注册 | |||||
jobs[consumeMd.EggEnergyDealUserECPMFunName] = EggEnergyDealUserECPMConsume // 处理给用户Ecpm值 | |||||
jobs[consumeMd.EggCanalInviteUserNumsFunName] = EggCanalInviteUserNumsConsume // 处理拉新人数 | |||||
jobs[consumeMd.EggCanalViolateNumsFunName] = EggCanalViolateNumsConsume // 处理违规次数 | |||||
jobs[consumeMd.IMEggEnergySendRedPackageFunName] = IMEggEnergySendRedPackageConsume // 处理用户发送红包次数 | |||||
jobs[consumeMd.EggRecordActiveDataFunName] = EggRecordActiveDataConsume // 用户签到后更新活跃数据 | |||||
jobs[consumeMd.EggCanalPersonAddActivityValueFunName] = EggCanalPersonAddActivityValueConsume // 用户活跃积分变更时更新es | |||||
} | } | ||||
func Run() { | func Run() { | ||||
@@ -67,7 +67,7 @@ var RabbitMqQueueKeyList = []*MqQueue{ | |||||
}, | }, | ||||
{ | { | ||||
ExchangeName: "egg.app", | ExchangeName: "egg.app", | ||||
Name: "egg_fin_withdraw_apply_error_queue", | |||||
Name: "egg_fin_withdraw_apply", | |||||
Type: DirectQueueType, | Type: DirectQueueType, | ||||
IsPersistent: false, | IsPersistent: false, | ||||
RoutKey: "egg_fin_withdraw_apply", | RoutKey: "egg_fin_withdraw_apply", | ||||
@@ -182,6 +182,24 @@ var RabbitMqQueueKeyList = []*MqQueue{ | |||||
BindKey: "", | BindKey: "", | ||||
ConsumeFunName: "IMEggEnergySendRedPackageConsume", | ConsumeFunName: "IMEggEnergySendRedPackageConsume", | ||||
}, | }, | ||||
{ | |||||
ExchangeName: "egg.app", | |||||
Name: "egg_record_active_queue", | |||||
Type: DirectQueueType, | |||||
IsPersistent: false, | |||||
RoutKey: "egg_record_active", | |||||
BindKey: "", | |||||
ConsumeFunName: "EggRecordActiveDataConsume", | |||||
}, | |||||
{ | |||||
ExchangeName: "egg.canal.topic", | |||||
Name: "egg_canal_person_add_activity_value_queue", | |||||
Type: DirectQueueType, | |||||
IsPersistent: false, | |||||
RoutKey: "egg_canal_person_add_activity_value", | |||||
BindKey: "", | |||||
ConsumeFunName: "EggCanalPersonAddActivityValueConsume", | |||||
}, | |||||
{ | { | ||||
ExchangeName: "egg.video_playlet", | ExchangeName: "egg.video_playlet", | ||||
Name: "egg_video_reward", | Name: "egg_video_reward", | ||||
@@ -222,6 +240,8 @@ const ( | |||||
EggCanalInviteUserNumsFunName = "EggCanalInviteUserNumsConsume" | EggCanalInviteUserNumsFunName = "EggCanalInviteUserNumsConsume" | ||||
EggCanalViolateNumsFunName = "EggCanalViolateNumsConsume" | EggCanalViolateNumsFunName = "EggCanalViolateNumsConsume" | ||||
IMEggEnergySendRedPackageFunName = "IMEggEnergySendRedPackageConsume" | IMEggEnergySendRedPackageFunName = "IMEggEnergySendRedPackageConsume" | ||||
EggRecordActiveDataFunName = "EggRecordActiveDataConsume" | |||||
EggCanalPersonAddActivityValueFunName = "EggCanalPersonAddActivityValueConsume" | |||||
VideoRewardFunName = "VideoRewardFunName" | VideoRewardFunName = "VideoRewardFunName" | ||||
PlayletRewardFunName = "PlayletRewardFunName" | PlayletRewardFunName = "PlayletRewardFunName" | ||||
) | ) |
@@ -0,0 +1,27 @@ | |||||
package md | |||||
type CanalUserVirtualCoinFlow struct { | |||||
Id int64 `json:"id"` | |||||
Uid int64 `json:"uid"` | |||||
CoinId int `json:"coin_id"` | |||||
Direction int `json:"direction"` | |||||
Title string `json:"title"` | |||||
Amount string `json:"amount"` | |||||
BeforeAmount string `json:"before_amount"` | |||||
AfterAmount string `json:"after_amount"` | |||||
SysFee string `json:"sys_fee"` | |||||
TransferType int `json:"transfer_type"` | |||||
} | |||||
type CanalUserVirtualCoinFlowMessage[T any] struct { | |||||
Data []T `json:"data"` | |||||
Database string `json:"database"` | |||||
ES int64 `json:"es"` | |||||
ID int64 `json:"id"` | |||||
IsDdl bool `json:"isDdl"` | |||||
Old []T `json:"old"` | |||||
PkNames []string `json:"pkNames"` | |||||
Table string `json:"table"` | |||||
TS int64 `json:"ts"` | |||||
Type string `json:"type"` | |||||
} |
@@ -7,6 +7,8 @@ go 1.19 | |||||
// replace code.fnuoos.com/EggPlanet/egg_system_rules.git => E:/company/Egg/egg_system_rules | // replace code.fnuoos.com/EggPlanet/egg_system_rules.git => E:/company/Egg/egg_system_rules | ||||
require ( | require ( | ||||
code.fnuoos.com/EggPlanet/egg_models.git v0.2.1-0.20241209061118-ae1be0db6a70 | |||||
code.fnuoos.com/EggPlanet/egg_system_rules.git v0.0.4-0.20241209020046-5e439665e263 | |||||
code.fnuoos.com/EggPlanet/egg_models.git v0.2.1-0.20241206115326-8cbc93c7c64d | code.fnuoos.com/EggPlanet/egg_models.git v0.2.1-0.20241206115326-8cbc93c7c64d | ||||
code.fnuoos.com/EggPlanet/egg_system_rules.git v0.0.4-0.20241209064215-2bf33ead99e1 | code.fnuoos.com/EggPlanet/egg_system_rules.git v0.0.4-0.20241209064215-2bf33ead99e1 | ||||
code.fnuoos.com/go_rely_warehouse/zyos_go_es.git v1.0.1-0.20241118083738-0f22da9ba0be | code.fnuoos.com/go_rely_warehouse/zyos_go_es.git v1.0.1-0.20241118083738-0f22da9ba0be | ||||