From 2506cac17a59a14e10adf451ec8fd0f6c659ab50 Mon Sep 17 00:00:00 2001 From: dengbiao Date: Sat, 27 Jul 2024 20:30:32 +0800 Subject: [PATCH] update --- consume/one_circles_sign_in_consume.go | 58 +++++++++++++------------- 1 file changed, 28 insertions(+), 30 deletions(-) diff --git a/consume/one_circles_sign_in_consume.go b/consume/one_circles_sign_in_consume.go index 2ee4b1a..d9f8808 100644 --- a/consume/one_circles_sign_in_consume.go +++ b/consume/one_circles_sign_in_consume.go @@ -12,8 +12,6 @@ import ( "errors" "fmt" "github.com/streadway/amqp" - "os" - "runtime/pprof" ) func OneCirclesSignInGreenEnergyConsume(queue md.MqQueue) { @@ -34,44 +32,44 @@ func OneCirclesSignInGreenEnergyConsume(queue md.MqQueue) { var res amqp.Delivery var ok bool - //for { - res, ok = <-delivery - if ok == true { - err = handleOneCirclesSignInGreenEnergy(ch, res.Body) - if err != nil { - fmt.Println("err ::: ", err) - utils.FilePutContents("OneCirclesSignInGreenEnergyConsume_ERR", "[err]:"+err.Error()) - _ = res.Reject(true) - //_ = res.Reject(false) - ////TODO::重新推回队列末尾,避免造成队列堵塞 - //var msg *md.OneCirclesStructForSignIn - //json.Unmarshal(res.Body, &msg) - //ch.Publish(queue.ExchangeName, msg, queue.RoutKey) + for { + res, ok = <-delivery + if ok == true { + err = handleOneCirclesSignInGreenEnergy(ch, res.Body) + if err != nil { + fmt.Println("err ::: ", err) + utils.FilePutContents("OneCirclesSignInGreenEnergyConsume_ERR", "[err]:"+err.Error()) + _ = res.Reject(true) + //_ = res.Reject(false) + ////TODO::重新推回队列末尾,避免造成队列堵塞 + //var msg *md.OneCirclesStructForSignIn + //json.Unmarshal(res.Body, &msg) + //ch.Publish(queue.ExchangeName, msg, queue.RoutKey) + } else { + _ = res.Ack(true) + } } else { - _ = res.Ack(true) + panic(errors.New("error getting message")) } - } else { - panic(errors.New("error getting message")) } - //} fmt.Println("get msg done") } func handleOneCirclesSignInGreenEnergy(ch *rabbit.Channel, msgData []byte) error { - cpuProfile, err := os.Create("./cpu_profile") - if err != nil { - fmt.Printf("创建文件失败:%s", err.Error()) - return err - } - defer cpuProfile.Close() - - //采集CPU信息 - pprof.StartCPUProfile(cpuProfile) - defer pprof.StopCPUProfile() + //cpuProfile, err := os.Create("./cpu_profile") + //if err != nil { + // fmt.Printf("创建文件失败:%s", err.Error()) + // return err + //} + //defer cpuProfile.Close() + // + ////采集CPU信息 + //pprof.StartCPUProfile(cpuProfile) + //defer pprof.StopCPUProfile() //1、解析mq中queue的数据结构体 var msg *md.OneCirclesStructForSignIn - err = json.Unmarshal(msgData, &msg) + err := json.Unmarshal(msgData, &msg) if err != nil { return err }