package task import ( taskMd "applet/app/task/md" db "code.fnuoos.com/zhimeng/model.git/src" "code.fnuoos.com/zhimeng/model.git/src/model" "fmt" "time" "github.com/robfig/cron/v3" "xorm.io/xorm" "applet/app/utils/logx" ) var ( timer *cron.Cron jobs = map[string]func(*xorm.Engine){} baseEntryId cron.EntryID entryIds []cron.EntryID taskCfgList *[]model.SysCfg ch = make(chan int, 30) workerNum = 15 // 智盟跟单并发数量 otherCh = make(chan int, 30) ) func Init() { // 初始化任务列表 initTasks() var err error timer = cron.New() // reload为初始化数据库方法 if baseEntryId, err = timer.AddFunc("@every 15m", reload); err != nil { _ = logx.Fatal(err) } } func Run() { reload() timer.Start() _ = logx.Info("auto tasks running...") } // MapCrontabCfg 获取自动任务队列 func MapCrontabCfg(eg *xorm.Engine) *[]model.SysCfg { var c []model.SysCfg // 数据库查询如果有下划线会认为是一个任意字符 if err := eg.Where("`k` LIKE 'cron\\_%' AND v != ''").Cols("`k`,`v`").Find(&c); err != nil || len(c) == 0 { logx.Warn(err) return nil } return &c } func reload() { taskCfgList = MapCrontabCfg(db.Db) if len(*taskCfgList) > 0 { // 删除原有所有任务 if len(entryIds) > 0 { for _, v := range entryIds { if v != baseEntryId { timer.Remove(v) } } entryIds = nil } var ( entryId cron.EntryID err error ) // 添加任务 for _, v := range *taskCfgList { if _, ok := jobs[v.K]; ok && v.V != "" { fmt.Println(v.V) if entryId, err = timer.AddFunc(v.V, doTask(v.K)); err == nil { entryIds = append(entryIds, entryId) } } } } } func doTask(fnName string) func() { return func() { begin := time.Now().Local() jobs[fnName](db.Db) end := time.Now().Local() logx.Infof( "[%s] AutoTask <%s> started at <%s>, ended at <%s> duration <%s>", fnName, begin.Format("2006-01-02 15:04:05.000"), end.Format("2006-01-02 15:04:05.000"), time.Duration(end.UnixNano()-begin.UnixNano()).String(), ) } } // 增加自动任务队列 func initTasks() { jobs[taskMd.CronCalcMasterDataStatistics] = taskCalcMasterDataStatistics // 计算站长数据统计 jobs[taskMd.CronMediumSettlementState] = taskMediumSettlementState // 媒体结算单状态 jobs[taskMd.CronAgentSettlementState] = taskAgentSettlementState // 代理结算单状态 }