From 4a1bb56fdc40b35e1717b0c349a1158f74b0d2a2 Mon Sep 17 00:00:00 2001 From: huangjiajun <582604932@qq.com> Date: Mon, 5 Aug 2024 11:12:15 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consume/init.go | 8 +- consume/md/consume_key.go | 10 +++ .../zhios_own_new_video_reward_exchange.go | 85 +++++++++++++++++++ 3 files changed, 99 insertions(+), 4 deletions(-) create mode 100644 consume/zhios_own_new_video_reward_exchange.go diff --git a/consume/init.go b/consume/init.go index 51cadda..8646f0e 100644 --- a/consume/init.go +++ b/consume/init.go @@ -93,10 +93,10 @@ func initConsumes() { jobs[consumeMd.CancalUserIntegralExchangeConsumeFunName] = CancalUserIntegralExchange //兑换 jobs[consumeMd.ZhiosTaskRewardConsumeFunName] = ZhiosTaskRewardExchange //兑换 jobs[consumeMd.CanalOneOrengeUserVirtualCcoinFlowFunName] = CanalOneOrengeUserVirtualCoinFlowConsume - jobs[consumeMd.ZhiosTaskVideoRewardConsumeFunName] = ZhiosTaskVideoRewardExchange //视频分佣 - jobs[consumeMd.ZhiosNewVideoRewardConsumeFunName] = ZhiosNewVideoRewardExchange //短视频奖励 - - jobs[consumeMd.ZhiosRelateRewardConsumeFunName] = ZhiosRelateRewardExchange //分佣结算 + jobs[consumeMd.ZhiosTaskVideoRewardConsumeFunName] = ZhiosTaskVideoRewardExchange //视频分佣 + jobs[consumeMd.ZhiosNewVideoRewardConsumeFunName] = ZhiosNewVideoRewardExchange //短视频奖励 + jobs[consumeMd.ZhiosRelateRewardConsumeFunName] = ZhiosRelateRewardExchange //分佣结算 + jobs[consumeMd.ZhiosOwnNewVideoRewardConsumeFunName] = ZhiosOwnNewVideoRewardExchange //短视频奖励 } diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index 2ce12b5..5f4271d 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -488,6 +488,15 @@ var RabbitMqQueueKeyList = []*MqQueue{ BindKey: "", ConsumeFunName: "ZhiosNewVideoRewardExchange", }, + { + ExchangeName: "zhios.new_video_reward.exchange", + Name: "zhios_own_new_video_reward_dev", + Type: FanOutQueueType, + IsPersistent: false, + RoutKey: "own_new_video_reward_dev", + BindKey: "", + ConsumeFunName: "ZhiosOwnNewVideoRewardExchange", + }, { ExchangeName: "canal.topic", // Name: "canal_fin_user_flow", @@ -565,5 +574,6 @@ const ( ZhiosRelateRewardConsumeFunName = "ZhiosRelateRewardExchange" ZhiosTaskVideoRewardConsumeFunName = "ZhiosTaskVideoRewardExchange" ZhiosNewVideoRewardConsumeFunName = "ZhiosNewVideoRewardExchange" + ZhiosOwnNewVideoRewardConsumeFunName = "ZhiosOwnNewVideoRewardExchange" CanalOneOrengeUserVirtualCcoinFlowFunName = "CanalOneOrengeUserVirtualCoinFlowConsume" ) diff --git a/consume/zhios_own_new_video_reward_exchange.go b/consume/zhios_own_new_video_reward_exchange.go new file mode 100644 index 0000000..b97cc7f --- /dev/null +++ b/consume/zhios_own_new_video_reward_exchange.go @@ -0,0 +1,85 @@ +package consume + +import ( + "applet/app/db" + "applet/app/svc" + "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "encoding/json" + "errors" + "fmt" + "github.com/streadway/amqp" +) + +// +func ZhiosOwnNewVideoRewardExchange(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1) + delivery := ch.Consume(queue.Name, false) + + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + //fmt.Println(string(res.Body)) + fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<") + err = handleZhiosOwnNewVideoRewardExchange(res.Body) + //_ = res.Reject(false) + fmt.Println(err) + _ = res.Ack(true) + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleZhiosOwnNewVideoRewardExchange(msg []byte) error { + //1、解析canal采集至mq中queue的数据结构体 + var canalMsg *md.ZhiosTaskReward + fmt.Println(string(msg)) + var tmpString string + err := json.Unmarshal(msg, &tmpString) + if err != nil { + fmt.Println(err.Error()) + return err + } + fmt.Println(tmpString) + err = json.Unmarshal([]byte(tmpString), &canalMsg) + if err != nil { + return err + } + mid := canalMsg.Mid + eg := db.DBs[mid] + if eg == nil { + return nil + } + amount := canalMsg.Money + //奖励 + oid := canalMsg.Oid + uid := canalMsg.Uid + sess := eg.NewSession() + defer sess.Close() + sess.Begin() + _, err = svc.ExchangeUserVirFinValidAndInterFlowWithSession(sess, + utils.StrToFloat64(amount), "看视频奖励", "0", 1, 170, utils.StrToInt(uid), utils.StrToInt(canalMsg.CoinId), 0, utils.StrToInt64(oid), "", 0, 0) + if err != nil { + sess.Rollback() + return err + } + sess.Commit() + + return nil +}