@@ -32,7 +32,7 @@ func CanalGuideOrderConsume(queue md.MqQueue) { | |||||
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) | ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) | ||||
//2、取出数据进行消费 | //2、取出数据进行消费 | ||||
ch.Qos(500) | ch.Qos(500) | ||||
delivery := ch.Consume(queue.Name) | |||||
delivery := ch.Consume(queue.Name, true) //设置自动应答 | |||||
geoIp2db, _ := geoip2db.NewGeoipDbByStatik() | geoIp2db, _ := geoip2db.NewGeoipDbByStatik() | ||||
defer geoIp2db.Close() | defer geoIp2db.Close() | ||||
@@ -46,7 +46,7 @@ func CanalGuideOrderConsume(queue md.MqQueue) { | |||||
fmt.Println(">>>>>>>>>>>>>>>>>>CanalGuideOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<") | fmt.Println(">>>>>>>>>>>>>>>>>>CanalGuideOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<") | ||||
err = handleGuideOrdTable(res.Body, geoIp2db) | err = handleGuideOrdTable(res.Body, geoIp2db) | ||||
//_ = res.Reject(false) | //_ = res.Reject(false) | ||||
_ = res.Ack(true) | |||||
//_ = res.Ack(true) | |||||
} else { | } else { | ||||
panic(errors.New("error getting message")) | panic(errors.New("error getting message")) | ||||
} | } | ||||
@@ -28,7 +28,7 @@ func CanalOrderConsume(queue md.MqQueue) { | |||||
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) | ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) | ||||
//2、取出数据进行消费 | //2、取出数据进行消费 | ||||
ch.Qos(1) | ch.Qos(1) | ||||
delivery := ch.Consume(queue.Name) | |||||
delivery := ch.Consume(queue.Name, false) | |||||
var res amqp.Delivery | var res amqp.Delivery | ||||
var ok bool | var ok bool | ||||
@@ -27,7 +27,7 @@ func WorkReceive(name string) { | |||||
} | } | ||||
defer ch.Release() | defer ch.Release() | ||||
//接收消息时,指定 | //接收消息时,指定 | ||||
msgs := ch.Consume(name) | |||||
msgs := ch.Consume(name, false) | |||||
for msg := range msgs { | for msg := range msgs { | ||||
var message2 Message | var message2 Message | ||||
jsonStr := string(msg.Body) | jsonStr := string(msg.Body) | ||||
@@ -36,7 +36,7 @@ func ZhiOsUserVisitIpAddressConsume(queue md.MqQueue) { | |||||
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) | ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) | ||||
//2、取出数据进行消费 | //2、取出数据进行消费 | ||||
ch.Qos(10) | ch.Qos(10) | ||||
delivery := ch.Consume(queue.Name) | |||||
delivery := ch.Consume(queue.Name, false) | |||||
var res amqp.Delivery | var res amqp.Delivery | ||||
var ok bool | var ok bool | ||||
@@ -3,7 +3,7 @@ module applet | |||||
go 1.18 | go 1.18 | ||||
require ( | require ( | ||||
code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git v0.0.3 | |||||
code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git v0.0.4 | |||||
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 | github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 | ||||
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 | github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 | ||||
github.com/boombuler/barcode v1.0.1 | github.com/boombuler/barcode v1.0.1 | ||||