golang 的 rabbitmq 消费项目
Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

2 anos atrás
2 anos atrás
2 anos atrás
2 anos atrás
2 anos atrás
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package task
  2. import (
  3. "time"
  4. "github.com/robfig/cron/v3"
  5. "xorm.io/xorm"
  6. "applet/app/cfg"
  7. "applet/app/db"
  8. "applet/app/db/model"
  9. "applet/app/md"
  10. "applet/app/utils"
  11. "applet/app/utils/logx"
  12. )
  13. var (
  14. timer *cron.Cron
  15. jobs = map[string]func(*xorm.Engine, string){}
  16. baseEntryId cron.EntryID
  17. entryIds []cron.EntryID
  18. taskCfgList map[string]*[]model.SysCfg
  19. )
  20. func Init() {
  21. // 初始化任务列表
  22. initTasks()
  23. var err error
  24. timer = cron.New()
  25. // reload为初始化数据库方法
  26. if baseEntryId, err = timer.AddFunc("@every 15m", reload); err != nil {
  27. _ = logx.Fatal(err)
  28. }
  29. }
  30. func Run() {
  31. reload()
  32. timer.Start()
  33. _ = logx.Info("auto tasks running...")
  34. }
  35. func reload() {
  36. // 重新初始化数据库
  37. db.InitMapDbs(cfg.DB, cfg.Prd)
  38. if len(taskCfgList) == 0 {
  39. taskCfgList = map[string]*[]model.SysCfg{}
  40. }
  41. // 获取所有站长的配置信息
  42. for dbName, v := range db.DBs {
  43. if conf := db.MapCrontabCfg(v); conf != nil {
  44. if cfg.Debug {
  45. dbInfo := md.SplitDbInfo(v)
  46. // 去掉模版库
  47. if dbName == "000000" {
  48. continue
  49. }
  50. _ = logx.Debugf("【MasterId】%s, 【Host】%s, 【Name】%s, 【User】%s, 【prd】%v, 【Task】%v\n", dbName, dbInfo.Host, dbInfo.Name, dbInfo.User, cfg.Prd, utils.SerializeStr(*conf))
  51. }
  52. taskCfgList[dbName] = conf
  53. }
  54. }
  55. if len(taskCfgList) > 0 {
  56. // 删除原有所有任务
  57. if len(entryIds) > 0 {
  58. for _, v := range entryIds {
  59. if v != baseEntryId {
  60. timer.Remove(v)
  61. }
  62. }
  63. entryIds = nil
  64. }
  65. var (
  66. entryId cron.EntryID
  67. err error
  68. )
  69. // 添加任务
  70. for dbName, v := range taskCfgList {
  71. for _, vv := range *v {
  72. if _, ok := jobs[vv.Key]; ok && vv.Val != "" {
  73. // fmt.Println(vv.Val)
  74. if entryId, err = timer.AddFunc(vv.Val, doTask(dbName, vv.Key)); err == nil {
  75. entryIds = append(entryIds, entryId)
  76. }
  77. }
  78. }
  79. }
  80. }
  81. }
  82. func doTask(dbName, fnName string) func() {
  83. return func() {
  84. begin := time.Now().Local()
  85. jobs[fnName](db.DBs[dbName], dbName)
  86. end := time.Now().Local()
  87. logx.Infof(
  88. "[%s] AutoTask <%s> started at <%s>, ended at <%s> duration <%s>",
  89. dbName,
  90. fnName,
  91. begin.Format("2006-01-02 15:04:05.000"),
  92. end.Format("2006-01-02 15:04:05.000"),
  93. time.Duration(end.UnixNano()-begin.UnixNano()).String(),
  94. )
  95. }
  96. }
  97. // 增加自动任务队列
  98. func initTasks() {
  99. //jobs[taskMd.MallCronOrderCancel] = taskCancelOrder // 取消订单
  100. }