附近小店
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

4 月之前
4 月之前
4 月之前
4 月之前
4 月之前
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package task
  2. import (
  3. "github.com/robfig/cron/v3"
  4. "time"
  5. "applet/app/cfg"
  6. "applet/app/db"
  7. "applet/app/db/model"
  8. "applet/app/md"
  9. "applet/app/utils"
  10. "applet/app/utils/logx"
  11. "xorm.io/xorm"
  12. )
  13. var (
  14. timer *cron.Cron
  15. jobs = map[string]func(*xorm.Engine, string){}
  16. baseEntryId cron.EntryID
  17. entryIds []cron.EntryID
  18. taskCfgList map[string]*[]model.SysCfg
  19. ch = make(chan int, 50)
  20. workerNum = 50 // 智盟跟单并发数量
  21. )
  22. func Init() {
  23. // 初始化任务列表
  24. initTasks()
  25. var err error
  26. timer = cron.New()
  27. if baseEntryId, err = timer.AddFunc("@every 20m", reload); err != nil {
  28. _ = logx.Fatal(err)
  29. }
  30. }
  31. func Run() {
  32. reload()
  33. timer.Start()
  34. _ = logx.Info("auto tasks running...")
  35. }
  36. func reload() {
  37. // 重新初始化数据库
  38. db.InitMapDbs(cfg.DB, cfg.Prd)
  39. if len(taskCfgList) == 0 {
  40. taskCfgList = map[string]*[]model.SysCfg{}
  41. }
  42. // 获取所有站长的配置信息
  43. for dbName, v := range db.DBs {
  44. if conf := db.MapCrontabCfg(v); conf != nil {
  45. if cfg.Debug {
  46. dbInfo := md.SplitDbInfo(v)
  47. // 去掉模版库
  48. if dbName == "000000" {
  49. continue
  50. }
  51. _ = logx.Debugf("【MasterId】%s, 【Host】%s, 【Name】%s, 【User】%s, 【prd】%v, 【Task】%v\n", dbName, dbInfo.Host, dbInfo.Name, dbInfo.User, cfg.Prd, utils.SerializeStr(*conf))
  52. }
  53. taskCfgList[dbName] = conf
  54. }
  55. }
  56. if len(taskCfgList) > 0 {
  57. // 删除原有所有任务
  58. if len(entryIds) > 0 {
  59. for _, v := range entryIds {
  60. if v != baseEntryId {
  61. timer.Remove(v)
  62. }
  63. }
  64. entryIds = nil
  65. }
  66. var (
  67. entryId cron.EntryID
  68. err error
  69. )
  70. // 添加任务
  71. for dbName, v := range taskCfgList {
  72. for _, vv := range *v {
  73. if _, ok := jobs[vv.Key]; ok && vv.Val != "" {
  74. // fmt.Println(vv.Val)
  75. if entryId, err = timer.AddFunc(vv.Val, doTask(dbName, vv.Key)); err == nil {
  76. entryIds = append(entryIds, entryId)
  77. }
  78. }
  79. }
  80. }
  81. }
  82. }
  83. func doTask(dbName, fnName string) func() {
  84. return func() {
  85. begin := time.Now().Local()
  86. jobs[fnName](db.DBs[dbName], dbName)
  87. end := time.Now().Local()
  88. logx.Infof(
  89. "[%s] AutoTask <%s> started at <%s>, ended at <%s> duration <%s>",
  90. dbName,
  91. fnName,
  92. begin.Format("2006-01-02 15:04:05.000"),
  93. end.Format("2006-01-02 15:04:05.000"),
  94. time.Duration(end.UnixNano()-begin.UnixNano()).String(),
  95. )
  96. }
  97. }
  98. // 增加自动任务队列
  99. func initTasks() {
  100. jobs[md.KEY_CFG_CRON_COMMUNITY_TEAM_ORDER_CANCEL] = taskOrderCancel //
  101. jobs[md.KEY_CFG_CRON_COMMUNITY_TEAM_ORDER_STORE_SETTLE] = taskOrderStoreSettle //
  102. }