蛋蛋星球RabbitMq消费项目
您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
 
 
 
 

80 行
2.0 KiB

  1. package consume
  2. import (
  3. "applet/app/cfg"
  4. "applet/app/db"
  5. "applet/app/utils"
  6. "applet/app/utils/logx"
  7. "applet/consume/md"
  8. md2 "applet/es/md"
  9. "code.fnuoos.com/EggPlanet/egg_system_rules.git"
  10. "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule"
  11. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  12. "encoding/json"
  13. "errors"
  14. "fmt"
  15. "github.com/streadway/amqp"
  16. "time"
  17. )
  18. func EggCanalUserConsume(queue md.MqQueue) {
  19. fmt.Println(">>>>>>>>>>>>EggCanalUserConsume>>>>>>>>>>>>")
  20. ch, err := rabbit.Cfg.Pool.GetChannel()
  21. if err != nil {
  22. logx.Error(err)
  23. return
  24. }
  25. defer ch.Release()
  26. //1、将自己绑定到交换机上
  27. ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
  28. //2、取出数据进行消费
  29. ch.Qos(1)
  30. delivery := ch.Consume(queue.Name, false)
  31. egg_system_rules.Init(cfg.RedisAddr)
  32. var res amqp.Delivery
  33. var ok bool
  34. for {
  35. res, ok = <-delivery
  36. if ok == true {
  37. err = handleEggCanalUserConsume(res.Body, ch)
  38. err = res.Ack(true)
  39. fmt.Println("err ::: ", err)
  40. } else {
  41. panic(errors.New("error getting message"))
  42. }
  43. }
  44. fmt.Println("get msg done")
  45. }
  46. func handleEggCanalUserConsume(msg []byte, ch *rabbit.Channel) error {
  47. time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒
  48. var canalMsg *md.CanalUserMessage[md.CanalUser]
  49. err := json.Unmarshal(msg, &canalMsg)
  50. if err != nil {
  51. fmt.Println("EggCanalInviteUserNumsConsumeFaliedUnMarshal_ERR:::::", err.Error())
  52. return nil
  53. }
  54. if canalMsg.Type == md2.CanalMsgInsertSqlType || canalMsg.Type == md2.CanalMsgUpdateSqlType {
  55. oldUser := make(map[string]md.CanalUser)
  56. for _, item := range canalMsg.Old {
  57. oldUser[item.Id] = item
  58. }
  59. for _, item := range canalMsg.Data {
  60. if utils.StrToInt(item.ParentUid) == 0 {
  61. continue
  62. }
  63. count := rule.ExtendUserCount(db.Db, utils.StrToInt(item.ParentUid))
  64. if count > 1000 {
  65. msg1 := md.CommUserId{
  66. Uid: item.ParentUid,
  67. }
  68. ch.Publish("egg.app", msg1, "egg_slow_auto_up_lv")
  69. continue
  70. }
  71. rule.UserUpgradeInsert(db.Db, utils.StrToInt(item.ParentUid))
  72. }
  73. }
  74. return nil
  75. }