广告涉及的mq都放这里
選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。
 
 
 
 
 
 

131 行
3.8 KiB

  1. package consume
  2. import (
  3. "applet/app/cfg"
  4. "applet/app/e"
  5. "applet/app/lib/wechat"
  6. md2 "applet/app/md"
  7. "applet/app/utils"
  8. "applet/app/utils/cache"
  9. "applet/app/utils/logx"
  10. "applet/consume/md"
  11. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  12. "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/rule/one_circles"
  13. db "code.fnuoos.com/zhimeng/model.git/src"
  14. implement2 "code.fnuoos.com/zhimeng/model.git/src/implement"
  15. "code.fnuoos.com/zhimeng/model.git/src/super/implement"
  16. "code.fnuoos.com/zhimeng/model.git/src/super/model"
  17. "encoding/json"
  18. "errors"
  19. "fmt"
  20. "github.com/streadway/amqp"
  21. "time"
  22. )
  23. func AdOriginalDataApplication(queue md.MqQueue) {
  24. fmt.Println(">>>>>>>>>>>>AdOriginalDataApplication>>>>>>>>>>>>")
  25. ch, err := rabbit.Cfg.Pool.GetChannel()
  26. if err != nil {
  27. logx.Error(err)
  28. return
  29. }
  30. defer ch.Release()
  31. //1、将自己绑定到交换机上
  32. ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
  33. //2、取出数据进行消费
  34. ch.Qos(1)
  35. delivery := ch.Consume(queue.Name, false)
  36. one_circles.Init(cfg.RedisAddr)
  37. var res amqp.Delivery
  38. var ok bool
  39. for {
  40. res, ok = <-delivery
  41. if ok == true {
  42. err = handleAdOriginalDataApplication(res.Body)
  43. fmt.Println("err ::: ", err)
  44. err = res.Ack(true)
  45. } else {
  46. panic(errors.New("error getting message"))
  47. }
  48. }
  49. fmt.Println("get msg done")
  50. }
  51. func handleAdOriginalDataApplication(msgData []byte) error {
  52. //1、解析mq中queue的数据结构体
  53. var msg md2.ZhiosAdOriginalDataApplication
  54. var tmpString string
  55. err := json.Unmarshal(msgData, &tmpString)
  56. if err != nil {
  57. fmt.Println(err.Error())
  58. return err
  59. }
  60. fmt.Println(tmpString)
  61. err = json.Unmarshal([]byte(tmpString), &msg)
  62. if err != nil {
  63. return err
  64. }
  65. if msg.IsEnd == "1" {
  66. cache.SetEx(msg.Mid+":original.wx.ad.data", "0", 5)
  67. }
  68. time.Sleep(time.Microsecond * 100) // 等待100毫秒
  69. fmt.Println("handleAdOriginalDataApplication:::::::::::>>>>>>>>>")
  70. fmt.Println(msg)
  71. if db.DBs[msg.Mid] == nil {
  72. return nil
  73. }
  74. eg := db.DBs[msg.Mid]
  75. wxApiService, err := wechat.NewWxApiService(msg.Mid, msg.ComponentAppid, msg.ComponentAppsecret)
  76. if err != nil {
  77. return err
  78. }
  79. userWxAppletListDb := implement.NewUserWxAppletListDb(db.Db)
  80. UserWxAppletList, err := userWxAppletListDb.GetUserWxAppletList(msg.Mid)
  81. if err != nil {
  82. return err
  83. }
  84. if UserWxAppletList == nil {
  85. return e.NewErr(400, "未查询到对应记录")
  86. }
  87. err, args := wxApiService.GetAdposDetail(UserWxAppletList.Appid, 1, 1, msg.Date, msg.Date, msg.AdId)
  88. if err != nil {
  89. return err
  90. }
  91. NewAppletApplicationAdSpaceListDb := implement2.NewAppletApplicationAdSpaceListDb(eg)
  92. for _, v := range args.List {
  93. count, _ := db.Db.Where("uuid=? and date=? and app_id=? and slot_id=?", msg.Mid, msg.Date, v.AppId, v.AdUnitId).Count(&model.OriginalWxAdData{})
  94. if count > 0 {
  95. continue
  96. }
  97. adData, _ := NewAppletApplicationAdSpaceListDb.GetAppletApplicationAdSpaceListByAdId(v.AdUnitId)
  98. if adData == nil {
  99. continue
  100. }
  101. tmp := model.OriginalWxAdData{
  102. Uuid: utils.StrToInt(msg.Mid),
  103. MediumId: adData.MediumId,
  104. AppId: adData.AppId,
  105. PlatformAppId: v.AppId,
  106. SlotId: v.AdUnitId,
  107. AdSlot: v.StatItem.AdSlot,
  108. Date: msg.Date,
  109. ReqSuccCount: int(v.StatItem.ReqSuccCount),
  110. ExposureCount: int(v.StatItem.ExposureCount),
  111. ExposureRate: utils.Float64ToStr(v.StatItem.ExposureRate),
  112. ClickCount: int(v.StatItem.ClickCount),
  113. ClickRate: utils.Float64ToStr(v.StatItem.ClickRate),
  114. PublisherIncome: int(v.StatItem.PublisherIncome),
  115. Ecpm: utils.Float64ToStr(v.StatItem.Ecpm),
  116. CreateAt: time.Now().Format("2006-01-02 15:04:05"),
  117. UpdateAt: time.Now().Format("2006-01-02 15:04:05"),
  118. Platform: "wx_applet",
  119. }
  120. db.Db.Insert(&tmp)
  121. }
  122. return nil
  123. }