@@ -0,0 +1,109 @@ | |||
package consume | |||
import ( | |||
utils2 "applet/app/utils" | |||
"applet/app/utils/logx" | |||
"applet/consume/md" | |||
md2 "applet/es/md" | |||
"code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy" | |||
"code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/enum" | |||
es2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/utils/es" | |||
"code.fnuoos.com/go_rely_warehouse/zyos_go_es.git/es" | |||
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" | |||
"context" | |||
"encoding/json" | |||
"errors" | |||
"fmt" | |||
"github.com/olivere/elastic/v7" | |||
"github.com/streadway/amqp" | |||
"strings" | |||
"time" | |||
) | |||
func EggCanalEnergyExchangeAccountConsume(queue md.MqQueue) { | |||
fmt.Println(">>>>>>>>>>>EggCanalEnergyExchangeAccountConsume>>>>>>>>>>>>>") | |||
ch, err := rabbit.Cfg.Pool.GetChannel() | |||
if err != nil { | |||
logx.Error(err) | |||
return | |||
} | |||
defer ch.Release() | |||
//1、将自己绑定到交换机上 | |||
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) | |||
//2、取出数据进行消费 | |||
ch.Qos(1000) | |||
delivery := ch.Consume(queue.Name, true) //设置自动应答 | |||
var res amqp.Delivery | |||
var ok bool | |||
for { | |||
res, ok = <-delivery | |||
if ok == true { | |||
//fmt.Println(string(res.Body)) | |||
fmt.Println(">>>>>>>>>>>>>>>>>>EggCanalEnergyExchangeAccountConsume<<<<<<<<<<<<<<<<<<<<<<<<<") | |||
err = handleEggCanalEnergyExchangeAccountConsume(res.Body) | |||
if err != nil { | |||
fmt.Println("EggCanalEnergyExchangeAccountConsume_ERR:::::", err.Error()) | |||
utils2.FilePutContents("EggCanalEnergyExchangeAccountConsume_ERR", utils2.SerializeStr(map[string]interface{}{ | |||
"body": res.Body, | |||
"err": err.Error(), | |||
})) | |||
} | |||
//_ = res.Reject(false) | |||
//_ = res.Ack(true) | |||
} else { | |||
panic(errors.New("error getting message")) | |||
} | |||
} | |||
} | |||
func handleEggCanalEnergyExchangeAccountConsume(msg []byte) error { | |||
//1.解析canal采集至mq中queue的数据结构体 | |||
time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒 | |||
var canalMsg *md.CanalEnergyExchangeAccountMessage[md.CanalEnergyExchangeAccount] | |||
err := json.Unmarshal(msg, &canalMsg) | |||
if err != nil { | |||
fmt.Println("EggCanalEnergyExchangeAccountConsumeUnMarshalFailed_ERR:::::", err.Error()) | |||
return nil | |||
} | |||
year, week := time.Now().ISOWeek() | |||
yearStr := utils2.IntToStr(year) | |||
weekStr := utils2.IntToStr(week) | |||
index := es2.GetAppointIndexFromAlias(yearStr, weekStr) | |||
// 2. 监听插入信息 | |||
if canalMsg.Type == md2.CanalMsgInsertSqlType { | |||
for _, item := range canalMsg.Data { | |||
uid := item.Uid | |||
id := fmt.Sprintf("%d%d_%s", year, week, uid) | |||
if item.Title != enum.EggEnergyExchangeAccountBalance.String() { | |||
continue | |||
} | |||
// 3. 增加 蛋蛋能量兑换余额 数量 | |||
amount := utils2.StrToFloat64(item.Amount) | |||
script := elastic.NewScript("ctx._source.egg_energy_exchange_account_balance += params.inc").Param("inc", amount) | |||
updateDoc, err := es.EsClient.Update(). | |||
Index(index). | |||
Id(id). | |||
Script(script). | |||
Do(context.Background()) | |||
if err != nil { | |||
if strings.Contains(err.Error(), "elastic: Error 404 (Not Found)") { | |||
// 蛋蛋分数据还不存在,创建蛋蛋分数据 | |||
now := time.Now().Format("2006-01-02 15:04:05") | |||
err1 := egg_energy.CreateEsScoreAndAssignValuesDoc(index, id, utils2.StrToInt64(uid), enum.EsEggEnergyExchangeAccountBalance, utils2.Float64ToStr(amount), now) | |||
if err1 != nil { | |||
return err1 | |||
} | |||
return nil | |||
} | |||
fmt.Println("EggCanalPersonAddActivityValueConsumeUpdateDoc_ERR::::", err.Error()) | |||
return err | |||
} | |||
fmt.Println("updateDoc==========>", updateDoc) | |||
} | |||
} | |||
return nil | |||
} |
@@ -1,67 +0,0 @@ | |||
package consume | |||
import ( | |||
utils2 "applet/app/utils" | |||
"code.fnuoos.com/EggPlanet/egg_system_rules.git/md" | |||
es2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/utils/es" | |||
"code.fnuoos.com/go_rely_warehouse/zyos_go_es.git/es" | |||
"context" | |||
"fmt" | |||
"github.com/olivere/elastic/v7" | |||
"strings" | |||
"testing" | |||
"time" | |||
) | |||
func TestEs(t *testing.T) { | |||
es.Init("http://123.57.140.192:9200", "elastic", "fnuo123") | |||
year, week := time.Now().ISOWeek() | |||
yearStr := utils2.IntToStr(year) | |||
weekStr := utils2.IntToStr(week) | |||
index := es2.GetAppointIndexFromAlias(yearStr, weekStr) | |||
amount := utils2.StrToFloat64("100") | |||
uid := "100" | |||
id := fmt.Sprintf("%d%d_%s", year, week, uid) | |||
script := elastic.NewScript("ctx._source.person_add_activity_value += params.inc").Param("inc", int(amount)) | |||
updateDoc, err := es.EsClient.Update(). | |||
Index(index). | |||
Id(id). | |||
Script(script). | |||
Do(context.Background()) | |||
now := time.Now().Format("2006-01-02 15:04:05") | |||
if err != nil { | |||
if strings.Contains(err.Error(), "elastic: Error 404 (Not Found)") { | |||
// 如果记录不存在就创建记录 | |||
m := md.EggEnergyUserEggScoreEs{ | |||
Uid: utils2.StrToInt64(uid), | |||
ScoreValue: 0, | |||
ScoreValueKind: 0, | |||
Ecpm: 0, | |||
InviteUserNums: 0, | |||
TeamActivityNums: 0, | |||
SignInNums: 0, | |||
ImActivityNums: 0, | |||
SendRedPackageNums: 0, | |||
EggEnergyExchangeAccountBalance: 0, | |||
AccountBalanceExchangeEggEnergyNums: 0, | |||
SendCircleOfFriendNums: 0, | |||
ForumCommentsNums: 0, | |||
CollegeLearningNums: 0, | |||
ViolateNums: 0, | |||
BrowseInterfaceNums: 0, | |||
PersonAddActivityValue: 1, | |||
CreatedAt: now, | |||
UpdatedAt: now, | |||
} | |||
createDoc, err1 := es.CreateDoc(index, id, m) | |||
if err1 != nil { | |||
fmt.Println("EggCanalPersonAddActivityValueConsumeCreateDoc_ERR::::", err1.Error()) | |||
return | |||
} | |||
fmt.Println("createDoc==========>", createDoc) | |||
return | |||
} | |||
fmt.Println("EggCanalPersonAddActivityValueConsumeUpdateDoc_ERR::::", err.Error()) | |||
} | |||
fmt.Println("updateDoc==========>", updateDoc) | |||
} |
@@ -0,0 +1,97 @@ | |||
package consume | |||
import ( | |||
"applet/app/cfg" | |||
utils2 "applet/app/utils" | |||
"applet/app/utils/logx" | |||
"applet/consume/md" | |||
"code.fnuoos.com/EggPlanet/egg_system_rules.git" | |||
"code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy" | |||
"code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/enum" | |||
md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/md" | |||
es2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/utils/es" | |||
"code.fnuoos.com/go_rely_warehouse/zyos_go_es.git/es" | |||
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" | |||
"context" | |||
"encoding/json" | |||
"errors" | |||
"fmt" | |||
"github.com/olivere/elastic/v7" | |||
"github.com/streadway/amqp" | |||
"strings" | |||
"time" | |||
) | |||
func EggSendFriendCircleDataConsume(queue md.MqQueue) { | |||
fmt.Println(">>>>>>>>>>>>EggSendFriendCircleDataConsume>>>>>>>>>>>>") | |||
ch, err := rabbit.Cfg.Pool.GetChannel() | |||
if err != nil { | |||
logx.Error(err) | |||
return | |||
} | |||
defer ch.Release() | |||
//1、将自己绑定到交换机上 | |||
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) | |||
//2、取出数据进行消费 | |||
ch.Qos(1) | |||
delivery := ch.Consume(queue.Name, false) | |||
egg_system_rules.Init(cfg.RedisAddr) | |||
var res amqp.Delivery | |||
var ok bool | |||
for { | |||
res, ok = <-delivery | |||
if ok == true { | |||
err = handleEggSendFriendCircleDataConsume(res.Body) | |||
if err != nil { | |||
fmt.Println("EggSendFriendCircleDataConsume_ERR:::::", err.Error()) | |||
utils2.FilePutContents("EggSendFriendCircleDataConsume_ERR", utils2.SerializeStr(map[string]interface{}{ | |||
"body": res.Body, | |||
"err": err.Error(), | |||
})) | |||
} | |||
//_ = res.Reject(false) | |||
err = res.Ack(true) | |||
fmt.Println("err ::: ", err) | |||
} else { | |||
panic(errors.New("error getting message")) | |||
} | |||
} | |||
} | |||
func handleEggSendFriendCircleDataConsume(msgData []byte) error { | |||
time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒 | |||
// 1.解析mq中queue的数据结构体 | |||
var msg *md2.EggSendFriendCircleData | |||
err := json.Unmarshal(msgData, &msg) | |||
if err != nil { | |||
return err | |||
} | |||
year, week := time.Now().ISOWeek() | |||
yearStr := utils2.IntToStr(year) | |||
weekStr := utils2.IntToStr(week) | |||
index := es2.GetAppointIndexFromAlias(yearStr, weekStr) | |||
id := fmt.Sprintf("%d%d_%d", year, week, msg.Uid) | |||
script := elastic.NewScript("ctx._source.send_circle_of_friend_nums += params.inc").Param("inc", 1) | |||
updateDoc, err := es.EsClient.Update(). | |||
Index(index). | |||
Id(id). | |||
Script(script). | |||
Do(context.Background()) | |||
if err != nil { | |||
if strings.Contains(err.Error(), "elastic: Error 404 (Not Found)") { | |||
// 蛋蛋分数据还不存在,创建蛋蛋分数据 | |||
now := time.Now().Format("2006-01-02 15:04:05") | |||
err1 := egg_energy.CreateEsScoreAndAssignValuesDoc(index, id, msg.Uid, enum.SendCircleOfFriendNums, "1", now) | |||
if err1 != nil { | |||
return err1 | |||
} | |||
return nil | |||
} | |||
fmt.Println("EggSendFriendCircleDataConsumeUpdateDoc_ERR::::", err.Error()) | |||
return err | |||
} | |||
fmt.Println("updateDoc==========>", updateDoc) | |||
return nil | |||
} |
@@ -37,6 +37,8 @@ func initConsumes() { | |||
jobs[consumeMd.EggCanalPersonAddActivityValueFunName] = EggCanalPersonAddActivityValueConsume // 用户活跃积分变更时更新es | |||
jobs[consumeMd.EggRecordActiveDataFunName] = EggRecordActiveDataConsume // 用户签到后更新活跃数据 | |||
jobs[consumeMd.EggEnergyAutoScoreDataFunName] = EggEnergyAutoScoreConsume // 自动打分 | |||
jobs[consumeMd.EggSendFriendCircleDataFunName] = EggSendFriendCircleDataConsume // 用户发送朋友圈后更新es | |||
jobs[consumeMd.EggCanalEnergyExchangeAccountFunName] = EggCanalEnergyExchangeAccountConsume // 蛋蛋能量兑换为余额的时候更新es | |||
jobs[consumeMd.VideoRewardFunName] = VideoRewardConsume // 短视频奖励 | |||
jobs[consumeMd.PlayletRewardFunName] = PlayletRewardConsume // 短剧奖励 | |||
@@ -290,6 +290,24 @@ var RabbitMqQueueKeyList = []*MqQueue{ | |||
BindKey: "", | |||
ConsumeFunName: "EggCanalUserVirtualCoinFlowAggregationConsume", | |||
}, | |||
{ | |||
ExchangeName: "egg.app", | |||
Name: "egg_send_friend_circle_queue", | |||
Type: DirectQueueType, | |||
IsPersistent: false, | |||
RoutKey: "egg_send_friend_circle", | |||
BindKey: "", | |||
ConsumeFunName: "EggSendFriendCircleDataConsume", | |||
}, | |||
{ | |||
ExchangeName: "egg.canal.topic", | |||
Name: "egg_canal_energy_exchange_account_queue", | |||
Type: DirectQueueType, | |||
IsPersistent: false, | |||
RoutKey: "egg_canal_user_wallet_flow", | |||
BindKey: "", | |||
ConsumeFunName: "EggCanalEnergyExchangeAccountConsume", | |||
}, | |||
} | |||
const ( | |||
@@ -322,4 +340,6 @@ const ( | |||
PublicPlatoonUserRelationCommissionConsumeFunName = "AddPublicPlatoonUserRelationCommissionConsume" | |||
EggEnergyTeamAssistanceConsumeFunName = "EggEnergyTeamAssistanceConsume" | |||
EggCanalUserVirtualCoinFlowAggregationConsumeFunName = "EggCanalUserVirtualCoinFlowAggregationConsume" | |||
EggSendFriendCircleDataFunName = "EggSendFriendCircleDataConsume" | |||
EggCanalEnergyExchangeAccountFunName = "EggCanalEnergyExchangeAccountConsume" | |||
) |
@@ -0,0 +1,31 @@ | |||
package md | |||
type CanalEnergyExchangeAccount struct { | |||
Id string `json:"id"` | |||
Uid string `json:"uid"` | |||
Direction string `json:"direction"` | |||
Amount string `json:"amount"` | |||
BeforeAmount string `json:"before_amount"` | |||
AfterAmount string `json:"after_amount"` | |||
SysFee string `json:"sys_fee"` | |||
OrdId string `json:"ord_id"` | |||
Title string `json:"title"` | |||
Kind string `json:"kind"` | |||
State string `json:"state"` | |||
Memo string `json:"memo"` | |||
CreateAt string `json:"create_at"` | |||
UpdateAt string `json:"update_at"` | |||
} | |||
type CanalEnergyExchangeAccountMessage[T any] struct { | |||
Data []T `json:"data"` | |||
Database string `json:"database"` | |||
ES int64 `json:"es"` | |||
ID int64 `json:"id"` | |||
IsDdl bool `json:"isDdl"` | |||
Old []T `json:"old"` | |||
PkNames []string `json:"pkNames"` | |||
Table string `json:"table"` | |||
TS int64 `json:"ts"` | |||
Type string `json:"type"` | |||
} |
@@ -8,7 +8,7 @@ go 1.19 | |||
require ( | |||
code.fnuoos.com/EggPlanet/egg_models.git v0.2.1-0.20241224090637-89a57f7fbb1e | |||
code.fnuoos.com/EggPlanet/egg_system_rules.git v0.0.4-0.20241226020214-a56eb16f6264 | |||
code.fnuoos.com/EggPlanet/egg_system_rules.git v0.0.4-0.20241226091556-c909dd302df7 | |||
code.fnuoos.com/go_rely_warehouse/zyos_go_es.git v1.0.1-0.20241118083738-0f22da9ba0be | |||
code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git v0.0.5 | |||
github.com/boombuler/barcode v1.0.1 | |||