huangjiajun 1 mês atrás
pai
commit
76a28f1830
1 arquivos alterados com 17 adições e 6 exclusões
  1. +17
    -6
      consume/ad_original_data_application.go

+ 17
- 6
consume/ad_original_data_application.go Ver arquivo

@@ -46,11 +46,15 @@ func AdOriginalDataApplication(queue md.MqQueue) {
fmt.Println("AdOriginalDataApplication_ERR:::::", err.Error())
_ = res.Reject(true) //TODO::拒绝 Ack
//_ = res.Reject(false)
var msg interface{}
json.Unmarshal(res.Body, &msg)
if err != nil {
//TODO::重新推回队列末尾,避免造成队列堵塞
ch.Publish(queue.ExchangeName, msg, queue.RoutKey)
var canalMsg *md2.ZhiosAdOriginalDataApplication
var tmpString string
err := json.Unmarshal(res.Body, &tmpString)
if err == nil {
fmt.Println(tmpString)
err = json.Unmarshal([]byte(tmpString), &canalMsg)
if err == nil {
ch.Publish(queue.ExchangeName, utils.SerializeStr(canalMsg), queue.RoutKey)
}
}
} else {
err = res.Ack(true)
@@ -65,7 +69,14 @@ func AdOriginalDataApplication(queue md.MqQueue) {
func handleAdOriginalDataApplication(msgData []byte) error {
//1、解析mq中queue的数据结构体
var msg md2.ZhiosAdOriginalDataApplication
err := json.Unmarshal(msgData, &msg)
var tmpString string
err := json.Unmarshal(msgData, &tmpString)
if err != nil {
fmt.Println(err.Error())
return err
}
fmt.Println(tmpString)
err = json.Unmarshal([]byte(tmpString), &msg)
if err != nil {
return err
}


Carregando…
Cancelar
Salvar