蛋蛋星球RabbitMq消费项目
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

egg_canal_user_down_consume.go 2.3 KiB

5 days ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. package consume
  2. import (
  3. "applet/app/cfg"
  4. "applet/app/db"
  5. "applet/app/utils"
  6. "applet/app/utils/logx"
  7. "applet/consume/md"
  8. "code.fnuoos.com/EggPlanet/egg_models.git/src/implement"
  9. "code.fnuoos.com/EggPlanet/egg_models.git/src/model"
  10. "code.fnuoos.com/EggPlanet/egg_system_rules.git"
  11. "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule"
  12. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  13. "encoding/json"
  14. "errors"
  15. "fmt"
  16. "github.com/streadway/amqp"
  17. "sort"
  18. "time"
  19. )
  20. func EggCanalUserDownConsume(queue md.MqQueue) {
  21. fmt.Println(">>>>>>>>>>>>EggCanalUserDownConsume>>>>>>>>>>>>")
  22. ch, err := rabbit.Cfg.Pool.GetChannel()
  23. if err != nil {
  24. logx.Error(err)
  25. return
  26. }
  27. defer ch.Release()
  28. //1、将自己绑定到交换机上
  29. ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
  30. //2、取出数据进行消费
  31. ch.Qos(1)
  32. delivery := ch.Consume(queue.Name, false)
  33. egg_system_rules.Init(cfg.RedisAddr)
  34. var res amqp.Delivery
  35. var ok bool
  36. for {
  37. res, ok = <-delivery
  38. if ok == true {
  39. err = handleEggCanalUserDownConsume(res.Body)
  40. err = res.Ack(true)
  41. fmt.Println("err ::: ", err)
  42. } else {
  43. panic(errors.New("error getting message"))
  44. }
  45. }
  46. fmt.Println("get msg done")
  47. }
  48. func handleEggCanalUserDownConsume(msgData []byte) error {
  49. var msg *md.CommUserId
  50. err := json.Unmarshal(msgData, &msg)
  51. if err != nil {
  52. return err
  53. }
  54. engine := db.Db
  55. NewUserDb := implement.NewUserDb(engine)
  56. user, _ := NewUserDb.GetUser(utils.StrToInt64(msg.Uid))
  57. levelDb := implement.NewUserLevelDb(engine)
  58. levelAll, _ := levelDb.UserLevelAllByAsc()
  59. level := 1
  60. if len(levelAll) > 0 {
  61. level = levelAll[0].Id
  62. }
  63. sort.Slice(levelAll, func(i, j int) bool {
  64. return levelAll[i].LevelWeight > levelAll[j].LevelWeight
  65. })
  66. isHas := 0
  67. task := make([]map[string]string, 0)
  68. for _, v := range levelAll {
  69. if v.Id == user.Level {
  70. isHas = 1
  71. }
  72. if isHas == 1 {
  73. task, err = rule.UserUpgradeTask(engine, int(user.Id), v.Id)
  74. if err != nil {
  75. continue
  76. }
  77. level = v.Id
  78. }
  79. }
  80. //升级
  81. oldLevel := user.Level
  82. user.Level = level
  83. _, err = engine.Where("id=?", user.Id).Cols("level").Update(user)
  84. if err != nil {
  85. return err
  86. }
  87. var tmp = model.UserUpgradeLevel{
  88. Uid: int(user.Id),
  89. Level: level,
  90. OldLv: oldLevel,
  91. CreateTime: time.Now(),
  92. Task: utils.SerializeStr(task),
  93. }
  94. engine.Insert(&tmp)
  95. return nil
  96. }