蛋蛋星球RabbitMq消费项目
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

egg_canal_user_down_consume.go 2.4 KiB

5 days ago
5 days ago
5 days ago
5 days ago
5 days ago
5 days ago
5 days ago
5 days ago
5 days ago
5 days ago
5 days ago
5 days ago
5 days ago
5 days ago
5 days ago
5 days ago
5 days ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. package consume
  2. import (
  3. "applet/app/cfg"
  4. "applet/app/db"
  5. "applet/app/utils"
  6. "applet/app/utils/logx"
  7. "applet/consume/md"
  8. "code.fnuoos.com/EggPlanet/egg_models.git/src/implement"
  9. "code.fnuoos.com/EggPlanet/egg_models.git/src/model"
  10. "code.fnuoos.com/EggPlanet/egg_system_rules.git"
  11. "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule"
  12. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  13. "encoding/json"
  14. "errors"
  15. "fmt"
  16. "github.com/streadway/amqp"
  17. "sort"
  18. "time"
  19. )
  20. func EggCanalUserDownConsume(queue md.MqQueue) {
  21. fmt.Println(">>>>>>>>>>>>EggCanalUserDownConsume>>>>>>>>>>>>")
  22. ch, err := rabbit.Cfg.Pool.GetChannel()
  23. if err != nil {
  24. logx.Error(err)
  25. return
  26. }
  27. defer ch.Release()
  28. //1、将自己绑定到交换机上
  29. ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
  30. //2、取出数据进行消费
  31. ch.Qos(100)
  32. delivery := ch.Consume(queue.Name, false)
  33. egg_system_rules.Init(cfg.RedisAddr)
  34. var res amqp.Delivery
  35. var ok bool
  36. for {
  37. res, ok = <-delivery
  38. if ok == true {
  39. err = handleEggCanalUserDownConsume(res.Body, ch)
  40. err = res.Ack(true)
  41. fmt.Println("err ::: ", err)
  42. } else {
  43. panic(errors.New("error getting message"))
  44. }
  45. }
  46. fmt.Println("get msg done")
  47. }
  48. func handleEggCanalUserDownConsume(msgData []byte, ch *rabbit.Channel) error {
  49. var msg *md.CommUserId
  50. err := json.Unmarshal(msgData, &msg)
  51. if err != nil {
  52. return err
  53. }
  54. engine := db.Db
  55. NewUserDb := implement.NewUserDb(engine)
  56. user, _ := NewUserDb.GetUser(utils.StrToInt64(msg.Uid))
  57. if user == nil {
  58. return nil
  59. }
  60. levelDb := implement.NewUserLevelDb(engine)
  61. levelAll, _ := levelDb.UserLevelAllByAsc()
  62. level := 1
  63. if len(levelAll) == 0 {
  64. return nil
  65. }
  66. level = levelAll[0].Id
  67. if levelAll[0].Id == user.Level {
  68. return nil
  69. }
  70. sort.Slice(levelAll, func(i, j int) bool {
  71. return levelAll[i].LevelWeight <= levelAll[j].LevelWeight
  72. })
  73. task := make([]map[string]string, 0)
  74. for _, v := range levelAll {
  75. task1, err := rule.UserUpgradeTask(engine, int(user.Id), v.Id)
  76. if err != nil {
  77. continue
  78. }
  79. level = v.Id
  80. task = task1
  81. }
  82. //升级
  83. oldLevel := user.Level
  84. user.Level = level
  85. _, err = engine.Where("id=?", user.Id).Cols("level").Update(user)
  86. if err != nil {
  87. return err
  88. }
  89. var tmp = model.UserUpgradeLevel{
  90. Uid: int(user.Id),
  91. Level: level,
  92. OldLv: oldLevel,
  93. CreateTime: time.Now(),
  94. Task: utils.SerializeStr(task),
  95. }
  96. engine.Insert(&tmp)
  97. return nil
  98. }