From 7b5429a2539bebd0cff81dede9cd72cde8dfadba Mon Sep 17 00:00:00 2001 From: DengBiao <2319963317@qq.com> Date: Fri, 8 Mar 2024 23:41:34 +0800 Subject: [PATCH] update --- app/db/dbs_map.go | 2 +- consume/init.go | 81 ++++++++++---------- consume/md/consume_key.go | 10 +++ consume/withdraw_consume.go | 96 ++++++++++++++++++++++++ consume/zhios_mall_green_coin_consume.go | 4 +- go.mod | 2 +- 6 files changed, 151 insertions(+), 44 deletions(-) create mode 100644 consume/withdraw_consume.go diff --git a/app/db/dbs_map.go b/app/db/dbs_map.go index 5acfe9d..b16ee8c 100644 --- a/app/db/dbs_map.go +++ b/app/db/dbs_map.go @@ -110,7 +110,7 @@ func GetAllDatabaseDev() *[]model.DbMapping { fmt.Println("cfg.Local is: ", cfg.Local) if cfg.Local { // 本地调试 加快速度 fmt.Println("notice:LOCAL TEST, only masterId:** 99813608 ** available!") - err = Db.Where("deleted_at != ? AND db_master_id=?", 1, 31585332).Find(&m) + err = Db.Where("deleted_at != ? AND db_master_id=?", 1, 72190665).Find(&m) } else { err = Db.Where("deleted_at != ? AND is_dev = '1' ", 1).Find(&m) } diff --git a/consume/init.go b/consume/init.go index 979cf9e..3ab1115 100644 --- a/consume/init.go +++ b/consume/init.go @@ -19,47 +19,47 @@ func Init() { func initConsumes() { //jobs[consumeMd.ZhiosMallGreenCoinConsumeFunName] = ZhiosMallGreenCoinConsume - jobs[consumeMd.ZhiosIntegralProxyRechargeFunName] = ZhiosIntegralProxyRecharge - jobs[consumeMd.ZhiosUserUpLvFunName] = ZhiosUserUpLv - jobs[consumeMd.CanalGuideOrderByUserUpLvConsume] = CanalGuideOrderByUserUpLvConsume - jobs[consumeMd.ZhiosOrderFreeFunName] = ZhiosOrderFree - jobs[consumeMd.ZhiosOrderTotalFunName] = ZhiosOrderTotal - jobs[consumeMd.ZhiosOrderTotalSecondFunName] = ZhiosOrderTotalSecond + //jobs[consumeMd.ZhiosIntegralProxyRechargeFunName] = ZhiosIntegralProxyRecharge + //jobs[consumeMd.ZhiosUserUpLvFunName] = ZhiosUserUpLv + //jobs[consumeMd.CanalGuideOrderByUserUpLvConsume] = CanalGuideOrderByUserUpLvConsume + //jobs[consumeMd.ZhiosOrderFreeFunName] = ZhiosOrderFree + //jobs[consumeMd.ZhiosOrderTotalFunName] = ZhiosOrderTotal + //jobs[consumeMd.ZhiosOrderTotalSecondFunName] = ZhiosOrderTotalSecond + //// + //jobs[consumeMd.ZhiosOrderSettleTotalFunName] = ZhiosSettleTotal + //jobs[consumeMd.ZhiosOrderHjyFunName] = ZhiosOrderHjy + //jobs[consumeMd.ZhiosOrderBuckleFunName] = ZhiosOrderBuckle + //// + //jobs[consumeMd.ZhiosSupplierAfterOrderFunName] = ZhiosSupplierAfterOrder + //jobs[consumeMd.ZhiosGuideStoreOrderFunName] = ZhiosGuideStoreOrder // - jobs[consumeMd.ZhiosOrderSettleTotalFunName] = ZhiosSettleTotal - jobs[consumeMd.ZhiosOrderHjyFunName] = ZhiosOrderHjy - jobs[consumeMd.ZhiosOrderBuckleFunName] = ZhiosOrderBuckle + //jobs[consumeMd.ZhiosAppreciationFunName] = ZhiosAppreciation + //jobs[consumeMd.ZhiosValidUserFunName] = ZhiosValidUser // - jobs[consumeMd.ZhiosSupplierAfterOrderFunName] = ZhiosSupplierAfterOrder - jobs[consumeMd.ZhiosGuideStoreOrderFunName] = ZhiosGuideStoreOrder - - jobs[consumeMd.ZhiosAppreciationFunName] = ZhiosAppreciation - jobs[consumeMd.ZhiosValidUserFunName] = ZhiosValidUser - - jobs[consumeMd.ZhiosAcquisitionConditionFunName] = ZhiosAcquisitionCondition - - jobs[consumeMd.DouShenUserRegisterConsumeForOfficialFunName] = DouShenUserRegisterConsumeForOfficial - jobs[consumeMd.DouShenUserRegisterConsumeForOperationCenterFunName] = DouShenUserRegisterConsumeForOperationCenter - jobs[consumeMd.DouShenUserRegisterConsumeForMyRecommenderFunName] = DouShenUserRegisterConsumeForMyRecommender - jobs[consumeMd.DouShenUserRegisterConsumeForMyFansFunName] = DouShenUserRegisterConsumeForMyFans - jobs[consumeMd.DouShenUserRegisterConsumeForUserRegisterUpLvFunName] = DouShenUserRegisterConsumeForUserRegisterUpLv - - jobs[consumeMd.ZhiosFastReturnOrderPayFunName] = ZhiosFastReturnOrderPay - jobs[consumeMd.ZhiosFastReturnOrderSuccessFunName] = ZhiosFastReturnOrderSuccess - jobs[consumeMd.ZhiosFastReturnOrderRefundFunName] = ZhiosFastReturnOrderRefund - jobs[consumeMd.ZhiosFastReturnOrderRefundSecondFunName] = ZhiosFastReturnOrderRefundSecond - - jobs[consumeMd.YoumishangExchangeStoreFunName] = YoumishangExchangeStore - - jobs[consumeMd.ZhiosRechargeOrderFailFunName] = ZhiosRechargeOrderFail - - jobs[consumeMd.CloudIssuanceAsyncMLoginFunName] = CloudIssuanceAsyncMLoginConsume - jobs[consumeMd.ZhiosTikTokUpdateFunName] = ZhiosTikTokUpdate - jobs[consumeMd.ZhiosTikTokAllUpdateFunName] = ZhiosTikTokAllUpdate - - jobs[consumeMd.ZhiosCapitalPoolOrderTotalFunName] = ZhiosCapitalPoolOrderTotal - jobs[consumeMd.ZhiosExpressOrderFail] = ZhiosExpressOrderFail - jobs[consumeMd.ZhiosWithdrawReward] = ZhiosWithdrawReward + //jobs[consumeMd.ZhiosAcquisitionConditionFunName] = ZhiosAcquisitionCondition + // + //jobs[consumeMd.DouShenUserRegisterConsumeForOfficialFunName] = DouShenUserRegisterConsumeForOfficial + //jobs[consumeMd.DouShenUserRegisterConsumeForOperationCenterFunName] = DouShenUserRegisterConsumeForOperationCenter + //jobs[consumeMd.DouShenUserRegisterConsumeForMyRecommenderFunName] = DouShenUserRegisterConsumeForMyRecommender + //jobs[consumeMd.DouShenUserRegisterConsumeForMyFansFunName] = DouShenUserRegisterConsumeForMyFans + //jobs[consumeMd.DouShenUserRegisterConsumeForUserRegisterUpLvFunName] = DouShenUserRegisterConsumeForUserRegisterUpLv + // + //jobs[consumeMd.ZhiosFastReturnOrderPayFunName] = ZhiosFastReturnOrderPay + //jobs[consumeMd.ZhiosFastReturnOrderSuccessFunName] = ZhiosFastReturnOrderSuccess + //jobs[consumeMd.ZhiosFastReturnOrderRefundFunName] = ZhiosFastReturnOrderRefund + //jobs[consumeMd.ZhiosFastReturnOrderRefundSecondFunName] = ZhiosFastReturnOrderRefundSecond + // + //jobs[consumeMd.YoumishangExchangeStoreFunName] = YoumishangExchangeStore + // + //jobs[consumeMd.ZhiosRechargeOrderFailFunName] = ZhiosRechargeOrderFail + // + //jobs[consumeMd.CloudIssuanceAsyncMLoginFunName] = CloudIssuanceAsyncMLoginConsume + //jobs[consumeMd.ZhiosTikTokUpdateFunName] = ZhiosTikTokUpdate + //jobs[consumeMd.ZhiosTikTokAllUpdateFunName] = ZhiosTikTokAllUpdate + // + //jobs[consumeMd.ZhiosCapitalPoolOrderTotalFunName] = ZhiosCapitalPoolOrderTotal + //jobs[consumeMd.ZhiosExpressOrderFail] = ZhiosExpressOrderFail + //jobs[consumeMd.ZhiosWithdrawReward] = ZhiosWithdrawReward // ////////////////////////////////////// V1 ///////////////////////////////////////////////////// @@ -80,6 +80,9 @@ func initConsumes() { //////////////////////////////////////// oneCircles ///////////////////////////////////////////////////// //jobs[consumeMd.OneCirclesSignInGreenEnergyFunName] = OneCirclesSignInGreenEnergyConsume //jobs[consumeMd.OneCirclesSignInCopyGreenEnergyFunName] = OneCirclesSignInCopyGreenEnergyConsume + + //////////////////////////////////////// withdraw ///////////////////////////////////////////////////// + jobs[consumeMd.WithdrawConsumeFunName] = WithdrawConsume } func Run() { diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index c037600..f7e539a 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -416,6 +416,15 @@ var RabbitMqQueueKeyList = []*MqQueue{ BindKey: "", ConsumeFunName: "OneCirclesSignInCopyGreenEnergyConsume", }, + { + ExchangeName: "zhios.app.user.withdraw.apply.exchange", + Name: "zhios_app_user_withdraw_apply_queue", + Type: FanOutQueueType, + IsPersistent: false, + RoutKey: "queues_one", + BindKey: "", + ConsumeFunName: "WithdrawConsume", + }, } const ( @@ -465,4 +474,5 @@ const ( CanalUserVirtualCcoinFlowFunName = "CanalUserVirtualCoinFlowConsume" OneCirclesSignInGreenEnergyFunName = "OneCirclesSignInGreenEnergyConsume" OneCirclesSignInCopyGreenEnergyFunName = "OneCirclesSignInCopyGreenEnergyConsume" + WithdrawConsumeFunName = "WithdrawConsume" ) diff --git a/consume/withdraw_consume.go b/consume/withdraw_consume.go new file mode 100644 index 0000000..0f9ae1f --- /dev/null +++ b/consume/withdraw_consume.go @@ -0,0 +1,96 @@ +package consume + +import ( + "applet/app/cfg" + "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/rule/one_circles" + "encoding/json" + "errors" + "fmt" + "github.com/streadway/amqp" + "time" +) + +func WithdrawConsume(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>WithdrawConsume>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1) + delivery := ch.Consume(queue.Name, false) + + one_circles.Init(cfg.RedisAddr) + + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + err = handleWithdrawConsume(res.Body) + fmt.Println("err ::: ", err) + if err != nil { + fmt.Println("WithdrawConsume_ERR:::::", err.Error()) + //_ = res.Reject(true) + _ = res.Reject(false) + var msg interface{} + json.Unmarshal(res.Body, &msg) + if err.Error() == "Connection timed out" { + //TODO::重新推回队列末尾,避免造成队列堵塞 + ch.Publish(queue.ExchangeName, msg, queue.RoutKey) + } else { + //TODO::推入新的队列中备份 + utils.FilePutContents("WithdrawConsume_ERR", utils.SerializeStr(err.Error())) + ch.Publish("zhios.app.user.withdraw.apply.exception.exchange", msg, "queues_one") + } + } else { + err = res.Ack(true) + } + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleWithdrawConsume(msgData []byte) error { + time.Sleep(time.Microsecond * 200) // 等待200毫秒 + //1、解析mq中queue的数据结构体 + var msg interface{} + err := json.Unmarshal(msgData, &msg) + if err != nil { + return err + } + fmt.Println("message:::::::::::>>>>>>>>>") + fmt.Println(msg) + post, err := utils.CurlPost("http://admin.99813608.zhiyingos.com/index/transfer", msg, nil) + if err != nil { + return err + } + fmt.Println("transfer:::::::::::<<<<<<<<<") + fmt.Println(string(post), "\n========================================\n\n") + var postResult struct { + Code int `json:"code"` + Msg string `json:"msg"` + Data struct { + IsOk bool `json:"isOk"` + Msg string `json:"msg"` + } `json:"data"` + } + err = json.Unmarshal(post, &postResult) + if err != nil { + return err + } + if !postResult.Data.IsOk { + return errors.New(postResult.Data.Msg) + } + return nil +} diff --git a/consume/zhios_mall_green_coin_consume.go b/consume/zhios_mall_green_coin_consume.go index 0d71687..bdb4326 100644 --- a/consume/zhios_mall_green_coin_consume.go +++ b/consume/zhios_mall_green_coin_consume.go @@ -49,10 +49,8 @@ func ZhiosMallGreenCoinConsume(queue md.MqQueue) { ch.Publish(md.MallGreenCoinConsume, utils.SerializeStr(canalMsg), md.MallGreenCoinConsumeKeyErr) } } - } _ = res.Ack(true) - } else { panic(errors.New("error getting message")) } @@ -82,7 +80,7 @@ func handleZhiosMallGreenCoinConsume(msg []byte) error { return nil } rule.InitForGreenCoinDoubleChainIntegral(cfg.RedisAddr) - _, err = rule.DealUserGreenCoinDoubleChainIntegral(eg, utils.StrToInt(canalMsg.Uid), canalMsg.Amount, canalMsg.Oid, canalMsg.Mid) + err = rule.DealUserGreenCoinDoubleChainIntegral(eg, utils.StrToInt(canalMsg.Uid), canalMsg.Oid, canalMsg.Mid) fmt.Println(err) if err != nil { return err diff --git a/go.mod b/go.mod index 5caad4a..9b2f283 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( code.fnuoos.com/go_rely_warehouse/zyos_go_condition_statistics.git v1.1.2-0.20240222023917-c31b53f7e8cb code.fnuoos.com/go_rely_warehouse/zyos_go_es.git v1.0.0 code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git v0.0.4 - code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git v1.9.10-0.20240306114517-5b62e6dc0082 + code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git v1.9.10-0.20240308111632-3557d568389b code.fnuoos.com/go_rely_warehouse/zyos_go_pay.git v1.6.2-0.20231116085701-9ba6e19f877b code.fnuoos.com/go_rely_warehouse/zyos_go_third_party_api.git v1.1.21-0.20240126015516-38ca248db2fd github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5