huangjiajun 1 month ago
parent
commit
14b01fe505
1 changed files with 5 additions and 19 deletions
  1. +5
    -19
      consume/ad_original_data_application.go

+ 5
- 19
consume/ad_original_data_application.go View File

@@ -42,22 +42,7 @@ func AdOriginalDataApplication(queue md.MqQueue) {
if ok == true { if ok == true {
err = handleAdOriginalDataApplication(res.Body) err = handleAdOriginalDataApplication(res.Body)
fmt.Println("err ::: ", err) fmt.Println("err ::: ", err)
if err != nil {
fmt.Println("AdOriginalDataApplication_ERR:::::", err.Error())
//_ = res.Reject(false)
var canalMsg *md2.ZhiosAdOriginalDataApplication
var tmpString string
err := json.Unmarshal(res.Body, &tmpString)
if err == nil {
fmt.Println(tmpString)
err = json.Unmarshal([]byte(tmpString), &canalMsg)
if err == nil {
ch.Publish(queue.ExchangeName, utils.SerializeStr(canalMsg), queue.RoutKey)
}
}
} else {
err = res.Ack(true)
}
err = res.Ack(true)
} else { } else {
panic(errors.New("error getting message")) panic(errors.New("error getting message"))
} }
@@ -79,6 +64,9 @@ func handleAdOriginalDataApplication(msgData []byte) error {
if err != nil { if err != nil {
return err return err
} }
if msg.IsEnd == "1" {
cache.SetEx(msg.Mid+":original.wx.ad.data", "0", 5)
}
time.Sleep(time.Microsecond * 100) // 等待100毫秒 time.Sleep(time.Microsecond * 100) // 等待100毫秒
fmt.Println("handleAdOriginalDataApplication:::::::::::>>>>>>>>>") fmt.Println("handleAdOriginalDataApplication:::::::::::>>>>>>>>>")
fmt.Println(msg) fmt.Println(msg)
@@ -119,8 +107,6 @@ func handleAdOriginalDataApplication(msgData []byte) error {
} }
eg.Insert(&tmp) eg.Insert(&tmp)
} }
if msg.IsEnd == "1" {
cache.SetEx(msg.Mid+":original.wx.ad.data", "0", 5)
}

return nil return nil
} }

Loading…
Cancel
Save