|
|
@@ -0,0 +1,166 @@ |
|
|
|
package consume |
|
|
|
|
|
|
|
import ( |
|
|
|
"applet/app/db" |
|
|
|
"applet/app/db/model" |
|
|
|
"applet/app/utils" |
|
|
|
"applet/app/utils/logx" |
|
|
|
"applet/consume/md" |
|
|
|
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" |
|
|
|
"encoding/json" |
|
|
|
"errors" |
|
|
|
"fmt" |
|
|
|
"github.com/streadway/amqp" |
|
|
|
"time" |
|
|
|
"xorm.io/xorm" |
|
|
|
) |
|
|
|
|
|
|
|
func ZhiosOrderTotal(queue md.MqQueue) { |
|
|
|
fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>") |
|
|
|
ch, err := rabbit.Cfg.Pool.GetChannel() |
|
|
|
if err != nil { |
|
|
|
logx.Error(err) |
|
|
|
return |
|
|
|
} |
|
|
|
defer ch.Release() |
|
|
|
//1、将自己绑定到交换机上 |
|
|
|
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) |
|
|
|
//2、取出数据进行消费 |
|
|
|
ch.Qos(1) |
|
|
|
delivery := ch.Consume(queue.Name, false) |
|
|
|
|
|
|
|
var res amqp.Delivery |
|
|
|
var ok bool |
|
|
|
for { |
|
|
|
res, ok = <-delivery |
|
|
|
if ok == true { |
|
|
|
//fmt.Println(string(res.Body)) |
|
|
|
fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<") |
|
|
|
err = handleZhiosOrderTotal(res.Body) |
|
|
|
//_ = res.Reject(false) |
|
|
|
if err != nil { |
|
|
|
_ = res.Reject(false) |
|
|
|
//TODO::重新推回队列末尾,避免造成队列堵塞 |
|
|
|
var msg *md.ZhiosOrderBuckle |
|
|
|
var tmpString string |
|
|
|
err := json.Unmarshal(res.Body, &tmpString) |
|
|
|
if err != nil { |
|
|
|
return |
|
|
|
} |
|
|
|
fmt.Println(tmpString) |
|
|
|
err = json.Unmarshal([]byte(tmpString), &msg) |
|
|
|
if err != nil { |
|
|
|
return |
|
|
|
} |
|
|
|
ch.Publish(queue.ExchangeName, utils.SerializeStr(msg), queue.RoutKey) |
|
|
|
} else { |
|
|
|
_ = res.Ack(true) |
|
|
|
} |
|
|
|
} else { |
|
|
|
panic(errors.New("error getting message")) |
|
|
|
} |
|
|
|
} |
|
|
|
fmt.Println("get msg done") |
|
|
|
} |
|
|
|
|
|
|
|
func handleZhiosOrderTotal(msg []byte) error { |
|
|
|
time.Sleep(time.Microsecond * 20) // 等待500毫秒 |
|
|
|
//1、解析canal采集至mq中queue的数据结构体 |
|
|
|
var canalMsg *md.ZhiosOrderBuckle |
|
|
|
fmt.Println(string(msg)) |
|
|
|
var tmpString string |
|
|
|
err := json.Unmarshal(msg, &tmpString) |
|
|
|
if err != nil { |
|
|
|
fmt.Println("===with", err.Error()) |
|
|
|
return err |
|
|
|
} |
|
|
|
fmt.Println(tmpString) |
|
|
|
err = json.Unmarshal([]byte(tmpString), &canalMsg) |
|
|
|
if err != nil { |
|
|
|
fmt.Println("===with", err.Error()) |
|
|
|
return err |
|
|
|
} |
|
|
|
mid := canalMsg.Mid |
|
|
|
eg := db.DBs[mid] |
|
|
|
if eg == nil { |
|
|
|
return nil |
|
|
|
} |
|
|
|
statUserOrderMoney(eg, canalMsg) |
|
|
|
return nil |
|
|
|
} |
|
|
|
func statUserOrderMoney(eg *xorm.Engine, req *md.ZhiosOrderBuckle) (map[string]string, error) { |
|
|
|
result := make(map[string]string) |
|
|
|
thisMonthRange := utils.GetTimeRange("current_month") |
|
|
|
lastMonthRange := utils.GetTimeRange("last_month") |
|
|
|
var thisMonthData model.UserMonthAmount |
|
|
|
thismonth := time.Unix(thisMonthRange["start"], 0).Format("200601") |
|
|
|
eg.Where("uid=? and date=?", req.Uid, thismonth).Get(&data) |
|
|
|
if thisMonthData.Id == 0 { |
|
|
|
thisMonthData.Uid = utils.StrToInt(req.Uid) |
|
|
|
thisMonthData.Date = utils.StrToInt(thismonth) |
|
|
|
eg.Insert(&thisMonthData) |
|
|
|
} |
|
|
|
|
|
|
|
var lastMonthData model.UserMonthAmount |
|
|
|
lastmonth := time.Unix(lastMonthRange["start"], 0).Format("200601") |
|
|
|
eg.Where("uid=? and date=?", req.Uid, lastmonth).Get(&data) |
|
|
|
if lastMonthData.Id == 0 { |
|
|
|
lastMonthData.Uid = utils.StrToInt(req.Uid) |
|
|
|
lastMonthData.Date = utils.StrToInt(lastmonth) |
|
|
|
eg.Insert(&lastMonthData) |
|
|
|
} |
|
|
|
types := "" |
|
|
|
uid := req.Uid |
|
|
|
// 获取时间范围 |
|
|
|
|
|
|
|
// 统计预估 (预估包含结算部分且是有效订单) |
|
|
|
sqlTpl := `SELECT cast(SUM(LEFT(olr.amount,LENGTH(olr.amount)-2)) as decimal(50,4)) AS amount |
|
|
|
FROM ord_list_relate olr |
|
|
|
LEFT JOIN ord_list ol ON olr.oid = ol.ord_id |
|
|
|
LEFT JOIN privilege_card_ord pco ON olr.oid =pco.ord_id |
|
|
|
LEFT JOIN duoyou_ord_list dol ON olr.oid =dol.oid |
|
|
|
LEFT JOIN recharge_order ro ON olr.oid =ro.oid |
|
|
|
LEFT JOIN playlet_sale_order pso ON olr.oid =pso.custom_oid |
|
|
|
WHERE olr.uid = ? %s |
|
|
|
AND olr.create_at >= ? |
|
|
|
AND olr.create_at < ? AND (ol.state<>4 or pco.state=1 or dol.id>0 or ro.status<>'已退款' or pso.status<>'订单退款'); |
|
|
|
` |
|
|
|
sqlTpl = fmt.Sprintf(sqlTpl, types) |
|
|
|
|
|
|
|
fmt.Println(lastMonthRange) |
|
|
|
thisMonthResult, err := db.QueryNativeString(eg, sqlTpl, uid, thisMonthRange["start"], thisMonthRange["end"]) |
|
|
|
if err != nil { |
|
|
|
_ = logx.Warn(err) |
|
|
|
result["thisMonth"] = "0" |
|
|
|
} else { |
|
|
|
result["thisMonth"] = thisMonthResult[0]["amount"] |
|
|
|
} |
|
|
|
thisMonthData.Amount = result["thisMonth"] |
|
|
|
eg.Where("id=?", thisMonthData.Id).Update(&thisMonthData) |
|
|
|
|
|
|
|
// 统计上月预估 (预估包含结算部分且是有效订单) |
|
|
|
sqlTpl2 := `SELECT cast(SUM(IF(ol.state<>4 or pco.state=1 or dol.id>0 or ro.status<>'已退款' or pso.status<>'订单退款', LEFT(olr.amount,LENGTH(olr.amount)-2), 0)) as decimal(50,4)) AS amount |
|
|
|
FROM ord_list_relate olr |
|
|
|
LEFT JOIN ord_list ol ON olr.oid = ol.ord_id |
|
|
|
LEFT JOIN privilege_card_ord pco ON olr.oid =pco.ord_id |
|
|
|
LEFT JOIN duoyou_ord_list dol ON olr.oid =dol.oid |
|
|
|
LEFT JOIN recharge_order ro ON olr.oid =ro.oid |
|
|
|
LEFT JOIN playlet_sale_order pso ON olr.oid =pso.custom_oid |
|
|
|
WHERE olr.uid = ? %s |
|
|
|
AND olr.create_at >= ? |
|
|
|
AND olr.create_at < ? |
|
|
|
` |
|
|
|
sqlTpl2 = fmt.Sprintf(sqlTpl2, types) |
|
|
|
|
|
|
|
lastMonthResult, err := db.QueryNativeString(eg, sqlTpl2, uid, lastMonthRange["start"], lastMonthRange["end"]) |
|
|
|
if err != nil { |
|
|
|
_ = logx.Warn(err) |
|
|
|
result["lastMonth"] = "0" |
|
|
|
} else { |
|
|
|
result["lastMonth"] = lastMonthResult[0]["amount"] |
|
|
|
} |
|
|
|
|
|
|
|
lastMonthData.Amount = result["lastMonth"] |
|
|
|
eg.Where("id=?", lastMonthData.Id).Update(&lastMonthData) |
|
|
|
return result, nil |
|
|
|
} |