From f3e984d1198fcee8b9365bfe0c8371ecad8b3116 Mon Sep 17 00:00:00 2001 From: shenjiachi Date: Thu, 14 Nov 2024 17:26:37 +0800 Subject: [PATCH] add consume fund data and platform revenue data --- consume/egg_energy_fund_data_consume.go | 171 ++++++++++++++++++++ consume/egg_energy_platform_revenue_data.go | 171 ++++++++++++++++++++ consume/md/consume_key.go | 22 ++- go.mod | 8 +- 4 files changed, 367 insertions(+), 5 deletions(-) create mode 100644 consume/egg_energy_fund_data_consume.go create mode 100644 consume/egg_energy_platform_revenue_data.go diff --git a/consume/egg_energy_fund_data_consume.go b/consume/egg_energy_fund_data_consume.go new file mode 100644 index 0000000..b257b5e --- /dev/null +++ b/consume/egg_energy_fund_data_consume.go @@ -0,0 +1,171 @@ +package consume + +import ( + "applet/app/cfg" + "applet/app/db" + utils2 "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/EggPlanet/egg_models.git/src/implement" + "code.fnuoos.com/EggPlanet/egg_models.git/src/model" + "code.fnuoos.com/EggPlanet/egg_system_rules.git" + "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy" + "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/enum" + md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/md" + "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/svc" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "encoding/json" + "errors" + "fmt" + "github.com/shopspring/decimal" + "github.com/streadway/amqp" + "time" +) + +func EggEnergyDealFundDataConsume(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>EggEnergyStartLevelDividendConsume>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1) + delivery := ch.Consume(queue.Name, false) + + egg_system_rules.Init(cfg.RedisAddr) + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + err = handleEggEnergyStartLevelDividendConsume(res.Body) + if err != nil { + fmt.Println("EggEnergyDealFundDataConsume_ERR:::::", err.Error()) + utils2.FilePutContents("EggEnergyDealFundDataConsume_ERR", utils2.SerializeStr(map[string]interface{}{ + "body": res.Body, + "err": err.Error(), + })) + } + //_ = res.Reject(false) + err = res.Ack(true) + fmt.Println("err ::: ", err) + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleEggEnergyDealFundDataConsume(msgData []byte) error { + time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒 + // 1.解析mq中queue的数据结构体 + var msg *md2.EggEnergyStructForEggEnergyFundData + err := json.Unmarshal(msgData, &msg) + if err != nil { + return err + } + now := time.Now() + + engine := db.Db + session := engine.NewSession() + session.Begin() + + // 2. 获取 fund_data 数据 + fundDataDb := implement.NewEggEnergyFundDataDb(engine) + fundDataRecordsDb := implement.NewEggEnergyFundDataRecordsDb(engine) + data, err := fundDataDb.EggEnergyFundDataGetOneByParams(map[string]interface{}{ + "key": "id", + "value": msg.ID, + }) + if err != nil { + return err + } + + // 3. 获取核心数据 + eggEnergyCoreData, cb, err2 := svc.GetEggEnergyCoreData(engine) + if err2 != nil { + return err2 + } + if cb != nil { + defer cb() // 释放锁 + } + // 4. 计算涨价公式 + err3, calcPriceIncreaseFormulaResp := egg_energy.CalcPriceIncreaseFormula(utils2.AnyToString(msg.Amount), eggEnergyCoreData) + if err3 != nil { + _ = session.Rollback() + session.Close() + return err3 + } + + // 5. 处理可用能量 + dealAvailableEggEnergyCoinReq := md2.DealAvailableEggEnergyCoinReq{ + Amount: calcPriceIncreaseFormulaResp.GetEggEnergyNums, + AmountFee: "", + BeforePrice: calcPriceIncreaseFormulaResp.BeforePrice, + AfterPrice: calcPriceIncreaseFormulaResp.AfterPrice, + BeforePlanetTotalValue: calcPriceIncreaseFormulaResp.BeforePlanetTotalValue, + AfterPlanetTotalValue: calcPriceIncreaseFormulaResp.AfterPlanetTotalValue, + BeforeEnergyTotalNums: calcPriceIncreaseFormulaResp.BeforeEnergyTotalNums, + AfterEnergyTotalNums: calcPriceIncreaseFormulaResp.AfterEnergyTotalNums, + } + err4 := egg_energy.DealAvailableEggEnergyCoin(session, int(enum.CapitalInjection), eggEnergyCoreData, dealAvailableEggEnergyCoinReq) + if err4 != nil { + fmt.Println("ActivityCoinAutoExchangeEggPersonEnergy:::::err111:::", err4) + _ = session.Rollback() + session.Close() + return err4 + } + + // 6. 计算余额 + balanceAmount := decimal.NewFromFloat(utils2.AnyToFloat64(data.BalanceAmount)). + Sub(decimal.NewFromFloat(utils2.AnyToFloat64(data.BalanceAmount))). + String() + + // 7. 插入资金数据详细数据 + record := model.EggEnergyFundDataRecords{ + RecordsId: data.Id, + TotalAmount: data.TotalAmount, + BalanceAmount: balanceAmount, + BalanceTimes: data.BalanceTimes - 1, + BeforePrice: calcPriceIncreaseFormulaResp.BeforePrice, + AfterPrice: calcPriceIncreaseFormulaResp.AfterPrice, + BeforePlanetTotalValue: calcPriceIncreaseFormulaResp.BeforePlanetTotalValue, + AfterPlanetTotalValue: calcPriceIncreaseFormulaResp.AfterPlanetTotalValue, + CreateAt: now.Format("2006-01-02 15:04:05"), + UpdateAt: now.Format("2006-01-02 15:04:05"), + } + _, err5 := fundDataRecordsDb.EggEnergyFundDataRecordsInsertBySession(session, record) + if err5 != nil { + _ = session.Rollback() + return err5 + } + + // 8. 更新当前数据 + fundData := model.EggEnergyFundData{ + Id: data.Id, + Kind: data.Kind, + TotalAmount: data.TotalAmount, + BalanceAmount: balanceAmount, + Hours: data.Hours, + BalanceTimes: data.BalanceTimes - 1, + Frequency: data.Frequency, + Memo: data.Memo, + CreateAt: data.CreateAt, + UpdateAt: now.Format("2006-01-02 15:04:05"), + } + + forceColumns := []string{ + "balance_times", + "balance_amount", + } + _, err6 := fundDataDb.EggEnergyFundDataUpdateBySession(session, fundData, forceColumns...) + if err6 != nil { + _ = session.Rollback() + return err6 + } + return session.Commit() +} diff --git a/consume/egg_energy_platform_revenue_data.go b/consume/egg_energy_platform_revenue_data.go new file mode 100644 index 0000000..c98855b --- /dev/null +++ b/consume/egg_energy_platform_revenue_data.go @@ -0,0 +1,171 @@ +package consume + +import ( + "applet/app/cfg" + "applet/app/db" + utils2 "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/EggPlanet/egg_models.git/src/implement" + "code.fnuoos.com/EggPlanet/egg_models.git/src/model" + "code.fnuoos.com/EggPlanet/egg_system_rules.git" + "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy" + "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/enum" + md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/md" + "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/svc" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "encoding/json" + "errors" + "fmt" + "github.com/shopspring/decimal" + "github.com/streadway/amqp" + "time" +) + +func EggEnergyDealPlatformRevenueDataConsume(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>EggEnergyStartLevelDividendConsume>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1) + delivery := ch.Consume(queue.Name, false) + + egg_system_rules.Init(cfg.RedisAddr) + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + err = handleEggEnergyDealPlatformRevenueDataConsume(res.Body) + if err != nil { + fmt.Println("EggEnergyDealFundDataConsume_ERR:::::", err.Error()) + utils2.FilePutContents("EggEnergyDealFundDataConsume_ERR", utils2.SerializeStr(map[string]interface{}{ + "body": res.Body, + "err": err.Error(), + })) + } + //_ = res.Reject(false) + err = res.Ack(true) + fmt.Println("err ::: ", err) + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleEggEnergyDealPlatformRevenueDataConsume(msgData []byte) error { + time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒 + // 1.解析mq中queue的数据结构体 + var msg *md2.EggEnergyStructForPlatformRevenueData + err := json.Unmarshal(msgData, &msg) + if err != nil { + return err + } + now := time.Now() + + engine := db.Db + session := engine.NewSession() + session.Begin() + + // 2. 获取 platform_revenue_data 数据 + revenueDataDb := implement.NewPlatformRevenueDataDb(engine) + dataRecordsDb := implement.NewPlatformRevenueDataRecordsDb(engine) + data, err := revenueDataDb.PlatformRevenueDataGetOneByParams(map[string]interface{}{ + "key": "id", + "value": msg.ID, + }) + if err != nil { + return err + } + + // 3. 获取核心数据 + eggEnergyCoreData, cb, err2 := svc.GetEggEnergyCoreData(engine) + if err2 != nil { + return err2 + } + if cb != nil { + defer cb() // 释放锁 + } + // 4. 计算涨价公式 + err3, calcPriceIncreaseFormulaResp := egg_energy.CalcPriceIncreaseFormula(utils2.AnyToString(msg.Amount), eggEnergyCoreData) + if err3 != nil { + _ = session.Rollback() + session.Close() + return err3 + } + + // 5. 处理可用能量 + dealAvailableEggEnergyCoinReq := md2.DealAvailableEggEnergyCoinReq{ + Amount: calcPriceIncreaseFormulaResp.GetEggEnergyNums, + AmountFee: "", + BeforePrice: calcPriceIncreaseFormulaResp.BeforePrice, + AfterPrice: calcPriceIncreaseFormulaResp.AfterPrice, + BeforePlanetTotalValue: calcPriceIncreaseFormulaResp.BeforePlanetTotalValue, + AfterPlanetTotalValue: calcPriceIncreaseFormulaResp.AfterPlanetTotalValue, + BeforeEnergyTotalNums: calcPriceIncreaseFormulaResp.BeforeEnergyTotalNums, + AfterEnergyTotalNums: calcPriceIncreaseFormulaResp.AfterEnergyTotalNums, + } + err4 := egg_energy.DealAvailableEggEnergyCoin(session, int(enum.CapitalInjection), eggEnergyCoreData, dealAvailableEggEnergyCoinReq) + if err4 != nil { + fmt.Println("ActivityCoinAutoExchangeEggPersonEnergy:::::err111:::", err4) + _ = session.Rollback() + session.Close() + return err4 + } + + // 6. 计算余额 + balanceAmount := decimal.NewFromFloat(utils2.AnyToFloat64(data.BalanceAmount)). + Sub(decimal.NewFromFloat(utils2.AnyToFloat64(data.BalanceAmount))). + String() + + // 7. 插入资金数据详细数据 + record := model.PlatformRevenueDataRecords{ + RecordsId: data.Id, + TotalAmount: data.TotalAmount, + BalanceAmount: balanceAmount, + BalanceTimes: data.BalanceTimes - 1, + BeforePrice: calcPriceIncreaseFormulaResp.BeforePrice, + AfterPrice: calcPriceIncreaseFormulaResp.AfterPrice, + BeforePlanetTotalValue: calcPriceIncreaseFormulaResp.BeforePlanetTotalValue, + AfterPlanetTotalValue: calcPriceIncreaseFormulaResp.AfterPlanetTotalValue, + CreateAt: now.Format("2006-01-02 15:04:05"), + UpdateAt: now.Format("2006-01-02 15:04:05"), + } + _, err5 := dataRecordsDb.PlatformRevenueDataRecordsInsertBySession(session, record) + if err5 != nil { + _ = session.Rollback() + return err5 + } + + // 8. 更新当前数据 + PlatformRevenueData := model.PlatformRevenueData{ + Id: data.Id, + Kind: data.Kind, + TotalAmount: data.TotalAmount, + BalanceAmount: balanceAmount, + Hours: data.Hours, + BalanceTimes: data.BalanceTimes - 1, + Frequency: data.Frequency, + Memo: data.Memo, + CreateAt: data.CreateAt, + UpdateAt: now.Format("2006-01-02 15:04:05"), + } + + forceColumns := []string{ + "balance_times", + "balance_amount", + } + _, err6 := revenueDataDb.PlatformRevenueDataUpdateBySession(session, PlatformRevenueData, forceColumns...) + if err6 != nil { + _ = session.Rollback() + return err6 + } + return session.Commit() +} diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index 64e2475..facd11e 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -20,8 +20,28 @@ var RabbitMqQueueKeyList = []*MqQueue{ BindKey: "", ConsumeFunName: "EggEnergyStartLevelDividendConsume", }, + { + ExchangeName: "egg.energy", + Name: "egg_energy_fund_data_queue", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "fund_data", + BindKey: "", + ConsumeFunName: "EggEnergyDealFundDataConsume", + }, + { + ExchangeName: "egg.energy", + Name: "egg_energy_platform_revenue_data_queue", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "platform_revenue_data", + BindKey: "", + ConsumeFunName: "EggEnergyDealPlatformRevenueDataConsume", + }, } const ( - EggEnergyStartLevelDividendFunName = "EggEnergyStartLevelDividendConsume" + EggEnergyStartLevelDividendFunName = "EggEnergyStartLevelDividendConsume" + EggEnergyDealFundDataFunName = "EggEnergyDealFundDataConsume" + EggEnergyDealPlatformRevenueDataFunName = "EggEnergyDealPlatformRevenueDataConsume" ) diff --git a/go.mod b/go.mod index 4b6e5a9..d769288 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,8 @@ replace code.fnuoos.com/EggPlanet/egg_models.git => E:/company/Egg/egg_models replace code.fnuoos.com/EggPlanet/egg_system_rules.git => E:/company/Egg/egg_system_rules require ( + code.fnuoos.com/EggPlanet/egg_models.git v0.2.1-0.20241114063419-cb68a0ed34ee + code.fnuoos.com/EggPlanet/egg_system_rules.git v0.0.2 code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git v0.0.5 github.com/boombuler/barcode v1.0.1 github.com/forgoer/openssl v1.2.1 @@ -15,6 +17,7 @@ require ( github.com/go-playground/universal-translator v0.18.1 github.com/go-playground/validator/v10 v10.19.0 github.com/go-redis/redis v6.15.9+incompatible + github.com/go-sql-driver/mysql v1.8.1 github.com/gomodule/redigo v2.0.0+incompatible github.com/makiuchi-d/gozxing v0.1.1 github.com/qiniu/api.v7/v7 v7.8.2 @@ -27,18 +30,16 @@ require ( google.golang.org/protobuf v1.33.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 gopkg.in/yaml.v2 v2.4.0 + xorm.io/xorm v1.3.1 ) require ( - code.fnuoos.com/EggPlanet/egg_models.git v0.2.1-0.20241114063419-cb68a0ed34ee // indirect - code.fnuoos.com/EggPlanet/egg_system_rules.git v0.0.2 // indirect filippo.io/edwards25519 v1.1.0 // indirect github.com/bytedance/sonic v1.11.3 // indirect github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect github.com/chenzhuoyu/iasm v0.9.1 // indirect github.com/gabriel-vasile/mimetype v1.4.3 // indirect github.com/gin-contrib/sse v0.1.0 // indirect - github.com/go-sql-driver/mysql v1.8.1 // indirect github.com/goccy/go-json v0.10.2 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.4 // indirect @@ -74,5 +75,4 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect honnef.co/go/tools v0.0.1-2020.1.4 // indirect xorm.io/builder v0.3.13 // indirect - xorm.io/xorm v1.3.1 // indirect )