Parcourir la source

add consume fund data and platform revenue data

master
shenjiachi il y a 1 mois
Parent
révision
f3e984d119
4 fichiers modifiés avec 367 ajouts et 5 suppressions
  1. +171
    -0
      consume/egg_energy_fund_data_consume.go
  2. +171
    -0
      consume/egg_energy_platform_revenue_data.go
  3. +21
    -1
      consume/md/consume_key.go
  4. +4
    -4
      go.mod

+ 171
- 0
consume/egg_energy_fund_data_consume.go Voir le fichier

@@ -0,0 +1,171 @@
package consume

import (
"applet/app/cfg"
"applet/app/db"
utils2 "applet/app/utils"
"applet/app/utils/logx"
"applet/consume/md"
"code.fnuoos.com/EggPlanet/egg_models.git/src/implement"
"code.fnuoos.com/EggPlanet/egg_models.git/src/model"
"code.fnuoos.com/EggPlanet/egg_system_rules.git"
"code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy"
"code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/enum"
md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/md"
"code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/svc"
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
"encoding/json"
"errors"
"fmt"
"github.com/shopspring/decimal"
"github.com/streadway/amqp"
"time"
)

func EggEnergyDealFundDataConsume(queue md.MqQueue) {
fmt.Println(">>>>>>>>>>>>EggEnergyStartLevelDividendConsume>>>>>>>>>>>>")
ch, err := rabbit.Cfg.Pool.GetChannel()
if err != nil {
logx.Error(err)
return
}
defer ch.Release()
//1、将自己绑定到交换机上
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
//2、取出数据进行消费
ch.Qos(1)
delivery := ch.Consume(queue.Name, false)

egg_system_rules.Init(cfg.RedisAddr)
var res amqp.Delivery
var ok bool
for {
res, ok = <-delivery
if ok == true {
err = handleEggEnergyStartLevelDividendConsume(res.Body)
if err != nil {
fmt.Println("EggEnergyDealFundDataConsume_ERR:::::", err.Error())
utils2.FilePutContents("EggEnergyDealFundDataConsume_ERR", utils2.SerializeStr(map[string]interface{}{
"body": res.Body,
"err": err.Error(),
}))
}
//_ = res.Reject(false)
err = res.Ack(true)
fmt.Println("err ::: ", err)
} else {
panic(errors.New("error getting message"))
}
}
fmt.Println("get msg done")
}

