蛋蛋星球RabbitMq消费项目
Não pode escolher mais do que 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.
 
 
 
 

109 linhas
2.4 KiB

  1. package consume
  2. import (
  3. "applet/app/cfg"
  4. "applet/app/db"
  5. "applet/app/utils"
  6. "applet/app/utils/logx"
  7. "applet/consume/md"
  8. "code.fnuoos.com/EggPlanet/egg_models.git/src/implement"
  9. "code.fnuoos.com/EggPlanet/egg_models.git/src/model"
  10. "code.fnuoos.com/EggPlanet/egg_system_rules.git"
  11. "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule"
  12. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  13. "encoding/json"
  14. "errors"
  15. "fmt"
  16. "github.com/streadway/amqp"
  17. "sort"
  18. "time"
  19. )
  20. func EggCanalUserDownConsume(queue md.MqQueue) {
  21. fmt.Println(">>>>>>>>>>>>EggCanalUserDownConsume>>>>>>>>>>>>")
  22. ch, err := rabbit.Cfg.Pool.GetChannel()
  23. if err != nil {
  24. logx.Error(err)
  25. return
  26. }
  27. defer ch.Release()
  28. //1、将自己绑定到交换机上
  29. ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
  30. //2、取出数据进行消费
  31. ch.Qos(1)
  32. delivery := ch.Consume(queue.Name, false)
  33. egg_system_rules.Init(cfg.RedisAddr)
  34. var res amqp.Delivery
  35. var ok bool
  36. for {
  37. res, ok = <-delivery
  38. if ok == true {
  39. err = handleEggCanalUserDownConsume(res.Body)
  40. err = res.Ack(true)
  41. fmt.Println("err ::: ", err)
  42. } else {
  43. panic(errors.New("error getting message"))
  44. }
  45. }
  46. fmt.Println("get msg done")
  47. }
  48. func handleEggCanalUserDownConsume(msgData []byte) error {
  49. var msg *md.CommUserId
  50. err := json.Unmarshal(msgData, &msg)
  51. if err != nil {
  52. return err
  53. }
  54. engine := db.Db
  55. NewUserDb := implement.NewUserDb(engine)
  56. user, _ := NewUserDb.GetUser(utils.StrToInt64(msg.Uid))
  57. if user == nil {
  58. return nil
  59. }
  60. levelDb := implement.NewUserLevelDb(engine)
  61. levelAll, _ := levelDb.UserLevelAllByAsc()
  62. level := 1
  63. if len(levelAll) == 0 {
  64. return nil
  65. }
  66. level = levelAll[0].Id
  67. if levelAll[0].Id == user.Level {
  68. return nil
  69. }
  70. sort.Slice(levelAll, func(i, j int) bool {
  71. return levelAll[i].LevelWeight > levelAll[j].LevelWeight
  72. })
  73. isHas := 0
  74. task := make([]map[string]string, 0)
  75. for _, v := range levelAll {
  76. if v.Id == user.Level {
  77. isHas = 1
  78. }
  79. if isHas == 1 {
  80. task, err = rule.UserUpgradeTask(engine, int(user.Id), v.Id)
  81. if err != nil {
  82. continue
  83. }
  84. level = v.Id
  85. }
  86. }
  87. //升级
  88. oldLevel := user.Level
  89. user.Level = level
  90. _, err = engine.Where("id=?", user.Id).Cols("level").Update(user)
  91. if err != nil {
  92. return err
  93. }
  94. var tmp = model.UserUpgradeLevel{
  95. Uid: int(user.Id),
  96. Level: level,
  97. OldLv: oldLevel,
  98. CreateTime: time.Now(),
  99. Task: utils.SerializeStr(task),
  100. }
  101. engine.Insert(&tmp)
  102. return nil
  103. }