Przeglądaj źródła

全球分红订单统计

three
huangjiajun 1 rok temu
rodzic
commit
a337f73961
6 zmienionych plików z 208 dodań i 1 usunięć
  1. +15
    -0
      app/db/model/capital_pool_order_total.go
  2. +3
    -1
      app/utils/convert.go
  3. +2
    -0
      consume/init.go
  4. +10
    -0
      consume/md/consume_key.go
  5. +10
    -0
      consume/md/md_zhios_capital_pool_order_total.go
  6. +168
    -0
      consume/zhios_capital_pool_order_total.go

+ 15
- 0
app/db/model/capital_pool_order_total.go Wyświetl plik

@@ -0,0 +1,15 @@
package model

import "time"

type CapitalPoolOrderTotal struct {
Id int64 `json:"id" xorm:"pk comment('订单id') BIGINT(22)"`
Uid int `json:"uid" xorm:"not null default 0 comment('uid') INT(10)"`
Level int `json:"level" xorm:" default 0 comment('') INT(10)"`
LevelType int `json:"level_type" xorm:" default 0 comment('') INT(10)"`
Sum string `json:"sum" xorm:"not null default 0.00 comment('') DECIMAL(10,2)"`
AllSum string `json:"all_sum" xorm:" default 0.00 comment('') DECIMAL(10,2)"`
RunTime string `json:"run_time" xorm:" default '' comment('') VARCHAR(255)"`
CreateTime string `json:"create_time" xorm:" default '' comment('') VARCHAR(255)"`
UpdateTime time.Time `json:"update_time" xorm:"comment('') TIMESTAMP"`
}

+ 3
- 1
app/utils/convert.go Wyświetl plik

@@ -22,7 +22,9 @@ func ToInt64(raw interface{}, e error) int64 {
}
return AnyToInt64(raw)
}

