@@ -110,7 +110,8 @@ func GetAllDatabaseDev() *[]model.DbMapping { | |||||
fmt.Println("cfg.Local is: ", cfg.Local) | fmt.Println("cfg.Local is: ", cfg.Local) | ||||
if cfg.Local { // 本地调试 加快速度 | if cfg.Local { // 本地调试 加快速度 | ||||
fmt.Println("notice:LOCAL TEST, only masterId:** 123456 ** available!") | fmt.Println("notice:LOCAL TEST, only masterId:** 123456 ** available!") | ||||
err = Db.Where("deleted_at != ? AND is_dev = '1' AND db_master_id=?", 1, 123456).Find(&m) | |||||
err = Db.Where("deleted_at != ? AND is_dev = '1' AND db_master_id= ?", 1, 123456). | |||||
Or("db_master_id = ?", 12293740).Find(&m) | |||||
} else { | } else { | ||||
err = Db.Where("deleted_at != ? AND is_dev = '1' ", 1).Find(&m) | err = Db.Where("deleted_at != ? AND is_dev = '1' ", 1).Find(&m) | ||||
} | } | ||||
@@ -0,0 +1,19 @@ | |||||
package model | |||||
import "time" | |||||
type Message struct { | |||||
Id int64 // 自增主键 | |||||
UserId int64 // 所属类型id | |||||
RequestId int64 // 请求id | |||||
SenderType int32 // 发送者类型 | |||||
SenderId int64 // 发送者账户id | |||||
ReceiverType int32 // 接收者账户id | |||||
ReceiverId int64 // 接收者id,如果是单聊信息,则为user_id,如果是群组消息,则为group_id | |||||
ToUserIds string // 需要@的用户id列表,多个用户用,隔开 | |||||
Type int // 消息类型 | |||||
Content []byte // 消息内容 | |||||
Seq int64 // 消息同步序列 | |||||
SendTime time.Time // 消息发送时间 | |||||
Status int32 // 创建时间 | |||||
} |
@@ -0,0 +1,299 @@ | |||||
package consume | |||||
import ( | |||||
"applet/app/db" | |||||
"applet/app/utils" | |||||
"applet/app/utils/cache" | |||||
"applet/app/utils/logx" | |||||
"applet/consume/md" | |||||
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" | |||||
db2 "code.fnuoos.com/go_rely_warehouse/zyos_model.git/src/models/official" | |||||
"code.fnuoos.com/go_rely_warehouse/zyos_model.git/src/models/official/model" | |||||
"encoding/json" | |||||
"errors" | |||||
"fmt" | |||||
"github.com/shopspring/decimal" | |||||
"github.com/streadway/amqp" | |||||
"strings" | |||||
"time" | |||||
) | |||||
const ZhiOsB2cPlaceOrderNumOfPeopleHashMapCacheKey = "%s:zhiOs_b2c_place_order_num_of_people_hash_map_cache:%s" //下单人数缓存hashMap键 | |||||
func CanalB2cOrderForNumericalStatementConsume(queue md.MqQueue) { | |||||
ch, err := rabbit.Cfg.Pool.GetChannel() | |||||
if err != nil { | |||||
logx.Error(err) | |||||
return | |||||
} | |||||
defer ch.Release() | |||||
//1、将自己绑定到交换机上 | |||||
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) | |||||
//2、取出数据进行消费 | |||||
ch.Qos(100) | |||||
delivery := ch.Consume(queue.Name, false) | |||||
var res amqp.Delivery | |||||
var ok bool | |||||
for { | |||||
res, ok = <-delivery | |||||
if ok == true { | |||||
//fmt.Println(string(res.Body)) | |||||
fmt.Println(">>>>>>>>>>>>>>>>CanalB2cOrderForNumericalStatementConsume<<<<<<<<<<<<<<<<<<<<<<<<<") | |||||
err = handleCanalB2cOrderForNumericalStatementTable(res.Body) | |||||
if err != nil { | |||||
fmt.Println("err ::: ", err) | |||||
utils.FilePutContents("CanalB2cOrderForNumericalStatementConsume_ERR", "[err]:"+err.Error()) | |||||
_ = res.Reject(false) | |||||
////TODO::重新推回队列末尾,避免造成队列堵塞 | |||||
//var msg *md.OneCirclesStructForSignIn | |||||
//json.Unmarshal(res.Body, &msg) | |||||
//ch.Publish(queue.ExchangeName, msg, queue.RoutKey) | |||||
} else { | |||||
_ = res.Ack(true) | |||||
} | |||||
} else { | |||||
panic(errors.New("error getting message")) | |||||
} | |||||
} | |||||
fmt.Println("get msg done") | |||||
} | |||||
func handleCanalB2cOrderForNumericalStatementTable(msg []byte) error { | |||||
//1、解析canal采集至mq中queue的数据结构体 | |||||
var canalMsg *md.CanalB2cOrderMessage[md.CanalB2cOrder] | |||||
err := json.Unmarshal(msg, &canalMsg) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
now := time.Now() | |||||
//2、获取masterId | |||||
masterId := utils.StrToInt(strings.Split(canalMsg.Database, "_")[1]) | |||||
//TODO::日志记录 | |||||
utils.FilePutContents("handleCanalB2cOrderForNumericalStatementTable_"+utils.IntToStr(masterId)+"_"+now.Format("2006-01-02"), string(msg)) | |||||
//2、判断操作(insert | update) | |||||
if canalMsg.Type == md.CanalMsgInsertSqlType || canalMsg.Type == md.CanalMsgUpdateSqlType { | |||||
var isUpdate bool | |||||
//3、查找是否有数据 | |||||
var ordDate string | |||||
for _, item := range canalMsg.Data { | |||||
ordDate = utils.TimeParseStd(item.CreateTime).Format("2006-01-02") | |||||
} | |||||
statistics, err := db2.GetMasterB2cOrderStatistics(db.Db, masterId, ordDate) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
if statistics == nil && canalMsg.Type == md.CanalMsgInsertSqlType { | |||||
statistics = &model.MasterB2cOrderStatistics{ | |||||
MasterId: masterId, | |||||
PaymentTotal: "", | |||||
OrderCount: 0, | |||||
LoseOrderCount: 0, | |||||
PlaceOrderNumOfPeople: 0, | |||||
EffectiveOrderCount: 0, | |||||
EffectiveCommission: "", | |||||
ReceiveCommission: "", | |||||
LoseCommission: "", | |||||
AvgCommission: "", | |||||
CustomerUnitPrice: "", | |||||
Date: ordDate, | |||||
CreateAt: now.Format("2006-01-02 15:04:05"), | |||||
UpdateAt: now.Format("2006-01-02 15:04:05"), | |||||
} | |||||
_, err = db2.MasterB2cOrderStatisticsInsert(db.Db, statistics) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
} | |||||
if statistics == nil { | |||||
return errors.New("过期订单数据不予处理") | |||||
} | |||||
paymentTotal := statistics.PaymentTotal | |||||
orderCount := statistics.OrderCount | |||||
loseOrderCount := statistics.LoseOrderCount | |||||
placeOrderNumOfPeople := statistics.PlaceOrderNumOfPeople | |||||
effectiveOrderCount := statistics.EffectiveOrderCount | |||||
effectiveCommission := statistics.EffectiveCommission | |||||
loseCommission := statistics.LoseCommission | |||||
avgCommission := statistics.AvgCommission | |||||
customerUnitPrice := statistics.CustomerUnitPrice | |||||
effectivePaymentTotal := statistics.EffectivePaymentTotal | |||||
if canalMsg.Type == md.CanalMsgInsertSqlType { | |||||
for _, item := range canalMsg.Data { | |||||
orderCount++ | |||||
if item.State == "0" { | |||||
//未支付不统计 | |||||
return errors.New("未支付不统计") | |||||
} else if item.State == "6" { | |||||
//失效 | |||||
loseOrderCount++ | |||||
} else { | |||||
effectiveOrderCount++ | |||||
} | |||||
paymentTotal = utils.Float64ToStr(utils.StrToFloat64(paymentTotal) + utils.StrToFloat64(item.CostPrice)) //付款金额 | |||||
effectiveCommission = utils.Float64ToStr(utils.StrToFloat64(effectiveCommission) + utils.StrToFloat64(item.EstimateCommission)) //有效佣金(元) | |||||
effectivePaymentTotal = utils.Float64ToStr(utils.StrToFloat64(effectivePaymentTotal) + utils.StrToFloat64(item.CostPrice)) //有效付款金额(元) | |||||
if item.State != "6" { | |||||
cacheKey := fmt.Sprintf(ZhiOsB2cPlaceOrderNumOfPeopleHashMapCacheKey, utils.IntToStr(masterId), ordDate) | |||||
get, _ := cache.HGetString(cacheKey, item.Uid) | |||||
if get == "" { | |||||
placeOrderNumOfPeople++ //下单人数 | |||||
cache.HSet(cacheKey, item.Uid, "1") | |||||
} else { | |||||
cache.HSet(cacheKey, item.Uid, utils.IntToStr(utils.StrToInt(get)+1)) | |||||
} | |||||
cache.Expire(cacheKey, md.ZhiOsUserVisitIpAddressHashMapCacheTime) | |||||
paymentTotalValue, _ := decimal.NewFromString(paymentTotal) | |||||
if placeOrderNumOfPeople == 0 { | |||||
return errors.New("divider cannot be 0 in division operation") | |||||
} | |||||
placeOrderNumOfPeopleValue := decimal.NewFromInt(int64(placeOrderNumOfPeople)) | |||||
customerUnitPrice = paymentTotalValue.Div(placeOrderNumOfPeopleValue).String() | |||||
} | |||||
//客单价 | |||||
isUpdate = true | |||||
} | |||||
} | |||||
if canalMsg.Type == md.CanalMsgUpdateSqlType { | |||||
judgeSate := JudgeB2cOrdSate(*canalMsg) | |||||
if judgeSate > 0 { | |||||
if judgeSate == 2 { | |||||
//TODO::未收货失效 | |||||
loseOrderCount++ | |||||
effectiveOrderCount-- | |||||
for _, item := range canalMsg.Data { | |||||
loseCommission = utils.Float64ToStr(utils.StrToFloat64(loseCommission) + utils.StrToFloat64(item.EstimateCommission)) | |||||
effectiveCommission = utils.Float64ToStr(utils.StrToFloat64(effectiveCommission) - utils.StrToFloat64(item.EstimateCommission)) | |||||
effectivePaymentTotal = utils.Float64ToStr(utils.StrToFloat64(effectivePaymentTotal) - utils.StrToFloat64(item.CostPrice)) | |||||
isUpdate = true | |||||
} | |||||
} | |||||
if judgeSate == 3 { | |||||
//TODO::已收货失效 | |||||
loseOrderCount++ | |||||
effectiveOrderCount-- | |||||
for _, item := range canalMsg.Data { | |||||
loseCommission = utils.Float64ToStr(utils.StrToFloat64(loseCommission) + utils.StrToFloat64(item.EstimateCommission)) | |||||
effectiveCommission = utils.Float64ToStr(utils.StrToFloat64(effectiveCommission) - utils.StrToFloat64(item.EstimateCommission)) | |||||
effectivePaymentTotal = utils.Float64ToStr(utils.StrToFloat64(effectivePaymentTotal) - utils.StrToFloat64(item.CostPrice)) | |||||
isUpdate = true | |||||
} | |||||
} | |||||
if judgeSate == 1 || judgeSate == 3 { | |||||
//TODO::收货额外处理 | |||||
//查找是否有数据 | |||||
var ordConfirmAt string | |||||
for _, item := range canalMsg.Data { | |||||
ordConfirmAt = utils.TimeParseStd(item.FinishTime).Format("2006-01-02") | |||||
} | |||||
ordConfirmStatistics, err := db2.GetMasterB2cOrderStatistics(db.Db, masterId, ordConfirmAt) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
if ordConfirmStatistics == nil { | |||||
return errors.New("过期收货订单数据不予处理") | |||||
} | |||||
if judgeSate == 1 { | |||||
//TODO::收货 | |||||
for _, item := range canalMsg.Data { | |||||
ordConfirmStatistics.ReceiveCommission = utils.Float64ToStr(utils.StrToFloat64(ordConfirmStatistics.ReceiveCommission) + utils.StrToFloat64(item.EstimateCommission)) | |||||
} | |||||
} | |||||
if judgeSate == 3 { | |||||
//TODO::已收货失效 | |||||
for _, item := range canalMsg.Data { | |||||
ordConfirmStatistics.ReceiveCommission = utils.Float64ToStr(utils.StrToFloat64(ordConfirmStatistics.ReceiveCommission) - utils.StrToFloat64(item.EstimateCommission)) | |||||
} | |||||
} | |||||
ordConfirmStatistics.UpdateAt = now.Format("2006-01-02 15:04:05") | |||||
_, err = db2.MasterB2cOrderStatisticsUpdate(db.Db, ordConfirmStatistics.Id, ordConfirmStatistics, | |||||
"receive_commission", "update_at") | |||||
} | |||||
} | |||||
} | |||||
if isUpdate { | |||||
statistics.PaymentTotal = paymentTotal | |||||
statistics.OrderCount = orderCount | |||||
statistics.LoseOrderCount = loseOrderCount | |||||
statistics.PlaceOrderNumOfPeople = placeOrderNumOfPeople | |||||
statistics.EffectiveOrderCount = effectiveOrderCount | |||||
statistics.EffectiveCommission = effectiveCommission | |||||
statistics.LoseCommission = loseCommission | |||||
statistics.AvgCommission = avgCommission | |||||
statistics.CustomerUnitPrice = customerUnitPrice | |||||
statistics.EffectivePaymentTotal = effectivePaymentTotal | |||||
statistics.UpdateAt = now.Format("2006-01-02 15:04:05") | |||||
_, err = db2.MasterB2cOrderStatisticsUpdate(db.Db, statistics.Id, statistics, | |||||
"payment_total", "order_count", "estimated_profit", "lose_order_count", | |||||
"place_order_num_of_people", "effective_order_count", "effective_commission", "lose_commission", | |||||
"avg_commission", "customer_unit_price", "effective_payment_total", "update_at") | |||||
if err != nil { | |||||
return err | |||||
} | |||||
} | |||||
} | |||||
return nil | |||||
} | |||||
// JudgeB2cOrdSate 处理订单状态(judgeSate[0:不需要处理 1:收货 2:未收货失效 3:已收货失效]) | |||||
func JudgeB2cOrdSate(message md.CanalB2cOrderMessage[md.CanalB2cOrder]) (judgeSate int) { | |||||
oldData := message.Old | |||||
//1、获取 旧的订单状态 | |||||
var oldOrdState string | |||||
for _, item := range oldData { | |||||
if item.State != "" { | |||||
oldOrdState = item.State | |||||
} | |||||
} | |||||
if oldOrdState == "" { | |||||
return | |||||
} | |||||
//2、获取 新的订单状态 | |||||
var nowOrdState string | |||||
for _, item := range message.Data { | |||||
if item.State != "" { | |||||
nowOrdState = item.State | |||||
} | |||||
} | |||||
if nowOrdState == "" { | |||||
return | |||||
} | |||||
if oldOrdState == nowOrdState { | |||||
return | |||||
} | |||||
//3、进行状态比较判断 | |||||
if oldOrdState == "3" || oldOrdState == "4" || oldOrdState == "5" { | |||||
if nowOrdState == "6" { | |||||
//已收货失效 | |||||
return 3 | |||||
} | |||||
} else { | |||||
if nowOrdState == "6" { | |||||
//未收货失效 | |||||
return 2 | |||||
} | |||||
if nowOrdState == "3" || nowOrdState == "4" || nowOrdState == "5" { | |||||
//收货 | |||||
return 1 | |||||
} | |||||
} | |||||
return | |||||
} |
@@ -0,0 +1,105 @@ | |||||
package consume | |||||
import ( | |||||
"applet/app/db" | |||||
model2 "applet/app/db/gim/model" | |||||
"applet/app/utils" | |||||
"applet/app/utils/logx" | |||||
"applet/consume/md" | |||||
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" | |||||
"encoding/json" | |||||
"errors" | |||||
"fmt" | |||||
"github.com/streadway/amqp" | |||||
) | |||||
func CanalGimMessageConsume(queue md.MqQueue) { | |||||
ch, err := rabbit.Cfg.Pool.GetChannel() | |||||
if err != nil { | |||||
logx.Error(err) | |||||
return | |||||
} | |||||
defer ch.Release() | |||||
//1、将自己绑定到交换机上 | |||||
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) | |||||
//2、取出数据进行消费 | |||||
ch.Qos(100) | |||||
delivery := ch.Consume(queue.Name, false) | |||||
var res amqp.Delivery | |||||
var ok bool | |||||
for { | |||||
res, ok = <-delivery | |||||
if ok == true { | |||||
//fmt.Println(string(res.Body)) | |||||
fmt.Println(">>>>>>>>>>>>>>>>CanalGimMessageConsume<<<<<<<<<<<<<<<<<<<<<<<<<") | |||||
err = handleCanalGimMessageTable(res.Body) | |||||
if err != nil { | |||||
fmt.Println("err ::: ", err) | |||||
utils.FilePutContents("CanalGimMessageConsume", "[err]:"+err.Error()) | |||||
_ = res.Reject(false) | |||||
////TODO::重新推回队列末尾,避免造成队列堵塞 | |||||
//var msg *md.OneCirclesStructForSignIn | |||||
//json.Unmarshal(res.Body, &msg) | |||||
//ch.Publish(queue.ExchangeName, msg, queue.RoutKey) | |||||
} else { | |||||
_ = res.Ack(true) | |||||
} | |||||
} else { | |||||
panic(errors.New("error getting message")) | |||||
} | |||||
} | |||||
fmt.Println("get msg done") | |||||
} | |||||
func handleCanalGimMessageTable(msg []byte) error { | |||||
//1、解析canal采集至mq中queue的数据结构体 | |||||
var canalMsg *md.CanalGimMessage[md.Message] | |||||
err := json.Unmarshal(msg, &canalMsg) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
//2、判断操作(insert | update) | |||||
if canalMsg.Type == md.CanalMsgInsertSqlType { | |||||
for _, item := range canalMsg.Data { | |||||
var oldM md.Message000 | |||||
get, err := db.ImDb.Table("message_000").ID(item.Id).Get(&oldM) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
if get { | |||||
message := &model2.Message{ | |||||
UserId: oldM.UserId, | |||||
RequestId: oldM.RequestId, | |||||
SenderType: oldM.SenderType, | |||||
SenderId: oldM.SenderId, | |||||
ReceiverType: oldM.ReceiverType, | |||||
ReceiverId: oldM.ReceiverId, | |||||
ToUserIds: oldM.ToUserIds, | |||||
Type: oldM.Type, | |||||
Content: oldM.Content, | |||||
Seq: oldM.Seq, | |||||
SendTime: oldM.SendTime, | |||||
Status: oldM.Status, | |||||
} | |||||
_, err = db.ImDb.InsertOne(message) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
} | |||||
} | |||||
} | |||||
if canalMsg.Type == md.CanalMsgUpdateSqlType { | |||||
//查找是否有数据 | |||||
for _, item := range canalMsg.Data { | |||||
_, err2 := db.ImDb.Where("user_id =?", item.UserId).And("send_time =?", item.SendTime).Cols("status").Update(&model2.Message{Status: int32(utils.StrToInt(item.Status))}) | |||||
if err2 != nil { | |||||
return err2 | |||||
} | |||||
} | |||||
} | |||||
return nil | |||||
} |
@@ -135,8 +135,6 @@ func handleCanalGuideOrderForNumericalStatementTable(msg []byte) error { | |||||
effectiveOrderCount++ | effectiveOrderCount++ | ||||
} | } | ||||
cacheKey := fmt.Sprintf(ZhiOsGuidePlaceOrderNumOfPeopleHashMapCacheKey, utils.IntToStr(masterId), ordDate) | |||||
get, _ := cache.HGetString(cacheKey, item.Uid) | |||||
paymentTotal = utils.Float64ToStr(utils.StrToFloat64(paymentTotal) + utils.StrToFloat64(item.PaidPrice)) //付款金额 | paymentTotal = utils.Float64ToStr(utils.StrToFloat64(paymentTotal) + utils.StrToFloat64(item.PaidPrice)) //付款金额 | ||||
estimatedCommission = utils.Float64ToStr(utils.StrToFloat64(estimatedCommission) + utils.StrToFloat64(item.BenefitAll)) //预估佣金(元) | estimatedCommission = utils.Float64ToStr(utils.StrToFloat64(estimatedCommission) + utils.StrToFloat64(item.BenefitAll)) //预估佣金(元) | ||||
estimatedProfit = utils.Float64ToStr(utils.StrToFloat64(estimatedProfit) + utils.StrToFloat64(item.SysCommission)) //预估利润(元) | estimatedProfit = utils.Float64ToStr(utils.StrToFloat64(estimatedProfit) + utils.StrToFloat64(item.SysCommission)) //预估利润(元) | ||||
@@ -147,19 +145,23 @@ func handleCanalGuideOrderForNumericalStatementTable(msg []byte) error { | |||||
orderCountValue := decimal.NewFromInt(int64(orderCount)) | orderCountValue := decimal.NewFromInt(int64(orderCount)) | ||||
avgCommission = estimatedCommissionValue.Div(orderCountValue).String() //平均佣金 | avgCommission = estimatedCommissionValue.Div(orderCountValue).String() //平均佣金 | ||||
if get == "" { | |||||
placeOrderNumOfPeople++ //下单人数 | |||||
cache.HSet(cacheKey, item.Uid, "1") | |||||
} else { | |||||
cache.HSet(cacheKey, item.Uid, utils.IntToStr(utils.StrToInt(get)+1)) | |||||
} | |||||
cache.Expire(cacheKey, md.ZhiOsUserVisitIpAddressHashMapCacheTime) | |||||
paymentTotalValue, _ := decimal.NewFromString(paymentTotal) | |||||
if placeOrderNumOfPeople == 0 { | |||||
return errors.New("divider cannot be 0 in division operation") | |||||
if item.State != "4" { | |||||
cacheKey := fmt.Sprintf(ZhiOsGuidePlaceOrderNumOfPeopleHashMapCacheKey, utils.IntToStr(masterId), ordDate) | |||||
get, _ := cache.HGetString(cacheKey, item.Uid) | |||||
if get == "" { | |||||
placeOrderNumOfPeople++ //下单人数 | |||||
cache.HSet(cacheKey, item.Uid, "1") | |||||
} else { | |||||
cache.HSet(cacheKey, item.Uid, utils.IntToStr(utils.StrToInt(get)+1)) | |||||
} | |||||
cache.Expire(cacheKey, md.ZhiOsUserVisitIpAddressHashMapCacheTime) | |||||
paymentTotalValue, _ := decimal.NewFromString(paymentTotal) | |||||
if placeOrderNumOfPeople == 0 { | |||||
return errors.New("divider cannot be 0 in division operation") | |||||
} | |||||
placeOrderNumOfPeopleValue := decimal.NewFromInt(int64(placeOrderNumOfPeople)) | |||||
customerUnitPrice = paymentTotalValue.Div(placeOrderNumOfPeopleValue).String() //客单价 | |||||
} | } | ||||
placeOrderNumOfPeopleValue := decimal.NewFromInt(int64(placeOrderNumOfPeople)) | |||||
customerUnitPrice = paymentTotalValue.Div(placeOrderNumOfPeopleValue).String() //客单价 | |||||
isUpdate = true | isUpdate = true | ||||
} | } | ||||
} | } | ||||
@@ -0,0 +1,307 @@ | |||||
package consume | |||||
import ( | |||||
"applet/app/db" | |||||
"applet/app/utils" | |||||
"applet/app/utils/cache" | |||||
"applet/app/utils/logx" | |||||
"applet/consume/md" | |||||
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" | |||||
db2 "code.fnuoos.com/go_rely_warehouse/zyos_model.git/src/models/official" | |||||
"code.fnuoos.com/go_rely_warehouse/zyos_model.git/src/models/official/model" | |||||
"encoding/json" | |||||
"errors" | |||||
"fmt" | |||||
"github.com/shopspring/decimal" | |||||
"github.com/streadway/amqp" | |||||
"strings" | |||||
"time" | |||||
) | |||||
const ZhiOsMallPlaceOrderNumOfPeopleHashMapCacheKey = "%s:zhiOs_mall_place_order_num_of_people_hash_map_cache:%s" //下单人数缓存hashMap键 | |||||
func CanalMallOrderForNumericalStatementConsume(queue md.MqQueue) { | |||||
ch, err := rabbit.Cfg.Pool.GetChannel() | |||||
if err != nil { | |||||
logx.Error(err) | |||||
return | |||||
} | |||||
defer ch.Release() | |||||
//1、将自己绑定到交换机上 | |||||
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) | |||||
//2、取出数据进行消费 | |||||
ch.Qos(1) | |||||
delivery := ch.Consume(queue.Name, false) | |||||
var res amqp.Delivery | |||||
var ok bool | |||||
for { | |||||
res, ok = <-delivery | |||||
if ok == true { | |||||
//fmt.Println(string(res.Body)) | |||||
fmt.Println(">>>>>>>>>>>>>>>>CanalMallOrderForNumericalStatementConsume<<<<<<<<<<<<<<<<<<<<<<<<<") | |||||
err = handleCanalMallOrderForNumericalStatementTable(res.Body) | |||||
if err != nil { | |||||
fmt.Println("err ::: ", err) | |||||
utils.FilePutContents("CanalMallOrderForNumericalStatementConsume_ERR", "[err]:"+err.Error()) | |||||
_ = res.Reject(false) | |||||
////TODO::重新推回队列末尾,避免造成队列堵塞 | |||||
//var msg *md.OneCirclesStructForSignIn | |||||
//json.Unmarshal(res.Body, &msg) | |||||
//ch.Publish(queue.ExchangeName, msg, queue.RoutKey) | |||||
} else { | |||||
_ = res.Ack(true) | |||||
} | |||||
} else { | |||||
panic(errors.New("error getting message")) | |||||
} | |||||
} | |||||
fmt.Println("get msg done") | |||||
} | |||||
func handleCanalMallOrderForNumericalStatementTable(msg []byte) error { | |||||
//1、解析canal采集至mq中queue的数据结构体 | |||||
var canalMsg *md.CanalMallOrder | |||||
err := json.Unmarshal(msg, &canalMsg) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
now := time.Now() | |||||
//2、获取masterId | |||||
masterId := utils.StrToInt(strings.Split(canalMsg.Database, "_")[1]) | |||||
//TODO::日志记录 | |||||
utils.FilePutContents("handleCanalMallOrderForNumericalStatementTable_"+utils.IntToStr(masterId)+"_"+now.Format("2006-01-02"), string(msg)) | |||||
//2、判断操作(insert | update) | |||||
if canalMsg.Type == md.CanalMsgInsertSqlType || canalMsg.Type == md.CanalMsgUpdateSqlType { | |||||
var isUpdate bool | |||||
//3、查找是否有数据 | |||||
var ordDate string | |||||
for _, item := range canalMsg.Data { | |||||
ordDate = utils.TimeParseStd(item.CreateTime).Format("2006-01-02") | |||||
} | |||||
statistics, err := db2.GetMasterMallOrderStatistics(db.Db, masterId, ordDate) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
if statistics == nil && canalMsg.Type == md.CanalMsgInsertSqlType { | |||||
statistics = &model.MasterMallOrderStatistics{ | |||||
MasterId: masterId, | |||||
PaymentTotal: "", | |||||
OrderCount: 0, | |||||
EstimatedCommission: "", | |||||
LoseOrderCount: 0, | |||||
PlaceOrderNumOfPeople: 0, | |||||
EffectiveOrderCount: 0, | |||||
EffectiveCommission: "", | |||||
ReceiveCommission: "", | |||||
LoseCommission: "", | |||||
AvgCommission: "", | |||||
CustomerUnitPrice: "", | |||||
Date: ordDate, | |||||
CreateAt: now.Format("2006-01-02 15:04:05"), | |||||
UpdateAt: now.Format("2006-01-02 15:04:05"), | |||||
} | |||||
_, err = db2.MasterMallOrderStatisticsInsert(db.Db, statistics) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
} | |||||
if statistics == nil { | |||||
return errors.New("过期订单数据不予处理") | |||||
} | |||||
paymentTotal := statistics.PaymentTotal | |||||
orderCount := statistics.OrderCount | |||||
estimatedCommission := statistics.EstimatedCommission | |||||
loseOrderCount := statistics.LoseOrderCount | |||||
placeOrderNumOfPeople := statistics.PlaceOrderNumOfPeople | |||||
effectiveOrderCount := statistics.EffectiveOrderCount | |||||
effectiveCommission := statistics.EffectiveCommission | |||||
loseCommission := statistics.LoseCommission | |||||
avgCommission := statistics.AvgCommission | |||||
customerUnitPrice := statistics.CustomerUnitPrice | |||||
effectivePaymentTotal := statistics.EffectivePaymentTotal | |||||
if canalMsg.Type == md.CanalMsgInsertSqlType { | |||||
for _, item := range canalMsg.Data { | |||||
orderCount++ | |||||
if item.State == "0" { | |||||
//未支付不统计 | |||||
return errors.New("未支付不统计") | |||||
} else if item.State == "6" { | |||||
//失效 | |||||
loseOrderCount++ | |||||
} else { | |||||
effectiveOrderCount++ | |||||
} | |||||
paymentTotal = utils.Float64ToStr(utils.StrToFloat64(paymentTotal) + utils.StrToFloat64(item.CostPrice)) //付款金额 | |||||
estimatedCommission = utils.Float64ToStr(utils.StrToFloat64(estimatedCommission) + utils.StrToFloat64(item.EstimateCommission)) //预估佣金(元) | |||||
effectiveCommission = utils.Float64ToStr(utils.StrToFloat64(effectiveCommission) + utils.StrToFloat64(item.EstimateCommission)) //有效佣金(元) | |||||
effectivePaymentTotal = utils.Float64ToStr(utils.StrToFloat64(effectivePaymentTotal) + utils.StrToFloat64(item.CostPrice)) //有效付款金额(元) | |||||
estimatedCommissionValue, _ := decimal.NewFromString(estimatedCommission) | |||||
orderCountValue := decimal.NewFromInt(int64(orderCount)) | |||||
avgCommission = estimatedCommissionValue.Div(orderCountValue).String() //平均佣金 | |||||
if item.State != "6" { | |||||
cacheKey := fmt.Sprintf(ZhiOsMallPlaceOrderNumOfPeopleHashMapCacheKey, utils.IntToStr(masterId), ordDate) | |||||
get, _ := cache.HGetString(cacheKey, item.Uid) | |||||
if get == "" { | |||||
placeOrderNumOfPeople++ //下单人数 | |||||
cache.HSet(cacheKey, item.Uid, "1") | |||||
} else { | |||||
cache.HSet(cacheKey, item.Uid, utils.IntToStr(utils.StrToInt(get)+1)) | |||||
} | |||||
cache.Expire(cacheKey, md.ZhiOsUserVisitIpAddressHashMapCacheTime) | |||||
paymentTotalValue, _ := decimal.NewFromString(paymentTotal) | |||||
if placeOrderNumOfPeople == 0 { | |||||
return errors.New("divider cannot be 0 in division operation") | |||||
} | |||||
placeOrderNumOfPeopleValue := decimal.NewFromInt(int64(placeOrderNumOfPeople)) | |||||
customerUnitPrice = paymentTotalValue.Div(placeOrderNumOfPeopleValue).String() | |||||
} | |||||
//客单价 | |||||
isUpdate = true | |||||
} | |||||
} | |||||
if canalMsg.Type == md.CanalMsgUpdateSqlType { | |||||
judgeSate := JudgeMallOrdSate(*canalMsg) | |||||
if judgeSate > 0 { | |||||
if judgeSate == 2 { | |||||
//TODO::未收货失效 | |||||
loseOrderCount++ | |||||
effectiveOrderCount-- | |||||
for _, item := range canalMsg.Data { | |||||
loseCommission = utils.Float64ToStr(utils.StrToFloat64(loseCommission) + utils.StrToFloat64(item.EstimateCommission)) | |||||
effectiveCommission = utils.Float64ToStr(utils.StrToFloat64(effectiveCommission) - utils.StrToFloat64(item.EstimateCommission)) | |||||
effectivePaymentTotal = utils.Float64ToStr(utils.StrToFloat64(effectivePaymentTotal) - utils.StrToFloat64(item.CostPrice)) | |||||
isUpdate = true | |||||
} | |||||
} | |||||
if judgeSate == 3 { | |||||
//TODO::已收货失效 | |||||
loseOrderCount++ | |||||
effectiveOrderCount-- | |||||
for _, item := range canalMsg.Data { | |||||
loseCommission = utils.Float64ToStr(utils.StrToFloat64(loseCommission) + utils.StrToFloat64(item.EstimateCommission)) | |||||
effectiveCommission = utils.Float64ToStr(utils.StrToFloat64(effectiveCommission) - utils.StrToFloat64(item.EstimateCommission)) | |||||
effectivePaymentTotal = utils.Float64ToStr(utils.StrToFloat64(effectivePaymentTotal) - utils.StrToFloat64(item.CostPrice)) | |||||
isUpdate = true | |||||
} | |||||
} | |||||
if judgeSate == 1 || judgeSate == 3 { | |||||
//TODO::收货额外处理 | |||||
//查找是否有数据 | |||||
var ordConfirmAt string | |||||
for _, item := range canalMsg.Data { | |||||
ordConfirmAt = utils.TimeParseStd(item.ConfirmTime).Format("2006-01-02") | |||||
} | |||||
ordConfirmStatistics, err := db2.GetMasterMallOrderStatistics(db.Db, masterId, ordConfirmAt) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
if ordConfirmStatistics == nil { | |||||
return errors.New("过期收货订单数据不予处理") | |||||
} | |||||
if judgeSate == 1 { | |||||
//TODO::收货 | |||||
for _, item := range canalMsg.Data { | |||||
ordConfirmStatistics.ReceiveCommission = utils.Float64ToStr(utils.StrToFloat64(ordConfirmStatistics.ReceiveCommission) + utils.StrToFloat64(item.EstimateCommission)) | |||||
} | |||||
} | |||||
if judgeSate == 3 { | |||||
//TODO::已收货失效 | |||||
for _, item := range canalMsg.Data { | |||||
ordConfirmStatistics.ReceiveCommission = utils.Float64ToStr(utils.StrToFloat64(ordConfirmStatistics.ReceiveCommission) - utils.StrToFloat64(item.EstimateCommission)) | |||||
} | |||||
} | |||||
ordConfirmStatistics.UpdateAt = now.Format("2006-01-02 15:04:05") | |||||
_, err = db2.MasterMallOrderStatisticsUpdate(db.Db, ordConfirmStatistics.Id, ordConfirmStatistics, | |||||
"receive_commission", "update_at") | |||||
} | |||||
} | |||||
} | |||||
if isUpdate { | |||||
statistics.PaymentTotal = paymentTotal | |||||
statistics.OrderCount = orderCount | |||||
statistics.EstimatedCommission = estimatedCommission | |||||
statistics.LoseOrderCount = loseOrderCount | |||||
statistics.PlaceOrderNumOfPeople = placeOrderNumOfPeople | |||||
statistics.EffectiveOrderCount = effectiveOrderCount | |||||
statistics.EffectiveCommission = effectiveCommission | |||||
statistics.LoseCommission = loseCommission | |||||
statistics.AvgCommission = avgCommission | |||||
statistics.CustomerUnitPrice = customerUnitPrice | |||||
statistics.EffectivePaymentTotal = effectivePaymentTotal | |||||
statistics.UpdateAt = now.Format("2006-01-02 15:04:05") | |||||
_, err = db2.MasterMallOrderStatisticsUpdate(db.Db, statistics.Id, statistics, | |||||
"payment_total", "order_count", "estimated_commission", "lose_order_count", | |||||
"place_order_num_of_people", "effective_order_count", "effective_commission", "lose_commission", | |||||
"avg_commission", "customer_unit_price", "effective_payment_total", "update_at") | |||||
if err != nil { | |||||
return err | |||||
} | |||||
} | |||||
} | |||||
return nil | |||||
} | |||||
// JudgeMallOrdSate 处理订单状态(judgeSate[0:不需要处理 1:收货 2:未收货失效 3:已收货失效]) | |||||
func JudgeMallOrdSate(message md.CanalMallOrder) (judgeSate int) { | |||||
oldData := message.Old | |||||
//1、获取 旧的订单状态 | |||||
var oldOrdState string | |||||
for _, item := range oldData { | |||||
if item.State != "" { | |||||
oldOrdState = item.State | |||||
} | |||||
} | |||||
if oldOrdState == "" { | |||||
return | |||||
} | |||||
//2、获取 新的订单状态 | |||||
var nowOrdState string | |||||
for _, item := range message.Data { | |||||
if item.State != "" { | |||||
nowOrdState = item.State | |||||
} | |||||
} | |||||
if nowOrdState == "" { | |||||
return | |||||
} | |||||
if oldOrdState == nowOrdState { | |||||
return | |||||
} | |||||
//3、进行状态比较判断 | |||||
if oldOrdState == "3" || oldOrdState == "4" || oldOrdState == "5" { | |||||
if nowOrdState == "6" { | |||||
//已收货失效 | |||||
return 3 | |||||
} | |||||
} else { | |||||
if nowOrdState == "6" { | |||||
//未收货失效 | |||||
return 2 | |||||
} | |||||
if nowOrdState == "3" || nowOrdState == "4" || nowOrdState == "5" { | |||||
//收货 | |||||
return 1 | |||||
} | |||||
} | |||||
return | |||||
} |
@@ -0,0 +1,307 @@ | |||||
package consume | |||||
import ( | |||||
"applet/app/db" | |||||
"applet/app/utils" | |||||
"applet/app/utils/cache" | |||||
"applet/app/utils/logx" | |||||
"applet/consume/md" | |||||
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" | |||||
db2 "code.fnuoos.com/go_rely_warehouse/zyos_model.git/src/models/official" | |||||
"code.fnuoos.com/go_rely_warehouse/zyos_model.git/src/models/official/model" | |||||
"encoding/json" | |||||
"errors" | |||||
"fmt" | |||||
"github.com/shopspring/decimal" | |||||
"github.com/streadway/amqp" | |||||
"strings" | |||||
"time" | |||||
) | |||||
const ZhiOsO2oPlaceOrderNumOfPeopleHashMapCacheKey = "%s:zhiOs_o2o_place_order_num_of_people_hash_map_cache:%s" //下单人数缓存hashMap键 | |||||
func CanalO2oOrderForNumericalStatementConsume(queue md.MqQueue) { | |||||
ch, err := rabbit.Cfg.Pool.GetChannel() | |||||
if err != nil { | |||||
logx.Error(err) | |||||
return | |||||
} | |||||
defer ch.Release() | |||||
//1、将自己绑定到交换机上 | |||||
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) | |||||
//2、取出数据进行消费 | |||||
ch.Qos(100) | |||||
delivery := ch.Consume(queue.Name, false) | |||||
var res amqp.Delivery | |||||
var ok bool | |||||
for { | |||||
res, ok = <-delivery | |||||
if ok == true { | |||||
//fmt.Println(string(res.Body)) | |||||
fmt.Println(">>>>>>>>>>>>>>>>CanalO2oOrderForNumericalStatementConsume<<<<<<<<<<<<<<<<<<<<<<<<<") | |||||
err = handleCanalO2oOrderForNumericalStatementTable(res.Body) | |||||
if err != nil { | |||||
fmt.Println("err ::: ", err) | |||||
utils.FilePutContents("CanalO2oOrderForNumericalStatementConsume_ERR", "[err]:"+err.Error()) | |||||
_ = res.Reject(false) | |||||
////TODO::重新推回队列末尾,避免造成队列堵塞 | |||||
//var msg *md.OneCirclesStructForSignIn | |||||
//json.Unmarshal(res.Body, &msg) | |||||
//ch.Publish(queue.ExchangeName, msg, queue.RoutKey) | |||||
} else { | |||||
_ = res.Ack(true) | |||||
} | |||||
} else { | |||||
panic(errors.New("error getting message")) | |||||
} | |||||
} | |||||
fmt.Println("get msg done") | |||||
} | |||||
func handleCanalO2oOrderForNumericalStatementTable(msg []byte) error { | |||||
//1、解析canal采集至mq中queue的数据结构体 | |||||
var canalMsg *md.CanalO2oOrderMessage[md.CanalO2oOrder] | |||||
err := json.Unmarshal(msg, &canalMsg) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
now := time.Now() | |||||
//2、获取masterId | |||||
masterId := utils.StrToInt(strings.Split(canalMsg.Database, "_")[1]) | |||||
//TODO::日志记录 | |||||
utils.FilePutContents("handleCanalO2oOrderForNumericalStatementTable_"+utils.IntToStr(masterId)+"_"+now.Format("2006-01-02"), string(msg)) | |||||
//2、判断操作(insert | update) | |||||
if canalMsg.Type == md.CanalMsgInsertSqlType || canalMsg.Type == md.CanalMsgUpdateSqlType { | |||||
var isUpdate bool | |||||
//3、查找是否有数据 | |||||
var ordDate string | |||||
for _, item := range canalMsg.Data { | |||||
ordDate = utils.TimeParseStd(item.CreateTime).Format("2006-01-02") | |||||
} | |||||
statistics, err := db2.GetMasterO2oOrderStatistics(db.Db, masterId, ordDate) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
if statistics == nil && canalMsg.Type == md.CanalMsgInsertSqlType { | |||||
statistics = &model.MasterO2oOrderStatistics{ | |||||
MasterId: masterId, | |||||
PaymentTotal: "", | |||||
OrderCount: 0, | |||||
EstimatedCommission: "", | |||||
LoseOrderCount: 0, | |||||
PlaceOrderNumOfPeople: 0, | |||||
EffectiveOrderCount: 0, | |||||
EffectiveCommission: "", | |||||
ReceiveCommission: "", | |||||
LoseCommission: "", | |||||
AvgCommission: "", | |||||
CustomerUnitPrice: "", | |||||
Date: ordDate, | |||||
CreateAt: now.Format("2006-01-02 15:04:05"), | |||||
UpdateAt: now.Format("2006-01-02 15:04:05"), | |||||
} | |||||
_, err = db2.MasterO2oOrderStatisticsInsert(db.Db, statistics) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
} | |||||
if statistics == nil { | |||||
return errors.New("过期订单数据不予处理") | |||||
} | |||||
paymentTotal := statistics.PaymentTotal | |||||
orderCount := statistics.OrderCount | |||||
estimatedCommission := statistics.EstimatedCommission | |||||
loseOrderCount := statistics.LoseOrderCount | |||||
placeOrderNumOfPeople := statistics.PlaceOrderNumOfPeople | |||||
effectiveOrderCount := statistics.EffectiveOrderCount | |||||
effectiveCommission := statistics.EffectiveCommission | |||||
loseCommission := statistics.LoseCommission | |||||
avgCommission := statistics.AvgCommission | |||||
customerUnitPrice := statistics.CustomerUnitPrice | |||||
effectivePaymentTotal := statistics.EffectivePaymentTotal | |||||
if canalMsg.Type == md.CanalMsgInsertSqlType { | |||||
for _, item := range canalMsg.Data { | |||||
orderCount++ | |||||
if item.State == "0" { | |||||
//未支付不统计 | |||||
return errors.New("未支付不统计") | |||||
} else if item.State == "6" { | |||||
//失效 | |||||
loseOrderCount++ | |||||
} else { | |||||
effectiveOrderCount++ | |||||
} | |||||
paymentTotal = utils.Float64ToStr(utils.StrToFloat64(paymentTotal) + utils.StrToFloat64(item.CostPrice)) //付款金额 | |||||
estimatedCommission = utils.Float64ToStr(utils.StrToFloat64(estimatedCommission) + utils.StrToFloat64(item.EstimateCommission)) //预估佣金(元) | |||||
effectiveCommission = utils.Float64ToStr(utils.StrToFloat64(effectiveCommission) + utils.StrToFloat64(item.EstimateCommission)) //有效佣金(元) | |||||
effectivePaymentTotal = utils.Float64ToStr(utils.StrToFloat64(effectivePaymentTotal) + utils.StrToFloat64(item.CostPrice)) //有效付款金额(元) | |||||
estimatedCommissionValue, _ := decimal.NewFromString(estimatedCommission) | |||||
orderCountValue := decimal.NewFromInt(int64(orderCount)) | |||||
avgCommission = estimatedCommissionValue.Div(orderCountValue).String() //平均佣金 | |||||
if item.State != "6" && item.State != "5" { | |||||
cacheKey := fmt.Sprintf(ZhiOsO2oPlaceOrderNumOfPeopleHashMapCacheKey, utils.IntToStr(masterId), ordDate) | |||||
get, _ := cache.HGetString(cacheKey, item.Uid) | |||||
if get == "" { | |||||
placeOrderNumOfPeople++ //下单人数 | |||||
cache.HSet(cacheKey, item.Uid, "1") | |||||
} else { | |||||
cache.HSet(cacheKey, item.Uid, utils.IntToStr(utils.StrToInt(get)+1)) | |||||
} | |||||
cache.Expire(cacheKey, md.ZhiOsUserVisitIpAddressHashMapCacheTime) | |||||
paymentTotalValue, _ := decimal.NewFromString(paymentTotal) | |||||
if placeOrderNumOfPeople == 0 { | |||||
return errors.New("divider cannot be 0 in division operation") | |||||
} | |||||
placeOrderNumOfPeopleValue := decimal.NewFromInt(int64(placeOrderNumOfPeople)) | |||||
customerUnitPrice = paymentTotalValue.Div(placeOrderNumOfPeopleValue).String() | |||||
} | |||||
//客单价 | |||||
isUpdate = true | |||||
} | |||||
} | |||||
if canalMsg.Type == md.CanalMsgUpdateSqlType { | |||||
judgeSate := JudgeO2oOrdSate(*canalMsg) | |||||
if judgeSate > 0 { | |||||
if judgeSate == 2 { | |||||
//TODO::未收货失效 | |||||
loseOrderCount++ | |||||
effectiveOrderCount-- | |||||
for _, item := range canalMsg.Data { | |||||
loseCommission = utils.Float64ToStr(utils.StrToFloat64(loseCommission) + utils.StrToFloat64(item.EstimateCommission)) | |||||
effectiveCommission = utils.Float64ToStr(utils.StrToFloat64(effectiveCommission) - utils.StrToFloat64(item.EstimateCommission)) | |||||
effectivePaymentTotal = utils.Float64ToStr(utils.StrToFloat64(effectivePaymentTotal) - utils.StrToFloat64(item.CostPrice)) | |||||
isUpdate = true | |||||
} | |||||
} | |||||
if judgeSate == 3 { | |||||
//TODO::已收货失效 | |||||
loseOrderCount++ | |||||
effectiveOrderCount-- | |||||
for _, item := range canalMsg.Data { | |||||
loseCommission = utils.Float64ToStr(utils.StrToFloat64(loseCommission) + utils.StrToFloat64(item.EstimateCommission)) | |||||
effectiveCommission = utils.Float64ToStr(utils.StrToFloat64(effectiveCommission) - utils.StrToFloat64(item.EstimateCommission)) | |||||
effectivePaymentTotal = utils.Float64ToStr(utils.StrToFloat64(effectivePaymentTotal) - utils.StrToFloat64(item.CostPrice)) | |||||
isUpdate = true | |||||
} | |||||
} | |||||
if judgeSate == 1 || judgeSate == 3 { | |||||
//TODO::收货额外处理 | |||||
//查找是否有数据 | |||||
var ordConfirmAt string | |||||
for _, item := range canalMsg.Data { | |||||
ordConfirmAt = utils.TimeParseStd(item.FinishTime).Format("2006-01-02") | |||||
} | |||||
ordConfirmStatistics, err := db2.GetMasterO2oOrderStatistics(db.Db, masterId, ordConfirmAt) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
if ordConfirmStatistics == nil { | |||||
return errors.New("过期收货订单数据不予处理") | |||||
} | |||||
if judgeSate == 1 { | |||||
//TODO::收货 | |||||
for _, item := range canalMsg.Data { | |||||
ordConfirmStatistics.ReceiveCommission = utils.Float64ToStr(utils.StrToFloat64(ordConfirmStatistics.ReceiveCommission) + utils.StrToFloat64(item.EstimateCommission)) | |||||
} | |||||
} | |||||
if judgeSate == 3 { | |||||
//TODO::已收货失效 | |||||
for _, item := range canalMsg.Data { | |||||
ordConfirmStatistics.ReceiveCommission = utils.Float64ToStr(utils.StrToFloat64(ordConfirmStatistics.ReceiveCommission) - utils.StrToFloat64(item.EstimateCommission)) | |||||
} | |||||
} | |||||
ordConfirmStatistics.UpdateAt = now.Format("2006-01-02 15:04:05") | |||||
_, err = db2.MasterO2oOrderStatisticsUpdate(db.Db, ordConfirmStatistics.Id, ordConfirmStatistics, | |||||
"receive_commission", "update_at") | |||||
} | |||||
} | |||||
} | |||||
if isUpdate { | |||||
statistics.PaymentTotal = paymentTotal | |||||
statistics.OrderCount = orderCount | |||||
statistics.EstimatedCommission = estimatedCommission | |||||
statistics.LoseOrderCount = loseOrderCount | |||||
statistics.PlaceOrderNumOfPeople = placeOrderNumOfPeople | |||||
statistics.EffectiveOrderCount = effectiveOrderCount | |||||
statistics.EffectiveCommission = effectiveCommission | |||||
statistics.LoseCommission = loseCommission | |||||
statistics.AvgCommission = avgCommission | |||||
statistics.CustomerUnitPrice = customerUnitPrice | |||||
statistics.EffectivePaymentTotal = effectivePaymentTotal | |||||
statistics.UpdateAt = now.Format("2006-01-02 15:04:05") | |||||
_, err = db2.MasterO2oOrderStatisticsUpdate(db.Db, statistics.Id, statistics, | |||||
"payment_total", "order_count", "estimated_commission", "lose_order_count", | |||||
"place_order_num_of_people", "effective_order_count", "effective_commission", "lose_commission", | |||||
"avg_commission", "customer_unit_price", "effective_payment_total", "update_at") | |||||
if err != nil { | |||||
return err | |||||
} | |||||
} | |||||
} | |||||
return nil | |||||
} | |||||
// JudgeO2oOrdSate 处理订单状态(judgeSate[0:不需要处理 1:收货 2:未收货失效 3:已收货失效]) | |||||
func JudgeO2oOrdSate(message md.CanalO2oOrderMessage[md.CanalO2oOrder]) (judgeSate int) { | |||||
oldData := message.Old | |||||
//1、获取 旧的订单状态 | |||||
var oldOrdState string | |||||
for _, item := range oldData { | |||||
if item.State != "" { | |||||
oldOrdState = item.State | |||||
} | |||||
} | |||||
if oldOrdState == "" { | |||||
return | |||||
} | |||||
//2、获取 新的订单状态 | |||||
var nowOrdState string | |||||
for _, item := range message.Data { | |||||
if item.State != "" { | |||||
nowOrdState = item.State | |||||
} | |||||
} | |||||
if nowOrdState == "" { | |||||
return | |||||
} | |||||
if oldOrdState == nowOrdState { | |||||
return | |||||
} | |||||
//3、进行状态比较判断 | |||||
if oldOrdState == "3" || oldOrdState == "4" { | |||||
if nowOrdState == "5" || nowOrdState == "6" { | |||||
//已收货失效 | |||||
return 3 | |||||
} | |||||
} else { | |||||
if nowOrdState == "5" || nowOrdState == "6" { | |||||
//未收货失效 | |||||
return 2 | |||||
} | |||||
if nowOrdState == "3" || nowOrdState == "4" { | |||||
//收货 | |||||
return 1 | |||||
} | |||||
} | |||||
return | |||||
} |
@@ -0,0 +1,173 @@ | |||||
package consume | |||||
import ( | |||||
"applet/app/db" | |||||
"applet/app/utils" | |||||
"applet/app/utils/cache" | |||||
"applet/app/utils/logx" | |||||
"applet/consume/md" | |||||
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" | |||||
db2 "code.fnuoos.com/go_rely_warehouse/zyos_model.git/src/models/official" | |||||
"code.fnuoos.com/go_rely_warehouse/zyos_model.git/src/models/official/model" | |||||
"encoding/json" | |||||
"errors" | |||||
"fmt" | |||||
"github.com/shopspring/decimal" | |||||
"github.com/streadway/amqp" | |||||
"strings" | |||||
"time" | |||||
) | |||||
const ZhiOsO2oPayPlaceOrderNumOfPeopleHashMapCacheKey = "%s:zhiOs_o2o_pay_place_order_num_of_people_hash_map_cache:%s" //下单人数缓存hashMap键 | |||||
func CanalO2oPayOrderForNumericalStatementConsume(queue md.MqQueue) { | |||||
ch, err := rabbit.Cfg.Pool.GetChannel() | |||||
if err != nil { | |||||
logx.Error(err) | |||||
return | |||||
} | |||||
defer ch.Release() | |||||
//1、将自己绑定到交换机上 | |||||
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) | |||||
//2、取出数据进行消费 | |||||
ch.Qos(100) | |||||
delivery := ch.Consume(queue.Name, false) | |||||
var res amqp.Delivery | |||||
var ok bool | |||||
for { | |||||
res, ok = <-delivery | |||||
if ok == true { | |||||
//fmt.Println(string(res.Body)) | |||||
fmt.Println(">>>>>>>>>>>>>>>>CanalO2oPayOrderForNumericalStatementConsume<<<<<<<<<<<<<<<<<<<<<<<<<") | |||||
err = handleCanalO2oPayOrderForNumericalStatementTable(res.Body) | |||||
if err != nil { | |||||
fmt.Println("err ::: ", err) | |||||
utils.FilePutContents("CanalO2oPayOrderForNumericalStatementConsume_ERR", "[err]:"+err.Error()) | |||||
_ = res.Reject(false) | |||||
////TODO::重新推回队列末尾,避免造成队列堵塞 | |||||
//var msg *md.OneCirclesStructForSignIn | |||||
//json.Unmarshal(res.Body, &msg) | |||||
//ch.Publish(queue.ExchangeName, msg, queue.RoutKey) | |||||
} else { | |||||
_ = res.Ack(true) | |||||
} | |||||
} else { | |||||
panic(errors.New("error getting message")) | |||||
} | |||||
} | |||||
fmt.Println("get msg done") | |||||
} | |||||
func handleCanalO2oPayOrderForNumericalStatementTable(msg []byte) error { | |||||
//1、解析canal采集至mq中queue的数据结构体 | |||||
var canalMsg *md.CanalMallOrder | |||||
err := json.Unmarshal(msg, &canalMsg) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
now := time.Now() | |||||
//2、获取masterId | |||||
masterId := utils.StrToInt(strings.Split(canalMsg.Database, "_")[1]) | |||||
//TODO::日志记录 | |||||
utils.FilePutContents("handleCanalO2oPayOrderForNumericalStatementTable_"+utils.IntToStr(masterId)+"_"+now.Format("2006-01-02"), string(msg)) | |||||
//2、判断操作(insert | update) | |||||
if canalMsg.Type == md.CanalMsgInsertSqlType || canalMsg.Type == md.CanalMsgUpdateSqlType { | |||||
var isUpdate bool | |||||
//3、查找是否有数据 | |||||
var ordDate string | |||||
for _, item := range canalMsg.Data { | |||||
ordDate = utils.TimeParseStd(item.CreateTime).Format("2006-01-02") | |||||
} | |||||
statistics, err := db2.GetMasterO2oPayOrderStatistics(db.Db, masterId, ordDate) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
if statistics == nil && canalMsg.Type == md.CanalMsgInsertSqlType { | |||||
statistics = &model.MasterO2oPayOrderStatistics{ | |||||
MasterId: masterId, | |||||
PaymentTotal: "", | |||||
OrderCount: 0, | |||||
EstimatedCommission: "", | |||||
PlaceOrderNumOfPeople: 0, | |||||
AvgCommission: "", | |||||
CustomerUnitPrice: "", | |||||
Date: ordDate, | |||||
CreateAt: now.Format("2006-01-02 15:04:05"), | |||||
UpdateAt: now.Format("2006-01-02 15:04:05"), | |||||
} | |||||
_, err = db2.MasterO2oPayOrderStatisticsInsert(db.Db, statistics) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
} | |||||
if statistics == nil { | |||||
return errors.New("过期订单数据不予处理") | |||||
} | |||||
paymentTotal := statistics.PaymentTotal | |||||
orderCount := statistics.OrderCount | |||||
estimatedCommission := statistics.EstimatedCommission | |||||
placeOrderNumOfPeople := statistics.PlaceOrderNumOfPeople | |||||
avgCommission := statistics.AvgCommission | |||||
customerUnitPrice := statistics.CustomerUnitPrice | |||||
if canalMsg.Type == md.CanalMsgInsertSqlType { | |||||
for _, item := range canalMsg.Data { | |||||
orderCount++ | |||||
if item.State == "0" { | |||||
//未支付不统计 | |||||
return errors.New("未支付不统计") | |||||
} | |||||
if item.State == "1" || item.State == "4" { | |||||
paymentTotal = utils.Float64ToStr(utils.StrToFloat64(paymentTotal) + utils.StrToFloat64(item.CostPrice)) //付款金额 | |||||
estimatedCommission = utils.Float64ToStr(utils.StrToFloat64(estimatedCommission) + utils.StrToFloat64(item.EstimateCommission)) //预估佣金(元) | |||||
estimatedCommissionValue, _ := decimal.NewFromString(estimatedCommission) | |||||
orderCountValue := decimal.NewFromInt(int64(orderCount)) | |||||
avgCommission = estimatedCommissionValue.Div(orderCountValue).String() //平均佣金 | |||||
cacheKey := fmt.Sprintf(ZhiOsO2oPayPlaceOrderNumOfPeopleHashMapCacheKey, utils.IntToStr(masterId), ordDate) | |||||
get, _ := cache.HGetString(cacheKey, item.Uid) | |||||
if get == "" { | |||||
placeOrderNumOfPeople++ //下单人数 | |||||
cache.HSet(cacheKey, item.Uid, "1") | |||||
} else { | |||||
cache.HSet(cacheKey, item.Uid, utils.IntToStr(utils.StrToInt(get)+1)) | |||||
} | |||||
cache.Expire(cacheKey, md.ZhiOsUserVisitIpAddressHashMapCacheTime) | |||||
paymentTotalValue, _ := decimal.NewFromString(paymentTotal) | |||||
if placeOrderNumOfPeople == 0 { | |||||
return errors.New("divider cannot be 0 in division operation") | |||||
} | |||||
placeOrderNumOfPeopleValue := decimal.NewFromInt(int64(placeOrderNumOfPeople)) | |||||
customerUnitPrice = paymentTotalValue.Div(placeOrderNumOfPeopleValue).String() //客单价 | |||||
isUpdate = true | |||||
} | |||||
} | |||||
} | |||||
if isUpdate { | |||||
statistics.PaymentTotal = paymentTotal | |||||
statistics.OrderCount = orderCount | |||||
statistics.EstimatedCommission = estimatedCommission | |||||
statistics.PlaceOrderNumOfPeople = placeOrderNumOfPeople | |||||
statistics.AvgCommission = avgCommission | |||||
statistics.CustomerUnitPrice = customerUnitPrice | |||||
statistics.UpdateAt = now.Format("2006-01-02 15:04:05") | |||||
_, err = db2.MasterO2oPayOrderStatisticsUpdate(db.Db, statistics.Id, statistics, | |||||
"payment_total", "order_count", "estimated_commission", | |||||
"place_order_num_of_people", | |||||
"avg_commission", "customer_unit_price", "update_at") | |||||
if err != nil { | |||||
return err | |||||
} | |||||
} | |||||
} | |||||
return nil | |||||
} |
@@ -19,48 +19,48 @@ func Init() { | |||||
func initConsumes() { | func initConsumes() { | ||||
//jobs[consumeMd.ZhiosGuideStoreOrderFunName] = ZhiosGuideStoreOrder | //jobs[consumeMd.ZhiosGuideStoreOrderFunName] = ZhiosGuideStoreOrder | ||||
jobs[consumeMd.ZhiosIntegralProxyRechargeFunName] = ZhiosIntegralProxyRecharge | |||||
jobs[consumeMd.ZhiosUserUpLvFunName] = ZhiosUserUpLv | |||||
jobs[consumeMd.CanalGuideOrderByUserUpLvConsume] = CanalGuideOrderByUserUpLvConsume | |||||
jobs[consumeMd.ZhiosOrderFreeFunName] = ZhiosOrderFree | |||||
jobs[consumeMd.ZhiosOrderTotalFunName] = ZhiosOrderTotal | |||||
jobs[consumeMd.ZhiosOrderTotalSecondFunName] = ZhiosOrderTotalSecond | |||||
jobs[consumeMd.ZhiosOrderSettleTotalFunName] = ZhiosSettleTotal | |||||
jobs[consumeMd.ZhiosOrderHjyFunName] = ZhiosOrderHjy | |||||
jobs[consumeMd.ZhiosOrderBuckleFunName] = ZhiosOrderBuckle | |||||
jobs[consumeMd.ZhiosSupplierAfterOrderFunName] = ZhiosSupplierAfterOrder | |||||
jobs[consumeMd.ZhiosAppreciationFunName] = ZhiosAppreciation | |||||
jobs[consumeMd.ZhiosValidUserFunName] = ZhiosValidUser | |||||
jobs[consumeMd.ZhiosAcquisitionConditionFunName] = ZhiosAcquisitionCondition | |||||
jobs[consumeMd.DouShenUserRegisterConsumeForOfficialFunName] = DouShenUserRegisterConsumeForOfficial | |||||
jobs[consumeMd.DouShenUserRegisterConsumeForOperationCenterFunName] = DouShenUserRegisterConsumeForOperationCenter | |||||
jobs[consumeMd.DouShenUserRegisterConsumeForMyRecommenderFunName] = DouShenUserRegisterConsumeForMyRecommender | |||||
jobs[consumeMd.DouShenUserRegisterConsumeForMyFansFunName] = DouShenUserRegisterConsumeForMyFans | |||||
jobs[consumeMd.DouShenUserRegisterConsumeForUserRegisterUpLvFunName] = DouShenUserRegisterConsumeForUserRegisterUpLv | |||||
jobs[consumeMd.ZhiosFastReturnOrderPayFunName] = ZhiosFastReturnOrderPay | |||||
jobs[consumeMd.ZhiosFastReturnOrderSuccessFunName] = ZhiosFastReturnOrderSuccess | |||||
jobs[consumeMd.ZhiosFastReturnOrderRefundFunName] = ZhiosFastReturnOrderRefund | |||||
jobs[consumeMd.ZhiosFastReturnOrderRefundSecondFunName] = ZhiosFastReturnOrderRefundSecond | |||||
jobs[consumeMd.YoumishangExchangeStoreFunName] = YoumishangExchangeStore | |||||
jobs[consumeMd.ZhiosRechargeOrderFailFunName] = ZhiosRechargeOrderFail | |||||
jobs[consumeMd.CloudIssuanceAsyncMLoginFunName] = CloudIssuanceAsyncMLoginConsume | |||||
jobs[consumeMd.ZhiosTikTokUpdateFunName] = ZhiosTikTokUpdate | |||||
jobs[consumeMd.ZhiosTikTokAllUpdateFunName] = ZhiosTikTokAllUpdate | |||||
jobs[consumeMd.ZhiosCapitalPoolOrderTotalFunName] = ZhiosCapitalPoolOrderTotal | |||||
jobs[consumeMd.ZhiosExpressOrderFail] = ZhiosExpressOrderFail | |||||
jobs[consumeMd.ZhiosWithdrawReward] = ZhiosWithdrawReward | |||||
jobs[consumeMd.ZhiosTaskTotal] = ZhiosTaskTotal | |||||
jobs[consumeMd.ZhiosAutoUnFreeze] = ZhiosAutoUnFreeze | |||||
//jobs[consumeMd.ZhiosIntegralProxyRechargeFunName] = ZhiosIntegralProxyRecharge | |||||
//jobs[consumeMd.ZhiosUserUpLvFunName] = ZhiosUserUpLv | |||||
//jobs[consumeMd.CanalGuideOrderByUserUpLvConsume] = CanalGuideOrderByUserUpLvConsume | |||||
//jobs[consumeMd.ZhiosOrderFreeFunName] = ZhiosOrderFree | |||||
//jobs[consumeMd.ZhiosOrderTotalFunName] = ZhiosOrderTotal | |||||
//jobs[consumeMd.ZhiosOrderTotalSecondFunName] = ZhiosOrderTotalSecond | |||||
// | |||||
//jobs[consumeMd.ZhiosOrderSettleTotalFunName] = ZhiosSettleTotal | |||||
//jobs[consumeMd.ZhiosOrderHjyFunName] = ZhiosOrderHjy | |||||
//jobs[consumeMd.ZhiosOrderBuckleFunName] = ZhiosOrderBuckle | |||||
// | |||||
//jobs[consumeMd.ZhiosSupplierAfterOrderFunName] = ZhiosSupplierAfterOrder | |||||
// | |||||
//jobs[consumeMd.ZhiosAppreciationFunName] = ZhiosAppreciation | |||||
//jobs[consumeMd.ZhiosValidUserFunName] = ZhiosValidUser | |||||
// | |||||
//jobs[consumeMd.ZhiosAcquisitionConditionFunName] = ZhiosAcquisitionCondition | |||||
// | |||||
//jobs[consumeMd.DouShenUserRegisterConsumeForOfficialFunName] = DouShenUserRegisterConsumeForOfficial | |||||
//jobs[consumeMd.DouShenUserRegisterConsumeForOperationCenterFunName] = DouShenUserRegisterConsumeForOperationCenter | |||||
//jobs[consumeMd.DouShenUserRegisterConsumeForMyRecommenderFunName] = DouShenUserRegisterConsumeForMyRecommender | |||||
//jobs[consumeMd.DouShenUserRegisterConsumeForMyFansFunName] = DouShenUserRegisterConsumeForMyFans | |||||
//jobs[consumeMd.DouShenUserRegisterConsumeForUserRegisterUpLvFunName] = DouShenUserRegisterConsumeForUserRegisterUpLv | |||||
// | |||||
//jobs[consumeMd.ZhiosFastReturnOrderPayFunName] = ZhiosFastReturnOrderPay | |||||
//jobs[consumeMd.ZhiosFastReturnOrderSuccessFunName] = ZhiosFastReturnOrderSuccess | |||||
//jobs[consumeMd.ZhiosFastReturnOrderRefundFunName] = ZhiosFastReturnOrderRefund | |||||
//jobs[consumeMd.ZhiosFastReturnOrderRefundSecondFunName] = ZhiosFastReturnOrderRefundSecond | |||||
// | |||||
//jobs[consumeMd.YoumishangExchangeStoreFunName] = YoumishangExchangeStore | |||||
// | |||||
//jobs[consumeMd.ZhiosRechargeOrderFailFunName] = ZhiosRechargeOrderFail | |||||
// | |||||
//jobs[consumeMd.CloudIssuanceAsyncMLoginFunName] = CloudIssuanceAsyncMLoginConsume | |||||
//jobs[consumeMd.ZhiosTikTokUpdateFunName] = ZhiosTikTokUpdate | |||||
//jobs[consumeMd.ZhiosTikTokAllUpdateFunName] = ZhiosTikTokAllUpdate | |||||
// | |||||
//jobs[consumeMd.ZhiosCapitalPoolOrderTotalFunName] = ZhiosCapitalPoolOrderTotal | |||||
//jobs[consumeMd.ZhiosExpressOrderFail] = ZhiosExpressOrderFail | |||||
//jobs[consumeMd.ZhiosWithdrawReward] = ZhiosWithdrawReward | |||||
//jobs[consumeMd.ZhiosTaskTotal] = ZhiosTaskTotal | |||||
//jobs[consumeMd.ZhiosAutoUnFreeze] = ZhiosAutoUnFreeze | |||||
////jobs[consumeMd.ZhiosUserProfileInviteCode] = ZhiosUserProfileInviteCode | ////jobs[consumeMd.ZhiosUserProfileInviteCode] = ZhiosUserProfileInviteCode | ||||
// | // | ||||
@@ -80,6 +80,10 @@ func initConsumes() { | |||||
//jobs[consumeMd.ZhiOsUserVisitIpAddressConsumeFunName] = ZhiOsUserVisitIpAddressConsume | //jobs[consumeMd.ZhiOsUserVisitIpAddressConsumeFunName] = ZhiOsUserVisitIpAddressConsume | ||||
//jobs[consumeMd.CanalUserVirtualCcoinFlowFunName] = CanalUserVirtualCoinFlowConsume | //jobs[consumeMd.CanalUserVirtualCcoinFlowFunName] = CanalUserVirtualCoinFlowConsume | ||||
//jobs[consumeMd.CanalGuideOrderForNumericalStatementConsumeFunName] = CanalGuideOrderForNumericalStatementConsume | //jobs[consumeMd.CanalGuideOrderForNumericalStatementConsumeFunName] = CanalGuideOrderForNumericalStatementConsume | ||||
//jobs[consumeMd.CanalMallOrderForNumericalStatementConsumeFunName] = CanalMallOrderForNumericalStatementConsume | |||||
//jobs[consumeMd.CanalO2oOrderForNumericalStatementConsumeFunName] = CanalO2oOrderForNumericalStatementConsume | |||||
//jobs[consumeMd.CanalO2oPayOrderForNumericalStatementConsumeFunName] = CanalO2oPayOrderForNumericalStatementConsume | |||||
//jobs[consumeMd.CanalB2cOrderForNumericalStatementConsumeFunName] = CanalB2cOrderForNumericalStatementConsume | |||||
//////////////////////////////////////// oneCircles ///////////////////////////////////////////////////// | //////////////////////////////////////// oneCircles ///////////////////////////////////////////////////// | ||||
@@ -89,6 +93,7 @@ func initConsumes() { | |||||
//jobs[consumeMd.OneCirclesActivityCoinAutoExchangeGreenEnergyForTeamFunName] = OneCirclesActivityCoinAutoExchangeGreenEnergyForTeamConsume | //jobs[consumeMd.OneCirclesActivityCoinAutoExchangeGreenEnergyForTeamFunName] = OneCirclesActivityCoinAutoExchangeGreenEnergyForTeamConsume | ||||
//jobs[consumeMd.OneCirclesSettlementPublicGiveActivityCoinFunName] = OneCirclesSettlementPublicGiveActivityCoinConsume | //jobs[consumeMd.OneCirclesSettlementPublicGiveActivityCoinFunName] = OneCirclesSettlementPublicGiveActivityCoinConsume | ||||
//jobs[consumeMd.OneCirclesAddPublicPlatoonUserRelationCommissionFunName] = OneCirclesAddPublicPlatoonUserRelationCommissionConsume | //jobs[consumeMd.OneCirclesAddPublicPlatoonUserRelationCommissionFunName] = OneCirclesAddPublicPlatoonUserRelationCommissionConsume | ||||
//jobs[consumeMd.OneCirclesSignInCopyGreenEnergyFunName] = OneCirclesSignInCopyGreenEnergyConsume | //jobs[consumeMd.OneCirclesSignInCopyGreenEnergyFunName] = OneCirclesSignInCopyGreenEnergyConsume | ||||
//////////////////////////////////////// withdraw ///////////////////////////////////////////////////// | //////////////////////////////////////// withdraw ///////////////////////////////////////////////////// | ||||
@@ -103,8 +108,12 @@ func initConsumes() { | |||||
//jobs[consumeMd.InstallmentPaymentAutoRepaidConsumeFunName] = InstallmentPaymentAutoRepaidConsume //分期付 - 自动扣款 | //jobs[consumeMd.InstallmentPaymentAutoRepaidConsumeFunName] = InstallmentPaymentAutoRepaidConsume //分期付 - 自动扣款 | ||||
////////////////////////////////////// SuperCloudIssuance ///////////////////////////////////////////////////// | ////////////////////////////////////// SuperCloudIssuance ///////////////////////////////////////////////////// | ||||
//jobs[consumeMd.SuperCloudIssuanceMsgCallBackFunName] = SuperCloudIssuanceMsgCallBackConsume | |||||
//jobs[consumeMd.SuperCloudIssuanceAsyncMLoginFunName] = SuperCloudIssuanceAsyncMLoginConsume | |||||
jobs[consumeMd.SuperCloudIssuanceMsgCallBackFunName] = SuperCloudIssuanceMsgCallBackConsume | |||||
jobs[consumeMd.SuperCloudIssuanceAsyncMLoginFunName] = SuperCloudIssuanceAsyncMLoginConsume | |||||
////////////////////////////////////// DMS ///////////////////////////////////////////////////// | |||||
//jobs[consumeMd.CanalGimMessageConsumeFunName] = CanalGimMessageConsume | |||||
} | } | ||||
func Run() { | func Run() { | ||||
@@ -83,6 +83,51 @@ var RabbitMqQueueKeyList = []*MqQueue{ | |||||
BindKey: "", | BindKey: "", | ||||
ConsumeFunName: "CanalGuideOrderForNumericalStatementConsume", | ConsumeFunName: "CanalGuideOrderForNumericalStatementConsume", | ||||
}, | }, | ||||
{ | |||||
ExchangeName: "canal.topic", | |||||
Name: "canal_mall_order_for_numerical_statement", | |||||
Type: TopicQueueType, | |||||
IsPersistent: false, | |||||
RoutKey: "canal_mall_order", | |||||
BindKey: "", | |||||
ConsumeFunName: "CanalMallOrderForNumericalStatementConsume", | |||||
}, | |||||
{ | |||||
ExchangeName: "canal.topic", | |||||
Name: "canal_b2c_order_for_numerical_statement", | |||||
Type: TopicQueueType, | |||||
IsPersistent: false, | |||||
RoutKey: "canal_b2c_order", | |||||
BindKey: "", | |||||
ConsumeFunName: "CanalB2cOrderForNumericalStatementConsume", | |||||
}, | |||||
{ | |||||
ExchangeName: "canal.topic", | |||||
Name: "canal_o2o_order_for_numerical_statement", | |||||
Type: TopicQueueType, | |||||
IsPersistent: false, | |||||
RoutKey: "canal_o2o_order", | |||||
BindKey: "", | |||||
ConsumeFunName: "CanalO2oOrderForNumericalStatementConsume", | |||||
}, | |||||
{ | |||||
ExchangeName: "canal.topic", | |||||
Name: "canal_o2o_pay_order_for_numerical_statement", | |||||
Type: TopicQueueType, | |||||
IsPersistent: false, | |||||
RoutKey: "canal_o2o_pay_to_merchant", | |||||
BindKey: "", | |||||
ConsumeFunName: "CanalO2oPayOrderForNumericalStatementConsume", | |||||
}, | |||||
{ | |||||
ExchangeName: "canal.topic", | |||||
Name: "canal_gim_message", | |||||
Type: TopicQueueType, | |||||
IsPersistent: false, | |||||
RoutKey: "canal_gim_message", | |||||
BindKey: "", | |||||
ConsumeFunName: "CanalGimMessageConsume", | |||||
}, | |||||
{ | { | ||||
ExchangeName: "canal.topic", | ExchangeName: "canal.topic", | ||||
Name: "canal_guide_order", | Name: "canal_guide_order", | ||||
@@ -578,6 +623,10 @@ const ( | |||||
ZhiosSupplierAfterOrderFunName = "ZhiosSupplierAfterOrder" | ZhiosSupplierAfterOrderFunName = "ZhiosSupplierAfterOrder" | ||||
CanalOrderConsumeFunName = "CanalOrderConsume" | CanalOrderConsumeFunName = "CanalOrderConsume" | ||||
CanalGuideOrderForNumericalStatementConsumeFunName = "CanalGuideOrderForNumericalStatementConsume" | CanalGuideOrderForNumericalStatementConsumeFunName = "CanalGuideOrderForNumericalStatementConsume" | ||||
CanalMallOrderForNumericalStatementConsumeFunName = "CanalMallOrderForNumericalStatementConsume" | |||||
CanalO2oOrderForNumericalStatementConsumeFunName = "CanalO2oOrderForNumericalStatementConsume" | |||||
CanalO2oPayOrderForNumericalStatementConsumeFunName = "CanalO2oPayOrderForNumericalStatementConsume" | |||||
CanalB2cOrderForNumericalStatementConsumeFunName = "CanalB2cOrderForNumericalStatementConsume" | |||||
CanalGuideOrderConsumeFunName = "CanalGuideOrderConsume" | CanalGuideOrderConsumeFunName = "CanalGuideOrderConsume" | ||||
ZhiOsUserVisitIpAddressConsumeFunName = "ZhiOsUserVisitIpAddressConsume" | ZhiOsUserVisitIpAddressConsumeFunName = "ZhiOsUserVisitIpAddressConsume" | ||||
DouShenUserRegisterConsumeForOfficialFunName = "DouShenUserRegisterConsumeForOfficial" | DouShenUserRegisterConsumeForOfficialFunName = "DouShenUserRegisterConsumeForOfficial" | ||||
@@ -625,4 +674,5 @@ const ( | |||||
ZhiosUserProfileInviteCode = "ZhiosUserProfileInviteCode" | ZhiosUserProfileInviteCode = "ZhiosUserProfileInviteCode" | ||||
ZhiosAutoUnFreeze = "ZhiosAutoUnFreeze" | ZhiosAutoUnFreeze = "ZhiosAutoUnFreeze" | ||||
InstallmentPaymentAutoRepaidConsumeFunName = "InstallmentPaymentAutoRepaidConsume" | InstallmentPaymentAutoRepaidConsumeFunName = "InstallmentPaymentAutoRepaidConsume" | ||||
CanalGimMessageConsumeFunName = "CanalGimMessageConsume" | |||||
) | ) |
@@ -0,0 +1,74 @@ | |||||
package md | |||||
type CanalB2cOrder struct { | |||||
OrdId string `json:"ord_id" xorm:"not null pk BIGINT(20)"` | |||||
MainOrdId string `json:"main_ord_id" xorm:"not null comment('主订单号') index BIGINT(20)"` | |||||
Uid string `json:"uid" xorm:"comment('用户id') index INT(11)"` | |||||
BelongStoreId string `json:"belong_store_id" xorm:"comment('归属店铺id') INT(11)"` | |||||
BuyerName string `json:"buyer_name" xorm:"comment('购买人') VARCHAR(255)"` | |||||
BuyerPhone string `json:"buyer_phone" xorm:"comment('购买人手机号') VARCHAR(255)"` | |||||
CostPrice string `json:"cost_price" xorm:"comment('价格') DECIMAL(12,2)"` | |||||
CostVirtualCoin string `json:"cost_virtual_coin" xorm:"comment('消耗的虚拟币') DECIMAL(12,2)"` | |||||
VirtualCoinId string `json:"virtual_coin_id" xorm:"comment('使用的虚拟币id') INT(11)"` | |||||
State string `json:"state" xorm:"comment('订单状态:0未支付 1已支付 ...(其余状态根据订单类型不同)') TINYINT(1)"` | |||||
PayTime string `json:"pay_time" xorm:"comment('支付时间') DATETIME"` | |||||
PickUp string `json:"pick_up" xorm:"not null comment('取货方式:1堂食 2打包带走') TINYINT(1)"` | |||||
PayChannel string `json:"pay_channel" xorm:"not null comment('支付方式:1balance 2alipay 3wx_pay 4zhios_pay_alipay') TINYINT(1)"` | |||||
ShippingTime string `json:"shipping_time" xorm:"comment('发货时间') DATETIME"` | |||||
DeliveryWay string `json:"delivery_way" xorm:"default 1 comment('发货方式(1:自己联系)') TINYINT(1)"` | |||||
LogisticCompany string `json:"logistic_company" xorm:"not null default '' comment('物流公司') VARCHAR(255)"` | |||||
LogisticNum string `json:"logistic_num" xorm:"not null default '' comment('物流单号') VARCHAR(255)"` | |||||
ReceiverPhone string `json:"receiver_phone" xorm:"not null default '' comment('收货人手机号') VARCHAR(20)"` | |||||
ReceiverName string `json:"receiver_name" xorm:"not null default '' comment('收货人名字') VARCHAR(255)"` | |||||
ReceiverAddressDetail string `json:"receiver_address_detail" xorm:"not null default '' comment('收货人地址') VARCHAR(255)"` | |||||
ShippingType string `json:"shipping_type" xorm:"not null default 1 comment('运送方式:1快递送货') TINYINT(1)"` | |||||
CouponDiscount string `json:"coupon_discount" xorm:"not null default 0.00 comment('优惠券折扣额') DECIMAL(12,2)"` | |||||
DiscountPrice string `json:"discount_price" xorm:"not null default 0.00 comment('立减') DECIMAL(12,2)"` | |||||
UserCouponId string `json:"user_coupon_id" xorm:"comment('使用的优惠券id') BIGINT(20)"` | |||||
ReturnInsuranceFee string `json:"return_insurance_fee" xorm:"not null default 0.00 comment('退货无忧费用') DECIMAL(12,2)"` | |||||
IsReceipt string `json:"is_receipt" xorm:"not null default 0 comment('是否开具发票 0否 1是') TINYINT(255)"` | |||||
ShippingFee string `json:"shipping_fee" xorm:"not null default 0.00 comment('运费') DECIMAL(12,2)"` | |||||
Comment string `json:"comment" xorm:"not null comment('备注') VARCHAR(2048)"` | |||||
ProvinceName string `json:"province_name" xorm:"not null default '' comment('收货省份') VARCHAR(255)"` | |||||
CityName string `json:"city_name" xorm:"not null default '' comment('收货城市') VARCHAR(255)"` | |||||
CountyName string `json:"county_name" xorm:"not null default '' comment('收货区域') VARCHAR(255)"` | |||||
PayNum string `json:"pay_num" xorm:"not null default '' comment('交易流水') VARCHAR(255)"` | |||||
ConfirmTime string `json:"confirm_time" xorm:"comment('确认时间') DATETIME"` | |||||
EstimateCommission string `json:"estimate_commission" xorm:"not null default 0.0000 comment('预计佣金(三方分账完之后站长的佣金)') DECIMAL(12,4)"` | |||||
CreateTime string `json:"create_time" xorm:"not null default 'CURRENT_TIMESTAMP' comment('创建时间') DATETIME"` | |||||
UpdateTime string `json:"update_time" xorm:"not null default 'CURRENT_TIMESTAMP' comment('更新时间') DATETIME"` | |||||
DeletedTime string `json:"deleted_time" xorm:"comment('删除时间') DATETIME"` | |||||
FinishTime string `json:"finish_time" xorm:"comment('完成时间') DATETIME"` | |||||
OrderType string `json:"order_type" xorm:"not null default 1 comment('订单类型:1(小店订单)') TINYINT(3)"` | |||||
Data string `json:"data" xorm:"not null comment('订单相关的数据') TEXT"` | |||||
SettleTime string `json:"settle_time" xorm:"comment('结算时间') DATETIME"` | |||||
CommissionTime string `json:"commission_time" xorm:"comment('分佣时间') DATETIME"` | |||||
ShareUid string `json:"share_uid" xorm:"comment('分享人') INT(11)"` | |||||
TotalPrice float32 `json:"total_price" xorm:"comment('订单总金额') FLOAT(8,2)"` | |||||
PickUpNum string `json:"pick_up_num" xorm:"comment('取餐号') VARCHAR(255)"` | |||||
TradeNo string `json:"trade_no" xorm:"comment('支付平台的订单号') VARCHAR(50)"` | |||||
PayTradeNo string `json:"pay_trade_no" xorm:"comment('支付联盟支付的订单号') VARCHAR(50)"` | |||||
ConsumptionType string `json:"consumption_type" xorm:"default 1 comment('消费类型:1到店消费2立即制作') TINYINT(1)"` | |||||
TableNum string `json:"table_num" xorm:"default '' comment('桌号') VARCHAR(12)"` | |||||
IsThreePartySplit string `json:"is_three_party_split" xorm:"comment('是否进行了三方分账,0:否,1:是') TINYINT(1)"` | |||||
MainCommission string `json:"main_commission" xorm:"not null default 0.0000 comment('总佣金') DECIMAL(12,4)"` | |||||
SkuPriceInfo string `json:"sku_price_info" xorm:"comment('新版本支付规则(规则计算信息)') VARCHAR(500)"` | |||||
PlatformCommission string `json:"platform_commission" xorm:"comment('平台所得的佣金(平台不是站长)') DECIMAL(12,4)"` | |||||
MealFee string `json:"meal_fee" xorm:"comment('餐位费') VARCHAR(50)"` | |||||
SupplierMerchantId string `json:"supplier_merchant_id" xorm:"comment('供应商id') INT(11)"` | |||||
SupplierOrdId string `json:"supplier_ord_id" xorm:"comment('供应商订单id') VARCHAR(255)"` | |||||
IsSetOutAch string `xorm:"not null default 0 INT(1)" json:"is_set_out_ach"` | |||||
} | |||||
type CanalB2cOrderMessage[T any] struct { | |||||
Data []T `json:"data"` | |||||
Database string `json:"database"` | |||||
ES int64 `json:"es"` | |||||
ID int64 `json:"id"` | |||||
IsDdl bool `json:"isDdl"` | |||||
Old []T `json:"old"` | |||||
PkNames []string `json:"pkNames"` | |||||
Table string `json:"table"` | |||||
TS int64 `json:"ts"` | |||||
Type string `json:"type"` | |||||
} |
@@ -0,0 +1,50 @@ | |||||
package md | |||||
import "time" | |||||
type Message struct { | |||||
Id string `json:"id"` | |||||
UserId string `json:"user_id"` | |||||
RequestId string `json:"request_id"` | |||||
SenderType string `json:"sender_type"` | |||||
SenderId string `json:"sender_id"` | |||||
ReceiverType string `json:"receiver_type"` | |||||
ReceiverId string `json:"receiver_id"` | |||||
ToUserIds string `json:"to_user_ids"` | |||||
Type string `json:"type"` | |||||
Content string `json:"content"` | |||||
Seq string `json:"seq"` | |||||
SendTime string `json:"send_time"` | |||||
Status string `json:"status"` | |||||
CreateTime string `json:"create_time"` | |||||
UpdateTime string `json:"update_time"` | |||||
} | |||||
type Message000 struct { | |||||
Id int64 // 自增主键 | |||||
UserId int64 // 所属类型id | |||||
RequestId int64 // 请求id | |||||
SenderType int32 // 发送者类型 | |||||
SenderId int64 // 发送者账户id | |||||
ReceiverType int32 // 接收者账户id | |||||
ReceiverId int64 // 接收者id,如果是单聊信息,则为user_id,如果是群组消息,则为group_id | |||||
ToUserIds string // 需要@的用户id列表,多个用户用,隔开 | |||||
Type int // 消息类型 | |||||
Content []byte // 消息内容 | |||||
Seq int64 // 消息同步序列 | |||||
SendTime time.Time // 消息发送时间 | |||||
Status int32 // 创建时间 | |||||
} | |||||
type CanalGimMessage[T any] struct { | |||||
Data []T `json:"data"` | |||||
Database string `json:"database"` | |||||
ES int64 `json:"es"` | |||||
ID int64 `json:"id"` | |||||
IsDdl bool `json:"isDdl"` | |||||
Old []T `json:"old"` | |||||
PkNames []string `json:"pkNames"` | |||||
Table string `json:"table"` | |||||
TS int64 `json:"ts"` | |||||
Type string `json:"type"` | |||||
} |
@@ -0,0 +1,297 @@ | |||||
package md | |||||
type CanalMallOrder struct { | |||||
Data []struct { | |||||
OrdId string `json:"ord_id"` | |||||
MainOrdId string `json:"main_ord_id"` | |||||
Uid string `json:"uid"` | |||||
BuyerName string `json:"buyer_name"` | |||||
BuyerPhone string `json:"buyer_phone"` | |||||
CostPrice string `json:"cost_price"` | |||||
State string `json:"state"` | |||||
PayTime interface{} `json:"pay_time"` | |||||
PayChannel string `json:"pay_channel"` | |||||
ShippingTime string `json:"shipping_time"` | |||||
LogisticCompany string `json:"logistic_company"` | |||||
LogisticNum string `json:"logistic_num"` | |||||
ReceiverPhone string `json:"receiver_phone"` | |||||
ReceiverName string `json:"receiver_name"` | |||||
ReceiverAddressDetail string `json:"receiver_address_detail"` | |||||
ShippingType string `json:"shipping_type"` | |||||
CouponDiscount string `json:"coupon_discount"` | |||||
DiscountPrice string `json:"discount_price"` | |||||
ReturnInsuranceFee string `json:"return_insurance_fee"` | |||||
IsReceipt string `json:"is_receipt"` | |||||
ShippingFee string `json:"shipping_fee"` | |||||
Comment string `json:"comment"` | |||||
ProvinceName string `json:"province_name"` | |||||
CityName string `json:"city_name"` | |||||
CountyName string `json:"county_name"` | |||||
PayNum string `json:"pay_num"` | |||||
ConfirmTime string `json:"confirm_time"` | |||||
EstimateIntegral string `json:"estimate_integral"` | |||||
EstimateCommission string `json:"estimate_commission"` | |||||
CreateTime string `json:"create_time"` | |||||
UpdateTime string `json:"update_time"` | |||||
DeletedTime interface{} `json:"deleted_time"` | |||||
FinishTime interface{} `json:"finish_time"` | |||||
OrderType string `json:"order_type"` | |||||
Data string `json:"data"` | |||||
GroupBuyCommission string `json:"group_buy_commission"` | |||||
GroupBuyCommissionTime interface{} `json:"group_buy_commission_time"` | |||||
CommissionTime string `json:"commission_time"` | |||||
GroupBuySettleTime interface{} `json:"group_buy_settle_time"` | |||||
SettleTime interface{} `json:"settle_time"` | |||||
CostVirtualCoin string `json:"cost_virtual_coin"` | |||||
VirtualCoinId string `json:"virtual_coin_id"` | |||||
UserCouponId string `json:"user_coupon_id"` | |||||
ShareUid string `json:"share_uid"` | |||||
IsConsign string `json:"is_consign"` | |||||
UserLevelData string `json:"user_level_data"` | |||||
GoodsId string `json:"goods_id"` | |||||
IsHasUserLevel string `json:"is_has_user_level"` | |||||
UserLevel string `json:"user_level"` | |||||
IsGiveUserLevel string `json:"is_give_user_level"` | |||||
ReturnMoneySettleAt string `json:"return_money_settle_at"` | |||||
IsSetReduce string `json:"is_set_reduce"` | |||||
DeductCoin string `json:"deduct_coin"` | |||||
ConsumptionCoinReward string `json:"consumption_coin_reward"` | |||||
DeductCoinReward string `json:"deduct_coin_reward"` | |||||
IsSubsidyEnd string `json:"is_subsidy_end"` | |||||
RunTime string `json:"run_time"` | |||||
SupplierMerchantId string `json:"supplier_merchant_id"` | |||||
SupplierOrdId string `json:"supplier_ord_id"` | |||||
PayOnBehalfUid string `json:"pay_on_behalf_uid"` | |||||
IsSetSubsidy string `json:"is_set_subsidy"` | |||||
ProvinceId string `json:"province_id"` | |||||
CityId string `json:"city_id"` | |||||
CountyId string `json:"county_id"` | |||||
CurrencyCode string `json:"currency_code"` | |||||
TotalShippingFee string `json:"total_shipping_fee"` | |||||
StoreOrdId string `json:"store_ord_id"` | |||||
IsSetOutAch string `json:"is_set_out_ach"` | |||||
SupplierStoreOrdId string `json:"supplier_store_ord_id"` | |||||
TransactionId string `json:"transaction_id"` | |||||
PayData interface{} `json:"pay_data"` | |||||
SubOrderPayType interface{} `json:"sub_order_pay_type"` | |||||
ConsumptionCoinRewardOut string `json:"consumption_coin_reward_out"` | |||||
DeductCoinRewardOut string `json:"deduct_coin_reward_out"` | |||||
IcbcIntegral string `json:"icbc_integral"` | |||||
ParentUid string `json:"parent_uid"` | |||||
SupplierCloudChainOrdId string `json:"supplier_cloud_chain_ord_id"` | |||||
GoodsType string `json:"goods_type"` | |||||
RewardCoinId string `json:"reward_coin_id"` | |||||
IsVirtualGoods string `json:"is_virtual_goods"` | |||||
RewardCoinAmount string `json:"reward_coin_amount"` | |||||
RewardVirtualUserLv string `json:"reward_virtual_user_lv"` | |||||
BaseCommission string `json:"base_commission"` | |||||
PlatformCostPrice string `json:"platform_cost_price"` | |||||
IsSettle string `json:"is_settle"` | |||||
Pvd string `json:"pvd"` | |||||
PointId string `json:"point_id"` | |||||
MacaoAddress string `json:"macao_address"` | |||||
AddressType string `json:"address_type"` | |||||
VirtualGoodsInfo string `json:"virtual_goods_info"` | |||||
PayWay string `json:"pay_way"` | |||||
} `json:"data"` | |||||
Database string `json:"database"` | |||||
Es int64 `json:"es"` | |||||
Id int `json:"id"` | |||||
IsDdl bool `json:"isDdl"` | |||||
MysqlType struct { | |||||
OrdId string `json:"ord_id"` | |||||
MainOrdId string `json:"main_ord_id"` | |||||
Uid string `json:"uid"` | |||||
BuyerName string `json:"buyer_name"` | |||||
BuyerPhone string `json:"buyer_phone"` | |||||
CostPrice string `json:"cost_price"` | |||||
State string `json:"state"` | |||||
PayTime string `json:"pay_time"` | |||||
PayChannel string `json:"pay_channel"` | |||||
ShippingTime string `json:"shipping_time"` | |||||
LogisticCompany string `json:"logistic_company"` | |||||
LogisticNum string `json:"logistic_num"` | |||||
ReceiverPhone string `json:"receiver_phone"` | |||||
ReceiverName string `json:"receiver_name"` | |||||
ReceiverAddressDetail string `json:"receiver_address_detail"` | |||||
ShippingType string `json:"shipping_type"` | |||||
CouponDiscount string `json:"coupon_discount"` | |||||
DiscountPrice string `json:"discount_price"` | |||||
ReturnInsuranceFee string `json:"return_insurance_fee"` | |||||
IsReceipt string `json:"is_receipt"` | |||||
ShippingFee string `json:"shipping_fee"` | |||||
Comment string `json:"comment"` | |||||
ProvinceName string `json:"province_name"` | |||||
CityName string `json:"city_name"` | |||||
CountyName string `json:"county_name"` | |||||
PayNum string `json:"pay_num"` | |||||
ConfirmTime string `json:"confirm_time"` | |||||
EstimateIntegral string `json:"estimate_integral"` | |||||
EstimateCommission string `json:"estimate_commission"` | |||||
CreateTime string `json:"create_time"` | |||||
UpdateTime string `json:"update_time"` | |||||
DeletedTime string `json:"deleted_time"` | |||||
FinishTime string `json:"finish_time"` | |||||
OrderType string `json:"order_type"` | |||||
Data string `json:"data"` | |||||
GroupBuyCommission string `json:"group_buy_commission"` | |||||
GroupBuyCommissionTime string `json:"group_buy_commission_time"` | |||||
CommissionTime string `json:"commission_time"` | |||||
GroupBuySettleTime string `json:"group_buy_settle_time"` | |||||
SettleTime string `json:"settle_time"` | |||||
CostVirtualCoin string `json:"cost_virtual_coin"` | |||||
VirtualCoinId string `json:"virtual_coin_id"` | |||||
UserCouponId string `json:"user_coupon_id"` | |||||
ShareUid string `json:"share_uid"` | |||||
IsConsign string `json:"is_consign"` | |||||
UserLevelData string `json:"user_level_data"` | |||||
GoodsId string `json:"goods_id"` | |||||
IsHasUserLevel string `json:"is_has_user_level"` | |||||
UserLevel string `json:"user_level"` | |||||
IsGiveUserLevel string `json:"is_give_user_level"` | |||||
ReturnMoneySettleAt string `json:"return_money_settle_at"` | |||||
IsSetReduce string `json:"is_set_reduce"` | |||||
DeductCoin string `json:"deduct_coin"` | |||||
ConsumptionCoinReward string `json:"consumption_coin_reward"` | |||||
DeductCoinReward string `json:"deduct_coin_reward"` | |||||
IsSubsidyEnd string `json:"is_subsidy_end"` | |||||
RunTime string `json:"run_time"` | |||||
SupplierMerchantId string `json:"supplier_merchant_id"` | |||||
SupplierOrdId string `json:"supplier_ord_id"` | |||||
PayOnBehalfUid string `json:"pay_on_behalf_uid"` | |||||
IsSetSubsidy string `json:"is_set_subsidy"` | |||||
ProvinceId string `json:"province_id"` | |||||
CityId string `json:"city_id"` | |||||
CountyId string `json:"county_id"` | |||||
CurrencyCode string `json:"currency_code"` | |||||
TotalShippingFee string `json:"total_shipping_fee"` | |||||
StoreOrdId string `json:"store_ord_id"` | |||||
IsSetOutAch string `json:"is_set_out_ach"` | |||||
SupplierStoreOrdId string `json:"supplier_store_ord_id"` | |||||
TransactionId string `json:"transaction_id"` | |||||
PayData string `json:"pay_data"` | |||||
SubOrderPayType string `json:"sub_order_pay_type"` | |||||
ConsumptionCoinRewardOut string `json:"consumption_coin_reward_out"` | |||||
DeductCoinRewardOut string `json:"deduct_coin_reward_out"` | |||||
IcbcIntegral string `json:"icbc_integral"` | |||||
ParentUid string `json:"parent_uid"` | |||||
SupplierCloudChainOrdId string `json:"supplier_cloud_chain_ord_id"` | |||||
GoodsType string `json:"goods_type"` | |||||
RewardCoinId string `json:"reward_coin_id"` | |||||
IsVirtualGoods string `json:"is_virtual_goods"` | |||||
RewardCoinAmount string `json:"reward_coin_amount"` | |||||
RewardVirtualUserLv string `json:"reward_virtual_user_lv"` | |||||
BaseCommission string `json:"base_commission"` | |||||
PlatformCostPrice string `json:"platform_cost_price"` | |||||
IsSettle string `json:"is_settle"` | |||||
Pvd string `json:"pvd"` | |||||
PointId string `json:"point_id"` | |||||
MacaoAddress string `json:"macao_address"` | |||||
AddressType string `json:"address_type"` | |||||
VirtualGoodsInfo string `json:"virtual_goods_info"` | |||||
PayWay string `json:"pay_way"` | |||||
} `json:"mysqlType"` | |||||
Old []struct { | |||||
State string `json:"state"` | |||||
ConfirmTime interface{} `json:"confirm_time"` | |||||
UpdateTime string `json:"update_time"` | |||||
} `json:"old"` | |||||
PkNames []string `json:"pkNames"` | |||||
Sql string `json:"sql"` | |||||
SqlType struct { | |||||
OrdId int `json:"ord_id"` | |||||
MainOrdId int `json:"main_ord_id"` | |||||
Uid int `json:"uid"` | |||||
BuyerName int `json:"buyer_name"` | |||||
BuyerPhone int `json:"buyer_phone"` | |||||
CostPrice int `json:"cost_price"` | |||||
State int `json:"state"` | |||||
PayTime int `json:"pay_time"` | |||||
PayChannel int `json:"pay_channel"` | |||||
ShippingTime int `json:"shipping_time"` | |||||
LogisticCompany int `json:"logistic_company"` | |||||
LogisticNum int `json:"logistic_num"` | |||||
ReceiverPhone int `json:"receiver_phone"` | |||||
ReceiverName int `json:"receiver_name"` | |||||
ReceiverAddressDetail int `json:"receiver_address_detail"` | |||||
ShippingType int `json:"shipping_type"` | |||||
CouponDiscount int `json:"coupon_discount"` | |||||
DiscountPrice int `json:"discount_price"` | |||||
ReturnInsuranceFee int `json:"return_insurance_fee"` | |||||
IsReceipt int `json:"is_receipt"` | |||||
ShippingFee int `json:"shipping_fee"` | |||||
Comment int `json:"comment"` | |||||
ProvinceName int `json:"province_name"` | |||||
CityName int `json:"city_name"` | |||||
CountyName int `json:"county_name"` | |||||
PayNum int `json:"pay_num"` | |||||
ConfirmTime int `json:"confirm_time"` | |||||
EstimateIntegral int `json:"estimate_integral"` | |||||
EstimateCommission int `json:"estimate_commission"` | |||||
CreateTime int `json:"create_time"` | |||||
UpdateTime int `json:"update_time"` | |||||
DeletedTime int `json:"deleted_time"` | |||||
FinishTime int `json:"finish_time"` | |||||
OrderType int `json:"order_type"` | |||||
Data int `json:"data"` | |||||
GroupBuyCommission int `json:"group_buy_commission"` | |||||
GroupBuyCommissionTime int `json:"group_buy_commission_time"` | |||||
CommissionTime int `json:"commission_time"` | |||||
GroupBuySettleTime int `json:"group_buy_settle_time"` | |||||
SettleTime int `json:"settle_time"` | |||||
CostVirtualCoin int `json:"cost_virtual_coin"` | |||||
VirtualCoinId int `json:"virtual_coin_id"` | |||||
UserCouponId int `json:"user_coupon_id"` | |||||
ShareUid int `json:"share_uid"` | |||||
IsConsign int `json:"is_consign"` | |||||
UserLevelData int `json:"user_level_data"` | |||||
GoodsId int `json:"goods_id"` | |||||
IsHasUserLevel int `json:"is_has_user_level"` | |||||
UserLevel int `json:"user_level"` | |||||
IsGiveUserLevel int `json:"is_give_user_level"` | |||||
ReturnMoneySettleAt int `json:"return_money_settle_at"` | |||||
IsSetReduce int `json:"is_set_reduce"` | |||||
DeductCoin int `json:"deduct_coin"` | |||||
ConsumptionCoinReward int `json:"consumption_coin_reward"` | |||||
DeductCoinReward int `json:"deduct_coin_reward"` | |||||
IsSubsidyEnd int `json:"is_subsidy_end"` | |||||
RunTime int `json:"run_time"` | |||||
SupplierMerchantId int `json:"supplier_merchant_id"` | |||||
SupplierOrdId int `json:"supplier_ord_id"` | |||||
PayOnBehalfUid int `json:"pay_on_behalf_uid"` | |||||
IsSetSubsidy int `json:"is_set_subsidy"` | |||||
ProvinceId int `json:"province_id"` | |||||
CityId int `json:"city_id"` | |||||
CountyId int `json:"county_id"` | |||||
CurrencyCode int `json:"currency_code"` | |||||
TotalShippingFee int `json:"total_shipping_fee"` | |||||
StoreOrdId int `json:"store_ord_id"` | |||||
IsSetOutAch int `json:"is_set_out_ach"` | |||||
SupplierStoreOrdId int `json:"supplier_store_ord_id"` | |||||
TransactionId int `json:"transaction_id"` | |||||
PayData int `json:"pay_data"` | |||||
SubOrderPayType int `json:"sub_order_pay_type"` | |||||
ConsumptionCoinRewardOut int `json:"consumption_coin_reward_out"` | |||||
DeductCoinRewardOut int `json:"deduct_coin_reward_out"` | |||||
IcbcIntegral int `json:"icbc_integral"` | |||||
ParentUid int `json:"parent_uid"` | |||||
SupplierCloudChainOrdId int `json:"supplier_cloud_chain_ord_id"` | |||||
GoodsType int `json:"goods_type"` | |||||
RewardCoinId int `json:"reward_coin_id"` | |||||
IsVirtualGoods int `json:"is_virtual_goods"` | |||||
RewardCoinAmount int `json:"reward_coin_amount"` | |||||
RewardVirtualUserLv int `json:"reward_virtual_user_lv"` | |||||
BaseCommission int `json:"base_commission"` | |||||
PlatformCostPrice int `json:"platform_cost_price"` | |||||
IsSettle int `json:"is_settle"` | |||||
Pvd int `json:"pvd"` | |||||
PointId int `json:"point_id"` | |||||
MacaoAddress int `json:"macao_address"` | |||||
AddressType int `json:"address_type"` | |||||
VirtualGoodsInfo int `json:"virtual_goods_info"` | |||||
PayWay int `json:"pay_way"` | |||||
} `json:"sqlType"` | |||||
Table string `json:"table"` | |||||
Ts int64 `json:"ts"` | |||||
Type string `json:"type"` | |||||
} |
@@ -0,0 +1,73 @@ | |||||
package md | |||||
type CanalO2oOrder struct { | |||||
OrdId string `json:"ord_id" xorm:"not null pk BIGstring(20)"` | |||||
MainOrdId string `json:"main_ord_id" xorm:"not null comment('主订单号') index BIGstring(20)"` | |||||
Uid string `json:"uid" xorm:"comment('用户id') index string(11)"` | |||||
BelongStoreId string `json:"belong_store_id" xorm:"comment('归属店铺id') string(11)"` | |||||
BuyerName string `json:"buyer_name" xorm:"comment('购买人') VARCHAR(255)"` | |||||
BuyerPhone string `json:"buyer_phone" xorm:"comment('购买人手机号') VARCHAR(255)"` | |||||
CostPrice string `json:"cost_price" xorm:"comment('价格') DECIMAL(12,2)"` | |||||
CostVirtualCoin string `json:"cost_virtual_coin" xorm:"comment('消耗的虚拟币') DECIMAL(12,2)"` | |||||
VirtualCoinId string `json:"virtual_coin_id" xorm:"comment('使用的虚拟币id') string(11)"` | |||||
State string `json:"state" xorm:"comment('订单状态:0未支付 1已支付 ...(其余状态根据订单类型不同)') TINYstring(1)"` | |||||
PayTime string `json:"pay_time" xorm:"comment('支付时间') DATETIME"` | |||||
PickUp string `json:"pick_up" xorm:"not null comment('取货方式:1堂食 2打包带走') TINYstring(1)"` | |||||
PayChannel string `json:"pay_channel" xorm:"not null comment('支付方式:1balance 2alipay 3wx_pay 4zhios_pay_alipay') TINYstring(1)"` | |||||
ShippingTime string `json:"shipping_time" xorm:"comment('发货时间') DATETIME"` | |||||
LogisticCompany string `json:"logistic_company" xorm:"not null default '' comment('物流公司') VARCHAR(255)"` | |||||
LogisticNum string `json:"logistic_num" xorm:"not null default '' comment('物流单号') VARCHAR(255)"` | |||||
ReceiverPhone string `json:"receiver_phone" xorm:"not null default '' comment('收货人手机号') VARCHAR(20)"` | |||||
ReceiverName string `json:"receiver_name" xorm:"not null default '' comment('收货人名字') VARCHAR(255)"` | |||||
ReceiverAddressDetail string `json:"receiver_address_detail" xorm:"not null default '' comment('收货人地址') VARCHAR(255)"` | |||||
ShippingType string `json:"shipping_type" xorm:"not null default 1 comment('运送方式:1快递送货') TINYstring(1)"` | |||||
CouponDiscount string `json:"coupon_discount" xorm:"not null default 0.00 comment('优惠券折扣额') DECIMAL(12,2)"` | |||||
DiscountPrice string `json:"discount_price" xorm:"not null default 0.00 comment('立减') DECIMAL(12,2)"` | |||||
UserCouponId string `json:"user_coupon_id" xorm:"comment('使用的优惠券id') BIGstring(20)"` | |||||
ReturnInsuranceFee string `json:"return_insurance_fee" xorm:"not null default 0.00 comment('退货无忧费用') DECIMAL(12,2)"` | |||||
IsReceipt string `json:"is_receipt" xorm:"not null default 0 comment('是否开具发票 0否 1是') TINYstring(255)"` | |||||
ShippingFee string `json:"shipping_fee" xorm:"not null default 0.00 comment('运费') DECIMAL(12,2)"` | |||||
Comment string `json:"comment" xorm:"not null comment('备注') VARCHAR(2048)"` | |||||
ProvinceName string `json:"province_name" xorm:"not null default '' comment('收货省份') VARCHAR(255)"` | |||||
CityName string `json:"city_name" xorm:"not null default '' comment('收货城市') VARCHAR(255)"` | |||||
CountyName string `json:"county_name" xorm:"not null default '' comment('收货区域') VARCHAR(255)"` | |||||
PayNum string `json:"pay_num" xorm:"not null default '' comment('交易流水') VARCHAR(255)"` | |||||
ConfirmTime string `json:"confirm_time" xorm:"comment('确认时间') DATETIME"` | |||||
EstimateCommission string `json:"estimate_commission" xorm:"not null default 0.0000 comment('预计佣金(三方分账完之后站长的佣金)') DECIMAL(12,4)"` | |||||
CreateTime string `json:"create_time" xorm:"not null default 'CURRENT_TIMESTAMP' comment('创建时间') DATETIME"` | |||||
UpdateTime string `json:"update_time" xorm:"not null default 'CURRENT_TIMESTAMP' comment('更新时间') DATETIME"` | |||||
DeletedTime string `json:"deleted_time" xorm:"comment('删除时间') DATETIME"` | |||||
FinishTime string `json:"finish_time" xorm:"comment('完成时间') DATETIME"` | |||||
OrderType string `json:"order_type" xorm:"not null default 1 comment('订单类型:1(小店订单)') TINYstring(3)"` | |||||
Data string `json:"data" xorm:"not null comment('订单相关的数据') TEXT"` | |||||
SettleTime string `json:"settle_time" xorm:"comment('结算时间') DATETIME"` | |||||
CommissionTime string `json:"commission_time" xorm:"comment('分佣时间') DATETIME"` | |||||
ShareUid string `json:"share_uid" xorm:"comment('分享人') string(11)"` | |||||
TotalPrice string `json:"total_price" xorm:"comment('订单总金额') FLOAT(8,2)"` | |||||
PickUpNum string `json:"pick_up_num" xorm:"comment('取餐号') VARCHAR(255)"` | |||||
TradeNo string `json:"trade_no" xorm:"comment('支付平台的订单号') VARCHAR(50)"` | |||||
PayTradeNo string `json:"pay_trade_no" xorm:"comment('支付联盟支付的订单号') VARCHAR(50)"` | |||||
ConsumptionType string `json:"consumption_type" xorm:"not null default 1 comment('消费类型:1到店消费2立即制作') TINYstring(1)"` | |||||
TableNum string `json:"table_num" xorm:"default '' comment('桌号') VARCHAR(12)"` | |||||
IsThreePartySplit string `json:"is_three_party_split" xorm:"comment('是否进行了三方分账,0:否,1:是') TINYstring(1)"` | |||||
MainCommission string `json:"main_commission" xorm:"not null default 0.0000 comment('总佣金') DECIMAL(12,4)"` | |||||
SkuPriceInfo string `json:"sku_price_info" xorm:"comment('新版本支付规则(规则计算信息)') VARCHAR(500)"` | |||||
PlatformCommission string `json:"platform_commission" xorm:"comment('平台所得的佣金(平台不是站长)') DECIMAL(12,4)"` | |||||
MealFee string `json:"meal_fee" xorm:"comment('餐位费') VARCHAR(50)"` | |||||
ReturnMoneySettleAt string `json:"return_money_settle_at" xorm:"default 0 comment('小口袋定制返现时间') string(11)"` | |||||
IsSetReduce string `json:"is_set_reduce" xorm:"default 0 comment('小口袋定制设置退回 0否 1是') string(1)"` | |||||
IsSetOutAch string `xorm:"not null default 0 string(1)" json:"is_set_out_ach"` | |||||
} | |||||
type CanalO2oOrderMessage[T any] struct { | |||||
Data []T `json:"data"` | |||||
Database string `json:"database"` | |||||
ES string `json:"es"` | |||||
ID string `json:"id"` | |||||
IsDdl bool `json:"isDdl"` | |||||
Old []T `json:"old"` | |||||
PkNames []string `json:"pkNames"` | |||||
Table string `json:"table"` | |||||
TS string `json:"ts"` | |||||
Type string `json:"type"` | |||||
} |
@@ -25,7 +25,7 @@ func OneCirclesSignInGreenEnergyConsume(queue md.MqQueue) { | |||||
//1、将自己绑定到交换机上 | //1、将自己绑定到交换机上 | ||||
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) | ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) | ||||
//2、取出数据进行消费 | //2、取出数据进行消费 | ||||
ch.Qos(1) | |||||
ch.Qos(5) | |||||
delivery := ch.Consume(queue.Name, false) | delivery := ch.Consume(queue.Name, false) | ||||
one_circles.Init(cfg.RedisAddr) | one_circles.Init(cfg.RedisAddr) | ||||
@@ -39,11 +39,12 @@ func OneCirclesSignInGreenEnergyConsume(queue md.MqQueue) { | |||||
if err != nil { | if err != nil { | ||||
fmt.Println("err ::: ", err) | fmt.Println("err ::: ", err) | ||||
utils.FilePutContents("OneCirclesSignInGreenEnergyConsume_ERR", "[err]:"+err.Error()) | utils.FilePutContents("OneCirclesSignInGreenEnergyConsume_ERR", "[err]:"+err.Error()) | ||||
_ = res.Reject(false) | |||||
//TODO::重新推回队列末尾,避免造成队列堵塞 | |||||
var msg *md.OneCirclesStructForSignIn | |||||
json.Unmarshal(res.Body, &msg) | |||||
ch.Publish(queue.ExchangeName, msg, queue.RoutKey) | |||||
_ = res.Reject(true) | |||||
//_ = res.Reject(false) | |||||
////TODO::重新推回队列末尾,避免造成队列堵塞 | |||||
//var msg *md.OneCirclesStructForSignIn | |||||
//json.Unmarshal(res.Body, &msg) | |||||
//ch.Publish(queue.ExchangeName, msg, queue.RoutKey) | |||||
} else { | } else { | ||||
_ = res.Ack(true) | _ = res.Ack(true) | ||||
} | } | ||||
@@ -25,7 +25,7 @@ func SuperCloudIssuanceMsgCallBackConsume(queue md.MqQueue) { | |||||
} | } | ||||
defer ch.Release() | defer ch.Release() | ||||
//2、取出数据进行消费 | //2、取出数据进行消费 | ||||
ch.Qos(10) | |||||
ch.Qos(1) | |||||
delivery := ch.Consume(queue.Name, false) | delivery := ch.Consume(queue.Name, false) | ||||
var res amqp.Delivery | var res amqp.Delivery | ||||
@@ -40,8 +40,8 @@ func SuperCloudIssuanceMsgCallBackConsume(queue md.MqQueue) { | |||||
if err != nil { | if err != nil { | ||||
panic(err) | panic(err) | ||||
} | } | ||||
randInt := utils.RandIntRand(100, 200) | |||||
time.Sleep(time.Millisecond * time.Duration(randInt)) // 等待100 ~ 200毫秒 | |||||
randInt := utils.RandIntRand(20, 100) | |||||
time.Sleep(time.Millisecond * time.Duration(randInt)) // 等待20 ~ 100毫秒 | |||||
go func() { | go func() { | ||||
//设置masterId | //设置masterId | ||||
cloudIssuanceRobotRecords, err := db.SuperCloudIssuanceRobotRecordsGetOneByParams(map[string]interface{}{ | cloudIssuanceRobotRecords, err := db.SuperCloudIssuanceRobotRecordsGetOneByParams(map[string]interface{}{ | ||||
@@ -9,10 +9,10 @@ require ( | |||||
code.fnuoos.com/go_rely_warehouse/zyos_go_condition_statistics.git v1.1.2-0.20240607091816-3df1433a2f0d | code.fnuoos.com/go_rely_warehouse/zyos_go_condition_statistics.git v1.1.2-0.20240607091816-3df1433a2f0d | ||||
code.fnuoos.com/go_rely_warehouse/zyos_go_es.git v1.0.0 | code.fnuoos.com/go_rely_warehouse/zyos_go_es.git v1.0.0 | ||||
code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git v0.0.5 | code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git v0.0.5 | ||||
code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git v1.9.10-0.20240703034234-2ab228956242 | |||||
code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git v1.9.10-0.20240719082936-c249de79edce | |||||
code.fnuoos.com/go_rely_warehouse/zyos_go_pay.git v1.6.2-0.20231116085701-9ba6e19f877b | code.fnuoos.com/go_rely_warehouse/zyos_go_pay.git v1.6.2-0.20231116085701-9ba6e19f877b | ||||
code.fnuoos.com/go_rely_warehouse/zyos_go_third_party_api.git v1.1.21-0.20240611024753-7cd929a03014 | code.fnuoos.com/go_rely_warehouse/zyos_go_third_party_api.git v1.1.21-0.20240611024753-7cd929a03014 | ||||
code.fnuoos.com/go_rely_warehouse/zyos_model.git v0.0.4-0.20240717064604-5e4000e89365 | |||||
code.fnuoos.com/go_rely_warehouse/zyos_model.git v0.0.4-0.20240723093418-66f0c713f459 | |||||
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 | github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 | ||||
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 | github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 | ||||
github.com/boombuler/barcode v1.0.1 | github.com/boombuler/barcode v1.0.1 | ||||
@@ -3,12 +3,12 @@ package svc | |||||
import ( | import ( | ||||
"applet/app/cfg" | "applet/app/cfg" | ||||
"applet/app/db" | "applet/app/db" | ||||
"applet/app/utils/cache" | |||||
"applet/mall/utils" | "applet/mall/utils" | ||||
db2 "applet/super_cloud_issuance/db" | db2 "applet/super_cloud_issuance/db" | ||||
"applet/super_cloud_issuance/enum" | "applet/super_cloud_issuance/enum" | ||||
"applet/super_cloud_issuance/md" | "applet/super_cloud_issuance/md" | ||||
"code.fnuoos.com/go_rely_warehouse/zyos_go_third_party_api.git/chain_transfer" | "code.fnuoos.com/go_rely_warehouse/zyos_go_third_party_api.git/chain_transfer" | ||||
"code.fnuoos.com/go_rely_warehouse/zyos_go_third_party_api.git/utils/cache" | |||||
"code.fnuoos.com/go_rely_warehouse/zyos_model.git/src/implement" | "code.fnuoos.com/go_rely_warehouse/zyos_model.git/src/implement" | ||||
"code.fnuoos.com/go_rely_warehouse/zyos_model.git/src/models" | "code.fnuoos.com/go_rely_warehouse/zyos_model.git/src/models" | ||||
"fmt" | "fmt" | ||||
@@ -50,7 +50,7 @@ func (dealSuperCloudIssuanceCallBackService *DealSuperCloudIssuanceCallBackServi | |||||
} | } | ||||
cache.NewRedis(cfg.RedisAddr) | cache.NewRedis(cfg.RedisAddr) | ||||
cloudIssuanceUserRobotBindFollowDb := implement.NewSuperCloudIssuanceUserRobotBindFollowDb(engine) | cloudIssuanceUserRobotBindFollowDb := implement.NewSuperCloudIssuanceUserRobotBindFollowDb(engine) | ||||
cloudIssuanceUserRobotBindFollow, err := cloudIssuanceUserRobotBindFollowDb.GetCloudIssuanceUserRobotBindFollow(bindId) | |||||
cloudIssuanceUserRobotBindFollow, err := cloudIssuanceUserRobotBindFollowDb.GetCloudIssuanceUserRobotBindFollowById(bindId) | |||||
for _, cloudIssuanceUserRobotBindFollowGroup := range cloudIssuanceUserRobotBindFollowGroups { | for _, cloudIssuanceUserRobotBindFollowGroup := range cloudIssuanceUserRobotBindFollowGroups { | ||||
fmt.Println(cloudIssuanceUserRobotBindFollowGroup) | fmt.Println(cloudIssuanceUserRobotBindFollowGroup) | ||||
cloudIssuanceService.Set(nil, enum.RobotMacSendTextMethodName, "127.0.0.1") | cloudIssuanceService.Set(nil, enum.RobotMacSendTextMethodName, "127.0.0.1") | ||||
@@ -164,7 +164,7 @@ func (dealSuperCloudIssuanceCallBackService *DealSuperCloudIssuanceCallBackServi | |||||
return | return | ||||
} | } | ||||
bindId = cloudIssuanceUserRobotBindFollow.ActivateGroupId | |||||
bindId = cloudIssuanceUserRobotBindFollow.Id | |||||
return | return | ||||
} | } | ||||