diff --git a/app/hdl/hdl_wx_open.go b/app/hdl/hdl_wx_open.go index a551a9c..94ef4e4 100644 --- a/app/hdl/hdl_wx_open.go +++ b/app/hdl/hdl_wx_open.go @@ -4,7 +4,7 @@ import ( "applet/app/cfg" "applet/app/e" "applet/app/lib/wechat" - "applet/app/md" + md2 "applet/app/lib/wechat/md" "applet/app/utils" "applet/app/utils/cache" db "code.fnuoos.com/zhimeng/model.git/src" @@ -71,7 +71,7 @@ func SetTicket(c *gin.Context) { fmt.Println("解密结果:", reqWxMessage) utils.FilePutContents("SetTicket_XML", utils.SerializeStr(reqWxMessage)) if reqWxMessage.InfoType == "component_verify_ticket" { //TODO::微信公众平台 验证票据 - cacheKey := fmt.Sprintf(md.MasterComponentVerifyTicket, utils.AnyToString(wxOpenThirdPartyAppList.Uuid)) + cacheKey := fmt.Sprintf(md2.MasterComponentVerifyTicket, utils.AnyToString(wxOpenThirdPartyAppList.Uuid)) cacheComponentVerifyTicket, _ := cache.GetString(cacheKey) if cacheComponentVerifyTicket == "" || cacheComponentVerifyTicket != reqWxMessage.ComponentVerifyTicket { cache.SetEx(cacheKey, reqWxMessage.ComponentVerifyTicket, 43140) @@ -132,6 +132,7 @@ func GetPreAuthCode(c *gin.Context) { wxOpenThirdPartyAppListDb := implement.NewWxOpenThirdPartyAppListDb(db.Db) wxOpenThirdPartyAppList, err := wxOpenThirdPartyAppListDb.GetWxOpenThirdPartyAppList(utils.StrToInt(masterId)) if err != nil { + e.OutErr(c, e.ERR, err.Error()) return } if wxOpenThirdPartyAppList == nil { diff --git a/app/lib/wechat/md/cache_key_md.go b/app/lib/wechat/md/cache_key_md.go new file mode 100644 index 0000000..92574ed --- /dev/null +++ b/app/lib/wechat/md/cache_key_md.go @@ -0,0 +1,6 @@ +package md + +const ( + MasterComponentVerifyTicket = "master_component_verify_ticket:%s" + MasterComponentAccessToken = "master_component_access_token:%s" +) diff --git a/app/lib/wechat/wechat_api.go b/app/lib/wechat/wechat_api.go index 1ff84b9..4e54294 100644 --- a/app/lib/wechat/wechat_api.go +++ b/app/lib/wechat/wechat_api.go @@ -2,8 +2,7 @@ package wechat import ( "applet/app/cfg" - md2 "applet/app/lib/wechat/md" - "applet/app/md" + "applet/app/lib/wechat/md" "applet/app/utils" "applet/app/utils/cache" db "code.fnuoos.com/zhimeng/model.git/src" @@ -53,7 +52,7 @@ func (wxApiService *WxApiService) GetComponentAccessToken() (cacheComponentAcces if err1 != nil { return cacheComponentAccessToken, err1 } - var postData md2.GetComponentAccessToken + var postData md.GetComponentAccessToken err = json.Unmarshal(postBody, &postData) if err != nil { return @@ -96,7 +95,7 @@ func (wxApiService *WxApiService) GetPreAuthCode() (preAuthCode string, err erro if err != nil { return } - var postData md2.GetPreAuthCode + var postData md.GetPreAuthCode err = json.Unmarshal(postBody, &postData) if err != nil { return @@ -110,7 +109,7 @@ func (wxApiService *WxApiService) GetPreAuthCode() (preAuthCode string, err erro } // GetAuthorizerAccessTokenByAuthCode 使用授权码获取授权信息 -func (wxApiService *WxApiService) GetAuthorizerAccessTokenByAuthCode(authCode string) (resp md2.GetAuthorizerAccessTokenByAuthCode, err error) { // set方法 +func (wxApiService *WxApiService) GetAuthorizerAccessTokenByAuthCode(authCode string) (resp md.GetAuthorizerAccessTokenByAuthCode, err error) { // set方法 componentAccessToken, err := wxApiService.GetComponentAccessToken() if err != nil { return diff --git a/app/md/app_redis_key.go b/app/md/app_redis_key.go index b1d517c..cc582e6 100644 --- a/app/md/app_redis_key.go +++ b/app/md/app_redis_key.go @@ -15,7 +15,4 @@ const ( KEY_SYS_CFG_CACHE = "sys_cfg_cache" CfgCacheTime = 86400 - - MasterComponentVerifyTicket = "master_component_verify_ticket:%s" - MasterComponentAccessToken = "master_component_access_token:%s" ) diff --git a/app/task/init.go b/app/task/init.go index 54b3536..5546190 100644 --- a/app/task/init.go +++ b/app/task/init.go @@ -2,25 +2,23 @@ package task import ( taskMd "applet/app/task/md" + db "code.fnuoos.com/zhimeng/model.git/src" + "code.fnuoos.com/zhimeng/model.git/src/model" + "fmt" "time" "github.com/robfig/cron/v3" "xorm.io/xorm" - "applet/app/cfg" - "applet/app/db" - "applet/app/db/model" - "applet/app/md" - "applet/app/utils" "applet/app/utils/logx" ) var ( timer *cron.Cron - jobs = map[string]func(*xorm.Engine, string){} + jobs = map[string]func(*xorm.Engine){} baseEntryId cron.EntryID entryIds []cron.EntryID - taskCfgList map[string]*[]model.SysCfg + taskCfgList *[]model.SysCfg ch = make(chan int, 30) workerNum = 15 // 智盟跟单并发数量 otherCh = make(chan int, 30) @@ -45,28 +43,9 @@ func Run() { } func reload() { - // 重新初始化数据库 - db.InitMapDbs(cfg.DB, cfg.Prd) + taskCfgList = db.MapCrontabCfg(db.Db) - if len(taskCfgList) == 0 { - taskCfgList = map[string]*[]model.SysCfg{} - } - - // 获取所有站长的配置信息 - for dbName, v := range db.DBs { - if conf := db.MapCrontabCfg(v); conf != nil { - if cfg.Debug { - dbInfo := md.SplitDbInfo(v) - // 去掉模版库 - if dbName == "000000" { - continue - } - _ = logx.Debugf("【MasterId】%s, 【Host】%s, 【Name】%s, 【User】%s, 【prd】%v, 【Task】%v\n", dbName, dbInfo.Host, dbInfo.Name, dbInfo.User, cfg.Prd, utils.SerializeStr(*conf)) - } - taskCfgList[dbName] = conf - } - } - if len(taskCfgList) > 0 { + if len(*taskCfgList) > 0 { // 删除原有所有任务 if len(entryIds) > 0 { for _, v := range entryIds { @@ -81,28 +60,24 @@ func reload() { err error ) // 添加任务 - for dbName, v := range taskCfgList { - for _, vv := range *v { - if _, ok := jobs[vv.Key]; ok && vv.Val != "" { - // fmt.Println(vv.Val) - if entryId, err = timer.AddFunc(vv.Val, doTask(dbName, vv.Key)); err == nil { - entryIds = append(entryIds, entryId) - } + for _, v := range *taskCfgList { + if _, ok := jobs[v.K]; ok && v.V != "" { + fmt.Println(v.V) + if entryId, err = timer.AddFunc(v.V, doTask(v.K)); err == nil { + entryIds = append(entryIds, entryId) } } } - } } -func doTask(dbName, fnName string) func() { +func doTask(fnName string) func() { return func() { begin := time.Now().Local() - jobs[fnName](db.DBs[dbName], dbName) + jobs[fnName](db.Db) end := time.Now().Local() logx.Infof( "[%s] AutoTask <%s> started at <%s>, ended at <%s> duration <%s>", - dbName, fnName, begin.Format("2006-01-02 15:04:05.000"), end.Format("2006-01-02 15:04:05.000"), @@ -113,5 +88,5 @@ func doTask(dbName, fnName string) func() { // 增加自动任务队列 func initTasks() { - jobs[taskMd.MallCronOrderCancel] = taskCancelOrder // 取消订单 + jobs[taskMd.CronCalcMasterDataStatistics] = taskCalcMasterDataStatistics // 计算站长数据统计 } diff --git a/app/task/md/cron_key.go b/app/task/md/cron_key.go index b38ccc8..90fbf65 100644 --- a/app/task/md/cron_key.go +++ b/app/task/md/cron_key.go @@ -1,5 +1,5 @@ package md const ( - MallCronOrderCancel = "mall_cron_order_cancel" // 取消订单任务 + CronCalcMasterDataStatistics = "cron_calc_master_data_statistics" // 计算站长数据统计 ) diff --git a/app/task/svc/svc_cancel_order.go b/app/task/svc/svc_cancel_order.go index 0c35b5f..7a299c0 100644 --- a/app/task/svc/svc_cancel_order.go +++ b/app/task/svc/svc_cancel_order.go @@ -1,64 +1,151 @@ package svc import ( - "applet/app/db" + "applet/app/cfg" "applet/app/utils" + "applet/app/utils/cache" "applet/app/utils/logx" + db "code.fnuoos.com/zhimeng/model.git/src" + "code.fnuoos.com/zhimeng/model.git/src/model" + "code.fnuoos.com/zhimeng/model.git/src/super/implement" + model2 "code.fnuoos.com/zhimeng/model.git/src/super/model" "errors" "fmt" "time" "xorm.io/xorm" ) -func CancelOrder(eg *xorm.Engine, dbName string) { - fmt.Println("cancel order...") +const PessimismLockKeyForCalcMasterDataStatistics = "calc_master_data_statistics_pessimism_lock_key" +const PessimismLockValueForCalcMasterDataStatistics = "running" + +func CalcMasterDataStatistics(eg *xorm.Engine) { + now := time.Now() + if now.Hour() > 1 && now.Hour() > 8 { + //TODO::只在凌晨 1 点 ~ 凌晨 8 点运行 + fmt.Println("err>>>>>>>", errors.New("非运行时间")) + logx.Warn(errors.New("非运行时间")) + return + } + + //TODO::增加“悲观锁”防止串行 + getString, _ := cache.GetString(PessimismLockKeyForCalcMasterDataStatistics) + //if err != nil { + // return err + //} + if getString == PessimismLockValueForCalcMasterDataStatistics { + fmt.Println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!", "上一次结算未执行完") + logx.Warn(errors.New("上一次结算未执行完")) + return + } + cache.SetEx(PessimismLockKeyForCalcMasterDataStatistics, PessimismLockValueForCalcMasterDataStatistics, 3600*8) //8小时 + + fmt.Println("calc master data statistics...") defer func() { if err := recover(); err != nil { _ = logx.Error(err) } }() - - timeStr, err := getCancelCfg(eg, dbName) - if err != nil { - fmt.Println(err.Error()) - return + var tables *[]model.DbMapping + if cfg.Prd { + tables = db.GetAllDatabasePrd() //默认获取全部 + } else { + tables = db.GetAllDatabaseDev() //默认获取全部 } - now := time.Now() - // x 分钟后取消订单 - expTime := now.Add(-time.Hour * time.Duration(utils.StrToInt64(timeStr))) - expTimeStr := utils.Time2String(expTime, "") - - page := 1 - - for { - isEmpty, err := handleOnePage(eg, dbName, expTimeStr) + masterDataStatisticsDb := implement.NewMasterDataStatisticsDb(eg) + year, month, _ := now.Date() + todayDate := now.AddDate(0, 0, -1).Format("2006-01-02") + sevenDayDate := now.AddDate(0, 0, -7).Format("2006-01-02") + thisMonthDate := time.Date(year, month, 1, 0, 0, 0, 0, time.Local).Format("2006-01-02") + lastMonthDate := time.Date(year, month-1, 1, 0, 0, 0, 0, time.Local).Format("2006-01-02") + for _, table := range *tables { + //1.1、查询今日 generate_wx_ad_data 数据 + var todayGenerateWxAdDatas []model2.GenerateWxAdData + err := eg.Where("uuid =? and date =?", table.DbMasterId, todayDate).Find(&todayGenerateWxAdDatas) if err != nil { - _ = logx.Error(err) - break - } - if isEmpty { - break + fmt.Println("err>>>>>>>", err.Error()) + logx.Warn(err) } - if page > 100 { - break + //1.2、查询7日 generate_wx_ad_data 数据 + var sevenDayGenerateWxAdDatas []model2.GenerateWxAdData + err = eg.Where("uuid =? and date >= ?", table.DbMasterId, sevenDayDate).Find(&sevenDayGenerateWxAdDatas) + if err != nil { + fmt.Println("err>>>>>>>", err.Error()) + logx.Warn(err) } - page += 1 - - } -} + //1.3、查询本月 generate_wx_ad_data 数据 + var thisMonthGenerateWxAdDatas []model2.GenerateWxAdData + err = eg.Where("uuid =? and date >= ?", table.DbMasterId, thisMonthDate).Find(&thisMonthGenerateWxAdDatas) + if err != nil { + fmt.Println("err>>>>>>>", err.Error()) + logx.Warn(err) + } -func handleOnePage(eg *xorm.Engine, dbName, expTimeStr string) (isEmpty bool, err error) { - return false, nil -} + //1.4、查询上月 generate_wx_ad_data 数据 + var lastMonthGenerateWxAdDatas []model2.GenerateWxAdData + err = eg.Where("uuid =? and date >= ? and date < ?", table.DbMasterId, lastMonthDate, thisMonthDate).Find(&lastMonthGenerateWxAdDatas) + if err != nil { + fmt.Println("err>>>>>>>", err.Error()) + logx.Warn(err) + } -func getCancelCfg(eg *xorm.Engine, masterId string) (string, error) { - cfg := db.SysCfgGetWithDb(eg, masterId, "order_expiration_time") + //2、计算数据 + var todayIncome, sevenDayIncome, thisMonthIncome, lastMonthIncome, + mediumTodayIncome, mediumSevenDayIncome, mediumThisMonthIncome, mediumLastMonthIncome, + agentTodayIncome, agentSevenDayIncome, agentThisMonthIncome, agentLastMonthIncome, + mediumTodayExposureCount, mediumSevenDayExposureCount, mediumThisMonthExposureCount, mediumLastMonthExposureCount int - if cfg == "" { - return "", errors.New("order_expiration_time no found") + for _, todayGenerateWxAdData := range todayGenerateWxAdDatas { + todayIncome += todayGenerateWxAdData.PlatformRetention + todayGenerateWxAdData.CommissionRetention + todayGenerateWxAdData.PriceAdjustmentRetention + mediumTodayIncome += todayGenerateWxAdData.MediaRevenue + mediumTodayExposureCount += todayGenerateWxAdData.ExposureCount + agentTodayIncome += todayGenerateWxAdData.AgentRevenue + } + for _, sevenDayGenerateWxAdData := range sevenDayGenerateWxAdDatas { + sevenDayIncome += sevenDayGenerateWxAdData.PlatformRetention + sevenDayGenerateWxAdData.CommissionRetention + sevenDayGenerateWxAdData.PriceAdjustmentRetention + mediumSevenDayIncome += sevenDayGenerateWxAdData.MediaRevenue + mediumSevenDayExposureCount += sevenDayGenerateWxAdData.ExposureCount + agentSevenDayIncome += sevenDayGenerateWxAdData.AgentRevenue + } + for _, thisMonthGenerateWxAdData := range thisMonthGenerateWxAdDatas { + thisMonthIncome += thisMonthGenerateWxAdData.PlatformRetention + thisMonthGenerateWxAdData.CommissionRetention + thisMonthGenerateWxAdData.PriceAdjustmentRetention + mediumThisMonthIncome += thisMonthGenerateWxAdData.MediaRevenue + mediumThisMonthExposureCount += thisMonthGenerateWxAdData.ExposureCount + agentThisMonthIncome += thisMonthGenerateWxAdData.AgentRevenue + } + for _, lastMonthGenerateWxAdData := range lastMonthGenerateWxAdDatas { + lastMonthIncome += lastMonthGenerateWxAdData.PlatformRetention + lastMonthGenerateWxAdData.CommissionRetention + lastMonthGenerateWxAdData.PriceAdjustmentRetention + mediumLastMonthIncome += lastMonthGenerateWxAdData.MediaRevenue + mediumLastMonthExposureCount += lastMonthGenerateWxAdData.ExposureCount + agentLastMonthIncome += lastMonthGenerateWxAdData.AgentRevenue + } + _, err = masterDataStatisticsDb.MasterDataStatisticsInsert(&model2.MasterDataStatistics{ + Uuid: utils.StrToInt(table.DbMasterId), + TodayIncome: todayIncome, + SevenDayIncome: sevenDayIncome, + ThisMonthIncome: thisMonthIncome, + LastMonthIncome: lastMonthIncome, + MediumTodayIncome: mediumTodayIncome, + MediumSevenDayIncome: mediumSevenDayIncome, + MediumThisMonthIncome: mediumThisMonthIncome, + MediumLastMonthIncome: mediumLastMonthIncome, + AgentTodayIncome: agentTodayIncome, + AgentSevenDayIncome: agentSevenDayIncome, + AgentThisMonthIncome: agentThisMonthIncome, + AgentLastMonthIncome: agentLastMonthIncome, + MediumTodayExposureCount: mediumTodayExposureCount, + MediumSevenDayExposureCount: mediumSevenDayExposureCount, + MediumThisMonthExposureCount: mediumThisMonthExposureCount, + MediumLastMonthExposureCount: mediumLastMonthExposureCount, + Date: todayDate, + CreateAt: now.Format("2006-01-02 15:04:05"), + UpdateAt: now.Format("2006-01-02 15:04:05"), + }) + if err != nil { + fmt.Println("err>>>>>>>", err.Error()) + logx.Warn(err) + } } - return cfg, nil } diff --git a/app/task/task_cancel_order.go b/app/task/task_cancel_order.go index 2e45bbb..5ddae31 100644 --- a/app/task/task_cancel_order.go +++ b/app/task/task_cancel_order.go @@ -7,8 +7,8 @@ import ( "xorm.io/xorm" ) -// 取消订单 -func taskCancelOrder(eg *xorm.Engine, dbName string) { +// 计算站长数据统计 +func taskCalcMasterDataStatistics(eg *xorm.Engine) { for { if len(ch) > workerNum { time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000))) @@ -18,6 +18,6 @@ func taskCancelOrder(eg *xorm.Engine, dbName string) { } START: ch <- 1 - svc.CancelOrder(eg, dbName) + svc.CalcMasterDataStatistics(eg) <-ch } diff --git a/go.mod b/go.mod index 28e31b0..6336f5a 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,7 @@ module applet go 1.18 -//replace code.fnuoos.com/zhimeng/model.git => E:/company/ad/models +replace code.fnuoos.com/zhimeng/model.git => E:/company/ad/models require ( code.fnuoos.com/zhimeng/model.git v0.0.3-0.20240821082038-bfd73b32452e diff --git a/main.go b/main.go index 4b1ba53..b094ca9 100644 --- a/main.go +++ b/main.go @@ -1,7 +1,7 @@ package main import ( - "code.fnuoos.com/zhimeng/model.git/src" + db "code.fnuoos.com/zhimeng/model.git/src" "context" "fmt" "log"