diff --git a/consume/init.go b/consume/init.go index 5bcaceb..e8df20c 100644 --- a/consume/init.go +++ b/consume/init.go @@ -25,7 +25,7 @@ func initConsumes() { jobs[consumeMd.ZhiosOrderTotalFunName] = ZhiosOrderTotal jobs[consumeMd.ZhiosOrderTotalSecondFunName] = ZhiosOrderTotalSecond // - //jobs[consumeMd.ZhiosOrderSettleTotalFunName] = ZhiosSettleTotal + jobs[consumeMd.ZhiosOrderSettleTotalFunName] = ZhiosSettleTotal jobs[consumeMd.ZhiosOrderHjyFunName] = ZhiosOrderHjy jobs[consumeMd.ZhiosOrderBuckleFunName] = ZhiosOrderBuckle // diff --git a/consume/zhios_order_settle_total.go b/consume/zhios_order_settle_total.go index cab364d..11ece81 100644 --- a/consume/zhios_order_settle_total.go +++ b/consume/zhios_order_settle_total.go @@ -25,7 +25,7 @@ func ZhiosSettleTotal(queue md.MqQueue) { //1、将自己绑定到交换机上 ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) //2、取出数据进行消费 - ch.Qos(1000) + ch.Qos(1) delivery := ch.Consume(queue.Name, false) var res amqp.Delivery @@ -38,6 +38,7 @@ func ZhiosSettleTotal(queue md.MqQueue) { err = handleZhiosOrderSettleTotal(res.Body) //_ = res.Reject(false) if err != nil { + fmt.Println("==================", err.Error()) _ = res.Reject(false) //TODO::重新推回队列末尾,避免造成队列堵塞 var msg *md.ZhiosOrderBuckle @@ -63,7 +64,7 @@ func ZhiosSettleTotal(queue md.MqQueue) { } func handleZhiosOrderSettleTotal(msg []byte) error { - time.Sleep(time.Microsecond * 20) // 等待500毫秒 + //time.Sleep(time.Microsecond * 20) // 等待500毫秒 //1、解析canal采集至mq中queue的数据结构体 var canalMsg *md.ZhiosOrderBuckle fmt.Println(string(msg)) @@ -87,18 +88,24 @@ func handleZhiosOrderSettleTotal(msg []byte) error { sess := eg.NewSession() defer sess.Close() sess.Begin() - + times := time.Now() + fmt.Println("==================", time.Since(times)) ordData, err := db.OrderListByPvdOid(sess, canalMsg.Oid) if err != nil || ordData == nil { return nil } + fmt.Println("==================1", time.Since(times)) + var ordRelate = make([]model.OrdListRelate, 0) sess.Where("oid=?", canalMsg.Oid).Find(&ordRelate) + fmt.Println("==================2", time.Since(times)) + if len(ordRelate) > 0 { for _, v := range ordRelate { if v.Amount == 0 && utils.StrToFloat64(v.AdditionalSubsidy) == 0 { continue } + var userStatistics model.UserStatistics sess.Where("uid=?", v.Uid).Get(&userStatistics) if userStatistics.Id == 0 { @@ -118,6 +125,8 @@ func handleZhiosOrderSettleTotal(msg []byte) error { if err != nil { return err } + fmt.Println("==================3", time.Since(times)) + var userStatistics1 model.UserWaitAmount sess.Where("uid=? ", v.Uid).Get(&userStatistics1) if userStatistics1.Id == 0 { @@ -129,6 +138,8 @@ func handleZhiosOrderSettleTotal(msg []byte) error { sess.Rollback() return err } + fmt.Println("==================4", time.Since(times)) + //if update1 == 0 { // sess.Rollback() // return errors.New("失败") @@ -140,6 +151,8 @@ func handleZhiosOrderSettleTotal(msg []byte) error { } now := time.Unix(int64(ordData.SettleAt), 0).Format("200601") sess.Where("uid=? and date=?", v.Uid, now).Get(&userStatistics2) + fmt.Println("==================31", time.Since(times)) + if userStatistics2.Id == 0 { userStatistics2 = model.UserMonthAmount{ Date: utils.StrToInt(now), @@ -155,12 +168,16 @@ func handleZhiosOrderSettleTotal(msg []byte) error { return errors.New("失败") } } + fmt.Println("==================41", time.Since(times)) + userStatistics2.SettleAmount = utils.Float64ToStrByPrec(utils.StrToFloat64(userStatistics2.SettleAmount)+v.Amount+utils.StrToFloat64(v.AdditionalSubsidy), 4) - _, err2 := eg.Where("id=?", userStatistics2.Id).Cols("settle_amount").Update(&userStatistics2) + _, err2 := sess.Where("id=?", userStatistics2.Id).Cols("settle_amount").Update(&userStatistics2) if err2 != nil { sess.Rollback() return err2 } + fmt.Println("==================5", time.Since(times)) + //if update2 == 0 { // sess.Rollback() // return errors.New("失败") @@ -168,5 +185,6 @@ func handleZhiosOrderSettleTotal(msg []byte) error { } } sess.Commit() + return nil }