Browse Source

更新

one_orenge
huangjiajun 8 months ago
parent
commit
00e6685320
1 changed files with 3 additions and 4 deletions
  1. +3
    -4
      consume/canal_user_virtual_coin_flow_consume.go

+ 3
- 4
consume/canal_user_virtual_coin_flow_consume.go View File

@@ -14,7 +14,6 @@ import (
)

func CanalOneOrengeUserVirtualCoinFlowConsume(queue md.MqQueue) {
fmt.Println(">>>>>>>>>>>>CanalUserVirtualCoinFlowConsume>>>>>>>>>>>>")
ch, err := rabbit.Cfg.Pool.GetChannel()
if err != nil {
logx.Error(err)
@@ -24,7 +23,7 @@ func CanalOneOrengeUserVirtualCoinFlowConsume(queue md.MqQueue) {
//1、将自己绑定到交换机上
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
//2、取出数据进行消费
ch.Qos(1)
ch.Qos(1000)
delivery := ch.Consume(queue.Name, false)

var res amqp.Delivery
@@ -33,10 +32,10 @@ func CanalOneOrengeUserVirtualCoinFlowConsume(queue md.MqQueue) {
res, ok = <-delivery
if ok == true {
//fmt.Println(string(res.Body))
fmt.Println(">>>>>>>>>>>>>>>>CanalUserVirtualCoinFlowConsume<<<<<<<<<<<<<<<<<<<<<<<<<")
fmt.Println(">>>>>>>>>>>>>>>>CanalOneOrengeUserVirtualCoinFlowConsume<<<<<<<<<<<<<<<<<<<<<<<<<")
err = handleCanalOneOrengeUserVirtualCoinFlow(res.Body)
if err != nil {
fmt.Println("handleCanalUserVirtualCoinFlow_ERR:::::", err.Error())
fmt.Println("CanalOneOrengeUserVirtualCoinFlowConsume_ERR:::::", err.Error())
}
//_ = res.Reject(false)
err = res.Ack(true)


Loading…
Cancel
Save