|
@@ -25,7 +25,7 @@ func ZhiosSettleTotal(queue md.MqQueue) { |
|
|
//1、将自己绑定到交换机上 |
|
|
//1、将自己绑定到交换机上 |
|
|
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) |
|
|
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) |
|
|
//2、取出数据进行消费 |
|
|
//2、取出数据进行消费 |
|
|
ch.Qos(1000) |
|
|
|
|
|
|
|
|
ch.Qos(1) |
|
|
delivery := ch.Consume(queue.Name, false) |
|
|
delivery := ch.Consume(queue.Name, false) |
|
|
|
|
|
|
|
|
var res amqp.Delivery |
|
|
var res amqp.Delivery |
|
@@ -38,6 +38,7 @@ func ZhiosSettleTotal(queue md.MqQueue) { |
|
|
err = handleZhiosOrderSettleTotal(res.Body) |
|
|
err = handleZhiosOrderSettleTotal(res.Body) |
|
|
//_ = res.Reject(false) |
|
|
//_ = res.Reject(false) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
|
|
|
fmt.Println("==================", err.Error()) |
|
|
_ = res.Reject(false) |
|
|
_ = res.Reject(false) |
|
|
//TODO::重新推回队列末尾,避免造成队列堵塞 |
|
|
//TODO::重新推回队列末尾,避免造成队列堵塞 |
|
|
var msg *md.ZhiosOrderBuckle |
|
|
var msg *md.ZhiosOrderBuckle |
|
@@ -63,7 +64,7 @@ func ZhiosSettleTotal(queue md.MqQueue) { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func handleZhiosOrderSettleTotal(msg []byte) error { |
|
|
func handleZhiosOrderSettleTotal(msg []byte) error { |
|
|
time.Sleep(time.Microsecond * 20) // 等待500毫秒 |
|
|
|
|
|
|
|
|
//time.Sleep(time.Microsecond * 20) // 等待500毫秒 |
|
|
//1、解析canal采集至mq中queue的数据结构体 |
|
|
//1、解析canal采集至mq中queue的数据结构体 |
|
|
var canalMsg *md.ZhiosOrderBuckle |
|
|
var canalMsg *md.ZhiosOrderBuckle |
|
|
fmt.Println(string(msg)) |
|
|
fmt.Println(string(msg)) |
|
@@ -87,18 +88,24 @@ func handleZhiosOrderSettleTotal(msg []byte) error { |
|
|
sess := eg.NewSession() |
|
|
sess := eg.NewSession() |
|
|
defer sess.Close() |
|
|
defer sess.Close() |
|
|
sess.Begin() |
|
|
sess.Begin() |
|
|
|
|
|
|
|
|
|
|
|
times := time.Now() |
|
|
|
|
|
fmt.Println("==================", time.Since(times)) |
|
|
ordData, err := db.OrderListByPvdOid(sess, canalMsg.Oid) |
|
|
ordData, err := db.OrderListByPvdOid(sess, canalMsg.Oid) |
|
|
if err != nil || ordData == nil { |
|
|
if err != nil || ordData == nil { |
|
|
return nil |
|
|
return nil |
|
|
} |
|
|
} |
|
|
|
|
|
fmt.Println("==================1", time.Since(times)) |
|
|
|
|
|
|
|
|
var ordRelate = make([]model.OrdListRelate, 0) |
|
|
var ordRelate = make([]model.OrdListRelate, 0) |
|
|
sess.Where("oid=?", canalMsg.Oid).Find(&ordRelate) |
|
|
sess.Where("oid=?", canalMsg.Oid).Find(&ordRelate) |
|
|
|
|
|
fmt.Println("==================2", time.Since(times)) |
|
|
|
|
|
|
|
|
if len(ordRelate) > 0 { |
|
|
if len(ordRelate) > 0 { |
|
|
for _, v := range ordRelate { |
|
|
for _, v := range ordRelate { |
|
|
if v.Amount == 0 && utils.StrToFloat64(v.AdditionalSubsidy) == 0 { |
|
|
if v.Amount == 0 && utils.StrToFloat64(v.AdditionalSubsidy) == 0 { |
|
|
continue |
|
|
continue |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
var userStatistics model.UserStatistics |
|
|
var userStatistics model.UserStatistics |
|
|
sess.Where("uid=?", v.Uid).Get(&userStatistics) |
|
|
sess.Where("uid=?", v.Uid).Get(&userStatistics) |
|
|
if userStatistics.Id == 0 { |
|
|
if userStatistics.Id == 0 { |
|
@@ -118,6 +125,8 @@ func handleZhiosOrderSettleTotal(msg []byte) error { |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
return err |
|
|
return err |
|
|
} |
|
|
} |
|
|
|
|
|
fmt.Println("==================3", time.Since(times)) |
|
|
|
|
|
|
|
|
var userStatistics1 model.UserWaitAmount |
|
|
var userStatistics1 model.UserWaitAmount |
|
|
sess.Where("uid=? ", v.Uid).Get(&userStatistics1) |
|
|
sess.Where("uid=? ", v.Uid).Get(&userStatistics1) |
|
|
if userStatistics1.Id == 0 { |
|
|
if userStatistics1.Id == 0 { |
|
@@ -129,6 +138,8 @@ func handleZhiosOrderSettleTotal(msg []byte) error { |
|
|
sess.Rollback() |
|
|
sess.Rollback() |
|
|
return err |
|
|
return err |
|
|
} |
|
|
} |
|
|
|
|
|
fmt.Println("==================4", time.Since(times)) |
|
|
|
|
|
|
|
|
//if update1 == 0 { |
|
|
//if update1 == 0 { |
|
|
// sess.Rollback() |
|
|
// sess.Rollback() |
|
|
// return errors.New("失败") |
|
|
// return errors.New("失败") |
|
@@ -140,6 +151,8 @@ func handleZhiosOrderSettleTotal(msg []byte) error { |
|
|
} |
|
|
} |
|
|
now := time.Unix(int64(ordData.SettleAt), 0).Format("200601") |
|
|
now := time.Unix(int64(ordData.SettleAt), 0).Format("200601") |
|
|
sess.Where("uid=? and date=?", v.Uid, now).Get(&userStatistics2) |
|
|
sess.Where("uid=? and date=?", v.Uid, now).Get(&userStatistics2) |
|
|
|
|
|
fmt.Println("==================31", time.Since(times)) |
|
|
|
|
|
|
|
|
if userStatistics2.Id == 0 { |
|
|
if userStatistics2.Id == 0 { |
|
|
userStatistics2 = model.UserMonthAmount{ |
|
|
userStatistics2 = model.UserMonthAmount{ |
|
|
Date: utils.StrToInt(now), |
|
|
Date: utils.StrToInt(now), |
|
@@ -155,12 +168,16 @@ func handleZhiosOrderSettleTotal(msg []byte) error { |
|
|
return errors.New("失败") |
|
|
return errors.New("失败") |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
fmt.Println("==================41", time.Since(times)) |
|
|
|
|
|
|
|
|
userStatistics2.SettleAmount = utils.Float64ToStrByPrec(utils.StrToFloat64(userStatistics2.SettleAmount)+v.Amount+utils.StrToFloat64(v.AdditionalSubsidy), 4) |
|
|
userStatistics2.SettleAmount = utils.Float64ToStrByPrec(utils.StrToFloat64(userStatistics2.SettleAmount)+v.Amount+utils.StrToFloat64(v.AdditionalSubsidy), 4) |
|
|
_, err2 := eg.Where("id=?", userStatistics2.Id).Cols("settle_amount").Update(&userStatistics2) |
|
|
|
|
|
|
|
|
_, err2 := sess.Where("id=?", userStatistics2.Id).Cols("settle_amount").Update(&userStatistics2) |
|
|
if err2 != nil { |
|
|
if err2 != nil { |
|
|
sess.Rollback() |
|
|
sess.Rollback() |
|
|
return err2 |
|
|
return err2 |
|
|
} |
|
|
} |
|
|
|
|
|
fmt.Println("==================5", time.Since(times)) |
|
|
|
|
|
|
|
|
//if update2 == 0 { |
|
|
//if update2 == 0 { |
|
|
// sess.Rollback() |
|
|
// sess.Rollback() |
|
|
// return errors.New("失败") |
|
|
// return errors.New("失败") |
|
@@ -168,5 +185,6 @@ func handleZhiosOrderSettleTotal(msg []byte) error { |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
sess.Commit() |
|
|
sess.Commit() |
|
|
|
|
|
|
|
|
return nil |
|
|
return nil |
|
|
} |
|
|
} |