蛋蛋星球RabbitMq消费项目
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

146 lines
3.9 KiB

  1. package consume
  2. import (
  3. "applet/app/db"
  4. utils2 "applet/app/utils"
  5. "applet/app/utils/logx"
  6. "applet/consume/md"
  7. md2 "applet/es/md"
  8. "code.fnuoos.com/EggPlanet/egg_models.git/src/implement"
  9. "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy"
  10. "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/enum"
  11. es2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/utils/es"
  12. "code.fnuoos.com/go_rely_warehouse/zyos_go_es.git/es"
  13. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  14. "context"
  15. "encoding/json"
  16. "errors"
  17. "fmt"
  18. "github.com/olivere/elastic/v7"
  19. "github.com/streadway/amqp"
  20. "strings"
  21. "time"
  22. )
  23. // EggCanalViolateNumsConsume 更新违规次数
  24. func EggCanalViolateNumsConsume(queue md.MqQueue) {
  25. fmt.Println(">>>>>>>>>>>EggCanalViolateNumsConsume>>>>>>>>>>>>>")
  26. ch, err := rabbit.Cfg.Pool.GetChannel()
  27. if err != nil {
  28. logx.Error(err)
  29. return
  30. }
  31. defer ch.Release()
  32. //1、将自己绑定到交换机上
  33. ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
  34. //2、取出数据进行消费
  35. ch.Qos(1000)
  36. delivery := ch.Consume(queue.Name, true) //设置自动应答
  37. var res amqp.Delivery
  38. var ok bool
  39. for {
  40. res, ok = <-delivery
  41. if ok == true {
  42. //fmt.Println(string(res.Body))
  43. fmt.Println(">>>>>>>>>>>>>>>>>>EggCanalViolateNumsConsume<<<<<<<<<<<<<<<<<<<<<<<<<")
  44. err = handleEggCanalViolateNumsConsume(res.Body)
  45. if err != nil {
  46. fmt.Println("EggCanalViolateNumsConsume_ERR:::::", err.Error())
  47. utils2.FilePutContents("EggCanalViolateNumsConsume_ERR", utils2.SerializeStr(map[string]interface{}{
  48. "body": res.Body,
  49. "err": err.Error(),
  50. }))
  51. }
  52. //_ = res.Reject(false)
  53. //_ = res.Ack(true)
  54. } else {
  55. panic(errors.New("error getting message"))
  56. }
  57. }
  58. }
  59. func handleEggCanalViolateNumsConsume(msg []byte) error {
  60. //1、解析canal采集至mq中queue的数据结构体
  61. var canalMsg *md.CanalTagRecordsMessage[md.CanalTagRecords]
  62. err := json.Unmarshal(msg, &canalMsg)
  63. if err != nil {
  64. return nil
  65. }
  66. year, week := time.Now().ISOWeek()
  67. yearStr := utils2.IntToStr(year)
  68. weekStr := utils2.IntToStr(week)
  69. index := es2.GetAppointIndexFromAlias(yearStr, weekStr)
  70. // 2. 监听插入信息
  71. if canalMsg.Type == md2.CanalMsgInsertSqlType {
  72. tagDb := implement.NewUserTagDb(db.Db)
  73. for _, item := range canalMsg.Data {
  74. tag, err1 := tagDb.UserTagGetOneByParams(map[string]interface{}{
  75. "key": "id",
  76. "value": item.TagId,
  77. })
  78. if err1 != nil {
  79. return err1
  80. }
  81. // 2.1. 判断是否为处罚标签
  82. if tag.IsPunish == 0 {
  83. continue
  84. }
  85. uid := item.Uid
  86. id := fmt.Sprintf("%d%d_%s", year, week, uid)
  87. // 2.2. 增加违规次数记录
  88. script := elastic.NewScript("ctx._source.violate_nums += params.inc").Param("inc", 1)
  89. _, err = es.EsClient.Update().
  90. Index(index).
  91. Id(id).
  92. Script(script).
  93. Do(context.Background())
  94. if err != nil {
  95. if strings.Contains(err.Error(), "elastic: Error 404 (Not Found)") {
  96. // 蛋蛋分数据还不存在,创建蛋蛋分数据
  97. now := time.Now().Format("2006-01-02 15:04:05")
  98. err1 := egg_energy.CreateEsScoreAndAssignValuesDoc(index, id, utils2.StrToInt64(uid), enum.ViolateNums, "1", now)
  99. if err1 != nil {
  100. return err1
  101. }
  102. return nil
  103. }
  104. return err
  105. }
  106. }
  107. }
  108. // 3. 监听删除信息
  109. if canalMsg.Type == md2.CanalMsgDeleteSqlType {
  110. tagDb := implement.NewUserTagDb(db.Db)
  111. for _, item := range canalMsg.Data {
  112. tag, err1 := tagDb.UserTagGetOneByParams(map[string]interface{}{
  113. "key": "id",
  114. "value": item.TagId,
  115. })
  116. if err1 != nil {
  117. return err1
  118. }
  119. // 3.1 判断是否为处罚标签
  120. if tag.IsPunish == 0 {
  121. continue
  122. }
  123. uid := item.Uid
  124. id := fmt.Sprintf("%d%d_%s", year, week, uid)
  125. // 3.2 减少违规次数记录
  126. script := elastic.NewScript("ctx._source.violate_nums -= params.dec").Param("dec", 1)
  127. _, err = es.EsClient.Update().
  128. Index(index).
  129. Id(id).
  130. Script(script).
  131. Do(context.Background())
  132. if err != nil {
  133. return err
  134. }
  135. }
  136. }
  137. return nil
  138. }