diff --git a/app/db/model/user_month_amount.go b/app/db/model/user_month_amount.go new file mode 100644 index 0000000..4f914f0 --- /dev/null +++ b/app/db/model/user_month_amount.go @@ -0,0 +1,8 @@ +package model + +type UserMonthAmount struct { + Id int `json:"id" xorm:"not null pk autoincr INT(11)"` + Uid int `json:"uid" xorm:"default 0 INT(11)"` + Date int `json:"date" xorm:"comment('202301') INT(11)"` + Amount string `json:"amount" xorm:"default 0.000000 DECIMAL(30,6)"` +} diff --git a/app/db/model/user_statistics.go b/app/db/model/user_statistics.go index befbced..730b42b 100644 --- a/app/db/model/user_statistics.go +++ b/app/db/model/user_statistics.go @@ -1,8 +1,14 @@ package model type UserStatistics struct { - Id int `json:"id" xorm:"not null pk autoincr INT(11)"` - ImportFinTotal string `json:"import_fin_total" xorm:"comment('导入的累计收益') DECIMAL(30,4)"` - FinTotal string `json:"fin_total" xorm:"comment('系统统计的累计收益') DECIMAL(30,4)"` - Uid int `json:"uid" xorm:"default 0 INT(11)"` + Id int `json:"id" xorm:"not null pk autoincr INT(11)"` + ImportFinTotal string `json:"import_fin_total" xorm:"comment('导入的累计收益') DECIMAL(30,4)"` + FinTotal string `json:"fin_total" xorm:"comment('系统统计的累计收益') DECIMAL(30,4)"` + Uid int `json:"uid" xorm:"default 0 index INT(11)"` + Month string `json:"month" xorm:"default 0.0000 DECIMAL(30,4)"` + LastMonth string `json:"last_month" xorm:"default 0.0000 DECIMAL(30,4)"` + MonthChangeTime int `json:"month_change_time" xorm:"default 0 INT(11)"` + WaitSettle string `json:"wait_settle" xorm:"default 0.0000 DECIMAL(30,4)"` + Today string `json:"today" xorm:"DECIMAL(30,4)"` + Yestday string `json:"yestday" xorm:"DECIMAL(30,4)"` } diff --git a/consume/init.go b/consume/init.go index 51d741f..248c6d0 100644 --- a/consume/init.go +++ b/consume/init.go @@ -17,6 +17,7 @@ func Init() { // 增加消费任务队列 func initConsumes() { + jobs[consumeMd.ZhiosOrderTotalFunName] = ZhiosOrderTotal jobs[consumeMd.ZhiosOrderSettleTotalFunName] = ZhiosSettleTotal jobs[consumeMd.ZhiosOrderHjyFunName] = ZhiosOrderHjy diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index 8e46b33..1d5ce5e 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -299,6 +299,15 @@ var RabbitMqQueueKeyList = []*MqQueue{ BindKey: "", ConsumeFunName: "ZhiosOrderSettleTotal", }, + { + ExchangeName: "zhios.order_total.exchange", + Name: "zhios_order_total", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "order_total", + BindKey: "", + ConsumeFunName: "ZhiosOrderTotal", + }, //{ // ExchangeName: "zhios.order_buckle.exchange", // Name: "zhios_order_buckle_dev", @@ -312,6 +321,7 @@ var RabbitMqQueueKeyList = []*MqQueue{ const ( ZhiosOrderSettleTotalFunName = "ZhiosOrderSettleTotal" + ZhiosOrderTotalFunName = "ZhiosOrderTotal" ZhiosOrderHjyFunName = "ZhiosOrderHjy" ZhiosOrderBuckleFunName = "ZhiosOrderBuckle" ZhiosSupplierAfterOrderFunName = "ZhiosSupplierAfterOrder" diff --git a/consume/md/md_zhios_capital_pool_order_total.go b/consume/md/md_zhios_capital_pool_order_total.go index 3e23ffb..a6d8690 100644 --- a/consume/md/md_zhios_capital_pool_order_total.go +++ b/consume/md/md_zhios_capital_pool_order_total.go @@ -20,6 +20,7 @@ type ZhiosWithdraw struct { } type ZhiosOrderBuckle struct { Oid string `json:"oid"` + Uid string `json:"uid"` Mid string `json:"mid"` } diff --git a/consume/zhios_order_total.go b/consume/zhios_order_total.go new file mode 100644 index 0000000..e43a55d --- /dev/null +++ b/consume/zhios_order_total.go @@ -0,0 +1,166 @@ +package consume + +import ( + "applet/app/db" + "applet/app/db/model" + "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "encoding/json" + "errors" + "fmt" + "github.com/streadway/amqp" + "time" + "xorm.io/xorm" +) + +func ZhiosOrderTotal(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1) + delivery := ch.Consume(queue.Name, false) + + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + //fmt.Println(string(res.Body)) + fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<") + err = handleZhiosOrderTotal(res.Body) + //_ = res.Reject(false) + if err != nil { + _ = res.Reject(false) + //TODO::重新推回队列末尾,避免造成队列堵塞 + var msg *md.ZhiosOrderBuckle + var tmpString string + err := json.Unmarshal(res.Body, &tmpString) + if err != nil { + return + } + fmt.Println(tmpString) + err = json.Unmarshal([]byte(tmpString), &msg) + if err != nil { + return + } + ch.Publish(queue.ExchangeName, utils.SerializeStr(msg), queue.RoutKey) + } else { + _ = res.Ack(true) + } + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleZhiosOrderTotal(msg []byte) error { + time.Sleep(time.Microsecond * 20) // 等待500毫秒 + //1、解析canal采集至mq中queue的数据结构体 + var canalMsg *md.ZhiosOrderBuckle + fmt.Println(string(msg)) + var tmpString string + err := json.Unmarshal(msg, &tmpString) + if err != nil { + fmt.Println("===with", err.Error()) + return err + } + fmt.Println(tmpString) + err = json.Unmarshal([]byte(tmpString), &canalMsg) + if err != nil { + fmt.Println("===with", err.Error()) + return err + } + mid := canalMsg.Mid + eg := db.DBs[mid] + if eg == nil { + return nil + } + statUserOrderMoney(eg, canalMsg) + return nil +} +func statUserOrderMoney(eg *xorm.Engine, req *md.ZhiosOrderBuckle) (map[string]string, error) { + result := make(map[string]string) + thisMonthRange := utils.GetTimeRange("current_month") + lastMonthRange := utils.GetTimeRange("last_month") + var thisMonthData model.UserMonthAmount + thismonth := time.Unix(thisMonthRange["start"], 0).Format("200601") + eg.Where("uid=? and date=?", req.Uid, thismonth).Get(&data) + if thisMonthData.Id == 0 { + thisMonthData.Uid = utils.StrToInt(req.Uid) + thisMonthData.Date = utils.StrToInt(thismonth) + eg.Insert(&thisMonthData) + } + + var lastMonthData model.UserMonthAmount + lastmonth := time.Unix(lastMonthRange["start"], 0).Format("200601") + eg.Where("uid=? and date=?", req.Uid, lastmonth).Get(&data) + if lastMonthData.Id == 0 { + lastMonthData.Uid = utils.StrToInt(req.Uid) + lastMonthData.Date = utils.StrToInt(lastmonth) + eg.Insert(&lastMonthData) + } + types := "" + uid := req.Uid + // 获取时间范围 + + // 统计预估 (预估包含结算部分且是有效订单) + sqlTpl := `SELECT cast(SUM(LEFT(olr.amount,LENGTH(olr.amount)-2)) as decimal(50,4)) AS amount +FROM ord_list_relate olr + LEFT JOIN ord_list ol ON olr.oid = ol.ord_id + LEFT JOIN privilege_card_ord pco ON olr.oid =pco.ord_id + LEFT JOIN duoyou_ord_list dol ON olr.oid =dol.oid + LEFT JOIN recharge_order ro ON olr.oid =ro.oid + LEFT JOIN playlet_sale_order pso ON olr.oid =pso.custom_oid +WHERE olr.uid = ? %s + AND olr.create_at >= ? + AND olr.create_at < ? AND (ol.state<>4 or pco.state=1 or dol.id>0 or ro.status<>'已退款' or pso.status<>'订单退款'); +` + sqlTpl = fmt.Sprintf(sqlTpl, types) + + fmt.Println(lastMonthRange) + thisMonthResult, err := db.QueryNativeString(eg, sqlTpl, uid, thisMonthRange["start"], thisMonthRange["end"]) + if err != nil { + _ = logx.Warn(err) + result["thisMonth"] = "0" + } else { + result["thisMonth"] = thisMonthResult[0]["amount"] + } + thisMonthData.Amount = result["thisMonth"] + eg.Where("id=?", thisMonthData.Id).Update(&thisMonthData) + + // 统计上月预估 (预估包含结算部分且是有效订单) + sqlTpl2 := `SELECT cast(SUM(IF(ol.state<>4 or pco.state=1 or dol.id>0 or ro.status<>'已退款' or pso.status<>'订单退款', LEFT(olr.amount,LENGTH(olr.amount)-2), 0)) as decimal(50,4)) AS amount +FROM ord_list_relate olr + LEFT JOIN ord_list ol ON olr.oid = ol.ord_id + LEFT JOIN privilege_card_ord pco ON olr.oid =pco.ord_id + LEFT JOIN duoyou_ord_list dol ON olr.oid =dol.oid +LEFT JOIN recharge_order ro ON olr.oid =ro.oid + LEFT JOIN playlet_sale_order pso ON olr.oid =pso.custom_oid +WHERE olr.uid = ? %s + AND olr.create_at >= ? + AND olr.create_at < ? +` + sqlTpl2 = fmt.Sprintf(sqlTpl2, types) + + lastMonthResult, err := db.QueryNativeString(eg, sqlTpl2, uid, lastMonthRange["start"], lastMonthRange["end"]) + if err != nil { + _ = logx.Warn(err) + result["lastMonth"] = "0" + } else { + result["lastMonth"] = lastMonthResult[0]["amount"] + } + + lastMonthData.Amount = result["lastMonth"] + eg.Where("id=?", lastMonthData.Id).Update(&lastMonthData) + return result, nil +}