func handleEggEnergyDealFundDataConsume(msgData []byte) error {
time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒
// 1.解析mq中queue的数据结构体
var msg *md2.EggEnergyStructForEggEnergyFundData
err := json.Unmarshal(msgData, &msg)
if err != nil {
return err
}
now := time.Now()

engine := db.Db
session := engine.NewSession()
session.Begin()

// 2. 获取 fund_data 数据
fundDataDb := implement.NewEggEnergyFundDataDb(engine)
fundDataRecordsDb := implement.NewEggEnergyFundDataRecordsDb(engine)
data, err := fundDataDb.EggEnergyFundDataGetOneByParams(map[string]interface{}{
"key": "id",
"value": msg.ID,
})
if err != nil {
return err
}

// 3. 获取核心数据
eggEnergyCoreData, cb, err2 := svc.GetEggEnergyCoreData(engine)
if err2 != nil {
return err2
}
if cb != nil {
defer cb() // 释放锁
}
// 4. 计算涨价公式
err3, calcPriceIncreaseFormulaResp := egg_energy.CalcPriceIncreaseFormula(utils2.AnyToString(msg.Amount), eggEnergyCoreData)
if err3 != nil {
_ = session.Rollback()
session.Close()
return err3
}

// 5. 处理可用能量
dealAvailableEggEnergyCoinReq := md2.DealAvailableEggEnergyCoinReq{
Amount: calcPriceIncreaseFormulaResp.GetEggEnergyNums,
AmountFee: "",
BeforePrice: calcPriceIncreaseFormulaResp.BeforePrice,
AfterPrice: calcPriceIncreaseFormulaResp.AfterPrice,
BeforePlanetTotalValue: calcPriceIncreaseFormulaResp.BeforePlanetTotalValue,
AfterPlanetTotalValue: calcPriceIncreaseFormulaResp.AfterPlanetTotalValue,
BeforeEnergyTotalNums: calcPriceIncreaseFormulaResp.BeforeEnergyTotalNums,
AfterEnergyTotalNums: calcPriceIncreaseFormulaResp.AfterEnergyTotalNums,
}
err4 := egg_energy.DealAvailableEggEnergyCoin(session, int(enum.CapitalInjection), eggEnergyCoreData, dealAvailableEggEnergyCoinReq)
if err4 != nil {
fmt.Println("ActivityCoinAutoExchangeEggPersonEnergy:::::err111:::", err4)
_ = session.Rollback()
session.Close()
return err4
}

// 6. 计算余额
balanceAmount := decimal.NewFromFloat(utils2.AnyToFloat64(data.BalanceAmount)).
Sub(decimal.NewFromFloat(utils2.AnyToFloat64(data.BalanceAmount))).
String()

// 7. 插入资金数据详细数据
record := model.EggEnergyFundDataRecords{
RecordsId: data.Id,
TotalAmount: data.TotalAmount,
BalanceAmount: balanceAmount,
BalanceTimes: data.BalanceTimes - 1,
BeforePrice: calcPriceIncreaseFormulaResp.BeforePrice,
AfterPrice: calcPriceIncreaseFormulaResp.AfterPrice,
BeforePlanetTotalValue: calcPriceIncreaseFormulaResp.BeforePlanetTotalValue,
AfterPlanetTotalValue: calcPriceIncreaseFormulaResp.AfterPlanetTotalValue,
CreateAt: now.Format("2006-01-02 15:04:05"),
UpdateAt: now.Format("2006-01-02 15:04:05"),
}
_, err5 := fundDataRecordsDb.EggEnergyFundDataRecordsInsertBySession(session, record)
if err5 != nil {
_ = session.Rollback()
return err5
}

// 8. 更新当前数据
fundData := model.EggEnergyFundData{
Id: data.Id,
Kind: data.Kind,
TotalAmount: data.TotalAmount,
BalanceAmount: balanceAmount,
Hours: data.Hours,
BalanceTimes: data.BalanceTimes - 1,
Frequency: data.Frequency,
Memo: data.Memo,
CreateAt: data.CreateAt,
UpdateAt: now.Format("2006-01-02 15:04:05"),
}

forceColumns := []string{
"balance_times",
"balance_amount",
}
_, err6 := fundDataDb.EggEnergyFundDataUpdateBySession(session, fundData, forceColumns...)
if err6 != nil {
_ = session.Rollback()
return err6
}
return session.Commit()
}

+ 171
- 0
consume/egg_energy_platform_revenue_data.go Voir le fichier

@@ -0,0 +1,171 @@
package consume

import (
"applet/app/cfg"
"applet/app/db"
utils2 "applet/app/utils"
"applet/app/utils/logx"
"applet/consume/md"
"code.fnuoos.com/EggPlanet/egg_models.git/src/implement"
"code.fnuoos.com/EggPlanet/egg_models.git/src/model"
"code.fnuoos.com/EggPlanet/egg_system_rules.git"
"code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy"
"code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/enum"
md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/md"
"code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/svc"
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
"encoding/json"
"errors"
"fmt"
"github.com/shopspring/decimal"
"github.com/streadway/amqp"
"time"
)

func EggEnergyDealPlatformRevenueDataConsume(queue md.MqQueue) {
fmt.Println(">>>>>>>>>>>>EggEnergyStartLevelDividendConsume>>>>>>>>>>>>")
ch, err := rabbit.Cfg.Pool.GetChannel()
if err != nil {
logx.Error(err)
return
}
defer ch.Release()
//1、将自己绑定到交换机上
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
//2、取出数据进行消费
ch.Qos(1)
delivery := ch.Consume(queue.Name, false)

egg_system_rules.Init(cfg.RedisAddr)
var res amqp.Delivery
var ok bool
for {
res, ok = <-delivery
if ok == true {
err = handleEggEnergyDealPlatformRevenueDataConsume(res.Body)
if err != nil {
fmt.Println("EggEnergyDealFundDataConsume_ERR:::::", err.Error())
utils2.FilePutContents("EggEnergyDealFundDataConsume_ERR", utils2.SerializeStr(map[string]interface{}{
"body": res.Body,
"err": err.Error(),
}))
}
//_ = res.Reject(false)
err = res.Ack(true)
fmt.Println("err ::: ", err)
} else {
panic(errors.New("error getting message"))
}
}
fmt.Println("get msg done")
}

