Browse Source

update task

master
dengbiao 1 month ago
parent
commit
1530a35eb0
9 changed files with 114 additions and 310 deletions
  1. +26
    -50
      app/task/init.go
  2. +3
    -1
      app/task/md/cron_key.go
  3. +0
    -64
      app/task/svc/svc_cancel_order.go
  4. +13
    -89
      app/task/svc/svc_deal_fund_data.go
  5. +16
    -96
      app/task/svc/svc_deal_platform_revenue_data.go
  6. +7
    -7
      app/task/svc/svc_save_egg_energy_price.go
  7. +23
    -0
      app/task/task_egg_energy_auto_record_price.go
  8. +3
    -3
      app/task/task_egg_energy_deal_fund_data.go
  9. +23
    -0
      app/task/task_egg_energy_deal_platform_revenue_data.go

+ 26
- 50
app/task/init.go View File

@@ -1,30 +1,26 @@
package task

import (
"applet/app/db"
taskMd "applet/app/task/md"
"code.fnuoos.com/EggPlanet/egg_models.git/src/implement"
"code.fnuoos.com/EggPlanet/egg_models.git/src/model"
"fmt"
"time"

"applet/app/utils/logx"
"github.com/robfig/cron/v3"
"xorm.io/xorm"

"applet/app/cfg"
"applet/app/db"
"applet/app/db/model"
"applet/app/md"
"applet/app/utils"
"applet/app/utils/logx"
)

var (
timer *cron.Cron
jobs = map[string]func(*xorm.Engine, string){}
baseEntryId cron.EntryID
entryIds []cron.EntryID
taskCfgList map[string]*[]model.SysCfg
ch = make(chan int, 30)
workerNum = 15 // 智盟跟单并发数量
otherCh = make(chan int, 30)
otherWorkerNum = 18 // 淘宝, 苏宁, 考拉并发量
timer *cron.Cron
jobs = map[string]func(*xorm.Engine){}
baseEntryId cron.EntryID
entryIds []cron.EntryID
taskCfgList *[]model.CronTask
ch = make(chan int, 30)
workerNum = 15 // 并发数量
)

