广告涉及的mq都放这里
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

ad_original_data_application.go 3.7 KiB

3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package consume
  2. import (
  3. "applet/app/cfg"
  4. "applet/app/e"
  5. "applet/app/lib/wechat"
  6. md2 "applet/app/md"
  7. "applet/app/utils"
  8. "applet/app/utils/cache"
  9. "applet/app/utils/logx"
  10. "applet/consume/md"
  11. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  12. "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/rule/one_circles"
  13. db "code.fnuoos.com/zhimeng/model.git/src"
  14. implement2 "code.fnuoos.com/zhimeng/model.git/src/implement"
  15. "code.fnuoos.com/zhimeng/model.git/src/super/implement"
  16. "code.fnuoos.com/zhimeng/model.git/src/super/model"
  17. "encoding/json"
  18. "errors"
  19. "fmt"
  20. "github.com/streadway/amqp"
  21. "time"
  22. )
  23. func AdOriginalDataApplication(queue md.MqQueue) {
  24. fmt.Println(">>>>>>>>>>>>AdOriginalDataApplication>>>>>>>>>>>>")
  25. ch, err := rabbit.Cfg.Pool.GetChannel()
  26. if err != nil {
  27. logx.Error(err)
  28. return
  29. }
  30. defer ch.Release()
  31. //1、将自己绑定到交换机上
  32. ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
  33. //2、取出数据进行消费
  34. ch.Qos(1)
  35. delivery := ch.Consume(queue.Name, false)
  36. one_circles.Init(cfg.RedisAddr)
  37. var res amqp.Delivery
  38. var ok bool
  39. for {
  40. res, ok = <-delivery
  41. if ok == true {
  42. err = handleAdOriginalDataApplication(res.Body)
  43. fmt.Println("err ::: ", err)
  44. err = res.Ack(true)
  45. } else {
  46. panic(errors.New("error getting message"))
  47. }
  48. }
  49. fmt.Println("get msg done")
  50. }
  51. func handleAdOriginalDataApplication(msgData []byte) error {
  52. //1、解析mq中queue的数据结构体
  53. var msg md2.ZhiosAdOriginalDataApplication
  54. var tmpString string
  55. err := json.Unmarshal(msgData, &tmpString)
  56. if err != nil {
  57. fmt.Println(err.Error())
  58. return err
  59. }
  60. fmt.Println(tmpString)
  61. err = json.Unmarshal([]byte(tmpString), &msg)
  62. if err != nil {
  63. return err
  64. }
  65. if msg.IsEnd == "1" {
  66. cache.SetEx(msg.Mid+":original.wx.ad.data", "0", 5)
  67. }
  68. time.Sleep(time.Microsecond * 100) // 等待100毫秒
  69. fmt.Println("handleAdOriginalDataApplication:::::::::::>>>>>>>>>")
  70. fmt.Println(msg)
  71. if db.DBs[msg.Mid] == nil {
  72. return nil
  73. }
  74. eg := db.DBs[msg.Mid]
  75. wxApiService, err := wechat.NewWxApiService(msg.Mid, msg.ComponentAppid, msg.ComponentAppsecret)
  76. if err != nil {
  77. return err
  78. }
  79. userWxAppletListDb := implement.NewUserWxAppletListDb(db.Db)
  80. UserWxAppletList, err := userWxAppletListDb.GetUserWxAppletList(msg.Mid)
  81. if err != nil {
  82. return err
  83. }
  84. if UserWxAppletList == nil {
  85. return e.NewErr(400, "未查询到对应记录")
  86. }
  87. err, args := wxApiService.GetAdposDetail(UserWxAppletList.Appid, 1, 1, msg.Date, msg.Date, msg.AdId)
  88. if err != nil {
  89. return err
  90. }
  91. NewAppletApplicationAdSpaceListDb := implement2.NewAppletApplicationAdSpaceListDb(eg)
  92. for _, v := range args.List {
  93. count, _ := db.Db.Where("uuid=? and date=? and app_id=? and slot_id=?", msg.Mid, msg.Date, v.AppId, v.AdUnitId).Count(&model.OriginalWxAdData{})
  94. if count > 0 {
  95. continue
  96. }
  97. adData, _ := NewAppletApplicationAdSpaceListDb.GetAppletApplicationAdSpaceListByAdId(v.AdUnitId)
  98. if adData == nil {
  99. continue
  100. }
  101. tmp := model.OriginalWxAdData{
  102. Uuid: utils.StrToInt(msg.Mid),
  103. MediumId: adData.MediumId,
  104. AppId: v.AppId,
  105. SlotId: v.AdUnitId,
  106. AdSlot: v.StatItem.AdSlot,
  107. Date: msg.Date,
  108. ReqSuccCount: int(v.StatItem.ReqSuccCount),
  109. ExposureCount: int(v.StatItem.ExposureCount),
  110. ExposureRate: utils.Float64ToStr(v.StatItem.ExposureRate),
  111. ClickCount: int(v.StatItem.ClickCount),
  112. ClickRate: utils.Float64ToStr(v.StatItem.ClickRate),
  113. PublisherIncome: int(v.StatItem.PublisherIncome),
  114. Ecpm: utils.Float64ToStr(v.StatItem.Ecpm),
  115. CreateAt: time.Now().Format("2006-01-02 15:04:05"),
  116. UpdateAt: time.Now().Format("2006-01-02 15:04:05"),
  117. Platform: "wx_applet",
  118. }
  119. db.Db.Insert(&tmp)
  120. }
  121. return nil
  122. }