From ffedcdc6453cf39693ad94b03dffb1ece83e14fb Mon Sep 17 00:00:00 2001 From: huangjiajun <582604932@qq.com> Date: Wed, 3 Jan 2024 18:33:53 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consume/canal_guide_order_by_user_up_lv_consume.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/consume/canal_guide_order_by_user_up_lv_consume.go b/consume/canal_guide_order_by_user_up_lv_consume.go index c78fa60..88747ed 100644 --- a/consume/canal_guide_order_by_user_up_lv_consume.go +++ b/consume/canal_guide_order_by_user_up_lv_consume.go @@ -9,8 +9,6 @@ import ( "encoding/json" "errors" "fmt" - "github.com/cc14514/go-geoip2" - geoip2db "github.com/cc14514/go-geoip2-db" "github.com/streadway/amqp" "strings" ) @@ -26,12 +24,9 @@ func CanalGuideOrderByUserUpLvConsume(queue md.MqQueue) { //1、将自己绑定到交换机上 ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) //2、取出数据进行消费 - ch.Qos(100) + ch.Qos(10) delivery := ch.Consume(queue.Name, true) //设置自动应答 - geoIp2db, _ := geoip2db.NewGeoipDbByStatik() - defer geoIp2db.Close() - var res amqp.Delivery var ok bool for { @@ -39,7 +34,7 @@ func CanalGuideOrderByUserUpLvConsume(queue md.MqQueue) { if ok == true { //fmt.Println(string(res.Body)) fmt.Println(">>>>>>>>>>>>>>>>>>CanalGuideOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<") - err = handleCanalGuideOrderByUserUpLvConsume(res.Body, geoIp2db) + handleCanalGuideOrderByUserUpLvConsume(res.Body) //_ = res.Reject(false) //_ = res.Ack(true) _ = res.Ack(true) @@ -50,7 +45,7 @@ func CanalGuideOrderByUserUpLvConsume(queue md.MqQueue) { fmt.Println("get msg done") } -func handleCanalGuideOrderByUserUpLvConsume(msg []byte, geoIp2db *geoip2.DBReader) error { +func handleCanalGuideOrderByUserUpLvConsume(msg []byte) error { //1、解析canal采集至mq中queue的数据结构体 var canalMsg *md.CanalOrderMessage[md.CanalGuideOrder] err := json.Unmarshal(msg, &canalMsg)