@@ -0,0 +1,259 @@ | |||||
package consume | |||||
import ( | |||||
"applet/app/db" | |||||
"applet/app/utils" | |||||
"applet/app/utils/cache" | |||||
"applet/app/utils/logx" | |||||
"applet/consume/md" | |||||
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" | |||||
db2 "code.fnuoos.com/go_rely_warehouse/zyos_model.git/src/models/official" | |||||
"code.fnuoos.com/go_rely_warehouse/zyos_model.git/src/models/official/model" | |||||
"encoding/json" | |||||
"errors" | |||||
"fmt" | |||||
"github.com/shopspring/decimal" | |||||
"github.com/streadway/amqp" | |||||
"strings" | |||||
"time" | |||||
) | |||||
const ZhiOsGuidePlaceOrderNumOfPeopleHashMapCacheKey = "%s:zhiOs_guide_place_order_num_of_people_hash_map_cache:%s" //下单人数缓存hashMap键 | |||||
func CanalGuideOrderForNumericalStatementConsume(queue md.MqQueue) { | |||||
ch, err := rabbit.Cfg.Pool.GetChannel() | |||||
if err != nil { | |||||
logx.Error(err) | |||||
return | |||||
} | |||||
defer ch.Release() | |||||
//1、将自己绑定到交换机上 | |||||
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) | |||||
//2、取出数据进行消费 | |||||
ch.Qos(100) | |||||
delivery := ch.Consume(queue.Name, false) | |||||
var res amqp.Delivery | |||||
var ok bool | |||||
for { | |||||
res, ok = <-delivery | |||||
if ok == true { | |||||
//fmt.Println(string(res.Body)) | |||||
fmt.Println(">>>>>>>>>>>>>>>>CanalGuideOrderForNumericalStatementConsume<<<<<<<<<<<<<<<<<<<<<<<<<") | |||||
err = handleCanalGuideOrderForNumericalStatementTable(res.Body) | |||||
if err != nil { | |||||
fmt.Println("err ::: ", err) | |||||
utils.FilePutContents("CanalGuideOrderForNumericalStatementConsume_ERR", "[err]:"+err.Error()) | |||||
_ = res.Reject(false) | |||||
////TODO::重新推回队列末尾,避免造成队列堵塞 | |||||
//var msg *md.OneCirclesStructForSignIn | |||||
//json.Unmarshal(res.Body, &msg) | |||||
//ch.Publish(queue.ExchangeName, msg, queue.RoutKey) | |||||
} else { | |||||
_ = res.Ack(true) | |||||
} | |||||
} else { | |||||
panic(errors.New("error getting message")) | |||||
} | |||||
} | |||||
fmt.Println("get msg done") | |||||
} | |||||
func handleCanalGuideOrderForNumericalStatementTable(msg []byte) error { | |||||
//1、解析canal采集至mq中queue的数据结构体 | |||||
var canalMsg *md.CanalGuideOrderMessage[md.CanalGuideOrder] | |||||
err := json.Unmarshal(msg, &canalMsg) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
now := time.Now() | |||||
//2、获取masterId | |||||
masterId := utils.StrToInt(strings.Split(canalMsg.Database, "_")[1]) | |||||
//2、判断操作(insert | update) | |||||
if canalMsg.Type == md.CanalMsgInsertSqlType || canalMsg.Type == md.CanalMsgUpdateSqlType { | |||||
var isUpdate bool | |||||
//3、查找是否有数据 | |||||
var ordDate string | |||||
for _, item := range canalMsg.Data { | |||||
ordDate = time.Unix(utils.StrToInt64(item.CreateAt), 0).Format("2006-01-02") | |||||
} | |||||
statistics, err := db2.GetMasterGuideOrderStatistics(db.Db, masterId, ordDate) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
if statistics == nil { | |||||
statistics = &model.MasterGuideOrderStatistics{ | |||||
MasterId: masterId, | |||||
PaymentTotal: "", | |||||
OrderCount: 0, | |||||
EstimatedCommission: "", | |||||
EstimatedProfit: "", | |||||
LoseOrderCount: 0, | |||||
PlaceOrderNumOfPeople: 0, | |||||
EffectiveOrderCount: 0, | |||||
EffectiveCommission: "", | |||||
ReceiveCommission: "", | |||||
LoseCommission: "", | |||||
AvgCommission: "", | |||||
CustomerUnitPrice: "", | |||||
Date: ordDate, | |||||
CreateAt: now.Format("2006-01-02 15:04:05"), | |||||
UpdateAt: now.Format("2006-01-02 15:04:05"), | |||||
} | |||||
_, err = db2.MasterGuideOrderStatisticsInsert(db.Db, statistics) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
} | |||||
paymentTotal := statistics.PaymentTotal | |||||
orderCount := statistics.OrderCount | |||||
estimatedCommission := statistics.EstimatedCommission | |||||
estimatedProfit := statistics.EstimatedProfit | |||||
loseOrderCount := statistics.LoseOrderCount | |||||
placeOrderNumOfPeople := statistics.PlaceOrderNumOfPeople | |||||
effectiveOrderCount := statistics.EffectiveOrderCount | |||||
effectiveCommission := statistics.EffectiveCommission | |||||
receiveCommission := statistics.ReceiveCommission | |||||
loseCommission := statistics.LoseCommission | |||||
avgCommission := statistics.AvgCommission | |||||
customerUnitPrice := statistics.CustomerUnitPrice | |||||
if canalMsg.Type == md.CanalMsgInsertSqlType { | |||||
for _, item := range canalMsg.Data { | |||||
orderCount++ | |||||
effectiveOrderCount++ | |||||
cacheKey := fmt.Sprintf(ZhiOsGuidePlaceOrderNumOfPeopleHashMapCacheKey, utils.IntToStr(masterId), ordDate) | |||||
get, _ := cache.HGetString(cacheKey, item.Uid) | |||||
paymentTotal = utils.Float64ToStr(utils.StrToFloat64(paymentTotal) + utils.StrToFloat64(item.PaidPrice)) | |||||
estimatedCommission = utils.Float64ToStr(utils.StrToFloat64(estimatedCommission) + utils.StrToFloat64(item.BenefitAll)) | |||||
estimatedProfit = utils.Float64ToStr(utils.StrToFloat64(estimatedProfit) + utils.StrToFloat64(item.SysCommission)) | |||||
effectiveCommission = utils.Float64ToStr(utils.StrToFloat64(effectiveCommission) + utils.StrToFloat64(item.BenefitAll)) | |||||
estimatedCommissionValue, _ := decimal.NewFromString(estimatedCommission) | |||||
orderCountValue := decimal.NewFromInt(int64(orderCount)) | |||||
avgCommission = estimatedCommissionValue.Div(orderCountValue).String() //平均佣金 | |||||
if get == "" { | |||||
placeOrderNumOfPeople++ //下单人数 | |||||
cache.HSet(cacheKey, item.Uid, "1") | |||||
} else { | |||||
cache.HSet(cacheKey, item.Uid, utils.IntToStr(utils.StrToInt(get)+1)) | |||||
} | |||||
cache.Expire(cacheKey, md.ZhiOsUserVisitIpAddressHashMapCacheTime) | |||||
paymentTotalValue, _ := decimal.NewFromString(paymentTotal) | |||||
if placeOrderNumOfPeople == 0 { | |||||
return errors.New("divider cannot be 0 in division operation") | |||||
} | |||||
placeOrderNumOfPeopleValue := decimal.NewFromInt(int64(placeOrderNumOfPeople)) | |||||
customerUnitPrice = paymentTotalValue.Div(placeOrderNumOfPeopleValue).String() //客单价 | |||||
isUpdate = true | |||||
} | |||||
} | |||||
if canalMsg.Type == md.CanalMsgUpdateSqlType { | |||||
judgeSate := JudgeSate(*canalMsg) | |||||
if judgeSate > 0 { | |||||
if judgeSate == 1 { | |||||
//TODO::收货 | |||||
for _, item := range canalMsg.Data { | |||||
receiveCommission = utils.Float64ToStr(utils.StrToFloat64(receiveCommission) + utils.StrToFloat64(item.BenefitAll)) | |||||
isUpdate = true | |||||
} | |||||
} | |||||
if judgeSate == 2 { | |||||
//TODO::未收货失效 | |||||
loseOrderCount++ | |||||
effectiveOrderCount-- | |||||
for _, item := range canalMsg.Data { | |||||
loseCommission = utils.Float64ToStr(utils.StrToFloat64(loseCommission) + utils.StrToFloat64(item.BenefitAll)) | |||||
effectiveCommission = utils.Float64ToStr(utils.StrToFloat64(effectiveCommission) - utils.StrToFloat64(item.BenefitAll)) | |||||
isUpdate = true | |||||
} | |||||
} | |||||
if judgeSate == 3 { | |||||
//TODO::已收货失效 | |||||
loseOrderCount++ | |||||
effectiveOrderCount-- | |||||
for _, item := range canalMsg.Data { | |||||
loseCommission = utils.Float64ToStr(utils.StrToFloat64(loseCommission) + utils.StrToFloat64(item.BenefitAll)) | |||||
effectiveCommission = utils.Float64ToStr(utils.StrToFloat64(effectiveCommission) - utils.StrToFloat64(item.BenefitAll)) | |||||
receiveCommission = utils.Float64ToStr(utils.StrToFloat64(receiveCommission) - utils.StrToFloat64(item.BenefitAll)) | |||||
isUpdate = true | |||||
} | |||||
} | |||||
} | |||||
} | |||||
if isUpdate { | |||||
statistics.PaymentTotal = paymentTotal | |||||
statistics.OrderCount = orderCount | |||||
statistics.EstimatedCommission = estimatedCommission | |||||
statistics.EstimatedProfit = estimatedProfit | |||||
statistics.LoseOrderCount = loseOrderCount | |||||
statistics.PlaceOrderNumOfPeople = placeOrderNumOfPeople | |||||
statistics.EffectiveOrderCount = effectiveOrderCount | |||||
statistics.EffectiveCommission = effectiveCommission | |||||
statistics.ReceiveCommission = receiveCommission | |||||
statistics.LoseCommission = loseCommission | |||||
statistics.AvgCommission = avgCommission | |||||
statistics.CustomerUnitPrice = customerUnitPrice | |||||
statistics.UpdateAt = now.Format("2006-01-02 15:04:05") | |||||
_, err = db2.MasterGuideOrderStatisticsUpdate(db.Db, statistics.Id, statistics, | |||||
"payment_total", "order_count", "estimated_commission", "estimated_profit", "lose_order_count", | |||||
"place_order_num_of_people", "effective_order_count", "effective_commission", "receive_commission", "lose_commission", | |||||
"avg_commission", "customer_unit_price", "update_at") | |||||
if err != nil { | |||||
return err | |||||
} | |||||
} | |||||
} | |||||
return nil | |||||
} | |||||
func SecondsUntilTomorrow(now time.Time) int64 { | |||||
tomorrow := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()).AddDate(0, 0, 1) | |||||
return int64(tomorrow.Sub(now).Seconds()) | |||||
} | |||||
// JudgeSate 处理订单状态(judgeSate[0:不需要处理 1:收货 2:未收货失效 3:已收货失效]) | |||||
func JudgeSate(message md.CanalGuideOrderMessage[md.CanalGuideOrder]) (judgeSate int) { | |||||
oldData := message.Old | |||||
//1、获取 旧的订单状态 | |||||
var oldOrdState string | |||||
for _, item := range oldData { | |||||
if item.State != "" { | |||||
oldOrdState = item.State | |||||
} | |||||
} | |||||
if oldOrdState == "" { | |||||
return | |||||
} | |||||
//2、获取 新的订单状态 | |||||
var nowOrdState string | |||||
for _, item := range message.Data { | |||||
nowOrdState = item.State | |||||
} | |||||
if oldOrdState == nowOrdState { | |||||
return | |||||
} | |||||
//3、进行状态比较判断 | |||||
if oldOrdState == "0" { | |||||
if nowOrdState == "4" { //未收货失效 | |||||
return 2 | |||||
} | |||||
if nowOrdState == "1" || nowOrdState == "2" || nowOrdState == "3" || nowOrdState == "5" { //收货 | |||||
return 1 | |||||
} | |||||
} else { | |||||
if nowOrdState == "4" { //已收货失效 | |||||
return 3 | |||||
} | |||||
} | |||||
return | |||||
} |
@@ -73,18 +73,20 @@ func initConsumes() { | |||||
//jobs[consumeMd.MallAddSupplyGoodsFunName] = MallAddSupplyGoodsConsume | //jobs[consumeMd.MallAddSupplyGoodsFunName] = MallAddSupplyGoodsConsume | ||||
//////////////////////////////////////// bigData ///////////////////////////////////////////////////// | //////////////////////////////////////// bigData ///////////////////////////////////////////////////// | ||||
//jobs[consumeMd.CanalOrderConsumeFunName] = CanalOrderConsume | |||||
//jobs[consumeMd.CanalGuideOrderConsumeFunName] = CanalGuideOrderConsume | |||||
//jobs[consumeMd.ZhiOsUserVisitIpAddressConsumeFunName] = ZhiOsUserVisitIpAddressConsume | |||||
//jobs[consumeMd.CanalUserVirtualCcoinFlowFunName] = CanalUserVirtualCoinFlowConsume | |||||
jobs[consumeMd.CanalOrderConsumeFunName] = CanalOrderConsume | |||||
jobs[consumeMd.CanalGuideOrderConsumeFunName] = CanalGuideOrderConsume | |||||
jobs[consumeMd.ZhiOsUserVisitIpAddressConsumeFunName] = ZhiOsUserVisitIpAddressConsume | |||||
jobs[consumeMd.CanalUserVirtualCcoinFlowFunName] = CanalUserVirtualCoinFlowConsume | |||||
jobs[consumeMd.CanalGuideOrderForNumericalStatementConsumeFunName] = CanalGuideOrderForNumericalStatementConsume | |||||
//////////////////////////////////////// oneCircles ///////////////////////////////////////////////////// | //////////////////////////////////////// oneCircles ///////////////////////////////////////////////////// | ||||
jobs[consumeMd.OneCirclesSignInGreenEnergyFunName] = OneCirclesSignInGreenEnergyConsume | |||||
jobs[consumeMd.OneCirclesStartLevelDividendFunName] = OneCirclesStartLevelDividendConsume | |||||
jobs[consumeMd.OneCirclesActivityCoinAutoExchangeGreenEnergyFunName] = OneCirclesActivityCoinAutoExchangeGreenEnergyConsume | |||||
jobs[consumeMd.OneCirclesActivityCoinAutoExchangeGreenEnergyForTeamFunName] = OneCirclesActivityCoinAutoExchangeGreenEnergyForTeamConsume | |||||
jobs[consumeMd.OneCirclesSettlementPublicGiveActivityCoinFunName] = OneCirclesSettlementPublicGiveActivityCoinConsume | |||||
//jobs[consumeMd.OneCirclesSignInGreenEnergyFunName] = OneCirclesSignInGreenEnergyConsume | |||||
//jobs[consumeMd.OneCirclesStartLevelDividendFunName] = OneCirclesStartLevelDividendConsume | |||||
//jobs[consumeMd.OneCirclesActivityCoinAutoExchangeGreenEnergyFunName] = OneCirclesActivityCoinAutoExchangeGreenEnergyConsume | |||||
//jobs[consumeMd.OneCirclesActivityCoinAutoExchangeGreenEnergyForTeamFunName] = OneCirclesActivityCoinAutoExchangeGreenEnergyForTeamConsume | |||||
//jobs[consumeMd.OneCirclesSettlementPublicGiveActivityCoinFunName] = OneCirclesSettlementPublicGiveActivityCoinConsume | |||||
//jobs[consumeMd.OneCirclesAddPublicPlatoonUserRelationCommissionFunName] = OneCirclesAddPublicPlatoonUserRelationCommissionConsume | |||||
//jobs[consumeMd.OneCirclesSignInCopyGreenEnergyFunName] = OneCirclesSignInCopyGreenEnergyConsume | //jobs[consumeMd.OneCirclesSignInCopyGreenEnergyFunName] = OneCirclesSignInCopyGreenEnergyConsume | ||||
//////////////////////////////////////// withdraw ///////////////////////////////////////////////////// | //////////////////////////////////////// withdraw ///////////////////////////////////////////////////// | ||||
@@ -74,6 +74,15 @@ var RabbitMqQueueKeyList = []*MqQueue{ | |||||
BindKey: "", | BindKey: "", | ||||
ConsumeFunName: "CanalOrderConsume", | ConsumeFunName: "CanalOrderConsume", | ||||
}, | }, | ||||
{ | |||||
ExchangeName: "canal.topic", | |||||
Name: "canal_guide_order_for_numerical_statement", | |||||
Type: TopicQueueType, | |||||
IsPersistent: false, | |||||
RoutKey: "canal_order_list", | |||||
BindKey: "", | |||||
ConsumeFunName: "CanalGuideOrderForNumericalStatementConsume", | |||||
}, | |||||
{ | { | ||||
ExchangeName: "canal.topic", | ExchangeName: "canal.topic", | ||||
Name: "canal_guide_order", | Name: "canal_guide_order", | ||||
@@ -479,6 +488,15 @@ var RabbitMqQueueKeyList = []*MqQueue{ | |||||
BindKey: "", | BindKey: "", | ||||
ConsumeFunName: "OneCirclesSettlementPublicGiveActivityCoinConsume", | ConsumeFunName: "OneCirclesSettlementPublicGiveActivityCoinConsume", | ||||
}, | }, | ||||
{ | |||||
ExchangeName: "one.circles", | |||||
Name: "one_circles_add_public_platoon_user_relation_commission", | |||||
Type: TopicQueueType, | |||||
IsPersistent: false, | |||||
RoutKey: "add_public_platoon_user_relation_commission", | |||||
BindKey: "", | |||||
ConsumeFunName: "OneCirclesAddPublicPlatoonUserRelationCommissionConsume", | |||||
}, | |||||
{ | { | ||||
ExchangeName: "one.circles", | ExchangeName: "one.circles", | ||||
Name: "one_circles_sign_in_green_energy_copy", | Name: "one_circles_sign_in_green_energy_copy", | ||||
@@ -541,6 +559,7 @@ const ( | |||||
ZhiosOrderBuckleFunName = "ZhiosOrderBuckle" | ZhiosOrderBuckleFunName = "ZhiosOrderBuckle" | ||||
ZhiosSupplierAfterOrderFunName = "ZhiosSupplierAfterOrder" | ZhiosSupplierAfterOrderFunName = "ZhiosSupplierAfterOrder" | ||||
CanalOrderConsumeFunName = "CanalOrderConsume" | CanalOrderConsumeFunName = "CanalOrderConsume" | ||||
CanalGuideOrderForNumericalStatementConsumeFunName = "CanalGuideOrderForNumericalStatementConsume" | |||||
CanalGuideOrderConsumeFunName = "CanalGuideOrderConsume" | CanalGuideOrderConsumeFunName = "CanalGuideOrderConsume" | ||||
ZhiOsUserVisitIpAddressConsumeFunName = "ZhiOsUserVisitIpAddressConsume" | ZhiOsUserVisitIpAddressConsumeFunName = "ZhiOsUserVisitIpAddressConsume" | ||||
DouShenUserRegisterConsumeForOfficialFunName = "DouShenUserRegisterConsumeForOfficial" | DouShenUserRegisterConsumeForOfficialFunName = "DouShenUserRegisterConsumeForOfficial" | ||||
@@ -580,6 +599,7 @@ const ( | |||||
OneCirclesActivityCoinAutoExchangeGreenEnergyFunName = "OneCirclesActivityCoinAutoExchangeGreenEnergyConsume" | OneCirclesActivityCoinAutoExchangeGreenEnergyFunName = "OneCirclesActivityCoinAutoExchangeGreenEnergyConsume" | ||||
OneCirclesActivityCoinAutoExchangeGreenEnergyForTeamFunName = "OneCirclesActivityCoinAutoExchangeGreenEnergyForTeamConsume" | OneCirclesActivityCoinAutoExchangeGreenEnergyForTeamFunName = "OneCirclesActivityCoinAutoExchangeGreenEnergyForTeamConsume" | ||||
OneCirclesSettlementPublicGiveActivityCoinFunName = "OneCirclesSettlementPublicGiveActivityCoinConsume" | OneCirclesSettlementPublicGiveActivityCoinFunName = "OneCirclesSettlementPublicGiveActivityCoinConsume" | ||||
OneCirclesAddPublicPlatoonUserRelationCommissionFunName = "OneCirclesAddPublicPlatoonUserRelationCommissionConsume" | |||||
WithdrawConsumeFunName = "WithdrawConsume" | WithdrawConsumeFunName = "WithdrawConsume" | ||||
FlexibleEmploymentWithdrawForGongMaoConsumeFunName = "FlexibleEmploymentWithdrawForGongMaoConsume" | FlexibleEmploymentWithdrawForGongMaoConsumeFunName = "FlexibleEmploymentWithdrawForGongMaoConsume" | ||||
FlexibleEmploymentWithdrawForPupiaoConsumeFunName = "FlexibleEmploymentWithdrawForPupiaoConsume" | FlexibleEmploymentWithdrawForPupiaoConsumeFunName = "FlexibleEmploymentWithdrawForPupiaoConsume" | ||||
@@ -9,13 +9,14 @@ type CanalGuideOrder struct { | |||||
ItemId string `json:"item_id"` | ItemId string `json:"item_id"` | ||||
Uid string `json:"uid"` | Uid string `json:"uid"` | ||||
CostPrice string `json:"cost_price"` | CostPrice string `json:"cost_price"` | ||||
PaidPrice string `json:"paid_price"` | |||||
ItemPrice string `json:"item_price"` | ItemPrice string `json:"item_price"` | ||||
State string `json:"state"` | State string `json:"state"` | ||||
OrderType string `json:"order_type"` | OrderType string `json:"order_type"` | ||||
ItemNum string `json:"item_num"` | ItemNum string `json:"item_num"` | ||||
SysCommission string `json:"sys_commission"` | |||||
CreateAt string `json:"create_at"` | CreateAt string `json:"create_at"` | ||||
PaidPrice string `json:"paid_price"` //付款金额 | |||||
BenefitAll string `json:"benefit_all"` //分润总额,供应商总额 | |||||
SysCommission string `json:"sys_commission"` //平台占佣金 | |||||
} | } | ||||
type CanalGuideOrderMessage[T any] struct { | type CanalGuideOrderMessage[T any] struct { | ||||
@@ -1,5 +1,7 @@ | |||||
package md | package md | ||||
import "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/md" | |||||
type OneCirclesStructForSignIn struct { | type OneCirclesStructForSignIn struct { | ||||
MasterId string `json:"master_id"` | MasterId string `json:"master_id"` | ||||
Uid int `json:"uid"` | Uid int `json:"uid"` | ||||
@@ -12,3 +14,8 @@ type OneCirclesStructForStarLevelDividends struct { | |||||
Uid int `json:"uid"` | Uid int `json:"uid"` | ||||
SignDividend float64 `json:"sign_dividend"` | SignDividend float64 `json:"sign_dividend"` | ||||
} | } | ||||
type OneCirclesStructForAddPublicPlatoonUserRelationCommissionConsume struct { | |||||
MasterId string `json:"master_id"` | |||||
Data md.AddOneCirclesPublicPlatoonUserRelationCommissionReq `json:"data"` | |||||
} |
@@ -0,0 +1,75 @@ | |||||
package consume | |||||
import ( | |||||
"applet/app/cfg" | |||||
"applet/app/db" | |||||
"applet/app/utils" | |||||
"applet/app/utils/logx" | |||||
"applet/consume/md" | |||||
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" | |||||
md2 "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/md" | |||||
"code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/rule/one_circles" | |||||
"encoding/json" | |||||
"errors" | |||||
"fmt" | |||||
"github.com/streadway/amqp" | |||||
) | |||||
func OneCirclesAddPublicPlatoonUserRelationCommissionConsume(queue md.MqQueue) { | |||||
fmt.Println(">>>>>>>>>>>>OneCirclesAddPublicPlatoonUserRelationCommissionConsume>>>>>>>>>>>>") | |||||
ch, err := rabbit.Cfg.Pool.GetChannel() | |||||
if err != nil { | |||||
logx.Error(err) | |||||
return | |||||
} | |||||
defer ch.Release() | |||||
//1、将自己绑定到交换机上 | |||||
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) | |||||
//2、取出数据进行消费 | |||||
ch.Qos(1) | |||||
delivery := ch.Consume(queue.Name, false) | |||||
one_circles.Init(cfg.RedisAddr) | |||||
var res amqp.Delivery | |||||
var ok bool | |||||
for { | |||||
res, ok = <-delivery | |||||
if ok == true { | |||||
err = handleOneCirclesAddPublicPlatoonUserRelationCommissionConsume(res.Body) | |||||
if err != nil { | |||||
fmt.Println("err ::: ", err) | |||||
utils.FilePutContents("OneCirclesAddPublicPlatoonUserRelationCommissionConsume_ERR", "[err]:"+err.Error()) | |||||
_ = res.Reject(false) | |||||
//TODO::重新推回队列末尾,避免造成队列堵塞 | |||||
var msg *md.OneCirclesStructForAddPublicPlatoonUserRelationCommissionConsume | |||||
json.Unmarshal(res.Body, &msg) | |||||
ch.Publish(queue.ExchangeName, msg, queue.RoutKey) | |||||
} else { | |||||
_ = res.Ack(true) | |||||
} | |||||
} else { | |||||
panic(errors.New("error getting message")) | |||||
} | |||||
} | |||||
fmt.Println("get msg done") | |||||
} | |||||
func handleOneCirclesAddPublicPlatoonUserRelationCommissionConsume(msgData []byte) error { | |||||
//1、解析mq中queue的数据结构体 | |||||
var msg *md.OneCirclesStructForAddPublicPlatoonUserRelationCommissionConsume | |||||
err := json.Unmarshal(msgData, &msg) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
engine := db.DBs[msg.MasterId] | |||||
var req []*md2.AddOneCirclesPublicPlatoonUserRelationCommissionReq | |||||
req = append(req, &msg.Data) | |||||
_, err = one_circles.AddOneCirclesPublicPlatoonUserRelationCommission(engine, req) | |||||
fmt.Println("err::::", err) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
return nil | |||||
} |
@@ -2,6 +2,9 @@ module applet | |||||
go 1.18 | go 1.18 | ||||
// go.mod文件中 | |||||
//replace code.fnuoos.com/go_rely_warehouse/zyos_model.git => E:/company/go_rely_warehouse/zyos_model/ | |||||
require ( | require ( | ||||
code.fnuoos.com/go_rely_warehouse/zyos_go_condition_statistics.git v1.1.2-0.20240607091816-3df1433a2f0d | code.fnuoos.com/go_rely_warehouse/zyos_go_condition_statistics.git v1.1.2-0.20240607091816-3df1433a2f0d | ||||
code.fnuoos.com/go_rely_warehouse/zyos_go_es.git v1.0.0 | code.fnuoos.com/go_rely_warehouse/zyos_go_es.git v1.0.0 | ||||
@@ -9,7 +12,7 @@ require ( | |||||
code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git v1.9.10-0.20240703034234-2ab228956242 | code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git v1.9.10-0.20240703034234-2ab228956242 | ||||
code.fnuoos.com/go_rely_warehouse/zyos_go_pay.git v1.6.2-0.20231116085701-9ba6e19f877b | code.fnuoos.com/go_rely_warehouse/zyos_go_pay.git v1.6.2-0.20231116085701-9ba6e19f877b | ||||
code.fnuoos.com/go_rely_warehouse/zyos_go_third_party_api.git v1.1.21-0.20240611024753-7cd929a03014 | code.fnuoos.com/go_rely_warehouse/zyos_go_third_party_api.git v1.1.21-0.20240611024753-7cd929a03014 | ||||
code.fnuoos.com/go_rely_warehouse/zyos_model.git v0.0.4-0.20240701102131-0408d7ee8572 | |||||
code.fnuoos.com/go_rely_warehouse/zyos_model.git v0.0.4-0.20240711033658-057f89bb825f | |||||
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 | github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 | ||||
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 | github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 | ||||
github.com/boombuler/barcode v1.0.1 | github.com/boombuler/barcode v1.0.1 | ||||