func Init() {
@@ -33,7 +29,7 @@ func Init() {
var err error
timer = cron.New()
// reload为初始化数据库方法
if baseEntryId, err = timer.AddFunc("@every 15m", reload); err != nil {
if baseEntryId, err = timer.AddFunc("@every 30m", reload); err != nil {
_ = logx.Fatal(err)
}
}
@@ -45,28 +41,10 @@ func Run() {
}

func reload() {
// 重新初始化数据库
db.InitMapDbs(cfg.DB, cfg.Prd)

if len(taskCfgList) == 0 {
taskCfgList = map[string]*[]model.SysCfg{}
}
cronTaskDb := implement.NewCronTaskDb(db.Db)
taskCfgList = cronTaskDb.MapCrontabCfg()

// 获取所有站长的配置信息
for dbName, v := range db.DBs {
if conf := db.MapCrontabCfg(v); conf != nil {
if cfg.Debug {
dbInfo := md.SplitDbInfo(v)
// 去掉模版库
if dbName == "000000" {
continue
}
_ = logx.Debugf("【MasterId】%s, 【Host】%s, 【Name】%s, 【User】%s, 【prd】%v, 【Task】%v\n", dbName, dbInfo.Host, dbInfo.Name, dbInfo.User, cfg.Prd, utils.SerializeStr(*conf))
}
taskCfgList[dbName] = conf
}
}
if len(taskCfgList) > 0 {
if len(*taskCfgList) > 0 {
// 删除原有所有任务
if len(entryIds) > 0 {
for _, v := range entryIds {
@@ -81,28 +59,24 @@ func reload() {
err error
)
// 添加任务
for dbName, v := range taskCfgList {
for _, vv := range *v {
if _, ok := jobs[vv.Key]; ok && vv.Val != "" {
// fmt.Println(vv.Val)
if entryId, err = timer.AddFunc(vv.Val, doTask(dbName, vv.Key)); err == nil {
entryIds = append(entryIds, entryId)
}
for _, v := range *taskCfgList {
if _, ok := jobs[v.Key]; ok && v.Val != "" {
fmt.Println(v.Val)
if entryId, err = timer.AddFunc(v.Val, doTask(v.Key)); err == nil {
entryIds = append(entryIds, entryId)
}
}
}

}
}

func doTask(dbName, fnName string) func() {
func doTask(fnName string) func() {
return func() {
begin := time.Now().Local()
jobs[fnName](db.DBs[dbName], dbName)
jobs[fnName](db.Db)
end := time.Now().Local()
logx.Infof(
"[%s] AutoTask <%s> started at <%s>, ended at <%s> duration <%s>",
dbName,
fnName,
begin.Format("2006-01-02 15:04:05.000"),
end.Format("2006-01-02 15:04:05.000"),
@@ -113,5 +87,7 @@ func doTask(dbName, fnName string) func() {

// 增加自动任务队列
func initTasks() {
jobs[taskMd.MallCronOrderCancel] = taskCancelOrder // 取消订单
jobs[taskMd.CronEggEnergyAutoRecordPrices] = taskEggEnergyAutoRecordPrices
jobs[taskMd.CronEggEnergyDealPlatformRevenueData] = taskEggEnergyDealPlatformRevenueData
jobs[taskMd.CronEggEnergyDealFundData] = taskEggEnergyDealFundData
}

+ 3
- 1
app/task/md/cron_key.go View File

@@ -1,5 +1,7 @@
package md

const (
MallCronOrderCancel = "mall_cron_order_cancel" // 取消订单任务
CronEggEnergyAutoRecordPrices = "cron_egg_energy_auto_record_price" // 自动记录价格
CronEggEnergyDealPlatformRevenueData = "cron_egg_energy_deal_platform_revenue_data" // 处理平台收益
CronEggEnergyDealFundData = "cron_egg_energy_deal_fund_data" // 处理价值投入
)

+ 0
- 64
app/task/svc/svc_cancel_order.go View File

@@ -1,64 +0,0 @@
package svc

import (
"applet/app/db"
"applet/app/utils"
"applet/app/utils/logx"
"errors"
"fmt"
"time"
"xorm.io/xorm"
)

func CancelOrder(eg *xorm.Engine, dbName string) {
fmt.Println("cancel order...")
defer func() {
if err := recover(); err != nil {
_ = logx.Error(err)
}
}()

timeStr, err := getCancelCfg(eg, dbName)
if err != nil {
fmt.Println(err.Error())
return
}

now := time.Now()
// x 分钟后取消订单
expTime := now.Add(-time.Hour * time.Duration(utils.StrToInt64(timeStr)))
expTimeStr := utils.Time2String(expTime, "")

page := 1

for {
isEmpty, err := handleOnePage(eg, dbName, expTimeStr)
if err != nil {
_ = logx.Error(err)
break
}
if isEmpty {
break
}

if page > 100 {
break
}

page += 1

}
}

func handleOnePage(eg *xorm.Engine, dbName, expTimeStr string) (isEmpty bool, err error) {
return false, nil
}

func getCancelCfg(eg *xorm.Engine, masterId string) (string, error) {
cfg := db.SysCfgGetWithDb(eg, masterId, "order_expiration_time")

if cfg == "" {
return "", errors.New("order_expiration_time no found")
}
return cfg, nil
}

+ 13
- 89
app/task/svc/svc_deal_fund_data.go View File

@@ -12,10 +12,11 @@ import (
"xorm.io/xorm"
)

const DealFundDataKey = "deal_fund_data_key"
const EggEnergyDealFundDataKey = "egg_energy_deal_fund_data_key"

func DealFundData(eg *xorm.Engine, dbName string, ch *rabbit.Channel) {
fmt.Println("deal_fund_data...")
// EggEnergyDealFundData 处理价值投入
func EggEnergyDealFundData(eg *xorm.Engine) {
fmt.Println("egg_energy_deal_fund_data...")
defer func() {
if err := recover(); err != nil {
fmt.Println(err)
@@ -24,12 +25,13 @@ func DealFundData(eg *xorm.Engine, dbName string, ch *rabbit.Channel) {
}()

// 悲观锁防止串行
getString, _ := cache.GetString(DealFundDataKey)
getString, _ := cache.GetString(EggEnergyDealFundDataKey)
if getString != "" {
fmt.Println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!", "上一次处理未执行完")
return
}
cache.SetEx(DealFundDataKey, "running", 60*30) //30分钟
cache.SetEx(EggEnergyDealFundDataKey, "running", 60*30) //30分钟
defer cache.Del(EggEnergyDealFundDataKey)

// 查询所有未被执行完的数据
fundDataDb := implement.NewEggEnergyFundDataDb(eg)
@@ -39,6 +41,12 @@ func DealFundData(eg *xorm.Engine, dbName string, ch *rabbit.Channel) {
return
}

ch, err := rabbit.Cfg.Pool.GetChannel()
if err != nil {
return
}
defer ch.Release()

now := time.Now()
fundDataRecordsDb := implement.NewEggEnergyFundDataRecordsDb(eg)
for _, data := range fundDataList {
@@ -75,89 +83,5 @@ func DealFundData(eg *xorm.Engine, dbName string, ch *rabbit.Channel) {
ID: data.Id,
Amount: amount.String(),
}, md2.EggEnergyRoutKeyForEggEnergyFundData)

//session := eg.NewSession()
//session.Begin()
//
//eggEnergyCoreData, cb, err2 := svc.GetEggEnergyCoreData(eg)
//if err2 != nil {
// _ = logx.Error(err2)
//}
//if cb != nil {
// defer cb() // 释放锁
//}
////计算涨价公式
//err3, calcPriceIncreaseFormulaResp := egg_energy.CalcPriceIncreaseFormula(utils.AnyToString(amount), eggEnergyCoreData)
//if err3 != nil {
// _ = logx.Error(err3)
// _ = session.Rollback()
// session.Close()
// continue
//}
//
//dealAvailableEggEnergyCoinReq := md2.DealAvailableEggEnergyCoinReq{
// Amount: calcPriceIncreaseFormulaResp.GetEggEnergyNums,
// AmountFee: "",
// BeforePrice: calcPriceIncreaseFormulaResp.BeforePrice,
// AfterPrice: calcPriceIncreaseFormulaResp.AfterPrice,
// BeforePlanetTotalValue: calcPriceIncreaseFormulaResp.BeforePlanetTotalValue,
// AfterPlanetTotalValue: calcPriceIncreaseFormulaResp.AfterPlanetTotalValue,
// BeforeEnergyTotalNums: calcPriceIncreaseFormulaResp.BeforeEnergyTotalNums,
// AfterEnergyTotalNums: calcPriceIncreaseFormulaResp.AfterEnergyTotalNums,
//}
//
//err4 := egg_energy.DealAvailableEggEnergyCoin(session, int(enum.CapitalInjection), eggEnergyCoreData, dealAvailableEggEnergyCoinReq)
//if err4 != nil {
// fmt.Println("ActivityCoinAutoExchangeEggPersonEnergy:::::err111:::", err4)
// _ = session.Rollback()
// session.Close()
// continue
//}
//
//// 插入资金数据详细数据
//record := model.EggEnergyFundDataRecords{
// RecordsId: data.Id,
// TotalAmount: data.TotalAmount,
// BalanceAmount: utils.AnyToString(utils.AnyToFloat64(data.BalanceAmount) - amount),
// BalanceTimes: data.BalanceTimes - 1,
// BeforePrice: calcPriceIncreaseFormulaResp.BeforePrice,
// AfterPrice: calcPriceIncreaseFormulaResp.AfterPrice,
// BeforePlanetTotalValue: calcPriceIncreaseFormulaResp.BeforePlanetTotalValue,
// AfterPlanetTotalValue: calcPriceIncreaseFormulaResp.AfterPlanetTotalValue,
// CreateAt: now.Format("2006-01-02 15:04:05"),
// UpdateAt: now.Format("2006-01-02 15:04:05"),
//}
//_, err5 := fundDataRecordsDb.EggEnergyFundDataRecordsInsertBySession(session, record)
//if err5 != nil {
// _ = session.Rollback()
// continue
//}
//
//// 更新当前数据
//fundData := model.EggEnergyFundData{
// Id: data.Id,
// Kind: data.Kind,
// TotalAmount: data.TotalAmount,
// BalanceAmount: utils.AnyToString(utils.AnyToFloat64(data.BalanceAmount) - amount),
// Hours: data.Hours,
// BalanceTimes: data.BalanceTimes - 1,
// Frequency: data.Frequency,
// Memo: data.Memo,
// CreateAt: data.CreateAt,
// UpdateAt: now.Format("2006-01-02 15:04:05"),
//}
//
//forceColumns := []string{
// "balance_times",
// "balance_amount",
//}
//_, err6 := fundDataDb.EggEnergyFundDataUpdateBySession(session, fundData, forceColumns...)
//if err6 != nil {
// _ = logx.Error(err6)
// _ = session.Rollback()
// continue
//}
}

cache.Del(DealFundDataKey)
}

+ 16
- 96
app/task/svc/svc_deal_platform_revenue_data.go View File

@@ -12,10 +12,11 @@ import (
"xorm.io/xorm"
)

const DealPlatformRevenueDataKey = "deal_platform_revenue_data"
const EggEnergyDealPlatformRevenueDataKey = "egg_energy_deal_platform_revenue_data"

func DealPlatformRevenueData(eg *xorm.Engine, dbName string, ch *rabbit.Channel) {
fmt.Println("deal_platform_revenue_data...")
// EggEnergyDealPlatformRevenueData 蛋蛋能量-处理平台收益
func EggEnergyDealPlatformRevenueData(eg *xorm.Engine) {
fmt.Println("egg_energy_deal_platform_revenue_data...")
defer func() {
if err := recover(); err != nil {
fmt.Println(err)
@@ -24,28 +25,35 @@ func DealPlatformRevenueData(eg *xorm.Engine, dbName string, ch *rabbit.Channel)
}()

// 悲观锁防止串行
getString, _ := cache.GetString(DealFundDataKey)
getString, _ := cache.GetString(EggEnergyDealPlatformRevenueDataKey)
if getString != "" {
fmt.Println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!", "上一次处理未执行完")
return
}
cache.SetEx(DealFundDataKey, "running", 60*30) //30分钟
cache.SetEx(EggEnergyDealPlatformRevenueDataKey, "running", 60*30) //30分钟
defer cache.Del(EggEnergyDealPlatformRevenueDataKey)

// 查询所有未被执行完的数据
platformRevenueDataDb := implement.NewPlatformRevenueDataDb(eg)
platformRevenueDataList, err := platformRevenueDataDb.PlatformRevenueDataFindNotFinish()
if err != nil {
fmt.Println("DealPlatformRevenueData_ERR:::::", err.Error())
fmt.Println("EggEnergyDealPlatformRevenueData_ERR:::::", err.Error())
return
}

ch, err := rabbit.Cfg.Pool.GetChannel()
if err != nil {
return
}
defer ch.Release()

now := time.Now()
recordsDb := implement.NewPlatformRevenueDataRecordsDb(eg)
for _, data := range platformRevenueDataList {
// 1、判断是否是第一次执行
lastRecord, err1 := recordsDb.PlatformRevenueDataRecordsGetLast(data.Id)
if err1 != nil {
fmt.Println("DealPlatformRevenueData_ERR:::::", err1.Error())
fmt.Println("EggEnergyDealPlatformRevenueData_ERR:::::", err1.Error())
continue
}
if lastRecord == nil {
@@ -65,7 +73,7 @@ func DealPlatformRevenueData(eg *xorm.Engine, dbName string, ch *rabbit.Channel)
times := decimal.NewFromInt(int64(data.Hours * 60)).Div(frequency).RoundFloor(8)
totalAmount, err2 := decimal.NewFromString(data.TotalAmount)
if err2 != nil {
fmt.Println("DealPlatformRevenueData_ERR:::::", err2.Error())
fmt.Println("EggEnergyDealPlatformRevenueData_ERR:::::", err2.Error())
continue
}
amount := totalAmount.Div(times)
@@ -75,93 +83,5 @@ func DealPlatformRevenueData(eg *xorm.Engine, dbName string, ch *rabbit.Channel)
ID: data.Id,
Amount: amount.String(),
}, md2.EggEnergyRoutKeyForPlatformRevenueData)

//session := eg.NewSession()
//defer func() {
// session.Close()
// if err := recover(); err != nil {
// _ = zhios_order_relate_logx.Error(err)
// }
//}()
//session.Begin()
//
//eggEnergyCoreData, cb, err2 := svc.GetEggEnergyCoreData(eg)
//if err2 != nil {
// _ = logx.Error(err2)
//}
//if cb != nil {
// defer cb() // 释放锁
//}
////计算涨价公式
//err3, calcPriceIncreaseFormulaResp := egg_energy.CalcPriceIncreaseFormula(utils.AnyToString(amount), eggEnergyCoreData)
//if err3 != nil {
// _ = logx.Error(err3)
// _ = session.Rollback()
// continue
//}
//
//dealAvailableEggEnergyCoinReq := md2.DealAvailableEggEnergyCoinReq{
// Amount: calcPriceIncreaseFormulaResp.GetEggEnergyNums,
// AmountFee: "",
// BeforePrice: calcPriceIncreaseFormulaResp.BeforePrice,
// AfterPrice: calcPriceIncreaseFormulaResp.AfterPrice,
// BeforePlanetTotalValue: calcPriceIncreaseFormulaResp.BeforePlanetTotalValue,
// AfterPlanetTotalValue: calcPriceIncreaseFormulaResp.AfterPlanetTotalValue,
// BeforeEnergyTotalNums: calcPriceIncreaseFormulaResp.BeforeEnergyTotalNums,
// AfterEnergyTotalNums: calcPriceIncreaseFormulaResp.AfterEnergyTotalNums,
//}
//
//err4 := egg_energy.DealAvailableEggEnergyCoin(session, int(enum.CapitalInjection), eggEnergyCoreData, dealAvailableEggEnergyCoinReq)
//if err4 != nil {
// fmt.Println("ActivityCoinAutoExchangeEggPersonEnergy:::::err111:::", err4)
// _ = session.Rollback()
// continue
//}
//
//// 插入平台营收详细数据
//record := model.PlatformRevenueDataRecords{
// RecordsId: data.Id,
// TotalAmount: data.TotalAmount,
// BalanceAmount: utils.AnyToString(utils.AnyToFloat64(data.BalanceAmount) - amount),
// BalanceTimes: data.BalanceTimes - 1,
// BeforePrice: calcPriceIncreaseFormulaResp.BeforePrice,
// AfterPrice: calcPriceIncreaseFormulaResp.AfterPrice,
// BeforePlanetTotalValue: calcPriceIncreaseFormulaResp.BeforePlanetTotalValue,
// AfterPlanetTotalValue: calcPriceIncreaseFormulaResp.AfterPlanetTotalValue,
// CreateAt: now.Format("2006-01-02 15:04:05"),
// UpdateAt: now.Format("2006-01-02 15:04:05"),
//}
//_, err5 := recordsDb.PlatformRevenueDataRecordsInsertBySession(session, record)
//if err5 != nil {
// _ = session.Rollback()
// continue
//}
//
//// 更新当前数据
//platformRevenueData := model.PlatformRevenueData{
// Id: data.Id,
// Kind: data.Kind,
// TotalAmount: data.TotalAmount,
// BalanceAmount: utils.AnyToString(utils.AnyToFloat64(data.BalanceAmount) - amount),
// Hours: data.Hours,
// BalanceTimes: data.BalanceTimes - 1,
// Frequency: data.Frequency,
// Memo: data.Memo,
// CreateAt: data.CreateAt,
// UpdateAt: now.Format("2006-01-02 15:04:05"),
//}
//
//forceColumns := []string{
// "balance_times",
// "balance_amount",
//}
//_, err6 := platformRevenueDataDb.PlatformRevenueDataUpdateBySession(session, platformRevenueData, forceColumns...)
//if err6 != nil {
// _ = logx.Error(err6)
// _ = session.Rollback()
// continue
//}
}

cache.Del(DealPlatformRevenueDataKey)
}

+ 7
- 7
app/task/svc/svc_save_egg_energy_price.go View File

@@ -11,9 +11,9 @@ import (
"xorm.io/xorm"
)

// SaveEggEnergyPrice 记录蛋蛋能量价格
func SaveEggEnergyPrice(engine *xorm.Engine, dbName string) {
fmt.Println("save_egg_energy_price...")
// EggEnergyAutoRecordPrices 蛋蛋能量-自动记录价格
func EggEnergyAutoRecordPrices(engine *xorm.Engine) {
fmt.Println("egg_energy_auto_record_price...")
defer func() {
if err := recover(); err != nil {
fmt.Println(err)
@@ -29,7 +29,7 @@ func SaveEggEnergyPrice(engine *xorm.Engine, dbName string) {
priceDb := implement.NewEggEnergyPriceDb(engine)
lastPrice, has, err := priceDb.EggEnergyPriceGetLastOne()
if err != nil {
fmt.Println("SaveEggEnergyPrice_ERR:::::", err.Error())
fmt.Println("EggEnergyAutoRecordPrices_ERR:::::", err.Error())
return
}
if has {
@@ -43,7 +43,7 @@ func SaveEggEnergyPrice(engine *xorm.Engine, dbName string) {
// 3. 查询当前价格
eggEnergyCoreData, cb, err1 := svc.GetEggEnergyCoreData(engine)
if err1 != nil {
fmt.Println("SaveEggEnergyPrice_ERR:::::", err1.Error())
fmt.Println("EggEnergyAutoRecordPrices_ERR:::::", err1.Error())
return
}
if cb != nil {
@@ -58,11 +58,11 @@ func SaveEggEnergyPrice(engine *xorm.Engine, dbName string) {
}
id, err2 := priceDb.EggEnergyPriceInsert(&priceData)
if err2 != nil {
fmt.Println("SaveEggEnergyPrice_ERR:::::", err2.Error())
fmt.Println("EggEnergyAutoRecordPrices_ERR:::::", err2.Error())
return
}
if id == 0 {
fmt.Println("SaveEggEnergyPrice_ERR:::::", errors.New(fmt.Sprintf("%s 新增价格失败, 当前价格 %s", now.Format("2006-01-02 15:04:05"), eggEnergyCoreData.NowPrice)))
fmt.Println("EggEnergyAutoRecordPrices_ERR:::::", errors.New(fmt.Sprintf("%s 新增价格失败, 当前价格 %s", now.Format("2006-01-02 15:04:05"), eggEnergyCoreData.NowPrice)))
return
}
return


+ 23
- 0
app/task/task_egg_energy_auto_record_price.go View File

@@ -0,0 +1,23 @@
package task

import (
"applet/app/task/svc"
"math/rand"
"time"
"xorm.io/xorm"
)

// 自动记录价格
func taskEggEnergyAutoRecordPrices(eg *xorm.Engine) {
for {
if len(ch) > workerNum {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
} else {
goto START
}
}
START:
ch <- 1
svc.EggEnergyAutoRecordPrices(eg)
<-ch
}

app/task/task_cancel_order.go → app/task/task_egg_energy_deal_fund_data.go View File

@@ -7,8 +7,8 @@ import (
"xorm.io/xorm"
)

// 取消订单
func taskCancelOrder(eg *xorm.Engine, dbName string) {
// 处理价值投入
func taskEggEnergyDealFundData(eg *xorm.Engine) {
for {
if len(ch) > workerNum {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
@@ -18,6 +18,6 @@ func taskCancelOrder(eg *xorm.Engine, dbName string) {
}
START:
ch <- 1
svc.CancelOrder(eg, dbName)
svc.EggEnergyDealFundData(eg)
<-ch
}

+ 23
- 0
app/task/task_egg_energy_deal_platform_revenue_data.go View File

@@ -0,0 +1,23 @@
package task

import (
"applet/app/task/svc"
"math/rand"
"time"
"xorm.io/xorm"
)

// 处理平台收益
func taskEggEnergyDealPlatformRevenueData(eg *xorm.Engine) {
for {
if len(ch) > workerNum {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
} else {
goto START
}
}
START:
ch <- 1
svc.EggEnergyDealPlatformRevenueData(eg)
<-ch
}

Loading…
Cancel
Save