蛋蛋星球RabbitMq消费项目
Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.
 
 
 
 

159 rindas
4.2 KiB

  1. package consume
  2. import (
  3. "applet/app/cfg"
  4. "applet/app/db"
  5. utils2 "applet/app/utils"
  6. "applet/app/utils/logx"
  7. "applet/consume/md"
  8. db2 "code.fnuoos.com/EggPlanet/egg_models.git/src"
  9. "code.fnuoos.com/EggPlanet/egg_models.git/src/model"
  10. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  11. "encoding/json"
  12. "errors"
  13. "fmt"
  14. "github.com/jinzhu/copier"
  15. "github.com/streadway/amqp"
  16. "time"
  17. "xorm.io/xorm"
  18. )
  19. func UserDeleteConsume(queue md.MqQueue) {
  20. var backCfg db2.BackUpDBCfg
  21. copier.Copy(&backCfg, &cfg.BackUpDb)
  22. backUpDb, err := db2.InitBackUpDB(&backCfg)
  23. if err != nil {
  24. logx.Error(err)
  25. return
  26. }
  27. fmt.Println(">>>>>>>>>>>>UserDeleteConsume>>>>>>>>>>>>")
  28. ch, err := rabbit.Cfg.Pool.GetChannel()
  29. if err != nil {
  30. logx.Error(err)
  31. return
  32. }
  33. defer ch.Release()
  34. //1、将自己绑定到交换机上
  35. ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
  36. //2、取出数据进行消费
  37. ch.Qos(100)
  38. delivery := ch.Consume(queue.Name, false)
  39. egg_system_rules.Init(cfg.RedisAddr)
  40. var res amqp.Delivery
  41. var ok bool
  42. for {
  43. res, ok = <-delivery
  44. if ok == true {
  45. err = handleUserDeleteConsume(backUpDb, ch, res.Body)
  46. if err != nil {
  47. fmt.Println("UserDeleteConsume_ERR:::::", err.Error())
  48. utils2.FilePutContents("UserDeleteConsume_ERR", utils2.SerializeStr(map[string]interface{}{
  49. "body": res.Body,
  50. "err": err.Error(),
  51. }))
  52. _ = res.Reject(false)
  53. //TODO::重新推回队列末尾,避免造成队列堵塞
  54. var msg *md.CommUserId
  55. json.Unmarshal(res.Body, &msg)
  56. ch.Publish(queue.ExchangeName, msg, queue.RoutKey)
  57. } else {
  58. //_ = res.Reject(false)
  59. err = res.Ack(true)
  60. }
  61. fmt.Println("err ::: ", err)
  62. } else {
  63. panic(errors.New("error getting message"))
  64. }
  65. }
  66. fmt.Println("get msg done")
  67. }
  68. func handleUserDeleteConsume(backEg *xorm.Engine, ch *rabbit.Channel, msgData []byte) error {
  69. if backEg == nil {
  70. return nil
  71. }
  72. time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒
  73. // 1.解析mq中queue的数据结构体
  74. var msg *md.CommUserId
  75. err := json.Unmarshal(msgData, &msg)
  76. if err != nil {
  77. return err
  78. }
  79. eg := db.Db
  80. //1.用户信息
  81. var user model.User
  82. exist, err := eg.Where("id=?", msg.Uid).Get(&user)
  83. if exist {
  84. backEg.Insert(&user)
  85. user.Phone = ""
  86. user.State = 2
  87. user.Nickname = "注销用户"
  88. db.Db.Where("id=?", user.Id).Cols("state,phone,nickname").Update(&user)
  89. }
  90. //2.用户关系链
  91. //查出所有下级
  92. var relate []model.UserRelate
  93. err = eg.Where("parent_uid=?", msg.Uid).Find(&relate)
  94. if len(relate) > 0 {
  95. backEg.Insert(&relate)
  96. }
  97. var relateParent []model.UserRelate
  98. err = eg.Where("uid=?", msg.Uid).Find(&relateParent)
  99. var relateParent1 []model.UserRelate
  100. err = eg.Where("uid=?", msg.ParentUid).Find(&relateParent1)
  101. var parentIds = make([]int64, 0)
  102. if len(relateParent) > 0 {
  103. backEg.Insert(&relateParent)
  104. for _, v := range relateParent {
  105. parentIds = append(parentIds, v.ParentUid)
  106. }
  107. }
  108. var ids = make([]int64, 0)
  109. for _, v := range relate {
  110. ids = append(ids, v.Uid)
  111. }
  112. //读出所有下级重新处理
  113. var allRelate []model.UserRelate
  114. err = eg.In("uid", ids).Desc("level").Find(&allRelate)
  115. for _, v := range allRelate {
  116. if utils2.InArrInt64(v.ParentUid, parentIds) {
  117. eg.Where("id=?", v.Id).Delete(&model.UserRelate{})
  118. continue
  119. }
  120. if v.ParentUid == utils2.StrToInt64(msg.Uid) && utils2.StrToInt64(msg.ParentUid) > 0 { //如果相等
  121. level := v.Level
  122. var tmp = model.UserRelate{
  123. ParentUid: utils2.StrToInt64(msg.ParentUid),
  124. Uid: v.Uid,
  125. Level: level,
  126. InviteTime: v.InviteTime,
  127. }
  128. eg.Insert(&tmp)
  129. for _, v1 := range relateParent1 {
  130. level++
  131. var tmp1 = model.UserRelate{
  132. ParentUid: v1.ParentUid,
  133. Uid: v.Uid,
  134. Level: level,
  135. InviteTime: v.InviteTime,
  136. }
  137. eg.Insert(&tmp1)
  138. }
  139. }
  140. }
  141. eg.Where("parent_uid=?", msg.Uid).Cols("parent_uid").Update(&model.User{ParentUid: utils2.StrToInt64(msg.ParentUid)})
  142. _, err = eg.Where("parent_uid=?", msg.Uid).Delete(&model.UserRelate{})
  143. _, err = eg.Where("uid=?", msg.Uid).Delete(&model.UserRelate{})
  144. var UserRealNameAuth model.UserRealNameAuth
  145. exist, err = eg.Where("uid=?", msg.Uid).Get(&UserRealNameAuth)
  146. if exist {
  147. backEg.Insert(&UserRealNameAuth)
  148. }
  149. eg.Where("uid=?", msg.Uid).Delete(&model.UserRealNameAuth{})
  150. return nil
  151. }