@@ -0,0 +1,116 @@ | |||||
package consume | |||||
import ( | |||||
"applet/app/db" | |||||
utils2 "applet/app/utils" | |||||
"applet/app/utils/logx" | |||||
"applet/consume/md" | |||||
md2 "applet/es/md" | |||||
"code.fnuoos.com/EggPlanet/egg_models.git/src/implement" | |||||
"code.fnuoos.com/EggPlanet/egg_models.git/src/model" | |||||
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" | |||||
"encoding/json" | |||||
"errors" | |||||
"fmt" | |||||
"github.com/shopspring/decimal" | |||||
"github.com/streadway/amqp" | |||||
"time" | |||||
) | |||||
func EggCanalUserVirtualCoinFlowAggregationConsume(queue md.MqQueue) { | |||||
fmt.Println(">>>>>>>>>>>EggCanalUserVirtualCoinFlowAggregationConsume>>>>>>>>>>>>>") | |||||
ch, err := rabbit.Cfg.Pool.GetChannel() | |||||
if err != nil { | |||||
logx.Error(err) | |||||
return | |||||
} | |||||
defer ch.Release() | |||||
//1、将自己绑定到交换机上 | |||||
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) | |||||
//2、取出数据进行消费 | |||||
ch.Qos(50) | |||||
delivery := ch.Consume(queue.Name, true) //设置自动应答 | |||||
var res amqp.Delivery | |||||
var ok bool | |||||
for { | |||||
res, ok = <-delivery | |||||
if ok == true { | |||||
fmt.Println(">>>>>>>>>>>>>>>>>>EggCanalUserVirtualCoinFlowAggregationConsume<<<<<<<<<<<<<<<<<<<<<<<<<") | |||||
err = handleEggCanalUserVirtualCoinFlowAggregationConsume(res.Body) | |||||
if err != nil { | |||||
fmt.Println("EggCanalUserVirtualCoinFlowAggregationConsume_ERR:::::", err.Error()) | |||||
utils2.FilePutContents("EggCanalUserVirtualCoinFlowAggregationConsume_ERR", utils2.SerializeStr(map[string]interface{}{ | |||||
"body": res.Body, | |||||
"err": err.Error(), | |||||
})) | |||||
} | |||||
//_ = res.Reject(false) | |||||
//_ = res.Ack(true) | |||||
} else { | |||||
panic(errors.New("error getting message")) | |||||
} | |||||
} | |||||
} | |||||
func handleEggCanalUserVirtualCoinFlowAggregationConsume(msg []byte) error { | |||||
//1、解析canal采集至mq中queue的数据结构体 | |||||
var canalMsg *md.CanalUserVirtualCoinFlowAggregationMessage[md.CanalUserVirtualCoinFlowAggregation] | |||||
err := json.Unmarshal(msg, &canalMsg) | |||||
if err != nil { | |||||
return nil | |||||
} | |||||
// 2. 监听插入信息 | |||||
if canalMsg.Type == md2.CanalMsgInsertSqlType && canalMsg.Data[0].Direction == utils2.IntToStr(1) { | |||||
//2、查找基础设置 | |||||
settingDb := implement.NewEggEnergyBasicSettingDb(db.Db) | |||||
basicSetting, err := settingDb.EggEnergyBasicSettingGetOne() | |||||
if err != nil { | |||||
return err | |||||
} | |||||
aggregationDb := implement.NewUserVirtualCoinFlowAggregationDb(db.Db) | |||||
now := time.Now() | |||||
if canalMsg.Data[0].CoinId == utils2.IntToStr(basicSetting.PersonEggEnergyCoinId) { | |||||
userVirtualCoinFlowAggregation, err1 := aggregationDb.UserVirtualCoinFlowAggregationGetOneByParams(map[string]interface{}{ | |||||
"key": "uid", | |||||
"value": canalMsg.Data[0].Uid, | |||||
}) | |||||
if err1 != nil { | |||||
return err1 | |||||
} | |||||
if userVirtualCoinFlowAggregation == nil { | |||||
//新增记录 | |||||
_, err3 := aggregationDb.UserVirtualCoinFlowAggregationInsert(&model.UserVirtualCoinFlowAggregation{ | |||||
Uid: utils2.StrToInt(canalMsg.Data[0].Uid), | |||||
CoinId: utils2.StrToInt(canalMsg.Data[0].CoinId), | |||||
TodayData: canalMsg.Data[0].Amount, | |||||
ThisWeekData: canalMsg.Data[0].Amount, | |||||
ThisMonthData: canalMsg.Data[0].Amount, | |||||
NowData: canalMsg.Data[0].Amount, | |||||
CreateAt: now.Format("2006-01-02 15:04:05"), | |||||
UpdateAt: now.Format("2006-01-02 15:04:05"), | |||||
}) | |||||
if err3 != nil { | |||||
return err3 | |||||
} | |||||
} else { | |||||
//更新记录 | |||||
amount, _ := decimal.NewFromString(canalMsg.Data[0].Amount) | |||||
todayData, _ := decimal.NewFromString(userVirtualCoinFlowAggregation.TodayData) | |||||
thisWeekData, _ := decimal.NewFromString(userVirtualCoinFlowAggregation.ThisWeekData) | |||||
thisMonthData, _ := decimal.NewFromString(userVirtualCoinFlowAggregation.ThisWeekData) | |||||
userVirtualCoinFlowAggregation.TodayData = todayData.Add(amount).String() | |||||
userVirtualCoinFlowAggregation.ThisWeekData = thisWeekData.Add(amount).String() | |||||
userVirtualCoinFlowAggregation.ThisMonthData = thisMonthData.Add(amount).String() | |||||
userVirtualCoinFlowAggregation.NowData = canalMsg.Data[0].AfterAmount | |||||
_, err2 := aggregationDb.UserVirtualCoinFlowAggregationUpdate(userVirtualCoinFlowAggregation.Id, userVirtualCoinFlowAggregation, "today_data", "this_week_data", "this_month_data", "now_data") | |||||
if err2 != nil { | |||||
return err2 | |||||
} | |||||
} | |||||
} | |||||
} | |||||
return nil | |||||
} |
@@ -0,0 +1,49 @@ | |||||
package consume | |||||
import ( | |||||
"applet/consume/md" | |||||
md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/md" | |||||
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" | |||||
"testing" | |||||
"time" | |||||
) | |||||
func TestSendMessageToUserVirtualCoinFlowAggregationConsume(t *testing.T) { | |||||
data := md.CanalUserVirtualCoinFlowAggregation{ | |||||
Id: "1", | |||||
Uid: "19", | |||||
CoinId: "1", | |||||
Direction: "1", | |||||
Title: "兑换到个人蛋蛋能量", | |||||
Amount: "40", | |||||
BeforeAmount: "0", | |||||
AfterAmount: "80", | |||||
SysFee: "0", | |||||
TransferType: "6", | |||||
} | |||||
message := md.CanalUserVirtualCoinFlowAggregationMessage[md.CanalUserVirtualCoinFlowAggregation]{ | |||||
Data: []md.CanalUserVirtualCoinFlowAggregation{data}, | |||||
Database: "test_db", | |||||
ES: time.Now().UnixNano(), | |||||
ID: 1, | |||||
IsDdl: false, | |||||
Old: nil, | |||||
PkNames: []string{"id"}, | |||||
Table: "user_virtual_coin_flow", | |||||
TS: time.Now().Unix(), | |||||
Type: "INSERT", | |||||
} | |||||
err := rabbit.Init("120.77.153.180", "5672", "guest", "guest") | |||||
if err != nil { | |||||
return | |||||
} | |||||
ch, err := rabbit.Cfg.Pool.GetChannel() | |||||
if err != nil { | |||||
return | |||||
} | |||||
defer ch.Release() | |||||
ch.Publish(md2.EggCanalExchange, message, "egg_canal_user_virtual_coin_flow") | |||||
} |
@@ -49,9 +49,9 @@ func initConsumes() { | |||||
jobs[consumeMd.EggEnergySettlementPublicGiveActivityCoinFunName] = EggEnergySettlementPublicGiveActivityCoinConsume | jobs[consumeMd.EggEnergySettlementPublicGiveActivityCoinFunName] = EggEnergySettlementPublicGiveActivityCoinConsume | ||||
jobs[consumeMd.EggEnergyStartExchangeGreenEnergyFunName] = EggEnergyStartExchangeGreenEnergyConsume | jobs[consumeMd.EggEnergyStartExchangeGreenEnergyFunName] = EggEnergyStartExchangeGreenEnergyConsume | ||||
jobs[consumeMd.EggEnergyAutoExchangeGreenEnergyFunName] = EggEnergyAutoExchangeGreenEnergyConsume | jobs[consumeMd.EggEnergyAutoExchangeGreenEnergyFunName] = EggEnergyAutoExchangeGreenEnergyConsume | ||||
jobs[consumeMd.EggEnergyAutoScoreDataFunName] = EggEnergyAutoScoreConsume // 自动打分 | |||||
jobs[consumeMd.EggEnergyTeamAssistanceConsumeFunName] = EggEnergyTeamAssistanceConsume // 更新团队助力数据 | |||||
jobs[consumeMd.EggEnergyAutoScoreDataFunName] = EggEnergyAutoScoreConsume // 自动打分 | |||||
jobs[consumeMd.EggEnergyTeamAssistanceConsumeFunName] = EggEnergyTeamAssistanceConsume // 更新团队助力数据 | |||||
jobs[consumeMd.EggCanalUserVirtualCoinFlowAggregationConsumeFunName] = EggCanalUserVirtualCoinFlowAggregationConsume // 聚合流水数据 | |||||
} | } | ||||
func Run() { | func Run() { | ||||
@@ -281,35 +281,45 @@ var RabbitMqQueueKeyList = []*MqQueue{ | |||||
BindKey: "", | BindKey: "", | ||||
ConsumeFunName: "EggEnergyTeamAssistanceConsume", | ConsumeFunName: "EggEnergyTeamAssistanceConsume", | ||||
}, | }, | ||||
{ | |||||
ExchangeName: "egg.canal.topic", | |||||
Name: "egg_user_virtual_coin_flow_aggregation", | |||||
Type: DirectQueueType, | |||||
IsPersistent: false, | |||||
RoutKey: "egg_canal_user_virtual_coin_flow", | |||||
BindKey: "", | |||||
ConsumeFunName: "EggCanalUserVirtualCoinFlowAggregationConsume", | |||||
}, | |||||
} | } | ||||
const ( | const ( | ||||
EggSlowAutoUpLvConsume = "EggSlowAutoUpLvConsume" | |||||
EggEnergyUserActivityConsume = "EggEnergyUserActivityConsume" | |||||
EggCanalUserConsume = "EggCanalUserConsume" | |||||
JpushRecordFunName = "JpushRecordConsume" | |||||
AliyunSmsRecordFunName = "AliyunSmsRecordConsume" | |||||
EggEnergyStartLevelDividendFunName = "EggEnergyStartLevelDividendConsume" | |||||
EggEnergyDealFundDataFunName = "EggEnergyDealFundDataConsume" | |||||
EggEnergyDealPlatformRevenueDataFunName = "EggEnergyDealPlatformRevenueDataConsume" | |||||
EggEnergyDealUserVirtualCoinDataFunName = "EggEnergyDealUserVirtualCoinDataConsume" | |||||
IMEggEnergyBatchSendMessageDataFunName = "IMEggEnergyBatchSendMessageDataConsume" | |||||
IMEggEnergyDelFriendCircleDataFunName = "IMEggEnergyDelFriendCircleDataConsume" | |||||
EggFinWithdrawApplyDataConsumeFunName = "EggFinWithdrawApplyDataConsume" | |||||
EggEnergySettlementPublicGiveActivityCoinFunName = "EggEnergySettlementPublicGiveActivityCoinConsume" | |||||
EggEnergyStartExchangeGreenEnergyFunName = "EggEnergyStartExchangeGreenEnergyConsume" | |||||
EggEnergyAutoExchangeGreenEnergyFunName = "EggEnergyAutoExchangeGreenEnergyConsume" | |||||
EggEnergyNewUserRegisterDataFunName = "EggEnergyNewUserRegisterDataConsume" | |||||
EggEnergyDealUserECPMFunName = "EggEnergyDealUserECPMConsume" | |||||
EggCanalInviteUserNumsFunName = "EggCanalInviteUserNumsConsume" | |||||
EggCanalViolateNumsFunName = "EggCanalViolateNumsConsume" | |||||
IMEggEnergySendRedPackageFunName = "IMEggEnergySendRedPackageConsume" | |||||
EggRecordActiveDataFunName = "EggRecordActiveDataConsume" | |||||
EggCanalPersonAddActivityValueFunName = "EggCanalPersonAddActivityValueConsume" | |||||
VideoRewardFunName = "VideoRewardFunName" | |||||
PlayletRewardFunName = "PlayletRewardFunName" | |||||
UserDeleteFunName = "UserDeleteConsume" | |||||
EggEnergyAutoScoreDataFunName = "EggEnergyAutoScoreConsume" | |||||
PublicPlatoonUserRelationCommissionConsumeFunName = "AddPublicPlatoonUserRelationCommissionConsume" | |||||
EggEnergyTeamAssistanceConsumeFunName = "EggEnergyTeamAssistanceConsume" | |||||
EggSlowAutoUpLvConsume = "EggSlowAutoUpLvConsume" | |||||
EggEnergyUserActivityConsume = "EggEnergyUserActivityConsume" | |||||
EggCanalUserConsume = "EggCanalUserConsume" | |||||
JpushRecordFunName = "JpushRecordConsume" | |||||
AliyunSmsRecordFunName = "AliyunSmsRecordConsume" | |||||
EggEnergyStartLevelDividendFunName = "EggEnergyStartLevelDividendConsume" | |||||
EggEnergyDealFundDataFunName = "EggEnergyDealFundDataConsume" | |||||
EggEnergyDealPlatformRevenueDataFunName = "EggEnergyDealPlatformRevenueDataConsume" | |||||
EggEnergyDealUserVirtualCoinDataFunName = "EggEnergyDealUserVirtualCoinDataConsume" | |||||
IMEggEnergyBatchSendMessageDataFunName = "IMEggEnergyBatchSendMessageDataConsume" | |||||
IMEggEnergyDelFriendCircleDataFunName = "IMEggEnergyDelFriendCircleDataConsume" | |||||
EggFinWithdrawApplyDataConsumeFunName = "EggFinWithdrawApplyDataConsume" | |||||
EggEnergySettlementPublicGiveActivityCoinFunName = "EggEnergySettlementPublicGiveActivityCoinConsume" | |||||
EggEnergyStartExchangeGreenEnergyFunName = "EggEnergyStartExchangeGreenEnergyConsume" | |||||
EggEnergyAutoExchangeGreenEnergyFunName = "EggEnergyAutoExchangeGreenEnergyConsume" | |||||
EggEnergyNewUserRegisterDataFunName = "EggEnergyNewUserRegisterDataConsume" | |||||
EggEnergyDealUserECPMFunName = "EggEnergyDealUserECPMConsume" | |||||
EggCanalInviteUserNumsFunName = "EggCanalInviteUserNumsConsume" | |||||
EggCanalViolateNumsFunName = "EggCanalViolateNumsConsume" | |||||
IMEggEnergySendRedPackageFunName = "IMEggEnergySendRedPackageConsume" | |||||
EggRecordActiveDataFunName = "EggRecordActiveDataConsume" | |||||
EggCanalPersonAddActivityValueFunName = "EggCanalPersonAddActivityValueConsume" | |||||
VideoRewardFunName = "VideoRewardFunName" | |||||
PlayletRewardFunName = "PlayletRewardFunName" | |||||
UserDeleteFunName = "UserDeleteConsume" | |||||
EggEnergyAutoScoreDataFunName = "EggEnergyAutoScoreConsume" | |||||
PublicPlatoonUserRelationCommissionConsumeFunName = "AddPublicPlatoonUserRelationCommissionConsume" | |||||
EggEnergyTeamAssistanceConsumeFunName = "EggEnergyTeamAssistanceConsume" | |||||
EggCanalUserVirtualCoinFlowAggregationConsumeFunName = "EggCanalUserVirtualCoinFlowAggregationConsume" | |||||
) | ) |
@@ -0,0 +1,27 @@ | |||||
package md | |||||
type CanalUserVirtualCoinFlowAggregation struct { | |||||
Id string `json:"id"` | |||||
Uid string `json:"uid"` | |||||
CoinId string `json:"coin_id"` | |||||
Direction string `json:"direction"` | |||||
Title string `json:"title"` | |||||
Amount string `json:"amount"` | |||||
BeforeAmount string `json:"before_amount"` | |||||
AfterAmount string `json:"after_amount"` | |||||
SysFee string `json:"sys_fee"` | |||||
TransferType string `json:"transfer_type"` | |||||
} | |||||
type CanalUserVirtualCoinFlowAggregationMessage[T any] struct { | |||||
Data []T `json:"data"` | |||||
Database string `json:"database"` | |||||
ES int64 `json:"es"` | |||||
ID int64 `json:"id"` | |||||
IsDdl bool `json:"isDdl"` | |||||
Old []T `json:"old"` | |||||
PkNames []string `json:"pkNames"` | |||||
Table string `json:"table"` | |||||
TS int64 `json:"ts"` | |||||
Type string `json:"type"` | |||||
} |
@@ -7,7 +7,7 @@ go 1.19 | |||||
// replace code.fnuoos.com/EggPlanet/egg_system_rules.git => E:/company/Egg/egg_system_rules | // replace code.fnuoos.com/EggPlanet/egg_system_rules.git => E:/company/Egg/egg_system_rules | ||||
require ( | require ( | ||||
code.fnuoos.com/EggPlanet/egg_models.git v0.2.1-0.20241223102439-60ec86c6e27d | |||||
code.fnuoos.com/EggPlanet/egg_models.git v0.2.1-0.20241224090637-89a57f7fbb1e | |||||
code.fnuoos.com/EggPlanet/egg_system_rules.git v0.0.4-0.20241222141934-5562d8e0231c | code.fnuoos.com/EggPlanet/egg_system_rules.git v0.0.4-0.20241222141934-5562d8e0231c | ||||
code.fnuoos.com/go_rely_warehouse/zyos_go_es.git v1.0.1-0.20241118083738-0f22da9ba0be | code.fnuoos.com/go_rely_warehouse/zyos_go_es.git v1.0.1-0.20241118083738-0f22da9ba0be | ||||
code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git v0.0.5 | code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git v0.0.5 | ||||