@@ -0,0 +1,79 @@ | |||||
package consume | |||||
import ( | |||||
"applet/app/cfg" | |||||
"applet/app/db" | |||||
"applet/app/utils" | |||||
"applet/app/utils/logx" | |||||
"applet/consume/md" | |||||
md2 "applet/es/md" | |||||
"code.fnuoos.com/EggPlanet/egg_system_rules.git" | |||||
"code.fnuoos.com/EggPlanet/egg_system_rules.git/rule" | |||||
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" | |||||
"encoding/json" | |||||
"errors" | |||||
"fmt" | |||||
"github.com/streadway/amqp" | |||||
"time" | |||||
) | |||||
func EggCanalUserConsume(queue md.MqQueue) { | |||||
fmt.Println(">>>>>>>>>>>>EggCanalUserConsume>>>>>>>>>>>>") | |||||
ch, err := rabbit.Cfg.Pool.GetChannel() | |||||
if err != nil { | |||||
logx.Error(err) | |||||
return | |||||
} | |||||
defer ch.Release() | |||||
//1、将自己绑定到交换机上 | |||||
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) | |||||
//2、取出数据进行消费 | |||||
ch.Qos(1) | |||||
delivery := ch.Consume(queue.Name, false) | |||||
egg_system_rules.Init(cfg.RedisAddr) | |||||
var res amqp.Delivery | |||||
var ok bool | |||||
for { | |||||
res, ok = <-delivery | |||||
if ok == true { | |||||
err = handleEggCanalUserConsume(res.Body, ch) | |||||
err = res.Ack(true) | |||||
fmt.Println("err ::: ", err) | |||||
} else { | |||||
panic(errors.New("error getting message")) | |||||
} | |||||
} | |||||
fmt.Println("get msg done") | |||||
} | |||||
func handleEggCanalUserConsume(msg []byte, ch *rabbit.Channel) error { | |||||
time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒 | |||||
var canalMsg *md.CanalUserMessage[md.CanalUser] | |||||
err := json.Unmarshal(msg, &canalMsg) | |||||
if err != nil { | |||||
fmt.Println("EggCanalInviteUserNumsConsumeFaliedUnMarshal_ERR:::::", err.Error()) | |||||
return nil | |||||
} | |||||
if canalMsg.Type == md2.CanalMsgInsertSqlType || canalMsg.Type == md2.CanalMsgUpdateSqlType { | |||||
oldUser := make(map[string]md.CanalUser) | |||||
for _, item := range canalMsg.Old { | |||||
oldUser[item.Id] = item | |||||
} | |||||
for _, item := range canalMsg.Data { | |||||
if utils.StrToInt(item.ParentUid) == 0 { | |||||
continue | |||||
} | |||||
count := rule.ExtendUserCount(db.Db, utils.StrToInt(item.ParentUid)) | |||||
if count > 1000 { | |||||
msg1 := md.CommUserId{ | |||||
Uid: item.ParentUid, | |||||
} | |||||
ch.Publish("egg.app", msg1, "egg_slow_auto_up_lv") | |||||
continue | |||||
} | |||||
rule.UserUpgradeInsert(db.Db, utils.StrToInt(item.ParentUid)) | |||||
} | |||||
} | |||||
return nil | |||||
} |
@@ -0,0 +1,83 @@ | |||||
package consume | |||||
import ( | |||||
"applet/app/cfg" | |||||
"applet/app/db" | |||||
"applet/app/utils" | |||||
md2 "applet/es/md" | |||||
"code.fnuoos.com/EggPlanet/egg_models.git/src/implement" | |||||
"code.fnuoos.com/EggPlanet/egg_system_rules.git" | |||||
"code.fnuoos.com/EggPlanet/egg_system_rules.git/rule" | |||||
"applet/app/utils/logx" | |||||
"applet/consume/md" | |||||
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" | |||||
"encoding/json" | |||||
"errors" | |||||
"fmt" | |||||
"github.com/streadway/amqp" | |||||
"time" | |||||
) | |||||
func EggEnergyUserActivityConsume(queue md.MqQueue) { | |||||
fmt.Println(">>>>>>>>>>>>EggEnergyUserActivityConsume>>>>>>>>>>>>") | |||||
ch, err := rabbit.Cfg.Pool.GetChannel() | |||||
if err != nil { | |||||
logx.Error(err) | |||||
return | |||||
} | |||||
defer ch.Release() | |||||
//1、将自己绑定到交换机上 | |||||
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) | |||||
//2、取出数据进行消费 | |||||
ch.Qos(1) | |||||
delivery := ch.Consume(queue.Name, false) | |||||
egg_system_rules.Init(cfg.RedisAddr) | |||||
var res amqp.Delivery | |||||
var ok bool | |||||
for { | |||||
res, ok = <-delivery | |||||
if ok == true { | |||||
err = handleEggEnergyUserActivityConsume(res.Body, ch) | |||||
err = res.Ack(true) | |||||
fmt.Println("err ::: ", err) | |||||
} else { | |||||
panic(errors.New("error getting message")) | |||||
} | |||||
} | |||||
fmt.Println("get msg done") | |||||
} | |||||
func handleEggEnergyUserActivityConsume(msg []byte, ch *rabbit.Channel) error { | |||||
time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒 | |||||
var canalMsg *md.CanalEggEnergyUserActivityMessage[md.CanalEggEnergyUserActivity] | |||||
err := json.Unmarshal(msg, &canalMsg) | |||||
if err != nil { | |||||
fmt.Println("EggCanalInviteUserNumsConsumeFaliedUnMarshal_ERR:::::", err.Error()) | |||||
return nil | |||||
} | |||||
if canalMsg.Type == md2.CanalMsgInsertSqlType { | |||||
for _, item := range canalMsg.Data { | |||||
userDb := implement.NewUserDb(db.Db) | |||||
user, _ := userDb.GetUser(utils.StrToInt64(item.Uid)) | |||||
if user == nil { | |||||
continue | |||||
} | |||||
if user.ParentUid == 0 { | |||||
continue | |||||
} | |||||
count := rule.ExtendUserCount(db.Db, int(user.ParentUid)) | |||||
if count > 1000 { | |||||
msg1 := md.CommUserId{ | |||||
Uid: utils.Int64ToStr(user.ParentUid), | |||||
} | |||||
ch.Publish("egg.app", msg1, "egg_slow_auto_up_lv") | |||||
continue | |||||
} | |||||
rule.UserUpgradeInsert(db.Db, int(user.ParentUid)) | |||||
} | |||||
} | |||||
return nil | |||||
} |
@@ -0,0 +1,60 @@ | |||||
package consume | |||||
import ( | |||||
"applet/app/cfg" | |||||
"applet/app/db" | |||||
"applet/app/utils" | |||||
"code.fnuoos.com/EggPlanet/egg_system_rules.git" | |||||
"code.fnuoos.com/EggPlanet/egg_system_rules.git/rule" | |||||
"applet/app/utils/logx" | |||||
"applet/consume/md" | |||||
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" | |||||
"encoding/json" | |||||
"errors" | |||||
"fmt" | |||||
"github.com/streadway/amqp" | |||||
"time" | |||||
) | |||||
func EggSlowAutoUpLvConsume(queue md.MqQueue) { | |||||
fmt.Println(">>>>>>>>>>>>EggSlowAutoUpLvConsume>>>>>>>>>>>>") | |||||
ch, err := rabbit.Cfg.Pool.GetChannel() | |||||
if err != nil { | |||||
logx.Error(err) | |||||
return | |||||
} | |||||
defer ch.Release() | |||||
//1、将自己绑定到交换机上 | |||||
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) | |||||
//2、取出数据进行消费 | |||||
ch.Qos(1) | |||||
delivery := ch.Consume(queue.Name, false) | |||||
egg_system_rules.Init(cfg.RedisAddr) | |||||
var res amqp.Delivery | |||||
var ok bool | |||||
for { | |||||
res, ok = <-delivery | |||||
if ok == true { | |||||
err = handleEggSlowAutoUpLvConsume(res.Body) | |||||
err = res.Ack(true) | |||||
fmt.Println("err ::: ", err) | |||||
} else { | |||||
panic(errors.New("error getting message")) | |||||
} | |||||
} | |||||
fmt.Println("get msg done") | |||||
} | |||||
func handleEggSlowAutoUpLvConsume(msgData []byte) error { | |||||
time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒 | |||||
// 1.解析mq中queue的数据结构体 | |||||
var msg *md.CommUserId | |||||
err := json.Unmarshal(msgData, &msg) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
rule.UserUpgradeInsert(db.Db, utils.StrToInt(msg.Uid)) | |||||
return nil | |||||
} |
@@ -17,6 +17,9 @@ func Init() { | |||||
// 增加消费任务队列 | // 增加消费任务队列 | ||||
func initConsumes() { | func initConsumes() { | ||||
jobs[consumeMd.EggSlowAutoUpLvConsume] = EggSlowAutoUpLvConsume //缓慢 自动升级 | |||||
jobs[consumeMd.EggEnergyUserActivityConsume] = EggEnergyUserActivityConsume //监听 自动升级 | |||||
jobs[consumeMd.EggCanalUserConsume] = EggCanalUserConsume //监听 自动升级 | |||||
jobs[consumeMd.AliyunSmsRecordFunName] = AliyunSmsRecordConsume //阿里云短信 | jobs[consumeMd.AliyunSmsRecordFunName] = AliyunSmsRecordConsume //阿里云短信 | ||||
jobs[consumeMd.JpushRecordFunName] = JpushRecordConsume //极光推送 | jobs[consumeMd.JpushRecordFunName] = JpushRecordConsume //极光推送 | ||||
@@ -245,11 +245,39 @@ var RabbitMqQueueKeyList = []*MqQueue{ | |||||
BindKey: "", | BindKey: "", | ||||
ConsumeFunName: "EggEnergyAutoScoreConsume", | ConsumeFunName: "EggEnergyAutoScoreConsume", | ||||
}, | }, | ||||
{ | |||||
ExchangeName: "egg.app", | |||||
Name: "egg_slow_auto_up_lv", | |||||
Type: DirectQueueType, | |||||
IsPersistent: false, | |||||
RoutKey: "egg_slow_auto_up_lv", | |||||
BindKey: "", | |||||
ConsumeFunName: "EggSlowAutoUpLvConsume", | |||||
}, | |||||
{ | |||||
ExchangeName: "egg.canal.topic", | |||||
Name: "egg_energy_user_activity", | |||||
Type: DirectQueueType, | |||||
IsPersistent: false, | |||||
RoutKey: "egg_energy_user_activity", | |||||
BindKey: "", | |||||
ConsumeFunName: "EggEnergyUserActivityConsume", | |||||
}, | |||||
{ | |||||
ExchangeName: "egg.canal.topic", | |||||
Name: "egg_canal_user", | |||||
Type: DirectQueueType, | |||||
IsPersistent: false, | |||||
RoutKey: "egg_canal_user", | |||||
BindKey: "", | |||||
ConsumeFunName: "EggCanalUserConsume", | |||||
}, | |||||
} | } | ||||
const ( | const ( | ||||
AdvertisingSignConsume = "AdvertisingSignConsume" | |||||
AdvertisingSmashConsume = "AdvertisingSmashConsume" | |||||
EggSlowAutoUpLvConsume = "EggSlowAutoUpLvConsume" | |||||
EggEnergyUserActivityConsume = "EggEnergyUserActivityConsume" | |||||
EggCanalUserConsume = "EggCanalUserConsume" | |||||
JpushRecordFunName = "JpushRecordConsume" | JpushRecordFunName = "JpushRecordConsume" | ||||
AliyunSmsRecordFunName = "AliyunSmsRecordConsume" | AliyunSmsRecordFunName = "AliyunSmsRecordConsume" | ||||
EggEnergyStartLevelDividendFunName = "EggEnergyStartLevelDividendConsume" | EggEnergyStartLevelDividendFunName = "EggEnergyStartLevelDividendConsume" | ||||
@@ -0,0 +1,19 @@ | |||||
package md | |||||
type CanalEggEnergyUserActivity struct { | |||||
Id string `json:"id"` | |||||
Uid string `json:"uid"` | |||||
} | |||||
type CanalEggEnergyUserActivityMessage[T any] struct { | |||||
Data []T `json:"data"` | |||||
Database string `json:"database"` | |||||
ES int64 `json:"es"` | |||||
ID int64 `json:"id"` | |||||
IsDdl bool `json:"isDdl"` | |||||
Old []T `json:"old"` | |||||
PkNames []string `json:"pkNames"` | |||||
Table string `json:"table"` | |||||
TS int64 `json:"ts"` | |||||
Type string `json:"type"` | |||||
} |
@@ -0,0 +1,20 @@ | |||||
package md | |||||
type CanalUser struct { | |||||
Id string `json:"id"` | |||||
Level string `json:"level"` | |||||
ParentUid string `json:"parent_uid"` | |||||
} | |||||
type CanalUserMessage[T any] struct { | |||||
Data []T `json:"data"` | |||||
Database string `json:"database"` | |||||
ES int64 `json:"es"` | |||||
ID int64 `json:"id"` | |||||
IsDdl bool `json:"isDdl"` | |||||
Old []T `json:"old"` | |||||
PkNames []string `json:"pkNames"` | |||||
Table string `json:"table"` | |||||
TS int64 `json:"ts"` | |||||
Type string `json:"type"` | |||||
} |
@@ -7,8 +7,8 @@ go 1.19 | |||||
//replace code.fnuoos.com/EggPlanet/egg_system_rules.git => E:/company/Egg/egg_system_rules | //replace code.fnuoos.com/EggPlanet/egg_system_rules.git => E:/company/Egg/egg_system_rules | ||||
require ( | require ( | ||||
code.fnuoos.com/EggPlanet/egg_models.git v0.2.1-0.20241212120727-3681308aeb14 | |||||
code.fnuoos.com/EggPlanet/egg_system_rules.git v0.0.4-0.20241213073654-f37e71ad92ef | |||||
code.fnuoos.com/EggPlanet/egg_models.git v0.2.1-0.20241214062221-cde2ce240fa8 | |||||
code.fnuoos.com/EggPlanet/egg_system_rules.git v0.0.4-0.20241214062309-291e3233ae28 | |||||
code.fnuoos.com/go_rely_warehouse/zyos_go_es.git v1.0.1-0.20241118083738-0f22da9ba0be | code.fnuoos.com/go_rely_warehouse/zyos_go_es.git v1.0.1-0.20241118083738-0f22da9ba0be | ||||
code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git v0.0.5 | code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git v0.0.5 | ||||
github.com/boombuler/barcode v1.0.1 | github.com/boombuler/barcode v1.0.1 | ||||