diff --git a/consume/canal_guide_order_for_numerical_statement_consume.go b/consume/canal_guide_order_for_numerical_statement_consume.go new file mode 100644 index 0000000..527cc0e --- /dev/null +++ b/consume/canal_guide_order_for_numerical_statement_consume.go @@ -0,0 +1,259 @@ +package consume + +import ( + "applet/app/db" + "applet/app/utils" + "applet/app/utils/cache" + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + db2 "code.fnuoos.com/go_rely_warehouse/zyos_model.git/src/models/official" + "code.fnuoos.com/go_rely_warehouse/zyos_model.git/src/models/official/model" + "encoding/json" + "errors" + "fmt" + "github.com/shopspring/decimal" + "github.com/streadway/amqp" + "strings" + "time" +) + +const ZhiOsGuidePlaceOrderNumOfPeopleHashMapCacheKey = "%s:zhiOs_guide_place_order_num_of_people_hash_map_cache:%s" //下单人数缓存hashMap键 + +func CanalGuideOrderForNumericalStatementConsume(queue md.MqQueue) { + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(100) + delivery := ch.Consume(queue.Name, false) + + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + //fmt.Println(string(res.Body)) + fmt.Println(">>>>>>>>>>>>>>>>CanalGuideOrderForNumericalStatementConsume<<<<<<<<<<<<<<<<<<<<<<<<<") + err = handleCanalGuideOrderForNumericalStatementTable(res.Body) + if err != nil { + fmt.Println("err ::: ", err) + utils.FilePutContents("CanalGuideOrderForNumericalStatementConsume_ERR", "[err]:"+err.Error()) + _ = res.Reject(false) + ////TODO::重新推回队列末尾,避免造成队列堵塞 + //var msg *md.OneCirclesStructForSignIn + //json.Unmarshal(res.Body, &msg) + //ch.Publish(queue.ExchangeName, msg, queue.RoutKey) + } else { + _ = res.Ack(true) + } + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleCanalGuideOrderForNumericalStatementTable(msg []byte) error { + //1、解析canal采集至mq中queue的数据结构体 + var canalMsg *md.CanalGuideOrderMessage[md.CanalGuideOrder] + err := json.Unmarshal(msg, &canalMsg) + if err != nil { + return err + } + now := time.Now() + + //2、获取masterId + masterId := utils.StrToInt(strings.Split(canalMsg.Database, "_")[1]) + + //2、判断操作(insert | update) + if canalMsg.Type == md.CanalMsgInsertSqlType || canalMsg.Type == md.CanalMsgUpdateSqlType { + var isUpdate bool + //3、查找是否有数据 + var ordDate string + for _, item := range canalMsg.Data { + ordDate = time.Unix(utils.StrToInt64(item.CreateAt), 0).Format("2006-01-02") + } + statistics, err := db2.GetMasterGuideOrderStatistics(db.Db, masterId, ordDate) + if err != nil { + return err + } + if statistics == nil { + statistics = &model.MasterGuideOrderStatistics{ + MasterId: masterId, + PaymentTotal: "", + OrderCount: 0, + EstimatedCommission: "", + EstimatedProfit: "", + LoseOrderCount: 0, + PlaceOrderNumOfPeople: 0, + EffectiveOrderCount: 0, + EffectiveCommission: "", + ReceiveCommission: "", + LoseCommission: "", + AvgCommission: "", + CustomerUnitPrice: "", + Date: ordDate, + CreateAt: now.Format("2006-01-02 15:04:05"), + UpdateAt: now.Format("2006-01-02 15:04:05"), + } + _, err = db2.MasterGuideOrderStatisticsInsert(db.Db, statistics) + if err != nil { + return err + } + } + + paymentTotal := statistics.PaymentTotal + orderCount := statistics.OrderCount + estimatedCommission := statistics.EstimatedCommission + estimatedProfit := statistics.EstimatedProfit + loseOrderCount := statistics.LoseOrderCount + placeOrderNumOfPeople := statistics.PlaceOrderNumOfPeople + effectiveOrderCount := statistics.EffectiveOrderCount + effectiveCommission := statistics.EffectiveCommission + receiveCommission := statistics.ReceiveCommission + loseCommission := statistics.LoseCommission + avgCommission := statistics.AvgCommission + customerUnitPrice := statistics.CustomerUnitPrice + if canalMsg.Type == md.CanalMsgInsertSqlType { + for _, item := range canalMsg.Data { + orderCount++ + effectiveOrderCount++ + cacheKey := fmt.Sprintf(ZhiOsGuidePlaceOrderNumOfPeopleHashMapCacheKey, utils.IntToStr(masterId), ordDate) + get, _ := cache.HGetString(cacheKey, item.Uid) + paymentTotal = utils.Float64ToStr(utils.StrToFloat64(paymentTotal) + utils.StrToFloat64(item.PaidPrice)) + estimatedCommission = utils.Float64ToStr(utils.StrToFloat64(estimatedCommission) + utils.StrToFloat64(item.BenefitAll)) + estimatedProfit = utils.Float64ToStr(utils.StrToFloat64(estimatedProfit) + utils.StrToFloat64(item.SysCommission)) + effectiveCommission = utils.Float64ToStr(utils.StrToFloat64(effectiveCommission) + utils.StrToFloat64(item.BenefitAll)) + + estimatedCommissionValue, _ := decimal.NewFromString(estimatedCommission) + orderCountValue := decimal.NewFromInt(int64(orderCount)) + avgCommission = estimatedCommissionValue.Div(orderCountValue).String() //平均佣金 + + if get == "" { + placeOrderNumOfPeople++ //下单人数 + cache.HSet(cacheKey, item.Uid, "1") + } else { + cache.HSet(cacheKey, item.Uid, utils.IntToStr(utils.StrToInt(get)+1)) + } + cache.Expire(cacheKey, md.ZhiOsUserVisitIpAddressHashMapCacheTime) + + paymentTotalValue, _ := decimal.NewFromString(paymentTotal) + if placeOrderNumOfPeople == 0 { + return errors.New("divider cannot be 0 in division operation") + } + placeOrderNumOfPeopleValue := decimal.NewFromInt(int64(placeOrderNumOfPeople)) + customerUnitPrice = paymentTotalValue.Div(placeOrderNumOfPeopleValue).String() //客单价 + isUpdate = true + } + } + + if canalMsg.Type == md.CanalMsgUpdateSqlType { + judgeSate := JudgeSate(*canalMsg) + if judgeSate > 0 { + if judgeSate == 1 { + //TODO::收货 + for _, item := range canalMsg.Data { + receiveCommission = utils.Float64ToStr(utils.StrToFloat64(receiveCommission) + utils.StrToFloat64(item.BenefitAll)) + isUpdate = true + } + } + if judgeSate == 2 { + //TODO::未收货失效 + loseOrderCount++ + effectiveOrderCount-- + for _, item := range canalMsg.Data { + loseCommission = utils.Float64ToStr(utils.StrToFloat64(loseCommission) + utils.StrToFloat64(item.BenefitAll)) + effectiveCommission = utils.Float64ToStr(utils.StrToFloat64(effectiveCommission) - utils.StrToFloat64(item.BenefitAll)) + isUpdate = true + } + } + if judgeSate == 3 { + //TODO::已收货失效 + loseOrderCount++ + effectiveOrderCount-- + for _, item := range canalMsg.Data { + loseCommission = utils.Float64ToStr(utils.StrToFloat64(loseCommission) + utils.StrToFloat64(item.BenefitAll)) + effectiveCommission = utils.Float64ToStr(utils.StrToFloat64(effectiveCommission) - utils.StrToFloat64(item.BenefitAll)) + receiveCommission = utils.Float64ToStr(utils.StrToFloat64(receiveCommission) - utils.StrToFloat64(item.BenefitAll)) + isUpdate = true + } + } + } + } + + if isUpdate { + statistics.PaymentTotal = paymentTotal + statistics.OrderCount = orderCount + statistics.EstimatedCommission = estimatedCommission + statistics.EstimatedProfit = estimatedProfit + statistics.LoseOrderCount = loseOrderCount + statistics.PlaceOrderNumOfPeople = placeOrderNumOfPeople + statistics.EffectiveOrderCount = effectiveOrderCount + statistics.EffectiveCommission = effectiveCommission + statistics.ReceiveCommission = receiveCommission + statistics.LoseCommission = loseCommission + statistics.AvgCommission = avgCommission + statistics.CustomerUnitPrice = customerUnitPrice + statistics.UpdateAt = now.Format("2006-01-02 15:04:05") + _, err = db2.MasterGuideOrderStatisticsUpdate(db.Db, statistics.Id, statistics, + "payment_total", "order_count", "estimated_commission", "estimated_profit", "lose_order_count", + "place_order_num_of_people", "effective_order_count", "effective_commission", "receive_commission", "lose_commission", + "avg_commission", "customer_unit_price", "update_at") + if err != nil { + return err + } + } + } + return nil +} + +func SecondsUntilTomorrow(now time.Time) int64 { + tomorrow := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()).AddDate(0, 0, 1) + return int64(tomorrow.Sub(now).Seconds()) +} + +// JudgeSate 处理订单状态(judgeSate[0:不需要处理 1:收货 2:未收货失效 3:已收货失效]) +func JudgeSate(message md.CanalGuideOrderMessage[md.CanalGuideOrder]) (judgeSate int) { + oldData := message.Old + + //1、获取 旧的订单状态 + var oldOrdState string + for _, item := range oldData { + if item.State != "" { + oldOrdState = item.State + } + } + if oldOrdState == "" { + return + } + + //2、获取 新的订单状态 + var nowOrdState string + for _, item := range message.Data { + nowOrdState = item.State + } + if oldOrdState == nowOrdState { + return + } + + //3、进行状态比较判断 + if oldOrdState == "0" { + if nowOrdState == "4" { //未收货失效 + return 2 + } + if nowOrdState == "1" || nowOrdState == "2" || nowOrdState == "3" || nowOrdState == "5" { //收货 + return 1 + } + } else { + if nowOrdState == "4" { //已收货失效 + return 3 + } + } + return +} diff --git a/consume/init.go b/consume/init.go index 994fbfd..3756ab4 100644 --- a/consume/init.go +++ b/consume/init.go @@ -19,48 +19,48 @@ func Init() { func initConsumes() { //jobs[consumeMd.ZhiosGuideStoreOrderFunName] = ZhiosGuideStoreOrder - jobs[consumeMd.ZhiosIntegralProxyRechargeFunName] = ZhiosIntegralProxyRecharge - jobs[consumeMd.ZhiosUserUpLvFunName] = ZhiosUserUpLv - jobs[consumeMd.CanalGuideOrderByUserUpLvConsume] = CanalGuideOrderByUserUpLvConsume - jobs[consumeMd.ZhiosOrderFreeFunName] = ZhiosOrderFree - jobs[consumeMd.ZhiosOrderTotalFunName] = ZhiosOrderTotal - jobs[consumeMd.ZhiosOrderTotalSecondFunName] = ZhiosOrderTotalSecond - // - jobs[consumeMd.ZhiosOrderSettleTotalFunName] = ZhiosSettleTotal - jobs[consumeMd.ZhiosOrderHjyFunName] = ZhiosOrderHjy - jobs[consumeMd.ZhiosOrderBuckleFunName] = ZhiosOrderBuckle - // - jobs[consumeMd.ZhiosSupplierAfterOrderFunName] = ZhiosSupplierAfterOrder + //jobs[consumeMd.ZhiosIntegralProxyRechargeFunName] = ZhiosIntegralProxyRecharge + //jobs[consumeMd.ZhiosUserUpLvFunName] = ZhiosUserUpLv + //jobs[consumeMd.CanalGuideOrderByUserUpLvConsume] = CanalGuideOrderByUserUpLvConsume + //jobs[consumeMd.ZhiosOrderFreeFunName] = ZhiosOrderFree + //jobs[consumeMd.ZhiosOrderTotalFunName] = ZhiosOrderTotal + //jobs[consumeMd.ZhiosOrderTotalSecondFunName] = ZhiosOrderTotalSecond + + //jobs[consumeMd.ZhiosOrderSettleTotalFunName] = ZhiosSettleTotal + //jobs[consumeMd.ZhiosOrderHjyFunName] = ZhiosOrderHjy + //jobs[consumeMd.ZhiosOrderBuckleFunName] = ZhiosOrderBuckle + + //jobs[consumeMd.ZhiosSupplierAfterOrderFunName] = ZhiosSupplierAfterOrder - jobs[consumeMd.ZhiosAppreciationFunName] = ZhiosAppreciation - jobs[consumeMd.ZhiosValidUserFunName] = ZhiosValidUser + //jobs[consumeMd.ZhiosAppreciationFunName] = ZhiosAppreciation + //jobs[consumeMd.ZhiosValidUserFunName] = ZhiosValidUser - jobs[consumeMd.ZhiosAcquisitionConditionFunName] = ZhiosAcquisitionCondition + //jobs[consumeMd.ZhiosAcquisitionConditionFunName] = ZhiosAcquisitionCondition - jobs[consumeMd.DouShenUserRegisterConsumeForOfficialFunName] = DouShenUserRegisterConsumeForOfficial - jobs[consumeMd.DouShenUserRegisterConsumeForOperationCenterFunName] = DouShenUserRegisterConsumeForOperationCenter - jobs[consumeMd.DouShenUserRegisterConsumeForMyRecommenderFunName] = DouShenUserRegisterConsumeForMyRecommender - jobs[consumeMd.DouShenUserRegisterConsumeForMyFansFunName] = DouShenUserRegisterConsumeForMyFans - jobs[consumeMd.DouShenUserRegisterConsumeForUserRegisterUpLvFunName] = DouShenUserRegisterConsumeForUserRegisterUpLv + //jobs[consumeMd.DouShenUserRegisterConsumeForOfficialFunName] = DouShenUserRegisterConsumeForOfficial + //jobs[consumeMd.DouShenUserRegisterConsumeForOperationCenterFunName] = DouShenUserRegisterConsumeForOperationCenter + //jobs[consumeMd.DouShenUserRegisterConsumeForMyRecommenderFunName] = DouShenUserRegisterConsumeForMyRecommender + //jobs[consumeMd.DouShenUserRegisterConsumeForMyFansFunName] = DouShenUserRegisterConsumeForMyFans + //jobs[consumeMd.DouShenUserRegisterConsumeForUserRegisterUpLvFunName] = DouShenUserRegisterConsumeForUserRegisterUpLv - jobs[consumeMd.ZhiosFastReturnOrderPayFunName] = ZhiosFastReturnOrderPay - jobs[consumeMd.ZhiosFastReturnOrderSuccessFunName] = ZhiosFastReturnOrderSuccess - jobs[consumeMd.ZhiosFastReturnOrderRefundFunName] = ZhiosFastReturnOrderRefund - jobs[consumeMd.ZhiosFastReturnOrderRefundSecondFunName] = ZhiosFastReturnOrderRefundSecond + //jobs[consumeMd.ZhiosFastReturnOrderPayFunName] = ZhiosFastReturnOrderPay + //jobs[consumeMd.ZhiosFastReturnOrderSuccessFunName] = ZhiosFastReturnOrderSuccess + //jobs[consumeMd.ZhiosFastReturnOrderRefundFunName] = ZhiosFastReturnOrderRefund + //jobs[consumeMd.ZhiosFastReturnOrderRefundSecondFunName] = ZhiosFastReturnOrderRefundSecond - jobs[consumeMd.YoumishangExchangeStoreFunName] = YoumishangExchangeStore + //jobs[consumeMd.YoumishangExchangeStoreFunName] = YoumishangExchangeStore - jobs[consumeMd.ZhiosRechargeOrderFailFunName] = ZhiosRechargeOrderFail + //jobs[consumeMd.ZhiosRechargeOrderFailFunName] = ZhiosRechargeOrderFail - jobs[consumeMd.CloudIssuanceAsyncMLoginFunName] = CloudIssuanceAsyncMLoginConsume - jobs[consumeMd.ZhiosTikTokUpdateFunName] = ZhiosTikTokUpdate - jobs[consumeMd.ZhiosTikTokAllUpdateFunName] = ZhiosTikTokAllUpdate + //jobs[consumeMd.CloudIssuanceAsyncMLoginFunName] = CloudIssuanceAsyncMLoginConsume + //jobs[consumeMd.ZhiosTikTokUpdateFunName] = ZhiosTikTokUpdate + //jobs[consumeMd.ZhiosTikTokAllUpdateFunName] = ZhiosTikTokAllUpdate - jobs[consumeMd.ZhiosCapitalPoolOrderTotalFunName] = ZhiosCapitalPoolOrderTotal - jobs[consumeMd.ZhiosExpressOrderFail] = ZhiosExpressOrderFail - jobs[consumeMd.ZhiosWithdrawReward] = ZhiosWithdrawReward - jobs[consumeMd.ZhiosTaskTotal] = ZhiosTaskTotal - jobs[consumeMd.ZhiosAutoUnFreeze] = ZhiosAutoUnFreeze + //jobs[consumeMd.ZhiosCapitalPoolOrderTotalFunName] = ZhiosCapitalPoolOrderTotal + //jobs[consumeMd.ZhiosExpressOrderFail] = ZhiosExpressOrderFail + //jobs[consumeMd.ZhiosWithdrawReward] = ZhiosWithdrawReward + //jobs[consumeMd.ZhiosTaskTotal] = ZhiosTaskTotal + //jobs[consumeMd.ZhiosAutoUnFreeze] = ZhiosAutoUnFreeze ////jobs[consumeMd.ZhiosUserProfileInviteCode] = ZhiosUserProfileInviteCode // @@ -79,15 +79,17 @@ func initConsumes() { //jobs[consumeMd.CanalGuideOrderConsumeFunName] = CanalGuideOrderConsume //jobs[consumeMd.ZhiOsUserVisitIpAddressConsumeFunName] = ZhiOsUserVisitIpAddressConsume //jobs[consumeMd.CanalUserVirtualCcoinFlowFunName] = CanalUserVirtualCoinFlowConsume + //jobs[consumeMd.CanalGuideOrderForNumericalStatementConsumeFunName] = CanalGuideOrderForNumericalStatementConsume //////////////////////////////////////// oneCircles ///////////////////////////////////////////////////// - //jobs[consumeMd.OneCirclesSignInGreenEnergyFunName] = OneCirclesSignInGreenEnergyConsume - //jobs[consumeMd.OneCirclesStartLevelDividendFunName] = OneCirclesStartLevelDividendConsume - //jobs[consumeMd.OneCirclesActivityCoinAutoExchangeGreenEnergyFunName] = OneCirclesActivityCoinAutoExchangeGreenEnergyConsume - //jobs[consumeMd.OneCirclesActivityCoinAutoExchangeGreenEnergyForTeamFunName] = OneCirclesActivityCoinAutoExchangeGreenEnergyForTeamConsume - //jobs[consumeMd.OneCirclesSettlementPublicGiveActivityCoinFunName] = OneCirclesSettlementPublicGiveActivityCoinConsume - ////jobs[consumeMd.OneCirclesSignInCopyGreenEnergyFunName] = OneCirclesSignInCopyGreenEnergyConsume + jobs[consumeMd.OneCirclesSignInGreenEnergyFunName] = OneCirclesSignInGreenEnergyConsume + jobs[consumeMd.OneCirclesStartLevelDividendFunName] = OneCirclesStartLevelDividendConsume + jobs[consumeMd.OneCirclesActivityCoinAutoExchangeGreenEnergyFunName] = OneCirclesActivityCoinAutoExchangeGreenEnergyConsume + jobs[consumeMd.OneCirclesActivityCoinAutoExchangeGreenEnergyForTeamFunName] = OneCirclesActivityCoinAutoExchangeGreenEnergyForTeamConsume + jobs[consumeMd.OneCirclesSettlementPublicGiveActivityCoinFunName] = OneCirclesSettlementPublicGiveActivityCoinConsume + jobs[consumeMd.OneCirclesAddPublicPlatoonUserRelationCommissionFunName] = OneCirclesAddPublicPlatoonUserRelationCommissionConsume + jobs[consumeMd.OneCirclesSignInCopyGreenEnergyFunName] = OneCirclesSignInCopyGreenEnergyConsume //////////////////////////////////////// withdraw ///////////////////////////////////////////////////// //jobs[consumeMd.WithdrawConsumeFunName] = WithdrawConsume diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index 5edf3d6..2a1b197 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -74,6 +74,15 @@ var RabbitMqQueueKeyList = []*MqQueue{ BindKey: "", ConsumeFunName: "CanalOrderConsume", }, + { + ExchangeName: "canal.topic", + Name: "canal_guide_order_for_numerical_statement", + Type: TopicQueueType, + IsPersistent: false, + RoutKey: "canal_order_list", + BindKey: "", + ConsumeFunName: "CanalGuideOrderForNumericalStatementConsume", + }, { ExchangeName: "canal.topic", Name: "canal_guide_order", @@ -497,6 +506,15 @@ var RabbitMqQueueKeyList = []*MqQueue{ BindKey: "", ConsumeFunName: "OneCirclesSettlementPublicGiveActivityCoinConsume", }, + { + ExchangeName: "one.circles", + Name: "one_circles_add_public_platoon_user_relation_commission", + Type: TopicQueueType, + IsPersistent: false, + RoutKey: "add_public_platoon_user_relation_commission", + BindKey: "", + ConsumeFunName: "OneCirclesAddPublicPlatoonUserRelationCommissionConsume", + }, { ExchangeName: "one.circles", Name: "one_circles_sign_in_green_energy_copy", @@ -559,6 +577,7 @@ const ( ZhiosOrderBuckleFunName = "ZhiosOrderBuckle" ZhiosSupplierAfterOrderFunName = "ZhiosSupplierAfterOrder" CanalOrderConsumeFunName = "CanalOrderConsume" + CanalGuideOrderForNumericalStatementConsumeFunName = "CanalGuideOrderForNumericalStatementConsume" CanalGuideOrderConsumeFunName = "CanalGuideOrderConsume" ZhiOsUserVisitIpAddressConsumeFunName = "ZhiOsUserVisitIpAddressConsume" DouShenUserRegisterConsumeForOfficialFunName = "DouShenUserRegisterConsumeForOfficial" @@ -598,6 +617,7 @@ const ( OneCirclesActivityCoinAutoExchangeGreenEnergyFunName = "OneCirclesActivityCoinAutoExchangeGreenEnergyConsume" OneCirclesActivityCoinAutoExchangeGreenEnergyForTeamFunName = "OneCirclesActivityCoinAutoExchangeGreenEnergyForTeamConsume" OneCirclesSettlementPublicGiveActivityCoinFunName = "OneCirclesSettlementPublicGiveActivityCoinConsume" + OneCirclesAddPublicPlatoonUserRelationCommissionFunName = "OneCirclesAddPublicPlatoonUserRelationCommissionConsume" WithdrawConsumeFunName = "WithdrawConsume" FlexibleEmploymentWithdrawForGongMaoConsumeFunName = "FlexibleEmploymentWithdrawForGongMaoConsume" FlexibleEmploymentWithdrawForPupiaoConsumeFunName = "FlexibleEmploymentWithdrawForPupiaoConsume" diff --git a/consume/md/md_canal_guide_order_consume.go b/consume/md/md_canal_guide_order_consume.go index e8e80a0..00ea12a 100644 --- a/consume/md/md_canal_guide_order_consume.go +++ b/consume/md/md_canal_guide_order_consume.go @@ -9,13 +9,14 @@ type CanalGuideOrder struct { ItemId string `json:"item_id"` Uid string `json:"uid"` CostPrice string `json:"cost_price"` - PaidPrice string `json:"paid_price"` ItemPrice string `json:"item_price"` State string `json:"state"` OrderType string `json:"order_type"` ItemNum string `json:"item_num"` - SysCommission string `json:"sys_commission"` CreateAt string `json:"create_at"` + PaidPrice string `json:"paid_price"` //付款金额 + BenefitAll string `json:"benefit_all"` //分润总额,供应商总额 + SysCommission string `json:"sys_commission"` //平台占佣金 } type CanalGuideOrderMessage[T any] struct { diff --git a/consume/md/md_one_circles.go b/consume/md/md_one_circles.go index f9f793f..d2da999 100644 --- a/consume/md/md_one_circles.go +++ b/consume/md/md_one_circles.go @@ -1,5 +1,7 @@ package md +import "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/md" + type OneCirclesStructForSignIn struct { MasterId string `json:"master_id"` Uid int `json:"uid"` @@ -12,3 +14,8 @@ type OneCirclesStructForStarLevelDividends struct { Uid int `json:"uid"` SignDividend float64 `json:"sign_dividend"` } + +type OneCirclesStructForAddPublicPlatoonUserRelationCommissionConsume struct { + MasterId string `json:"master_id"` + Data md.AddOneCirclesPublicPlatoonUserRelationCommissionReq `json:"data"` +} diff --git a/consume/one_circles_activity_coin_auto_exchange_green_energy_consume.go b/consume/one_circles_activity_coin_auto_exchange_green_energy_consume.go index 4d5d4a9..c933849 100644 --- a/consume/one_circles_activity_coin_auto_exchange_green_energy_consume.go +++ b/consume/one_circles_activity_coin_auto_exchange_green_energy_consume.go @@ -32,7 +32,7 @@ func OneCirclesActivityCoinAutoExchangeGreenEnergyConsume(queue md.MqQueue) { //1、将自己绑定到交换机上 ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) //2、取出数据进行消费 - ch.Qos(40) + ch.Qos(50) delivery := ch.Consume(queue.Name, false) one_circles.Init(cfg.RedisAddr) @@ -61,7 +61,7 @@ func OneCirclesActivityCoinAutoExchangeGreenEnergyConsume(queue md.MqQueue) { } func handleOneCirclesActivityCoinAutoExchangeGreenEnergyConsume(msgData []byte) error { - time.Sleep(time.Duration(100) * time.Microsecond) //休眠0.1毫秒 + time.Sleep(time.Duration(10) * time.Microsecond) //休眠0.1毫秒 //1、解析mq中queue的数据结构体 var msg *md2.OneCirclesStructForAutoExchangeGreenEnergy err := json.Unmarshal(msgData, &msg) @@ -77,16 +77,6 @@ func handleOneCirclesActivityCoinAutoExchangeGreenEnergyConsume(msgData []byte) if cb != nil { defer cb() // 释放锁 } - //oneCirclesGreenEnergyBasicSetting, err := db2.OneCirclesGreenEnergyBasicSettingGetOneByParams(engine, map[string]interface{}{ - // "key": "is_open", - // "value": 1, - //}) - //if err != nil { - // return err - //} - //if oneCirclesGreenEnergyBasicSetting == nil { - // return nil - //} session := engine.NewSession() defer func() { @@ -96,30 +86,8 @@ func handleOneCirclesActivityCoinAutoExchangeGreenEnergyConsume(msgData []byte) } }() session.Begin() - //3.1计算涨价公式 - err1, values, _, afterPriceValue := one_circles.NewCalcPriceIncreaseFormula(msg.AutoExchangeNumsAmount, oneCirclesGreenEnergyBasicSetting) - if err1 != nil { - _ = session.Rollback() - return err1 - } - //3.2给相应的用户加上个人的绿色积分(可用数量) - err = rule.DealUserCoin(session, md2.DealUserCoinReq{ - Kind: "add", - Mid: msg.MasterId, - Title: md2.OneCirclesPersonalActiveCoinExchangeGreenEnergy, - TransferType: md2.OneCirclesPersonalActiveCoinExchangeGreenEnergyForUserVirtualCoinFlow, - OrdId: "", - CoinId: oneCirclesGreenEnergyBasicSetting.PersonGreenEnergyCoinId, - Uid: msg.Uid, - Amount: utils.StrToFloat64(values), - }) - if err != nil { - _ = session.Rollback() - fmt.Println("err:::::33333", err.Error()) - return err - } - //4.1给相应的用户减去个人活跃积分 + //3给相应的用户减去个人活跃积分 err = rule.DealUserCoin(session, md2.DealUserCoinReq{ Kind: "sub", Mid: msg.MasterId, @@ -135,21 +103,45 @@ func handleOneCirclesActivityCoinAutoExchangeGreenEnergyConsume(msgData []byte) fmt.Println("err:::::33333", err.Error()) return err } - //4.2减少“原始数量”中的绿色能量 - err = one_circles.DealAvailableGreenEnergyCoin(session, int(enum.PersonalActivePointRedemption), utils.StrToFloat64(values), utils.StrToFloat64(msg.AutoExchangeNumsAmount), enum.PersonalActivePointRedemption.String(), oneCirclesGreenEnergyBasicSetting, afterPriceValue) + + //4.1计算涨价公式 + err1, values, _, afterPriceValue := one_circles.NewCalcPriceIncreaseFormula(msg.AutoExchangeNumsAmount, oneCirclesGreenEnergyBasicSetting) + if err1 != nil { + _ = session.Rollback() + return err1 + } + //4.2给相应的用户加上个人的绿色积分(可用数量) + err = rule.DealUserCoin(session, md2.DealUserCoinReq{ + Kind: "add", + Mid: msg.MasterId, + Title: md2.OneCirclesPersonalActiveCoinExchangeGreenEnergy, + TransferType: md2.OneCirclesPersonalActiveCoinExchangeGreenEnergyForUserVirtualCoinFlow, + OrdId: "", + CoinId: oneCirclesGreenEnergyBasicSetting.PersonGreenEnergyCoinId, + Uid: msg.Uid, + Amount: utils.StrToFloat64(values), + }) if err != nil { _ = session.Rollback() fmt.Println("err:::::44444", err.Error()) return err } - //5、修改 one_circles_green_energy_basic_setting 的 now_price - _, err = db2.OneCirclesGreenEnergyBasicSettingUpdate(session, oneCirclesGreenEnergyBasicSetting.Id, oneCirclesGreenEnergyBasicSetting) + //5减少“原始数量”中的绿色能量 + err = one_circles.DealAvailableGreenEnergyCoin(session, int(enum.PersonalActivePointRedemption), utils.StrToFloat64(values), utils.StrToFloat64(msg.AutoExchangeNumsAmount), enum.PersonalActivePointRedemption.String(), oneCirclesGreenEnergyBasicSetting, afterPriceValue) if err != nil { _ = session.Rollback() - fmt.Println("err:::::77777", err.Error()) + fmt.Println("err:::::555555", err.Error()) return err } + + //6、修改 one_circles_green_energy_basic_setting 的 now_price + db2.OneCirclesGreenEnergyBasicSettingUpdate(session, oneCirclesGreenEnergyBasicSetting.Id, oneCirclesGreenEnergyBasicSetting) + //if err != nil { + // _ = session.Rollback() + // fmt.Println("err:::::666666", err.Error()) + // return err + //} err = session.Commit() if err != nil { _ = session.Rollback() diff --git a/consume/one_circles_add_public_platoon_user_relation_commission.go b/consume/one_circles_add_public_platoon_user_relation_commission.go new file mode 100644 index 0000000..ed83e06 --- /dev/null +++ b/consume/one_circles_add_public_platoon_user_relation_commission.go @@ -0,0 +1,75 @@ +package consume + +import ( + "applet/app/cfg" + "applet/app/db" + "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + md2 "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/md" + "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/rule/one_circles" + "encoding/json" + "errors" + "fmt" + "github.com/streadway/amqp" +) + +func OneCirclesAddPublicPlatoonUserRelationCommissionConsume(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>OneCirclesAddPublicPlatoonUserRelationCommissionConsume>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1) + delivery := ch.Consume(queue.Name, false) + + one_circles.Init(cfg.RedisAddr) + + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + err = handleOneCirclesAddPublicPlatoonUserRelationCommissionConsume(res.Body) + if err != nil { + fmt.Println("err ::: ", err) + utils.FilePutContents("OneCirclesAddPublicPlatoonUserRelationCommissionConsume_ERR", "[err]:"+err.Error()) + _ = res.Reject(false) + //TODO::重新推回队列末尾,避免造成队列堵塞 + var msg *md.OneCirclesStructForAddPublicPlatoonUserRelationCommissionConsume + json.Unmarshal(res.Body, &msg) + ch.Publish(queue.ExchangeName, msg, queue.RoutKey) + } else { + _ = res.Ack(true) + } + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleOneCirclesAddPublicPlatoonUserRelationCommissionConsume(msgData []byte) error { + //1、解析mq中queue的数据结构体 + var msg *md.OneCirclesStructForAddPublicPlatoonUserRelationCommissionConsume + err := json.Unmarshal(msgData, &msg) + if err != nil { + return err + } + + engine := db.DBs[msg.MasterId] + var req []*md2.AddOneCirclesPublicPlatoonUserRelationCommissionReq + req = append(req, &msg.Data) + _, err = one_circles.AddOneCirclesPublicPlatoonUserRelationCommission(engine, req) + fmt.Println("err::::", err) + if err != nil { + return err + } + return nil +} diff --git a/go.mod b/go.mod index 2d2d74b..b543312 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,9 @@ module applet go 1.18 +// go.mod文件中 +//replace code.fnuoos.com/go_rely_warehouse/zyos_model.git => E:/company/go_rely_warehouse/zyos_model/ + require ( code.fnuoos.com/go_rely_warehouse/zyos_go_condition_statistics.git v1.1.2-0.20240607091816-3df1433a2f0d code.fnuoos.com/go_rely_warehouse/zyos_go_es.git v1.0.0 @@ -9,7 +12,7 @@ require ( code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git v1.9.10-0.20240703034234-2ab228956242 code.fnuoos.com/go_rely_warehouse/zyos_go_pay.git v1.6.2-0.20231116085701-9ba6e19f877b code.fnuoos.com/go_rely_warehouse/zyos_go_third_party_api.git v1.1.21-0.20240611024753-7cd929a03014 - code.fnuoos.com/go_rely_warehouse/zyos_model.git v0.0.4-0.20240701102131-0408d7ee8572 + code.fnuoos.com/go_rely_warehouse/zyos_model.git v0.0.4-0.20240711033658-057f89bb825f github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 github.com/boombuler/barcode v1.0.1