func Float64ToStrByPrec(f float64, prec int) string {
return strconv.FormatFloat(f, 'f', prec, 64)
}
func AnyToBool(raw interface{}) bool {
switch i := raw.(type) {
case float32, float64, int, int64, uint, uint8, uint16, uint32, uint64, int8, int16, int32:


+ 2
- 0
consume/init.go Wyświetl plik

@@ -40,6 +40,8 @@ func initConsumes() {
jobs[consumeMd.ZhiosTikTokUpdateFunName] = ZhiosTikTokUpdate
jobs[consumeMd.ZhiosTikTokAllUpdateFunName] = ZhiosTikTokAllUpdate

jobs[consumeMd.ZhiosCapitalPoolOrderTotalFunName] = ZhiosCapitalPoolOrderTotal

}

func Run() {


+ 10
- 0
consume/md/consume_key.go Wyświetl plik

@@ -165,6 +165,15 @@ var RabbitMqQueueKeyList = []*MqQueue{
BindKey: "",
ConsumeFunName: "ZhiosTikTokAllUpdate",
},
{
ExchangeName: "zhios.capital_pool.order_total.exchange",
Name: "zhios_capital_pool_order_total",
Type: DirectQueueType,
IsPersistent: false,
RoutKey: "order_total",
BindKey: "",
ConsumeFunName: "ZhiosCapitalPoolOrderTotal",
},
}

const (
@@ -182,6 +191,7 @@ const (
CanalMallOrdForYouMiShangFunName = "CanalMallOrdForYouMiShang"
YoumishangExchangeStoreFunName = "YoumishangExchangeStore"
ZhiosRechargeOrderFailFunName = "ZhiosRechargeOrderFail"
ZhiosCapitalPoolOrderTotalFunName = "ZhiosCapitalPoolOrderTotal"
ZhiosTikTokUpdateFunName = "ZhiosTikTokUpdate"
ZhiosTikTokAllUpdateFunName = "ZhiosTikTokAllUpdate"
CloudIssuanceAsyncMLoginFunName = "CloudIssuanceAsyncMLoginConsume"


+ 10
- 0
consume/md/md_zhios_capital_pool_order_total.go Wyświetl plik

@@ -0,0 +1,10 @@
package md

type ZhiosCapitalPoolOrderTotal struct {
Uid []string `json:"uid"`
Mid string `json:"mid"`
Runtime int64 `json:"runtime"`
TotalTime int64 `json:"totalTime"`
BonusLevelType int `json:"bonusLevelType"`
Level string `json:"level"`
}

+ 168
- 0
consume/zhios_capital_pool_order_total.go Wyświetl plik

@@ -0,0 +1,168 @@
package consume

import (
"applet/app/db"
model2 "applet/app/db/model"
"applet/app/utils"
"applet/app/utils/logx"
"applet/consume/md"
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
"encoding/json"
"errors"
"fmt"
"github.com/streadway/amqp"
"github.com/syyongx/php2go"
"time"
"xorm.io/xorm"
)

func ZhiosCapitalPoolOrderTotal(queue md.MqQueue) {
fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>")
ch, err := rabbit.Cfg.Pool.GetChannel()
if err != nil {
logx.Error(err)
return
}
defer ch.Release()
//1、将自己绑定到交换机上
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
//2、取出数据进行消费
ch.Qos(20)
delivery := ch.Consume(queue.Name, false)

var res amqp.Delivery
var ok bool
for {
res, ok = <-delivery
if ok == true {
//fmt.Println(string(res.Body))
fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<")
err = handleZhiosCapitalPoolOrderTotal(res.Body)
//_ = res.Reject(false)
if err == nil {
_ = res.Ack(true)
}
} else {
panic(errors.New("error getting message"))
}
}
fmt.Println("get msg done")
}

func handleZhiosCapitalPoolOrderTotal(msg []byte) error {
//1、解析canal采集至mq中queue的数据结构体
var canalMsg *md.ZhiosCapitalPoolOrderTotal
fmt.Println(string(msg))
var tmpString string
err := json.Unmarshal(msg, &tmpString)
if err != nil {
fmt.Println(err.Error())
return err
}
fmt.Println(tmpString)
err = json.Unmarshal([]byte(tmpString), &canalMsg)
if err != nil {
return err
}
mid := canalMsg.Mid
eg := db.DBs[mid]
err = AddUserOrdTotal(eg, canalMsg.Mid, canalMsg.Uid, canalMsg.Runtime, canalMsg.TotalTime, canalMsg.BonusLevelType, "0")
if err != nil {
fmt.Println(err)
return err
}
return nil
}
func AddUserOrdTotal(eg *xorm.Engine, dbName string, uids []string, runtime, totalTime int64, leveType int, level string) error {
if len(uids) == 0 {
return nil
}
m := GetUserOrdTotal(eg, uids, totalTime)
var data []model2.CapitalPoolOrderTotal
err := eg.In("uid", uids).And("create_time=? and level_type=? and level=?", time.Unix(runtime, 0).Format("2006-01-02"), leveType, level).Find(&data)
var userData = make(map[string]model2.CapitalPoolOrderTotal)
if err == nil {
for _, v := range data {
userData[utils.IntToStr(v.Uid)] = v
}
}
for k, v := range m {
user, ok := userData[k]
if ok {
user.Sum = utils.Float64ToStrByPrec(v, 2)
user.UpdateTime = time.Now()
eg.Where("id=?", user.Id).Cols("sum,update_time").Update(&user)
} else {
fmt.Println(time.Unix(runtime, 0).Format("2006-01-02"))
var userDatas = &model2.CapitalPoolOrderTotal{
Uid: utils.StrToInt(k),
Sum: utils.Float64ToStrByPrec(v, 2),
CreateTime: time.Unix(runtime, 0).Format("2006-01-02"),
UpdateTime: time.Now(),
LevelType: leveType,
Level: utils.StrToInt(level),
}
fmt.Println(userDatas)
_, err := eg.InsertOne(userDatas)
if err != nil {
utils.FilePutContents(dbName+"capital", utils.SerializeStr(userDatas))
utils.FilePutContents(dbName+"capital", err.Error())
return err
}
}
}
sum, _ := eg.Where("create_time=? and level_type=? and level=?", time.Unix(runtime, 0).Format("2006-01-02"), leveType, level).Sum(&model2.CapitalPoolOrderTotal{}, "sum")
fmt.Println(sum)

sql := `UPDATE capital_pool_order_total SET all_sum=%f WHERE create_time='%s' and level_type=%d and level=%s;`
sql = fmt.Sprintf(sql, sum, time.Unix(runtime, 0).Format("2006-01-02"), leveType, level)
db.QueryNativeString(eg, sql)
return nil
}

func GetUserOrdTotal(eg *xorm.Engine, uids []string, totalTime int64) map[string]float64 {
totalTimeStr := time.Unix(totalTime, 0)
var userMap = make(map[string]float64, 0)
if len(uids) == 0 {
return userMap
}
uidStr := php2go.Implode(",", uids)
guideSql := `SELECT SUM(paid_price) AS amount,uid FROM ord_list WHERE uid IN (%s) AND state IN(%s) AND confirm_at>=%d GROUP BY uid;`
guideSql = fmt.Sprintf(guideSql, uidStr, "1,2,3,5", totalTime)
guide, err := db.QueryNativeString(eg, guideSql)
fmt.Println(err)
if len(guide) > 0 {
for _, v := range guide {
userMap[v["uid"]] += utils.StrToFloat64(v["amount"])
}
}
mallSql := `SELECT SUM(cost_price) AS amount,uid FROM mall_ord WHERE uid IN (%s) AND state IN(%s) AND confirm_time>='%s' GROUP BY uid;`
mallSql = fmt.Sprintf(mallSql, uidStr, "3", totalTimeStr)
mall, err := db.QueryNativeString(eg, mallSql)
fmt.Println(err)

if len(mall) > 0 {
for _, v := range mall {
userMap[v["uid"]] += utils.StrToFloat64(v["amount"])
}
}
cardSql := `SELECT SUM(paid_price) AS amount,uid FROM privilege_card_ord WHERE uid IN (%s) AND state IN(%s) AND created_at>=%d GROUP BY uid;`
cardSql = fmt.Sprintf(cardSql, uidStr, "1", totalTime)
card, err := db.QueryNativeString(eg, cardSql)
fmt.Println(err)
if len(card) > 0 {
for _, v := range card {
userMap[v["uid"]] += utils.StrToFloat64(v["amount"])
}
}
rechargeSql := `SELECT SUM(amount) AS amount,uid FROM recharge_order WHERE uid IN (%s) AND status='已付款' AND create_time>='%s' GROUP BY uid;`
rechargeSql = fmt.Sprintf(rechargeSql, uidStr, time.Unix(totalTime, 0).Format("2006-01-02 15:04:05"))
recharge, err := db.QueryNativeString(eg, rechargeSql)
fmt.Println(err)
if len(recharge) > 0 {
for _, v := range recharge {
userMap[v["uid"]] += utils.StrToFloat64(v["amount"])
}
}
return userMap
}

Ładowanie…
Anuluj
Zapisz