Browse Source

更新

three
huangjiajun 11 months ago
parent
commit
ffedcdc645
1 changed files with 3 additions and 8 deletions
  1. +3
    -8
      consume/canal_guide_order_by_user_up_lv_consume.go

+ 3
- 8
consume/canal_guide_order_by_user_up_lv_consume.go View File

@@ -9,8 +9,6 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/cc14514/go-geoip2"
geoip2db "github.com/cc14514/go-geoip2-db"
"github.com/streadway/amqp"
"strings"
)
@@ -26,12 +24,9 @@ func CanalGuideOrderByUserUpLvConsume(queue md.MqQueue) {
//1、将自己绑定到交换机上
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
//2、取出数据进行消费
ch.Qos(100)
ch.Qos(10)
delivery := ch.Consume(queue.Name, true) //设置自动应答

geoIp2db, _ := geoip2db.NewGeoipDbByStatik()
defer geoIp2db.Close()

var res amqp.Delivery
var ok bool
for {
@@ -39,7 +34,7 @@ func CanalGuideOrderByUserUpLvConsume(queue md.MqQueue) {
if ok == true {
//fmt.Println(string(res.Body))
fmt.Println(">>>>>>>>>>>>>>>>>>CanalGuideOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<")
err = handleCanalGuideOrderByUserUpLvConsume(res.Body, geoIp2db)
handleCanalGuideOrderByUserUpLvConsume(res.Body)
//_ = res.Reject(false)
//_ = res.Ack(true)
_ = res.Ack(true)
@@ -50,7 +45,7 @@ func CanalGuideOrderByUserUpLvConsume(queue md.MqQueue) {
fmt.Println("get msg done")
}

func handleCanalGuideOrderByUserUpLvConsume(msg []byte, geoIp2db *geoip2.DBReader) error {
func handleCanalGuideOrderByUserUpLvConsume(msg []byte) error {
//1、解析canal采集至mq中queue的数据结构体
var canalMsg *md.CanalOrderMessage[md.CanalGuideOrder]
err := json.Unmarshal(msg, &canalMsg)


Loading…
Cancel
Save