@@ -4,7 +4,7 @@ import ( | |||||
"applet/app/cfg" | "applet/app/cfg" | ||||
"applet/app/e" | "applet/app/e" | ||||
"applet/app/lib/wechat" | "applet/app/lib/wechat" | ||||
"applet/app/md" | |||||
md2 "applet/app/lib/wechat/md" | |||||
"applet/app/utils" | "applet/app/utils" | ||||
"applet/app/utils/cache" | "applet/app/utils/cache" | ||||
db "code.fnuoos.com/zhimeng/model.git/src" | db "code.fnuoos.com/zhimeng/model.git/src" | ||||
@@ -71,7 +71,7 @@ func SetTicket(c *gin.Context) { | |||||
fmt.Println("解密结果:", reqWxMessage) | fmt.Println("解密结果:", reqWxMessage) | ||||
utils.FilePutContents("SetTicket_XML", utils.SerializeStr(reqWxMessage)) | utils.FilePutContents("SetTicket_XML", utils.SerializeStr(reqWxMessage)) | ||||
if reqWxMessage.InfoType == "component_verify_ticket" { //TODO::微信公众平台 验证票据 | if reqWxMessage.InfoType == "component_verify_ticket" { //TODO::微信公众平台 验证票据 | ||||
cacheKey := fmt.Sprintf(md.MasterComponentVerifyTicket, utils.AnyToString(wxOpenThirdPartyAppList.Uuid)) | |||||
cacheKey := fmt.Sprintf(md2.MasterComponentVerifyTicket, utils.AnyToString(wxOpenThirdPartyAppList.Uuid)) | |||||
cacheComponentVerifyTicket, _ := cache.GetString(cacheKey) | cacheComponentVerifyTicket, _ := cache.GetString(cacheKey) | ||||
if cacheComponentVerifyTicket == "" || cacheComponentVerifyTicket != reqWxMessage.ComponentVerifyTicket { | if cacheComponentVerifyTicket == "" || cacheComponentVerifyTicket != reqWxMessage.ComponentVerifyTicket { | ||||
cache.SetEx(cacheKey, reqWxMessage.ComponentVerifyTicket, 43140) | cache.SetEx(cacheKey, reqWxMessage.ComponentVerifyTicket, 43140) | ||||
@@ -132,6 +132,7 @@ func GetPreAuthCode(c *gin.Context) { | |||||
wxOpenThirdPartyAppListDb := implement.NewWxOpenThirdPartyAppListDb(db.Db) | wxOpenThirdPartyAppListDb := implement.NewWxOpenThirdPartyAppListDb(db.Db) | ||||
wxOpenThirdPartyAppList, err := wxOpenThirdPartyAppListDb.GetWxOpenThirdPartyAppList(utils.StrToInt(masterId)) | wxOpenThirdPartyAppList, err := wxOpenThirdPartyAppListDb.GetWxOpenThirdPartyAppList(utils.StrToInt(masterId)) | ||||
if err != nil { | if err != nil { | ||||
e.OutErr(c, e.ERR, err.Error()) | |||||
return | return | ||||
} | } | ||||
if wxOpenThirdPartyAppList == nil { | if wxOpenThirdPartyAppList == nil { | ||||
@@ -0,0 +1,6 @@ | |||||
package md | |||||
const ( | |||||
MasterComponentVerifyTicket = "master_component_verify_ticket:%s" | |||||
MasterComponentAccessToken = "master_component_access_token:%s" | |||||
) |
@@ -2,8 +2,7 @@ package wechat | |||||
import ( | import ( | ||||
"applet/app/cfg" | "applet/app/cfg" | ||||
md2 "applet/app/lib/wechat/md" | |||||
"applet/app/md" | |||||
"applet/app/lib/wechat/md" | |||||
"applet/app/utils" | "applet/app/utils" | ||||
"applet/app/utils/cache" | "applet/app/utils/cache" | ||||
db "code.fnuoos.com/zhimeng/model.git/src" | db "code.fnuoos.com/zhimeng/model.git/src" | ||||
@@ -53,7 +52,7 @@ func (wxApiService *WxApiService) GetComponentAccessToken() (cacheComponentAcces | |||||
if err1 != nil { | if err1 != nil { | ||||
return cacheComponentAccessToken, err1 | return cacheComponentAccessToken, err1 | ||||
} | } | ||||
var postData md2.GetComponentAccessToken | |||||
var postData md.GetComponentAccessToken | |||||
err = json.Unmarshal(postBody, &postData) | err = json.Unmarshal(postBody, &postData) | ||||
if err != nil { | if err != nil { | ||||
return | return | ||||
@@ -96,7 +95,7 @@ func (wxApiService *WxApiService) GetPreAuthCode() (preAuthCode string, err erro | |||||
if err != nil { | if err != nil { | ||||
return | return | ||||
} | } | ||||
var postData md2.GetPreAuthCode | |||||
var postData md.GetPreAuthCode | |||||
err = json.Unmarshal(postBody, &postData) | err = json.Unmarshal(postBody, &postData) | ||||
if err != nil { | if err != nil { | ||||
return | return | ||||
@@ -110,7 +109,7 @@ func (wxApiService *WxApiService) GetPreAuthCode() (preAuthCode string, err erro | |||||
} | } | ||||
// GetAuthorizerAccessTokenByAuthCode 使用授权码获取授权信息 | // GetAuthorizerAccessTokenByAuthCode 使用授权码获取授权信息 | ||||
func (wxApiService *WxApiService) GetAuthorizerAccessTokenByAuthCode(authCode string) (resp md2.GetAuthorizerAccessTokenByAuthCode, err error) { // set方法 | |||||
func (wxApiService *WxApiService) GetAuthorizerAccessTokenByAuthCode(authCode string) (resp md.GetAuthorizerAccessTokenByAuthCode, err error) { // set方法 | |||||
componentAccessToken, err := wxApiService.GetComponentAccessToken() | componentAccessToken, err := wxApiService.GetComponentAccessToken() | ||||
if err != nil { | if err != nil { | ||||
return | return | ||||
@@ -15,7 +15,4 @@ const ( | |||||
KEY_SYS_CFG_CACHE = "sys_cfg_cache" | KEY_SYS_CFG_CACHE = "sys_cfg_cache" | ||||
CfgCacheTime = 86400 | CfgCacheTime = 86400 | ||||
MasterComponentVerifyTicket = "master_component_verify_ticket:%s" | |||||
MasterComponentAccessToken = "master_component_access_token:%s" | |||||
) | ) |
@@ -2,25 +2,23 @@ package task | |||||
import ( | import ( | ||||
taskMd "applet/app/task/md" | taskMd "applet/app/task/md" | ||||
db "code.fnuoos.com/zhimeng/model.git/src" | |||||
"code.fnuoos.com/zhimeng/model.git/src/model" | |||||
"fmt" | |||||
"time" | "time" | ||||
"github.com/robfig/cron/v3" | "github.com/robfig/cron/v3" | ||||
"xorm.io/xorm" | "xorm.io/xorm" | ||||
"applet/app/cfg" | |||||
"applet/app/db" | |||||
"applet/app/db/model" | |||||
"applet/app/md" | |||||
"applet/app/utils" | |||||
"applet/app/utils/logx" | "applet/app/utils/logx" | ||||
) | ) | ||||
var ( | var ( | ||||
timer *cron.Cron | timer *cron.Cron | ||||
jobs = map[string]func(*xorm.Engine, string){} | |||||
jobs = map[string]func(*xorm.Engine){} | |||||
baseEntryId cron.EntryID | baseEntryId cron.EntryID | ||||
entryIds []cron.EntryID | entryIds []cron.EntryID | ||||
taskCfgList map[string]*[]model.SysCfg | |||||
taskCfgList *[]model.SysCfg | |||||
ch = make(chan int, 30) | ch = make(chan int, 30) | ||||
workerNum = 15 // 智盟跟单并发数量 | workerNum = 15 // 智盟跟单并发数量 | ||||
otherCh = make(chan int, 30) | otherCh = make(chan int, 30) | ||||
@@ -45,28 +43,9 @@ func Run() { | |||||
} | } | ||||
func reload() { | func reload() { | ||||
// 重新初始化数据库 | |||||
db.InitMapDbs(cfg.DB, cfg.Prd) | |||||
taskCfgList = db.MapCrontabCfg(db.Db) | |||||
if len(taskCfgList) == 0 { | |||||
taskCfgList = map[string]*[]model.SysCfg{} | |||||
} | |||||
// 获取所有站长的配置信息 | |||||
for dbName, v := range db.DBs { | |||||
if conf := db.MapCrontabCfg(v); conf != nil { | |||||
if cfg.Debug { | |||||
dbInfo := md.SplitDbInfo(v) | |||||
// 去掉模版库 | |||||
if dbName == "000000" { | |||||
continue | |||||
} | |||||
_ = logx.Debugf("【MasterId】%s, 【Host】%s, 【Name】%s, 【User】%s, 【prd】%v, 【Task】%v\n", dbName, dbInfo.Host, dbInfo.Name, dbInfo.User, cfg.Prd, utils.SerializeStr(*conf)) | |||||
} | |||||
taskCfgList[dbName] = conf | |||||
} | |||||
} | |||||
if len(taskCfgList) > 0 { | |||||
if len(*taskCfgList) > 0 { | |||||
// 删除原有所有任务 | // 删除原有所有任务 | ||||
if len(entryIds) > 0 { | if len(entryIds) > 0 { | ||||
for _, v := range entryIds { | for _, v := range entryIds { | ||||
@@ -81,28 +60,24 @@ func reload() { | |||||
err error | err error | ||||
) | ) | ||||
// 添加任务 | // 添加任务 | ||||
for dbName, v := range taskCfgList { | |||||
for _, vv := range *v { | |||||
if _, ok := jobs[vv.Key]; ok && vv.Val != "" { | |||||
// fmt.Println(vv.Val) | |||||
if entryId, err = timer.AddFunc(vv.Val, doTask(dbName, vv.Key)); err == nil { | |||||
entryIds = append(entryIds, entryId) | |||||
} | |||||
for _, v := range *taskCfgList { | |||||
if _, ok := jobs[v.K]; ok && v.V != "" { | |||||
fmt.Println(v.V) | |||||
if entryId, err = timer.AddFunc(v.V, doTask(v.K)); err == nil { | |||||
entryIds = append(entryIds, entryId) | |||||
} | } | ||||
} | } | ||||
} | } | ||||
} | } | ||||
} | } | ||||
func doTask(dbName, fnName string) func() { | |||||
func doTask(fnName string) func() { | |||||
return func() { | return func() { | ||||
begin := time.Now().Local() | begin := time.Now().Local() | ||||
jobs[fnName](db.DBs[dbName], dbName) | |||||
jobs[fnName](db.Db) | |||||
end := time.Now().Local() | end := time.Now().Local() | ||||
logx.Infof( | logx.Infof( | ||||
"[%s] AutoTask <%s> started at <%s>, ended at <%s> duration <%s>", | "[%s] AutoTask <%s> started at <%s>, ended at <%s> duration <%s>", | ||||
dbName, | |||||
fnName, | fnName, | ||||
begin.Format("2006-01-02 15:04:05.000"), | begin.Format("2006-01-02 15:04:05.000"), | ||||
end.Format("2006-01-02 15:04:05.000"), | end.Format("2006-01-02 15:04:05.000"), | ||||
@@ -113,5 +88,5 @@ func doTask(dbName, fnName string) func() { | |||||
// 增加自动任务队列 | // 增加自动任务队列 | ||||
func initTasks() { | func initTasks() { | ||||
jobs[taskMd.MallCronOrderCancel] = taskCancelOrder // 取消订单 | |||||
jobs[taskMd.CronCalcMasterDataStatistics] = taskCalcMasterDataStatistics // 计算站长数据统计 | |||||
} | } |
@@ -1,5 +1,5 @@ | |||||
package md | package md | ||||
const ( | const ( | ||||
MallCronOrderCancel = "mall_cron_order_cancel" // 取消订单任务 | |||||
CronCalcMasterDataStatistics = "cron_calc_master_data_statistics" // 计算站长数据统计 | |||||
) | ) |
@@ -1,64 +1,151 @@ | |||||
package svc | package svc | ||||
import ( | import ( | ||||
"applet/app/db" | |||||
"applet/app/cfg" | |||||
"applet/app/utils" | "applet/app/utils" | ||||
"applet/app/utils/cache" | |||||
"applet/app/utils/logx" | "applet/app/utils/logx" | ||||
db "code.fnuoos.com/zhimeng/model.git/src" | |||||
"code.fnuoos.com/zhimeng/model.git/src/model" | |||||
"code.fnuoos.com/zhimeng/model.git/src/super/implement" | |||||
model2 "code.fnuoos.com/zhimeng/model.git/src/super/model" | |||||
"errors" | "errors" | ||||
"fmt" | "fmt" | ||||
"time" | "time" | ||||
"xorm.io/xorm" | "xorm.io/xorm" | ||||
) | ) | ||||
func CancelOrder(eg *xorm.Engine, dbName string) { | |||||
fmt.Println("cancel order...") | |||||
const PessimismLockKeyForCalcMasterDataStatistics = "calc_master_data_statistics_pessimism_lock_key" | |||||
const PessimismLockValueForCalcMasterDataStatistics = "running" | |||||
func CalcMasterDataStatistics(eg *xorm.Engine) { | |||||
now := time.Now() | |||||
if now.Hour() > 1 && now.Hour() > 8 { | |||||
//TODO::只在凌晨 1 点 ~ 凌晨 8 点运行 | |||||
fmt.Println("err>>>>>>>", errors.New("非运行时间")) | |||||
logx.Warn(errors.New("非运行时间")) | |||||
return | |||||
} | |||||
//TODO::增加“悲观锁”防止串行 | |||||
getString, _ := cache.GetString(PessimismLockKeyForCalcMasterDataStatistics) | |||||
//if err != nil { | |||||
// return err | |||||
//} | |||||
if getString == PessimismLockValueForCalcMasterDataStatistics { | |||||
fmt.Println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!", "上一次结算未执行完") | |||||
logx.Warn(errors.New("上一次结算未执行完")) | |||||
return | |||||
} | |||||
cache.SetEx(PessimismLockKeyForCalcMasterDataStatistics, PessimismLockValueForCalcMasterDataStatistics, 3600*8) //8小时 | |||||
fmt.Println("calc master data statistics...") | |||||
defer func() { | defer func() { | ||||
if err := recover(); err != nil { | if err := recover(); err != nil { | ||||
_ = logx.Error(err) | _ = logx.Error(err) | ||||
} | } | ||||
}() | }() | ||||
timeStr, err := getCancelCfg(eg, dbName) | |||||
if err != nil { | |||||
fmt.Println(err.Error()) | |||||
return | |||||
var tables *[]model.DbMapping | |||||
if cfg.Prd { | |||||
tables = db.GetAllDatabasePrd() //默认获取全部 | |||||
} else { | |||||
tables = db.GetAllDatabaseDev() //默认获取全部 | |||||
} | } | ||||
now := time.Now() | |||||
// x 分钟后取消订单 | |||||
expTime := now.Add(-time.Hour * time.Duration(utils.StrToInt64(timeStr))) | |||||
expTimeStr := utils.Time2String(expTime, "") | |||||
page := 1 | |||||
for { | |||||
isEmpty, err := handleOnePage(eg, dbName, expTimeStr) | |||||
masterDataStatisticsDb := implement.NewMasterDataStatisticsDb(eg) | |||||
year, month, _ := now.Date() | |||||
todayDate := now.AddDate(0, 0, -1).Format("2006-01-02") | |||||
sevenDayDate := now.AddDate(0, 0, -7).Format("2006-01-02") | |||||
thisMonthDate := time.Date(year, month, 1, 0, 0, 0, 0, time.Local).Format("2006-01-02") | |||||
lastMonthDate := time.Date(year, month-1, 1, 0, 0, 0, 0, time.Local).Format("2006-01-02") | |||||
for _, table := range *tables { | |||||
//1.1、查询今日 generate_wx_ad_data 数据 | |||||
var todayGenerateWxAdDatas []model2.GenerateWxAdData | |||||
err := eg.Where("uuid =? and date =?", table.DbMasterId, todayDate).Find(&todayGenerateWxAdDatas) | |||||
if err != nil { | if err != nil { | ||||
_ = logx.Error(err) | |||||
break | |||||
} | |||||
if isEmpty { | |||||
break | |||||
fmt.Println("err>>>>>>>", err.Error()) | |||||
logx.Warn(err) | |||||
} | } | ||||
if page > 100 { | |||||
break | |||||
//1.2、查询7日 generate_wx_ad_data 数据 | |||||
var sevenDayGenerateWxAdDatas []model2.GenerateWxAdData | |||||
err = eg.Where("uuid =? and date >= ?", table.DbMasterId, sevenDayDate).Find(&sevenDayGenerateWxAdDatas) | |||||
if err != nil { | |||||
fmt.Println("err>>>>>>>", err.Error()) | |||||
logx.Warn(err) | |||||
} | } | ||||
page += 1 | |||||
} | |||||
} | |||||
//1.3、查询本月 generate_wx_ad_data 数据 | |||||
var thisMonthGenerateWxAdDatas []model2.GenerateWxAdData | |||||
err = eg.Where("uuid =? and date >= ?", table.DbMasterId, thisMonthDate).Find(&thisMonthGenerateWxAdDatas) | |||||
if err != nil { | |||||
fmt.Println("err>>>>>>>", err.Error()) | |||||
logx.Warn(err) | |||||
} | |||||
func handleOnePage(eg *xorm.Engine, dbName, expTimeStr string) (isEmpty bool, err error) { | |||||
return false, nil | |||||
} | |||||
//1.4、查询上月 generate_wx_ad_data 数据 | |||||
var lastMonthGenerateWxAdDatas []model2.GenerateWxAdData | |||||
err = eg.Where("uuid =? and date >= ? and date < ?", table.DbMasterId, lastMonthDate, thisMonthDate).Find(&lastMonthGenerateWxAdDatas) | |||||
if err != nil { | |||||
fmt.Println("err>>>>>>>", err.Error()) | |||||
logx.Warn(err) | |||||
} | |||||
func getCancelCfg(eg *xorm.Engine, masterId string) (string, error) { | |||||
cfg := db.SysCfgGetWithDb(eg, masterId, "order_expiration_time") | |||||
//2、计算数据 | |||||
var todayIncome, sevenDayIncome, thisMonthIncome, lastMonthIncome, | |||||
mediumTodayIncome, mediumSevenDayIncome, mediumThisMonthIncome, mediumLastMonthIncome, | |||||
agentTodayIncome, agentSevenDayIncome, agentThisMonthIncome, agentLastMonthIncome, | |||||
mediumTodayExposureCount, mediumSevenDayExposureCount, mediumThisMonthExposureCount, mediumLastMonthExposureCount int | |||||
if cfg == "" { | |||||
return "", errors.New("order_expiration_time no found") | |||||
for _, todayGenerateWxAdData := range todayGenerateWxAdDatas { | |||||
todayIncome += todayGenerateWxAdData.PlatformRetention + todayGenerateWxAdData.CommissionRetention + todayGenerateWxAdData.PriceAdjustmentRetention | |||||
mediumTodayIncome += todayGenerateWxAdData.MediaRevenue | |||||
mediumTodayExposureCount += todayGenerateWxAdData.ExposureCount | |||||
agentTodayIncome += todayGenerateWxAdData.AgentRevenue | |||||
} | |||||
for _, sevenDayGenerateWxAdData := range sevenDayGenerateWxAdDatas { | |||||
sevenDayIncome += sevenDayGenerateWxAdData.PlatformRetention + sevenDayGenerateWxAdData.CommissionRetention + sevenDayGenerateWxAdData.PriceAdjustmentRetention | |||||
mediumSevenDayIncome += sevenDayGenerateWxAdData.MediaRevenue | |||||
mediumSevenDayExposureCount += sevenDayGenerateWxAdData.ExposureCount | |||||
agentSevenDayIncome += sevenDayGenerateWxAdData.AgentRevenue | |||||
} | |||||
for _, thisMonthGenerateWxAdData := range thisMonthGenerateWxAdDatas { | |||||
thisMonthIncome += thisMonthGenerateWxAdData.PlatformRetention + thisMonthGenerateWxAdData.CommissionRetention + thisMonthGenerateWxAdData.PriceAdjustmentRetention | |||||
mediumThisMonthIncome += thisMonthGenerateWxAdData.MediaRevenue | |||||
mediumThisMonthExposureCount += thisMonthGenerateWxAdData.ExposureCount | |||||
agentThisMonthIncome += thisMonthGenerateWxAdData.AgentRevenue | |||||
} | |||||
for _, lastMonthGenerateWxAdData := range lastMonthGenerateWxAdDatas { | |||||
lastMonthIncome += lastMonthGenerateWxAdData.PlatformRetention + lastMonthGenerateWxAdData.CommissionRetention + lastMonthGenerateWxAdData.PriceAdjustmentRetention | |||||
mediumLastMonthIncome += lastMonthGenerateWxAdData.MediaRevenue | |||||
mediumLastMonthExposureCount += lastMonthGenerateWxAdData.ExposureCount | |||||
agentLastMonthIncome += lastMonthGenerateWxAdData.AgentRevenue | |||||
} | |||||
_, err = masterDataStatisticsDb.MasterDataStatisticsInsert(&model2.MasterDataStatistics{ | |||||
Uuid: utils.StrToInt(table.DbMasterId), | |||||
TodayIncome: todayIncome, | |||||
SevenDayIncome: sevenDayIncome, | |||||
ThisMonthIncome: thisMonthIncome, | |||||
LastMonthIncome: lastMonthIncome, | |||||
MediumTodayIncome: mediumTodayIncome, | |||||
MediumSevenDayIncome: mediumSevenDayIncome, | |||||
MediumThisMonthIncome: mediumThisMonthIncome, | |||||
MediumLastMonthIncome: mediumLastMonthIncome, | |||||
AgentTodayIncome: agentTodayIncome, | |||||
AgentSevenDayIncome: agentSevenDayIncome, | |||||
AgentThisMonthIncome: agentThisMonthIncome, | |||||
AgentLastMonthIncome: agentLastMonthIncome, | |||||
MediumTodayExposureCount: mediumTodayExposureCount, | |||||
MediumSevenDayExposureCount: mediumSevenDayExposureCount, | |||||
MediumThisMonthExposureCount: mediumThisMonthExposureCount, | |||||
MediumLastMonthExposureCount: mediumLastMonthExposureCount, | |||||
Date: todayDate, | |||||
CreateAt: now.Format("2006-01-02 15:04:05"), | |||||
UpdateAt: now.Format("2006-01-02 15:04:05"), | |||||
}) | |||||
if err != nil { | |||||
fmt.Println("err>>>>>>>", err.Error()) | |||||
logx.Warn(err) | |||||
} | |||||
} | } | ||||
return cfg, nil | |||||
} | } |
@@ -7,8 +7,8 @@ import ( | |||||
"xorm.io/xorm" | "xorm.io/xorm" | ||||
) | ) | ||||
// 取消订单 | |||||
func taskCancelOrder(eg *xorm.Engine, dbName string) { | |||||
// 计算站长数据统计 | |||||
func taskCalcMasterDataStatistics(eg *xorm.Engine) { | |||||
for { | for { | ||||
if len(ch) > workerNum { | if len(ch) > workerNum { | ||||
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000))) | time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000))) | ||||
@@ -18,6 +18,6 @@ func taskCancelOrder(eg *xorm.Engine, dbName string) { | |||||
} | } | ||||
START: | START: | ||||
ch <- 1 | ch <- 1 | ||||
svc.CancelOrder(eg, dbName) | |||||
svc.CalcMasterDataStatistics(eg) | |||||
<-ch | <-ch | ||||
} | } |
@@ -2,7 +2,7 @@ module applet | |||||
go 1.18 | go 1.18 | ||||
//replace code.fnuoos.com/zhimeng/model.git => E:/company/ad/models | |||||
replace code.fnuoos.com/zhimeng/model.git => E:/company/ad/models | |||||
require ( | require ( | ||||
code.fnuoos.com/zhimeng/model.git v0.0.3-0.20240821082038-bfd73b32452e | code.fnuoos.com/zhimeng/model.git v0.0.3-0.20240821082038-bfd73b32452e | ||||
@@ -1,7 +1,7 @@ | |||||
package main | package main | ||||
import ( | import ( | ||||
"code.fnuoos.com/zhimeng/model.git/src" | |||||
db "code.fnuoos.com/zhimeng/model.git/src" | |||||
"context" | "context" | ||||
"fmt" | "fmt" | ||||
"log" | "log" | ||||