diff --git a/consume/canal_guide_order_consume.go b/consume/canal_guide_order_consume.go index b910d13..81c1771 100644 --- a/consume/canal_guide_order_consume.go +++ b/consume/canal_guide_order_consume.go @@ -34,6 +34,9 @@ func CanalGuideOrderConsume(queue md.MqQueue) { ch.Qos(1) delivery := ch.Consume(queue.Name) + geoIp2db, _ := geoip2db.NewGeoipDbByStatik() + defer geoIp2db.Close() + var res amqp.Delivery var ok bool for { @@ -41,7 +44,7 @@ func CanalGuideOrderConsume(queue md.MqQueue) { if ok == true { //fmt.Println(string(res.Body)) fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") - err = handleGuideOrdTable(res.Body) + err = handleGuideOrdTable(res.Body, geoIp2db) //_ = res.Reject(false) _ = res.Ack(true) } else { @@ -51,7 +54,7 @@ func CanalGuideOrderConsume(queue md.MqQueue) { fmt.Println("get msg done") } -func handleGuideOrdTable(msg []byte) error { +func handleGuideOrdTable(msg []byte, geoIp2db *geoip2.DBReader) error { //1、解析canal采集至mq中queue的数据结构体 var canalMsg *md.CanalOrderMessage[md.CanalGuideOrder] err := json.Unmarshal(msg, &canalMsg) @@ -59,9 +62,6 @@ func handleGuideOrdTable(msg []byte) error { return err } - geoIp2db, _ := geoip2db.NewGeoipDbByStatik() - defer geoIp2db.Close() - //2、判断操作(目前只针对insert进行修改) if canalMsg.Type == md.CanalMsgInsertSqlType { //3、TODO::判断es索引是否创建(因为这里我已经手动创建了,省略此步骤)