func handleEggEnergyDealPlatformRevenueDataConsume(msgData []byte) error {
time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒
// 1.解析mq中queue的数据结构体
var msg *md2.EggEnergyStructForPlatformRevenueData
err := json.Unmarshal(msgData, &msg)
if err != nil {
return err
}
now := time.Now()

engine := db.Db
session := engine.NewSession()
session.Begin()

// 2. 获取 platform_revenue_data 数据
revenueDataDb := implement.NewPlatformRevenueDataDb(engine)
dataRecordsDb := implement.NewPlatformRevenueDataRecordsDb(engine)
data, err := revenueDataDb.PlatformRevenueDataGetOneByParams(map[string]interface{}{
"key": "id",
"value": msg.ID,
})
if err != nil {
return err
}

// 3. 获取核心数据
eggEnergyCoreData, cb, err2 := svc.GetEggEnergyCoreData(engine)
if err2 != nil {
return err2
}
if cb != nil {
defer cb() // 释放锁
}
// 4. 计算涨价公式
err3, calcPriceIncreaseFormulaResp := egg_energy.CalcPriceIncreaseFormula(utils2.AnyToString(msg.Amount), eggEnergyCoreData)
if err3 != nil {
_ = session.Rollback()
session.Close()
return err3
}

// 5. 处理可用能量
dealAvailableEggEnergyCoinReq := md2.DealAvailableEggEnergyCoinReq{
Amount: calcPriceIncreaseFormulaResp.GetEggEnergyNums,
AmountFee: "",
BeforePrice: calcPriceIncreaseFormulaResp.BeforePrice,
AfterPrice: calcPriceIncreaseFormulaResp.AfterPrice,
BeforePlanetTotalValue: calcPriceIncreaseFormulaResp.BeforePlanetTotalValue,
AfterPlanetTotalValue: calcPriceIncreaseFormulaResp.AfterPlanetTotalValue,
BeforeEnergyTotalNums: calcPriceIncreaseFormulaResp.BeforeEnergyTotalNums,
AfterEnergyTotalNums: calcPriceIncreaseFormulaResp.AfterEnergyTotalNums,
}
err4 := egg_energy.DealAvailableEggEnergyCoin(session, int(enum.CapitalInjection), eggEnergyCoreData, dealAvailableEggEnergyCoinReq)
if err4 != nil {
fmt.Println("ActivityCoinAutoExchangeEggPersonEnergy:::::err111:::", err4)
_ = session.Rollback()
session.Close()
return err4
}

// 6. 计算余额
balanceAmount := decimal.NewFromFloat(utils2.AnyToFloat64(data.BalanceAmount)).
Sub(decimal.NewFromFloat(utils2.AnyToFloat64(data.BalanceAmount))).
String()

// 7. 插入资金数据详细数据
record := model.PlatformRevenueDataRecords{
RecordsId: data.Id,
TotalAmount: data.TotalAmount,
BalanceAmount: balanceAmount,
BalanceTimes: data.BalanceTimes - 1,
BeforePrice: calcPriceIncreaseFormulaResp.BeforePrice,
AfterPrice: calcPriceIncreaseFormulaResp.AfterPrice,
BeforePlanetTotalValue: calcPriceIncreaseFormulaResp.BeforePlanetTotalValue,
AfterPlanetTotalValue: calcPriceIncreaseFormulaResp.AfterPlanetTotalValue,
CreateAt: now.Format("2006-01-02 15:04:05"),
UpdateAt: now.Format("2006-01-02 15:04:05"),
}
_, err5 := dataRecordsDb.PlatformRevenueDataRecordsInsertBySession(session, record)
if err5 != nil {
_ = session.Rollback()
return err5
}

// 8. 更新当前数据
PlatformRevenueData := model.PlatformRevenueData{
Id: data.Id,
Kind: data.Kind,
TotalAmount: data.TotalAmount,
BalanceAmount: balanceAmount,
Hours: data.Hours,
BalanceTimes: data.BalanceTimes - 1,
Frequency: data.Frequency,
Memo: data.Memo,
CreateAt: data.CreateAt,
UpdateAt: now.Format("2006-01-02 15:04:05"),
}

forceColumns := []string{
"balance_times",
"balance_amount",
}
_, err6 := revenueDataDb.PlatformRevenueDataUpdateBySession(session, PlatformRevenueData, forceColumns...)
if err6 != nil {
_ = session.Rollback()
return err6
}
return session.Commit()
}

