广告平台(总站长使用)

4 月之前
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. package task
  2. import (
  3. taskMd "applet/app/task/md"
  4. "time"
  5. "github.com/robfig/cron/v3"
  6. "xorm.io/xorm"
  7. "applet/app/cfg"
  8. "applet/app/db"
  9. "applet/app/db/model"
  10. "applet/app/md"
  11. "applet/app/utils"
  12. "applet/app/utils/logx"
  13. )
  14. var (
  15. timer *cron.Cron
  16. jobs = map[string]func(*xorm.Engine, string){}
  17. baseEntryId cron.EntryID
  18. entryIds []cron.EntryID
  19. taskCfgList map[string]*[]model.SysCfg
  20. ch = make(chan int, 30)
  21. workerNum = 15 // 智盟跟单并发数量
  22. otherCh = make(chan int, 30)
  23. otherWorkerNum = 18 // 淘宝, 苏宁, 考拉并发量
  24. )
  25. func Init() {
  26. // 初始化任务列表
  27. initTasks()
  28. var err error
  29. timer = cron.New()
  30. // reload为初始化数据库方法
  31. if baseEntryId, err = timer.AddFunc("@every 15m", reload); err != nil {
  32. _ = logx.Fatal(err)
  33. }
  34. }
  35. func Run() {
  36. reload()
  37. timer.Start()
  38. _ = logx.Info("auto tasks running...")
  39. }
  40. func reload() {
  41. // 重新初始化数据库
  42. db.InitMapDbs(cfg.DB, cfg.Prd)
  43. if len(taskCfgList) == 0 {
  44. taskCfgList = map[string]*[]model.SysCfg{}
  45. }
  46. // 获取所有站长的配置信息
  47. for dbName, v := range db.DBs {
  48. if conf := db.MapCrontabCfg(v); conf != nil {
  49. if cfg.Debug {
  50. dbInfo := md.SplitDbInfo(v)
  51. // 去掉模版库
  52. if dbName == "000000" {
  53. continue
  54. }
  55. _ = logx.Debugf("【MasterId】%s, 【Host】%s, 【Name】%s, 【User】%s, 【prd】%v, 【Task】%v\n", dbName, dbInfo.Host, dbInfo.Name, dbInfo.User, cfg.Prd, utils.SerializeStr(*conf))
  56. }
  57. taskCfgList[dbName] = conf
  58. }
  59. }
  60. if len(taskCfgList) > 0 {
  61. // 删除原有所有任务
  62. if len(entryIds) > 0 {
  63. for _, v := range entryIds {
  64. if v != baseEntryId {
  65. timer.Remove(v)
  66. }
  67. }
  68. entryIds = nil
  69. }
  70. var (
  71. entryId cron.EntryID
  72. err error
  73. )
  74. // 添加任务
  75. for dbName, v := range taskCfgList {
  76. for _, vv := range *v {
  77. if _, ok := jobs[vv.Key]; ok && vv.Val != "" {
  78. // fmt.Println(vv.Val)
  79. if entryId, err = timer.AddFunc(vv.Val, doTask(dbName, vv.Key)); err == nil {
  80. entryIds = append(entryIds, entryId)
  81. }
  82. }
  83. }
  84. }
  85. }
  86. }
  87. func doTask(dbName, fnName string) func() {
  88. return func() {
  89. begin := time.Now().Local()
  90. jobs[fnName](db.DBs[dbName], dbName)
  91. end := time.Now().Local()
  92. logx.Infof(
  93. "[%s] AutoTask <%s> started at <%s>, ended at <%s> duration <%s>",
  94. dbName,
  95. fnName,
  96. begin.Format("2006-01-02 15:04:05.000"),
  97. end.Format("2006-01-02 15:04:05.000"),
  98. time.Duration(end.UnixNano()-begin.UnixNano()).String(),
  99. )
  100. }
  101. }
  102. // 增加自动任务队列
  103. func initTasks() {
  104. jobs[taskMd.MallCronOrderCancel] = taskCancelOrder // 取消订单
  105. }