蛋蛋星球RabbitMq消费项目
Non puoi selezionare più di 25 argomenti Gli argomenti devono iniziare con una lettera o un numero, possono includere trattini ('-') e possono essere lunghi fino a 35 caratteri.

egg_energy_user_activity_consume.go 2.1 KiB

2 settimane fa
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package consume
  2. import (
  3. "applet/app/cfg"
  4. "applet/app/db"
  5. "applet/app/utils"
  6. md2 "applet/es/md"
  7. "code.fnuoos.com/EggPlanet/egg_models.git/src/implement"
  8. "code.fnuoos.com/EggPlanet/egg_system_rules.git"
  9. "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule"
  10. "applet/app/utils/logx"
  11. "applet/consume/md"
  12. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  13. "encoding/json"
  14. "errors"
  15. "fmt"
  16. "github.com/streadway/amqp"
  17. "time"
  18. )
  19. func EggEnergyUserActivityConsume(queue md.MqQueue) {
  20. fmt.Println(">>>>>>>>>>>>EggEnergyUserActivityConsume>>>>>>>>>>>>")
  21. ch, err := rabbit.Cfg.Pool.GetChannel()
  22. if err != nil {
  23. logx.Error(err)
  24. return
  25. }
  26. defer ch.Release()
  27. //1、将自己绑定到交换机上
  28. ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
  29. //2、取出数据进行消费
  30. ch.Qos(1)
  31. delivery := ch.Consume(queue.Name, false)
  32. egg_system_rules.Init(cfg.RedisAddr)
  33. var res amqp.Delivery
  34. var ok bool
  35. for {
  36. res, ok = <-delivery
  37. if ok == true {
  38. err = handleEggEnergyUserActivityConsume(res.Body, ch)
  39. err = res.Ack(true)
  40. fmt.Println("err ::: ", err)
  41. } else {
  42. panic(errors.New("error getting message"))
  43. }
  44. }
  45. fmt.Println("get msg done")
  46. }
  47. func handleEggEnergyUserActivityConsume(msg []byte, ch *rabbit.Channel) error {
  48. time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒
  49. var canalMsg *md.CanalEggEnergyUserActivityMessage[md.CanalEggEnergyUserActivity]
  50. err := json.Unmarshal(msg, &canalMsg)
  51. if err != nil {
  52. fmt.Println("EggCanalInviteUserNumsConsumeFaliedUnMarshal_ERR:::::", err.Error())
  53. return nil
  54. }
  55. if canalMsg.Type == md2.CanalMsgInsertSqlType {
  56. for _, item := range canalMsg.Data {
  57. userDb := implement.NewUserDb(db.Db)
  58. user, _ := userDb.GetUser(utils.StrToInt64(item.Uid))
  59. if user == nil {
  60. continue
  61. }
  62. if user.ParentUid == 0 {
  63. continue
  64. }
  65. count := rule.ExtendUserCount(db.Db, int(user.ParentUid))
  66. if count > 1000 {
  67. msg1 := md.CommUserId{
  68. Uid: utils.Int64ToStr(user.ParentUid),
  69. }
  70. ch.Publish("egg.app", msg1, "egg_slow_auto_up_lv")
  71. continue
  72. }
  73. rule.UserUpgradeInsert(db.Db, int(user.ParentUid))
  74. }
  75. }
  76. return nil
  77. }