+ 21
- 1
consume/md/consume_key.go Voir le fichier

@@ -20,8 +20,28 @@ var RabbitMqQueueKeyList = []*MqQueue{
BindKey: "",
ConsumeFunName: "EggEnergyStartLevelDividendConsume",
},
{
ExchangeName: "egg.energy",
Name: "egg_energy_fund_data_queue",
Type: DirectQueueType,
IsPersistent: false,
RoutKey: "fund_data",
BindKey: "",
ConsumeFunName: "EggEnergyDealFundDataConsume",
},
{
ExchangeName: "egg.energy",
Name: "egg_energy_platform_revenue_data_queue",
Type: DirectQueueType,
IsPersistent: false,
RoutKey: "platform_revenue_data",
BindKey: "",
ConsumeFunName: "EggEnergyDealPlatformRevenueDataConsume",
},
}

const (
EggEnergyStartLevelDividendFunName = "EggEnergyStartLevelDividendConsume"
EggEnergyStartLevelDividendFunName = "EggEnergyStartLevelDividendConsume"
EggEnergyDealFundDataFunName = "EggEnergyDealFundDataConsume"
EggEnergyDealPlatformRevenueDataFunName = "EggEnergyDealPlatformRevenueDataConsume"
)

+ 4
- 4
go.mod Voir le fichier

@@ -7,6 +7,8 @@ replace code.fnuoos.com/EggPlanet/egg_models.git => E:/company/Egg/egg_models
replace code.fnuoos.com/EggPlanet/egg_system_rules.git => E:/company/Egg/egg_system_rules

require (
code.fnuoos.com/EggPlanet/egg_models.git v0.2.1-0.20241114063419-cb68a0ed34ee
code.fnuoos.com/EggPlanet/egg_system_rules.git v0.0.2
code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git v0.0.5
github.com/boombuler/barcode v1.0.1
github.com/forgoer/openssl v1.2.1
@@ -15,6 +17,7 @@ require (
github.com/go-playground/universal-translator v0.18.1
github.com/go-playground/validator/v10 v10.19.0
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-sql-driver/mysql v1.8.1
github.com/gomodule/redigo v2.0.0+incompatible
github.com/makiuchi-d/gozxing v0.1.1
github.com/qiniu/api.v7/v7 v7.8.2
@@ -27,18 +30,16 @@ require (
google.golang.org/protobuf v1.33.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gopkg.in/yaml.v2 v2.4.0
xorm.io/xorm v1.3.1
)

require (
code.fnuoos.com/EggPlanet/egg_models.git v0.2.1-0.20241114063419-cb68a0ed34ee // indirect
code.fnuoos.com/EggPlanet/egg_system_rules.git v0.0.2 // indirect
filippo.io/edwards25519 v1.1.0 // indirect
github.com/bytedance/sonic v1.11.3 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect
github.com/chenzhuoyu/iasm v0.9.1 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-sql-driver/mysql v1.8.1 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
@@ -74,5 +75,4 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
honnef.co/go/tools v0.0.1-2020.1.4 // indirect
xorm.io/builder v0.3.13 // indirect
xorm.io/xorm v1.3.1 // indirect
)

Chargement…
Annuler
Enregistrer