diff --git a/consume/ad_original_data_application.go b/consume/ad_original_data_application.go index 49abb20..068fe83 100644 --- a/consume/ad_original_data_application.go +++ b/consume/ad_original_data_application.go @@ -42,22 +42,7 @@ func AdOriginalDataApplication(queue md.MqQueue) { if ok == true { err = handleAdOriginalDataApplication(res.Body) fmt.Println("err ::: ", err) - if err != nil { - fmt.Println("AdOriginalDataApplication_ERR:::::", err.Error()) - //_ = res.Reject(false) - var canalMsg *md2.ZhiosAdOriginalDataApplication - var tmpString string - err := json.Unmarshal(res.Body, &tmpString) - if err == nil { - fmt.Println(tmpString) - err = json.Unmarshal([]byte(tmpString), &canalMsg) - if err == nil { - ch.Publish(queue.ExchangeName, utils.SerializeStr(canalMsg), queue.RoutKey) - } - } - } else { - err = res.Ack(true) - } + err = res.Ack(true) } else { panic(errors.New("error getting message")) } @@ -79,6 +64,9 @@ func handleAdOriginalDataApplication(msgData []byte) error { if err != nil { return err } + if msg.IsEnd == "1" { + cache.SetEx(msg.Mid+":original.wx.ad.data", "0", 5) + } time.Sleep(time.Microsecond * 100) // 等待100毫秒 fmt.Println("handleAdOriginalDataApplication:::::::::::>>>>>>>>>") fmt.Println(msg) @@ -119,8 +107,6 @@ func handleAdOriginalDataApplication(msgData []byte) error { } eg.Insert(&tmp) } - if msg.IsEnd == "1" { - cache.SetEx(msg.Mid+":original.wx.ad.data", "0", 5) - } + return nil }