huangjiajun 1 год назад
Родитель
Сommit
a33466ae3e
3 измененных файлов: 43 добавлений и 0 удалений
  1. +1
    -0
      consume/init.go
  2. +10
    -0
      consume/md/consume_key.go
  3. +32
    -0
      consume/zhios_tikTok_goods_update.go

+ 1
- 0
consume/init.go Просмотреть файл

@@ -37,6 +37,7 @@ func initConsumes() {

jobs[consumeMd.CloudIssuanceAsyncMLoginFunName] = CloudIssuanceAsyncMLoginConsume
jobs[consumeMd.ZhiosTikTokUpdateFunName] = ZhiosTikTokUpdate
jobs[consumeMd.ZhiosTikTokAllUpdateFunName] = ZhiosTikTokAllUpdate

}



+ 10
- 0
consume/md/consume_key.go Просмотреть файл

@@ -156,6 +156,15 @@ var RabbitMqQueueKeyList = []*MqQueue{
BindKey: "",
ConsumeFunName: "ZhiosTikTokUpdate",
},
{
ExchangeName: "zhios.tikTok.exchange",
Name: "zhios_tikTok_all_update",
Type: DirectQueueType,
IsPersistent: false,
RoutKey: "all_update",
BindKey: "",
ConsumeFunName: "ZhiosTikTokAllUpdate",
},
}

const (
@@ -174,5 +183,6 @@ const (
YoumishangExchangeStoreFunName = "YoumishangExchangeStore"
ZhiosRechargeOrderFailFunName = "ZhiosRechargeOrderFail"
ZhiosTikTokUpdateFunName = "ZhiosTikTokUpdate"
ZhiosTikTokAllUpdateFunName = "ZhiosTikTokAllUpdate"
CloudIssuanceAsyncMLoginFunName = "CloudIssuanceAsyncMLoginConsume"
)

+ 32
- 0
consume/zhios_tikTok_goods_update.go Просмотреть файл

@@ -46,6 +46,38 @@ func ZhiosTikTokUpdate(queue md.MqQueue) {
}
fmt.Println("get msg done")
}
func ZhiosTikTokAllUpdate(queue md.MqQueue) {
fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>")
ch, err := rabbit.Cfg.Pool.GetChannel()
if err != nil {
logx.Error(err)
return
}
defer ch.Release()
//1、将自己绑定到交换机上
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
//2、取出数据进行消费
ch.Qos(15)
delivery := ch.Consume(queue.Name, false)

var res amqp.Delivery
var ok bool
for {
res, ok = <-delivery
if ok == true {
//fmt.Println(string(res.Body))
fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<")
err = handleZhiosTikTokGoodsUpdate(res.Body)
//_ = res.Reject(false)
//if err == nil {
_ = res.Ack(true)
//}
} else {
panic(errors.New("error getting message"))
}
}
fmt.Println("get msg done")
}

func handleZhiosTikTokGoodsUpdate(msg []byte) error {
//1、解析canal采集至mq中queue的数据结构体


Загрузка…
Отмена
Сохранить