|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- package task
-
- import (
- taskMd "applet/app/task/md"
- "time"
-
- "github.com/robfig/cron/v3"
- "xorm.io/xorm"
-
- "applet/app/cfg"
- "applet/app/db"
- "applet/app/db/model"
- "applet/app/md"
- "applet/app/utils"
- "applet/app/utils/logx"
- )
-
- var (
- timer *cron.Cron
- jobs = map[string]func(*xorm.Engine, string){}
- baseEntryId cron.EntryID
- entryIds []cron.EntryID
- taskCfgList map[string]*[]model.SysCfg
- ch = make(chan int, 30)
- workerNum = 15 // 智盟跟单并发数量
- otherCh = make(chan int, 30)
- otherWorkerNum = 18 // 淘宝, 苏宁, 考拉并发量
- )
-
- func Init() {
- // 初始化任务列表
- initTasks()
- var err error
- timer = cron.New()
- // reload为初始化数据库方法
- if baseEntryId, err = timer.AddFunc("@every 15m", reload); err != nil {
- _ = logx.Fatal(err)
- }
- }
-
- func Run() {
- reload()
- timer.Start()
- _ = logx.Info("auto tasks running...")
- }
-
- func reload() {
- // 重新初始化数据库
- db.InitMapDbs(cfg.DB, cfg.Prd)
-
- if len(taskCfgList) == 0 {
- taskCfgList = map[string]*[]model.SysCfg{}
- }
-
- // 获取所有站长的配置信息
- for dbName, v := range db.DBs {
- if conf := db.MapCrontabCfg(v); conf != nil {
- if cfg.Debug {
- dbInfo := md.SplitDbInfo(v)
- // 去掉模版库
- if dbName == "000000" {
- continue
- }
- _ = logx.Debugf("【MasterId】%s, 【Host】%s, 【Name】%s, 【User】%s, 【prd】%v, 【Task】%v\n", dbName, dbInfo.Host, dbInfo.Name, dbInfo.User, cfg.Prd, utils.SerializeStr(*conf))
- }
- taskCfgList[dbName] = conf
- }
- }
- if len(taskCfgList) > 0 {
- // 删除原有所有任务
- if len(entryIds) > 0 {
- for _, v := range entryIds {
- if v != baseEntryId {
- timer.Remove(v)
- }
- }
- entryIds = nil
- }
- var (
- entryId cron.EntryID
- err error
- )
- // 添加任务
- for dbName, v := range taskCfgList {
- for _, vv := range *v {
- if _, ok := jobs[vv.Key]; ok && vv.Val != "" {
- // fmt.Println(vv.Val)
- if entryId, err = timer.AddFunc(vv.Val, doTask(dbName, vv.Key)); err == nil {
- entryIds = append(entryIds, entryId)
- }
- }
- }
- }
-
- }
- }
-
- func doTask(dbName, fnName string) func() {
- return func() {
- begin := time.Now().Local()
- jobs[fnName](db.DBs[dbName], dbName)
- end := time.Now().Local()
- logx.Infof(
- "[%s] AutoTask <%s> started at <%s>, ended at <%s> duration <%s>",
- dbName,
- fnName,
- begin.Format("2006-01-02 15:04:05.000"),
- end.Format("2006-01-02 15:04:05.000"),
- time.Duration(end.UnixNano()-begin.UnixNano()).String(),
- )
- }
- }
-
- // 增加自动任务队列
- func initTasks() {
- jobs[taskMd.MallCronOrderCancel] = taskCancelOrder // 取消订单
- }
|