Browse Source

feat:add user egg energy aggregation when user virtual flow change

master
shenjiachi 4 days ago
parent
commit
beae09fdd8
4 changed files with 90 additions and 14 deletions
  1. +70
    -2
      consume/egg_cannal_user_virtual_ coin_flow_aggregation_consume.go
  2. +18
    -10
      consume/egg_cannal_user_virtual_ coin_flow_aggregation_consume_test.go
  3. +1
    -1
      consume/egg_energy_deal_user_ecpm_test.go
  4. +1
    -1
      go.mod

+ 70
- 2
consume/egg_cannal_user_virtual_ coin_flow_aggregation_consume.go View File

@@ -70,7 +70,7 @@ func handleEggCanalUserVirtualCoinFlowAggregationConsume(msg []byte) error {
return err
}
aggregationDb := implement.NewUserVirtualCoinFlowAggregationDb(db.Db)
now := time.Now()
now := time.Now().AddDate(0, 0, 1)

if canalMsg.Data[0].CoinId == utils2.IntToStr(basicSetting.PersonEggEnergyCoinId) {
userVirtualCoinFlowAggregation, err1 := aggregationDb.UserVirtualCoinFlowAggregationGetOneByParams(map[string]interface{}{
@@ -100,7 +100,7 @@ func handleEggCanalUserVirtualCoinFlowAggregationConsume(msg []byte) error {
amount, _ := decimal.NewFromString(canalMsg.Data[0].Amount)
todayData, _ := decimal.NewFromString(userVirtualCoinFlowAggregation.TodayData)
thisWeekData, _ := decimal.NewFromString(userVirtualCoinFlowAggregation.ThisWeekData)
thisMonthData, _ := decimal.NewFromString(userVirtualCoinFlowAggregation.ThisWeekData)
thisMonthData, _ := decimal.NewFromString(userVirtualCoinFlowAggregation.ThisMonthData)
userVirtualCoinFlowAggregation.TodayData = todayData.Add(amount).String()
userVirtualCoinFlowAggregation.ThisWeekData = thisWeekData.Add(amount).String()
userVirtualCoinFlowAggregation.ThisMonthData = thisMonthData.Add(amount).String()
@@ -110,6 +110,74 @@ func handleEggCanalUserVirtualCoinFlowAggregationConsume(msg []byte) error {
return err2
}
}

// 更新 user_egg_energy_aggregation
uid := utils2.StrToInt64(canalMsg.Data[0].Uid)
year, week := now.ISOWeek()
_, month, day := now.Date()
energyAggregationDb := implement.NewUserEggEnergyAggregationDb(db.Db)
energyAggregation, err3 := energyAggregationDb.UserEggEnergyAggregationGetOneByTime(day, week, int(month), year, uid)
if err3 != nil {
return err3
}
amount, _ := decimal.NewFromString(canalMsg.Data[0].Amount)
if energyAggregation == nil {
// 不存在当天数据 创建新数据
nowStr := now.Format("2006-01-02 15:04:05")
m := model.UserEggEnergyAggregation{
Uid: uid,
TodayData: canalMsg.Data[0].Amount,
Day: day,
Week: week,
Month: int(month),
Year: year,
CreateAt: nowStr,
UpdateAt: nowStr,
}

// 读取最新数据
lastEnergyAggregation, err4 := energyAggregationDb.UserEggEnergyAggregationGetLastOneByUid(uid)
if err4 != nil {
return err4
}
// 判断是否上条数据是否在本周
if lastEnergyAggregation != nil {
if lastEnergyAggregation.Week == week && lastEnergyAggregation.Year == year {
// 周数相同 累加上周数据
thisWeekData, _ := decimal.NewFromString(lastEnergyAggregation.ThisWeekData)
m.ThisWeekData = thisWeekData.Add(amount).String()
} else {
m.ThisWeekData = amount.String()
}
if lastEnergyAggregation.Month == int(month) && lastEnergyAggregation.Year == year {
// 月份相同 累加上月数据
thisMonthData, _ := decimal.NewFromString(lastEnergyAggregation.ThisMonthData)
m.ThisMonthData = thisMonthData.Add(amount).String()
} else {
m.ThisMonthData = amount.String()
}
} else {
m.ThisWeekData = amount.String()
m.ThisMonthData = amount.String()
}

_, err5 := energyAggregationDb.UserEggEnergyAggregationInsert(&m)
if err5 != nil {
return err5
}
} else {
// 存在当天数据累加数据
todayData, _ := decimal.NewFromString(energyAggregation.TodayData)
thisWeekData, _ := decimal.NewFromString(energyAggregation.ThisWeekData)
thisMonthData, _ := decimal.NewFromString(energyAggregation.ThisMonthData)
energyAggregation.TodayData = todayData.Add(amount).String()
energyAggregation.ThisWeekData = thisWeekData.Add(amount).String()
energyAggregation.ThisMonthData = thisMonthData.Add(amount).String()
_, err5 := energyAggregationDb.UserEggEnergyAggregationUpdate(energyAggregation.Id, energyAggregation, "today_data", "this_week_data", "this_month_data")
if err5 != nil {
return err5
}
}
}
}
return nil


+ 18
- 10
consume/egg_cannal_user_virtual_ coin_flow_aggregation_consume_test.go View File

@@ -1,21 +1,34 @@
package consume

import (
"applet/app/cfg"
"applet/app/db"
"applet/consume/md"
md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/md"
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
"encoding/json"
"testing"
"time"
)

func TestSendMessageToUserVirtualCoinFlowAggregationConsume(t *testing.T) {
dbcfg := cfg.DBCfg{
Host: "119.23.182.117:3306",
Name: "egg",
User: "root",
Psw: "Fnuo123com@",
ShowLog: true,
MaxLifetime: 30,
MaxOpenConns: 100,
MaxIdleConns: 100,
Path: "tmp/%s.log",
}
db.InitDB(&dbcfg)
data := md.CanalUserVirtualCoinFlowAggregation{
Id: "1",
Uid: "19",
CoinId: "1",
Direction: "1",
Title: "兑换到个人蛋蛋能量",
Amount: "40",
Amount: "20",
BeforeAmount: "0",
AfterAmount: "80",
SysFee: "0",
@@ -35,15 +48,10 @@ func TestSendMessageToUserVirtualCoinFlowAggregationConsume(t *testing.T) {
Type: "INSERT",
}

err := rabbit.Init("120.77.153.180", "5672", "guest", "guest")
if err != nil {
return
}
ch, err := rabbit.Cfg.Pool.GetChannel()
m, _ := json.Marshal(message)
err := handleEggCanalUserVirtualCoinFlowAggregationConsume(m)
if err != nil {
return
}
defer ch.Release()
ch.Publish(md2.EggCanalExchange, message, "egg_canal_user_virtual_coin_flow")

}

+ 1
- 1
consume/egg_energy_deal_user_ecpm_test.go View File

@@ -16,7 +16,7 @@ func TestHandleEggEnergyDealUserECPMConsume(t *testing.T) {
Ecpm: "101.4",
}
bytes, err := json.Marshal(m)
err = handleTemporaryEggEnergyDealUserECPMConsume(bytes)
err = handleTemporaryEggEnergyDealUserECPMConsume(bytes, nil)
if err != nil {
fmt.Println(err)
return


+ 1
- 1
go.mod View File

@@ -2,7 +2,7 @@ module applet

go 1.19

// replace code.fnuoos.com/EggPlanet/egg_models.git => E:/company/Egg/egg_models
replace code.fnuoos.com/EggPlanet/egg_models.git => E:/company/Egg/egg_models

// replace code.fnuoos.com/EggPlanet/egg_system_rules.git => E:/company/Egg/egg_system_rules



Loading…
Cancel
Save