diff --git a/app/task/svc/svc_deal_fund_data.go b/app/task/svc/svc_deal_fund_data.go index 2a1008e..0df5c18 100644 --- a/app/task/svc/svc_deal_fund_data.go +++ b/app/task/svc/svc_deal_fund_data.go @@ -2,27 +2,37 @@ package svc import ( "applet/app/utils" + "applet/app/utils/cache" "applet/app/utils/logx" "code.fnuoos.com/EggPlanet/egg_models.git/src/implement" - "code.fnuoos.com/EggPlanet/egg_models.git/src/model" - zhios_order_relate_logx "code.fnuoos.com/EggPlanet/egg_models.git/utils/logx" - "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy" - "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/enum" md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/md" - "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/svc" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" "fmt" + "github.com/shopspring/decimal" "time" "xorm.io/xorm" ) -func DealFundData(eg *xorm.Engine, dbName string) { +const DealFundDataKey = "deal_fund_data_key" + +func DealFundData(eg *xorm.Engine, dbName string, ch *rabbit.Channel) { fmt.Println("deal_fund_data...") defer func() { if err := recover(); err != nil { - _ = logx.Error(err) + fmt.Println(err) + return } }() + // 悲观锁防止串行 + getString, _ := cache.GetString(DealFundDataKey) + if getString != "" { + fmt.Println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!", "上一次处理未执行完") + return + } + cache.SetEx(DealFundDataKey, "running", 60*30) //30分钟 + + // 查询所有未被执行完的数据 fundDataDb := implement.NewEggEnergyFundDataDb(eg) fundDataList, err := fundDataDb.EggEnergyFundDataFindNotFinish() if err != nil { @@ -33,113 +43,122 @@ func DealFundData(eg *xorm.Engine, dbName string) { now := time.Now() fundDataRecordsDb := implement.NewEggEnergyFundDataRecordsDb(eg) for _, data := range fundDataList { - // 判断是否是第一次执行 + // 1、判断是否是第一次执行 lastRecord, err1 := fundDataRecordsDb.EggEnergyFundDataRecordsGetLast(data.Id) if err1 != nil { _ = logx.Error(err1) continue } if lastRecord == nil { - // 首次执行 判断当前是否可以执行 + // 2.1 首次执行 判断当前是否可以执行 if utils.TimeParseStd(data.CreateAt).Add(time.Duration(data.Frequency) * time.Minute).After(now) { continue } } else { - // 已经被执行过 + // 2.2 已经被执行过 if utils.TimeParseStd(lastRecord.CreateAt).Add(time.Duration(data.Frequency) * time.Minute).After(now) { continue } } - // 总执行次数 按十分钟更新 - times := data.Hours * 60 / data.Frequency - // 每次更新金额 - amount := utils.AnyToFloat64(data.TotalAmount) / utils.AnyToFloat64(times) - - session := eg.NewSession() - defer func() { - session.Close() - if err := recover(); err != nil { - _ = zhios_order_relate_logx.Error(err) - } - }() - session.Begin() - - eggEnergyCoreData, cb, err2 := svc.GetEggEnergyCoreData(eg) + // 3、计算总执行次数、每次更新金额 (每次更新金额 = 金额 / 总执行次数 总执行次数 = 时间 / 频次) + frequency := decimal.NewFromInt(int64(data.Frequency)) + times := decimal.NewFromInt(int64(data.Hours * 60)).Div(frequency).RoundFloor(8) + totalAmount, err2 := decimal.NewFromString(data.TotalAmount) if err2 != nil { - _ = logx.Error(err2) - } - if cb != nil { - defer cb() // 释放锁 - } - //计算涨价公式 - err3, calcPriceIncreaseFormulaResp := egg_energy.CalcPriceIncreaseFormula(utils.AnyToString(amount), eggEnergyCoreData) - if err3 != nil { - _ = logx.Error(err3) - _ = session.Rollback() + fmt.Println(err2) continue } + amount := totalAmount.Div(times) - dealAvailableEggEnergyCoinReq := md2.DealAvailableEggEnergyCoinReq{ - Amount: calcPriceIncreaseFormulaResp.GetEggEnergyNums, - AmountFee: "", - BeforePrice: calcPriceIncreaseFormulaResp.BeforePrice, - AfterPrice: calcPriceIncreaseFormulaResp.AfterPrice, - BeforePlanetTotalValue: calcPriceIncreaseFormulaResp.BeforePlanetTotalValue, - AfterPlanetTotalValue: calcPriceIncreaseFormulaResp.AfterPlanetTotalValue, - BeforeEnergyTotalNums: calcPriceIncreaseFormulaResp.BeforeEnergyTotalNums, - AfterEnergyTotalNums: calcPriceIncreaseFormulaResp.AfterEnergyTotalNums, - } + // 4、推入rabbitmq 异步处理 + ch.Publish(md2.EggEnergyExchange, md2.EggEnergyStructForEggEnergyFundData{ + ID: data.Id, + Amount: amount.String(), + }, md2.EggEnergyRoutKeyForEggEnergyFundData) - err4 := egg_energy.DealAvailableEggEnergyCoin(session, int(enum.CapitalInjection), eggEnergyCoreData, dealAvailableEggEnergyCoinReq) - if err4 != nil { - fmt.Println("ActivityCoinAutoExchangeEggPersonEnergy:::::err111:::", err4) - _ = session.Rollback() - continue - } - - // 插入资金数据详细数据 - record := model.EggEnergyFundDataRecords{ - RecordsId: data.Id, - TotalAmount: data.TotalAmount, - BalanceAmount: utils.AnyToString(utils.AnyToFloat64(data.BalanceAmount) - amount), - BalanceTimes: data.BalanceTimes - 1, - BeforePrice: calcPriceIncreaseFormulaResp.BeforePrice, - AfterPrice: calcPriceIncreaseFormulaResp.AfterPrice, - BeforePlanetTotalValue: calcPriceIncreaseFormulaResp.BeforePlanetTotalValue, - AfterPlanetTotalValue: calcPriceIncreaseFormulaResp.AfterPlanetTotalValue, - CreateAt: now.Format("2006-01-02 15:04:05"), - UpdateAt: now.Format("2006-01-02 15:04:05"), - } - _, err5 := fundDataRecordsDb.EggEnergyFundDataRecordsInsertBySession(session, record) - if err5 != nil { - _ = session.Rollback() - continue - } - - // 更新当前数据 - fundData := model.EggEnergyFundData{ - Id: data.Id, - Kind: data.Kind, - TotalAmount: data.TotalAmount, - BalanceAmount: utils.AnyToString(utils.AnyToFloat64(data.BalanceAmount) - amount), - Hours: data.Hours, - BalanceTimes: data.BalanceTimes - 1, - Frequency: data.Frequency, - Memo: data.Memo, - CreateAt: data.CreateAt, - UpdateAt: now.Format("2006-01-02 15:04:05"), - } - - forceColumns := []string{ - "balance_times", - "balance_amount", - } - _, err6 := fundDataDb.EggEnergyFundDataUpdateBySession(session, fundData, forceColumns...) - if err6 != nil { - _ = logx.Error(err6) - _ = session.Rollback() - continue - } + //session := eg.NewSession() + //session.Begin() + // + //eggEnergyCoreData, cb, err2 := svc.GetEggEnergyCoreData(eg) + //if err2 != nil { + // _ = logx.Error(err2) + //} + //if cb != nil { + // defer cb() // 释放锁 + //} + ////计算涨价公式 + //err3, calcPriceIncreaseFormulaResp := egg_energy.CalcPriceIncreaseFormula(utils.AnyToString(amount), eggEnergyCoreData) + //if err3 != nil { + // _ = logx.Error(err3) + // _ = session.Rollback() + // session.Close() + // continue + //} + // + //dealAvailableEggEnergyCoinReq := md2.DealAvailableEggEnergyCoinReq{ + // Amount: calcPriceIncreaseFormulaResp.GetEggEnergyNums, + // AmountFee: "", + // BeforePrice: calcPriceIncreaseFormulaResp.BeforePrice, + // AfterPrice: calcPriceIncreaseFormulaResp.AfterPrice, + // BeforePlanetTotalValue: calcPriceIncreaseFormulaResp.BeforePlanetTotalValue, + // AfterPlanetTotalValue: calcPriceIncreaseFormulaResp.AfterPlanetTotalValue, + // BeforeEnergyTotalNums: calcPriceIncreaseFormulaResp.BeforeEnergyTotalNums, + // AfterEnergyTotalNums: calcPriceIncreaseFormulaResp.AfterEnergyTotalNums, + //} + // + //err4 := egg_energy.DealAvailableEggEnergyCoin(session, int(enum.CapitalInjection), eggEnergyCoreData, dealAvailableEggEnergyCoinReq) + //if err4 != nil { + // fmt.Println("ActivityCoinAutoExchangeEggPersonEnergy:::::err111:::", err4) + // _ = session.Rollback() + // session.Close() + // continue + //} + // + //// 插入资金数据详细数据 + //record := model.EggEnergyFundDataRecords{ + // RecordsId: data.Id, + // TotalAmount: data.TotalAmount, + // BalanceAmount: utils.AnyToString(utils.AnyToFloat64(data.BalanceAmount) - amount), + // BalanceTimes: data.BalanceTimes - 1, + // BeforePrice: calcPriceIncreaseFormulaResp.BeforePrice, + // AfterPrice: calcPriceIncreaseFormulaResp.AfterPrice, + // BeforePlanetTotalValue: calcPriceIncreaseFormulaResp.BeforePlanetTotalValue, + // AfterPlanetTotalValue: calcPriceIncreaseFormulaResp.AfterPlanetTotalValue, + // CreateAt: now.Format("2006-01-02 15:04:05"), + // UpdateAt: now.Format("2006-01-02 15:04:05"), + //} + //_, err5 := fundDataRecordsDb.EggEnergyFundDataRecordsInsertBySession(session, record) + //if err5 != nil { + // _ = session.Rollback() + // continue + //} + // + //// 更新当前数据 + //fundData := model.EggEnergyFundData{ + // Id: data.Id, + // Kind: data.Kind, + // TotalAmount: data.TotalAmount, + // BalanceAmount: utils.AnyToString(utils.AnyToFloat64(data.BalanceAmount) - amount), + // Hours: data.Hours, + // BalanceTimes: data.BalanceTimes - 1, + // Frequency: data.Frequency, + // Memo: data.Memo, + // CreateAt: data.CreateAt, + // UpdateAt: now.Format("2006-01-02 15:04:05"), + //} + // + //forceColumns := []string{ + // "balance_times", + // "balance_amount", + //} + //_, err6 := fundDataDb.EggEnergyFundDataUpdateBySession(session, fundData, forceColumns...) + //if err6 != nil { + // _ = logx.Error(err6) + // _ = session.Rollback() + // continue + //} } + + cache.Del(DealFundDataKey) } diff --git a/app/task/svc/svc_deal_platform_revenue_data.go b/app/task/svc/svc_deal_platform_revenue_data.go index 19cd3c8..18a9198 100644 --- a/app/task/svc/svc_deal_platform_revenue_data.go +++ b/app/task/svc/svc_deal_platform_revenue_data.go @@ -2,27 +2,37 @@ package svc import ( "applet/app/utils" + "applet/app/utils/cache" "applet/app/utils/logx" "code.fnuoos.com/EggPlanet/egg_models.git/src/implement" - "code.fnuoos.com/EggPlanet/egg_models.git/src/model" - zhios_order_relate_logx "code.fnuoos.com/EggPlanet/egg_models.git/utils/logx" - "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy" - "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/enum" md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/md" - "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/svc" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" "fmt" + "github.com/shopspring/decimal" "time" "xorm.io/xorm" ) -func DealPlatformRevenueData(eg *xorm.Engine, dbName string) { +const DealPlatformRevenueDataKey = "deal_platform_revenue_data" + +func DealPlatformRevenueData(eg *xorm.Engine, dbName string, ch *rabbit.Channel) { fmt.Println("deal_platform_revenue_data...") defer func() { if err := recover(); err != nil { - _ = logx.Error(err) + fmt.Println(err) + return } }() + // 悲观锁防止串行 + getString, _ := cache.GetString(DealFundDataKey) + if getString != "" { + fmt.Println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!", "上一次处理未执行完") + return + } + cache.SetEx(DealFundDataKey, "running", 60*30) //30分钟 + + // 查询所有未被执行完的数据 platformRevenueDataDb := implement.NewPlatformRevenueDataDb(eg) platformRevenueDataList, err := platformRevenueDataDb.PlatformRevenueDataFindNotFinish() if err != nil { @@ -33,113 +43,126 @@ func DealPlatformRevenueData(eg *xorm.Engine, dbName string) { now := time.Now() recordsDb := implement.NewPlatformRevenueDataRecordsDb(eg) for _, data := range platformRevenueDataList { - // 判断是否是第一次执行 + // 1、判断是否是第一次执行 lastRecord, err1 := recordsDb.PlatformRevenueDataRecordsGetLast(data.Id) if err1 != nil { _ = logx.Error(err1) continue } if lastRecord == nil { - // 首次执行 判断当前是否可以执行 + // 2.1 首次执行 判断当前是否可以执行 if utils.TimeParseStd(data.CreateAt).Add(time.Duration(data.Frequency) * time.Minute).After(now) { continue } } else { - // 已经被执行过 + // 2.2 已经被执行过 if utils.TimeParseStd(lastRecord.CreateAt).Add(time.Duration(data.Frequency) * time.Minute).After(now) { continue } } - // 总执行次数 按十分钟更新 - times := data.Hours * 60 / data.Frequency - // 每次更新金额 - amount := utils.AnyToFloat64(data.TotalAmount) / utils.AnyToFloat64(times) - - session := eg.NewSession() - defer func() { - session.Close() - if err := recover(); err != nil { - _ = zhios_order_relate_logx.Error(err) - } - }() - session.Begin() - - eggEnergyCoreData, cb, err2 := svc.GetEggEnergyCoreData(eg) + // 3、计算总执行次数、每次更新金额 (每次更新金额 = 金额 / 总执行次数 总执行次数 = 时间 / 频次) + frequency := decimal.NewFromInt(int64(data.Frequency)) + times := decimal.NewFromInt(int64(data.Hours * 60)).Div(frequency).RoundFloor(8) + totalAmount, err2 := decimal.NewFromString(data.TotalAmount) if err2 != nil { - _ = logx.Error(err2) - } - if cb != nil { - defer cb() // 释放锁 - } - //计算涨价公式 - err3, calcPriceIncreaseFormulaResp := egg_energy.CalcPriceIncreaseFormula(utils.AnyToString(amount), eggEnergyCoreData) - if err3 != nil { - _ = logx.Error(err3) - _ = session.Rollback() + fmt.Println(err2) continue } + amount := totalAmount.Div(times) - dealAvailableEggEnergyCoinReq := md2.DealAvailableEggEnergyCoinReq{ - Amount: calcPriceIncreaseFormulaResp.GetEggEnergyNums, - AmountFee: "", - BeforePrice: calcPriceIncreaseFormulaResp.BeforePrice, - AfterPrice: calcPriceIncreaseFormulaResp.AfterPrice, - BeforePlanetTotalValue: calcPriceIncreaseFormulaResp.BeforePlanetTotalValue, - AfterPlanetTotalValue: calcPriceIncreaseFormulaResp.AfterPlanetTotalValue, - BeforeEnergyTotalNums: calcPriceIncreaseFormulaResp.BeforeEnergyTotalNums, - AfterEnergyTotalNums: calcPriceIncreaseFormulaResp.AfterEnergyTotalNums, - } + // 4、推入rabbitmq 异步处理 + ch.Publish(md2.EggEnergyExchange, md2.EggEnergyStructForPlatformRevenueData{ + ID: data.Id, + Amount: amount.String(), + }, md2.EggEnergyRoutKeyForPlatformRevenueData) - err4 := egg_energy.DealAvailableEggEnergyCoin(session, int(enum.CapitalInjection), eggEnergyCoreData, dealAvailableEggEnergyCoinReq) - if err4 != nil { - fmt.Println("ActivityCoinAutoExchangeEggPersonEnergy:::::err111:::", err4) - _ = session.Rollback() - continue - } - - // 插入平台营收详细数据 - record := model.PlatformRevenueDataRecords{ - RecordsId: data.Id, - TotalAmount: data.TotalAmount, - BalanceAmount: utils.AnyToString(utils.AnyToFloat64(data.BalanceAmount) - amount), - BalanceTimes: data.BalanceTimes - 1, - BeforePrice: calcPriceIncreaseFormulaResp.BeforePrice, - AfterPrice: calcPriceIncreaseFormulaResp.AfterPrice, - BeforePlanetTotalValue: calcPriceIncreaseFormulaResp.BeforePlanetTotalValue, - AfterPlanetTotalValue: calcPriceIncreaseFormulaResp.AfterPlanetTotalValue, - CreateAt: now.Format("2006-01-02 15:04:05"), - UpdateAt: now.Format("2006-01-02 15:04:05"), - } - _, err5 := recordsDb.PlatformRevenueDataRecordsInsertBySession(session, record) - if err5 != nil { - _ = session.Rollback() - continue - } - - // 更新当前数据 - platformRevenueData := model.PlatformRevenueData{ - Id: data.Id, - Kind: data.Kind, - TotalAmount: data.TotalAmount, - BalanceAmount: utils.AnyToString(utils.AnyToFloat64(data.BalanceAmount) - amount), - Hours: data.Hours, - BalanceTimes: data.BalanceTimes - 1, - Frequency: data.Frequency, - Memo: data.Memo, - CreateAt: data.CreateAt, - UpdateAt: now.Format("2006-01-02 15:04:05"), - } - - forceColumns := []string{ - "balance_times", - "balance_amount", - } - _, err6 := platformRevenueDataDb.PlatformRevenueDataUpdateBySession(session, platformRevenueData, forceColumns...) - if err6 != nil { - _ = logx.Error(err6) - _ = session.Rollback() - continue - } + //session := eg.NewSession() + //defer func() { + // session.Close() + // if err := recover(); err != nil { + // _ = zhios_order_relate_logx.Error(err) + // } + //}() + //session.Begin() + // + //eggEnergyCoreData, cb, err2 := svc.GetEggEnergyCoreData(eg) + //if err2 != nil { + // _ = logx.Error(err2) + //} + //if cb != nil { + // defer cb() // 释放锁 + //} + ////计算涨价公式 + //err3, calcPriceIncreaseFormulaResp := egg_energy.CalcPriceIncreaseFormula(utils.AnyToString(amount), eggEnergyCoreData) + //if err3 != nil { + // _ = logx.Error(err3) + // _ = session.Rollback() + // continue + //} + // + //dealAvailableEggEnergyCoinReq := md2.DealAvailableEggEnergyCoinReq{ + // Amount: calcPriceIncreaseFormulaResp.GetEggEnergyNums, + // AmountFee: "", + // BeforePrice: calcPriceIncreaseFormulaResp.BeforePrice, + // AfterPrice: calcPriceIncreaseFormulaResp.AfterPrice, + // BeforePlanetTotalValue: calcPriceIncreaseFormulaResp.BeforePlanetTotalValue, + // AfterPlanetTotalValue: calcPriceIncreaseFormulaResp.AfterPlanetTotalValue, + // BeforeEnergyTotalNums: calcPriceIncreaseFormulaResp.BeforeEnergyTotalNums, + // AfterEnergyTotalNums: calcPriceIncreaseFormulaResp.AfterEnergyTotalNums, + //} + // + //err4 := egg_energy.DealAvailableEggEnergyCoin(session, int(enum.CapitalInjection), eggEnergyCoreData, dealAvailableEggEnergyCoinReq) + //if err4 != nil { + // fmt.Println("ActivityCoinAutoExchangeEggPersonEnergy:::::err111:::", err4) + // _ = session.Rollback() + // continue + //} + // + //// 插入平台营收详细数据 + //record := model.PlatformRevenueDataRecords{ + // RecordsId: data.Id, + // TotalAmount: data.TotalAmount, + // BalanceAmount: utils.AnyToString(utils.AnyToFloat64(data.BalanceAmount) - amount), + // BalanceTimes: data.BalanceTimes - 1, + // BeforePrice: calcPriceIncreaseFormulaResp.BeforePrice, + // AfterPrice: calcPriceIncreaseFormulaResp.AfterPrice, + // BeforePlanetTotalValue: calcPriceIncreaseFormulaResp.BeforePlanetTotalValue, + // AfterPlanetTotalValue: calcPriceIncreaseFormulaResp.AfterPlanetTotalValue, + // CreateAt: now.Format("2006-01-02 15:04:05"), + // UpdateAt: now.Format("2006-01-02 15:04:05"), + //} + //_, err5 := recordsDb.PlatformRevenueDataRecordsInsertBySession(session, record) + //if err5 != nil { + // _ = session.Rollback() + // continue + //} + // + //// 更新当前数据 + //platformRevenueData := model.PlatformRevenueData{ + // Id: data.Id, + // Kind: data.Kind, + // TotalAmount: data.TotalAmount, + // BalanceAmount: utils.AnyToString(utils.AnyToFloat64(data.BalanceAmount) - amount), + // Hours: data.Hours, + // BalanceTimes: data.BalanceTimes - 1, + // Frequency: data.Frequency, + // Memo: data.Memo, + // CreateAt: data.CreateAt, + // UpdateAt: now.Format("2006-01-02 15:04:05"), + //} + // + //forceColumns := []string{ + // "balance_times", + // "balance_amount", + //} + //_, err6 := platformRevenueDataDb.PlatformRevenueDataUpdateBySession(session, platformRevenueData, forceColumns...) + //if err6 != nil { + // _ = logx.Error(err6) + // _ = session.Rollback() + // continue + //} } + + cache.Del(DealPlatformRevenueDataKey) }