DengBiao před 1 rokem
rodič
revize
b54a706b8c
1 změnil soubory, kde provedl 5 přidání a 5 odebrání
  1. +5
    -5
      consume/canal_guide_order_consume.go

+ 5
- 5
consume/canal_guide_order_consume.go Zobrazit soubor

@@ -34,6 +34,9 @@ func CanalGuideOrderConsume(queue md.MqQueue) {
ch.Qos(1)
delivery := ch.Consume(queue.Name)

geoIp2db, _ := geoip2db.NewGeoipDbByStatik()
defer geoIp2db.Close()

var res amqp.Delivery
var ok bool
for {
@@ -41,7 +44,7 @@ func CanalGuideOrderConsume(queue md.MqQueue) {
if ok == true {
//fmt.Println(string(res.Body))
fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
err = handleGuideOrdTable(res.Body)
err = handleGuideOrdTable(res.Body, geoIp2db)
//_ = res.Reject(false)
_ = res.Ack(true)
} else {
@@ -51,7 +54,7 @@ func CanalGuideOrderConsume(queue md.MqQueue) {
fmt.Println("get msg done")
}

func handleGuideOrdTable(msg []byte) error {
func handleGuideOrdTable(msg []byte, geoIp2db *geoip2.DBReader) error {
//1、解析canal采集至mq中queue的数据结构体
var canalMsg *md.CanalOrderMessage[md.CanalGuideOrder]
err := json.Unmarshal(msg, &canalMsg)
@@ -59,9 +62,6 @@ func handleGuideOrdTable(msg []byte) error {
return err
}

geoIp2db, _ := geoip2db.NewGeoipDbByStatik()
defer geoIp2db.Close()

//2、判断操作(目前只针对insert进行修改)
if canalMsg.Type == md.CanalMsgInsertSqlType {
//3、TODO::判断es索引是否创建(因为这里我已经手动创建了,省略此步骤)


Načítá se…
Zrušit
Uložit