From a337f7396197293f32e82877aedf9a5688307d2d Mon Sep 17 00:00:00 2001 From: huangjiajun <582604932@qq.com> Date: Tue, 4 Jul 2023 10:51:39 +0800 Subject: [PATCH] =?UTF-8?q?=E5=85=A8=E7=90=83=E5=88=86=E7=BA=A2=E8=AE=A2?= =?UTF-8?q?=E5=8D=95=E7=BB=9F=E8=AE=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/db/model/capital_pool_order_total.go | 15 ++ app/utils/convert.go | 4 +- consume/init.go | 2 + consume/md/consume_key.go | 10 ++ .../md/md_zhios_capital_pool_order_total.go | 10 ++ consume/zhios_capital_pool_order_total.go | 168 ++++++++++++++++++ 6 files changed, 208 insertions(+), 1 deletion(-) create mode 100644 app/db/model/capital_pool_order_total.go create mode 100644 consume/md/md_zhios_capital_pool_order_total.go create mode 100644 consume/zhios_capital_pool_order_total.go diff --git a/app/db/model/capital_pool_order_total.go b/app/db/model/capital_pool_order_total.go new file mode 100644 index 0000000..04c0534 --- /dev/null +++ b/app/db/model/capital_pool_order_total.go @@ -0,0 +1,15 @@ +package model + +import "time" + +type CapitalPoolOrderTotal struct { + Id int64 `json:"id" xorm:"pk comment('订单id') BIGINT(22)"` + Uid int `json:"uid" xorm:"not null default 0 comment('uid') INT(10)"` + Level int `json:"level" xorm:" default 0 comment('') INT(10)"` + LevelType int `json:"level_type" xorm:" default 0 comment('') INT(10)"` + Sum string `json:"sum" xorm:"not null default 0.00 comment('') DECIMAL(10,2)"` + AllSum string `json:"all_sum" xorm:" default 0.00 comment('') DECIMAL(10,2)"` + RunTime string `json:"run_time" xorm:" default '' comment('') VARCHAR(255)"` + CreateTime string `json:"create_time" xorm:" default '' comment('') VARCHAR(255)"` + UpdateTime time.Time `json:"update_time" xorm:"comment('') TIMESTAMP"` +} diff --git a/app/utils/convert.go b/app/utils/convert.go index a638d37..04cab78 100644 --- a/app/utils/convert.go +++ b/app/utils/convert.go @@ -22,7 +22,9 @@ func ToInt64(raw interface{}, e error) int64 { } return AnyToInt64(raw) } - +func Float64ToStrByPrec(f float64, prec int) string { + return strconv.FormatFloat(f, 'f', prec, 64) +} func AnyToBool(raw interface{}) bool { switch i := raw.(type) { case float32, float64, int, int64, uint, uint8, uint16, uint32, uint64, int8, int16, int32: diff --git a/consume/init.go b/consume/init.go index 146dc43..9d55a56 100644 --- a/consume/init.go +++ b/consume/init.go @@ -40,6 +40,8 @@ func initConsumes() { jobs[consumeMd.ZhiosTikTokUpdateFunName] = ZhiosTikTokUpdate jobs[consumeMd.ZhiosTikTokAllUpdateFunName] = ZhiosTikTokAllUpdate + jobs[consumeMd.ZhiosCapitalPoolOrderTotalFunName] = ZhiosCapitalPoolOrderTotal + } func Run() { diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index 930ce29..b3d6020 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -165,6 +165,15 @@ var RabbitMqQueueKeyList = []*MqQueue{ BindKey: "", ConsumeFunName: "ZhiosTikTokAllUpdate", }, + { + ExchangeName: "zhios.capital_pool.order_total.exchange", + Name: "zhios_capital_pool_order_total", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "order_total", + BindKey: "", + ConsumeFunName: "ZhiosCapitalPoolOrderTotal", + }, } const ( @@ -182,6 +191,7 @@ const ( CanalMallOrdForYouMiShangFunName = "CanalMallOrdForYouMiShang" YoumishangExchangeStoreFunName = "YoumishangExchangeStore" ZhiosRechargeOrderFailFunName = "ZhiosRechargeOrderFail" + ZhiosCapitalPoolOrderTotalFunName = "ZhiosCapitalPoolOrderTotal" ZhiosTikTokUpdateFunName = "ZhiosTikTokUpdate" ZhiosTikTokAllUpdateFunName = "ZhiosTikTokAllUpdate" CloudIssuanceAsyncMLoginFunName = "CloudIssuanceAsyncMLoginConsume" diff --git a/consume/md/md_zhios_capital_pool_order_total.go b/consume/md/md_zhios_capital_pool_order_total.go new file mode 100644 index 0000000..a7e21b9 --- /dev/null +++ b/consume/md/md_zhios_capital_pool_order_total.go @@ -0,0 +1,10 @@ +package md + +type ZhiosCapitalPoolOrderTotal struct { + Uid []string `json:"uid"` + Mid string `json:"mid"` + Runtime int64 `json:"runtime"` + TotalTime int64 `json:"totalTime"` + BonusLevelType int `json:"bonusLevelType"` + Level string `json:"level"` +} diff --git a/consume/zhios_capital_pool_order_total.go b/consume/zhios_capital_pool_order_total.go new file mode 100644 index 0000000..d5560ac --- /dev/null +++ b/consume/zhios_capital_pool_order_total.go @@ -0,0 +1,168 @@ +package consume + +import ( + "applet/app/db" + model2 "applet/app/db/model" + "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "encoding/json" + "errors" + "fmt" + "github.com/streadway/amqp" + "github.com/syyongx/php2go" + "time" + "xorm.io/xorm" +) + +func ZhiosCapitalPoolOrderTotal(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(20) + delivery := ch.Consume(queue.Name, false) + + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + //fmt.Println(string(res.Body)) + fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<") + err = handleZhiosCapitalPoolOrderTotal(res.Body) + //_ = res.Reject(false) + if err == nil { + _ = res.Ack(true) + } + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleZhiosCapitalPoolOrderTotal(msg []byte) error { + //1、解析canal采集至mq中queue的数据结构体 + var canalMsg *md.ZhiosCapitalPoolOrderTotal + fmt.Println(string(msg)) + var tmpString string + err := json.Unmarshal(msg, &tmpString) + if err != nil { + fmt.Println(err.Error()) + return err + } + fmt.Println(tmpString) + err = json.Unmarshal([]byte(tmpString), &canalMsg) + if err != nil { + return err + } + mid := canalMsg.Mid + eg := db.DBs[mid] + err = AddUserOrdTotal(eg, canalMsg.Mid, canalMsg.Uid, canalMsg.Runtime, canalMsg.TotalTime, canalMsg.BonusLevelType, "0") + if err != nil { + fmt.Println(err) + return err + } + return nil +} +func AddUserOrdTotal(eg *xorm.Engine, dbName string, uids []string, runtime, totalTime int64, leveType int, level string) error { + if len(uids) == 0 { + return nil + } + m := GetUserOrdTotal(eg, uids, totalTime) + var data []model2.CapitalPoolOrderTotal + err := eg.In("uid", uids).And("create_time=? and level_type=? and level=?", time.Unix(runtime, 0).Format("2006-01-02"), leveType, level).Find(&data) + var userData = make(map[string]model2.CapitalPoolOrderTotal) + if err == nil { + for _, v := range data { + userData[utils.IntToStr(v.Uid)] = v + } + } + for k, v := range m { + user, ok := userData[k] + if ok { + user.Sum = utils.Float64ToStrByPrec(v, 2) + user.UpdateTime = time.Now() + eg.Where("id=?", user.Id).Cols("sum,update_time").Update(&user) + } else { + fmt.Println(time.Unix(runtime, 0).Format("2006-01-02")) + var userDatas = &model2.CapitalPoolOrderTotal{ + Uid: utils.StrToInt(k), + Sum: utils.Float64ToStrByPrec(v, 2), + CreateTime: time.Unix(runtime, 0).Format("2006-01-02"), + UpdateTime: time.Now(), + LevelType: leveType, + Level: utils.StrToInt(level), + } + fmt.Println(userDatas) + _, err := eg.InsertOne(userDatas) + if err != nil { + utils.FilePutContents(dbName+"capital", utils.SerializeStr(userDatas)) + utils.FilePutContents(dbName+"capital", err.Error()) + return err + } + } + } + sum, _ := eg.Where("create_time=? and level_type=? and level=?", time.Unix(runtime, 0).Format("2006-01-02"), leveType, level).Sum(&model2.CapitalPoolOrderTotal{}, "sum") + fmt.Println(sum) + + sql := `UPDATE capital_pool_order_total SET all_sum=%f WHERE create_time='%s' and level_type=%d and level=%s;` + sql = fmt.Sprintf(sql, sum, time.Unix(runtime, 0).Format("2006-01-02"), leveType, level) + db.QueryNativeString(eg, sql) + return nil +} + +func GetUserOrdTotal(eg *xorm.Engine, uids []string, totalTime int64) map[string]float64 { + totalTimeStr := time.Unix(totalTime, 0) + var userMap = make(map[string]float64, 0) + if len(uids) == 0 { + return userMap + } + uidStr := php2go.Implode(",", uids) + guideSql := `SELECT SUM(paid_price) AS amount,uid FROM ord_list WHERE uid IN (%s) AND state IN(%s) AND confirm_at>=%d GROUP BY uid;` + guideSql = fmt.Sprintf(guideSql, uidStr, "1,2,3,5", totalTime) + guide, err := db.QueryNativeString(eg, guideSql) + fmt.Println(err) + if len(guide) > 0 { + for _, v := range guide { + userMap[v["uid"]] += utils.StrToFloat64(v["amount"]) + } + } + mallSql := `SELECT SUM(cost_price) AS amount,uid FROM mall_ord WHERE uid IN (%s) AND state IN(%s) AND confirm_time>='%s' GROUP BY uid;` + mallSql = fmt.Sprintf(mallSql, uidStr, "3", totalTimeStr) + mall, err := db.QueryNativeString(eg, mallSql) + fmt.Println(err) + + if len(mall) > 0 { + for _, v := range mall { + userMap[v["uid"]] += utils.StrToFloat64(v["amount"]) + } + } + cardSql := `SELECT SUM(paid_price) AS amount,uid FROM privilege_card_ord WHERE uid IN (%s) AND state IN(%s) AND created_at>=%d GROUP BY uid;` + cardSql = fmt.Sprintf(cardSql, uidStr, "1", totalTime) + card, err := db.QueryNativeString(eg, cardSql) + fmt.Println(err) + if len(card) > 0 { + for _, v := range card { + userMap[v["uid"]] += utils.StrToFloat64(v["amount"]) + } + } + rechargeSql := `SELECT SUM(amount) AS amount,uid FROM recharge_order WHERE uid IN (%s) AND status='已付款' AND create_time>='%s' GROUP BY uid;` + rechargeSql = fmt.Sprintf(rechargeSql, uidStr, time.Unix(totalTime, 0).Format("2006-01-02 15:04:05")) + recharge, err := db.QueryNativeString(eg, rechargeSql) + fmt.Println(err) + if len(recharge) > 0 { + for _, v := range recharge { + userMap[v["uid"]] += utils.StrToFloat64(v["amount"]) + } + } + return userMap +}