From ad5fc6d65e275066cd5a51306a472d3d860e861e Mon Sep 17 00:00:00 2001 From: huangjiajun <582604932@qq.com> Date: Mon, 17 Jun 2024 18:11:25 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/db/dbs_map.go | 3 +- consume/init.go | 1 + consume/md/consume_key.go | 10 ++ consume/md/md.go | 2 + consume/zhios_new_video_reward_exchange.go | 102 +++++++++++++++++++++ 5 files changed, 117 insertions(+), 1 deletion(-) create mode 100644 consume/zhios_new_video_reward_exchange.go diff --git a/app/db/dbs_map.go b/app/db/dbs_map.go index 5acfe9d..91ec1b1 100644 --- a/app/db/dbs_map.go +++ b/app/db/dbs_map.go @@ -110,7 +110,8 @@ func GetAllDatabaseDev() *[]model.DbMapping { fmt.Println("cfg.Local is: ", cfg.Local) if cfg.Local { // 本地调试 加快速度 fmt.Println("notice:LOCAL TEST, only masterId:** 99813608 ** available!") - err = Db.Where("deleted_at != ? AND db_master_id=?", 1, 31585332).Find(&m) + err = Db.Where("deleted_at != ? AND is_dev = '1' AND db_master_id=?", 1, 123456).Find(&m) + } else { err = Db.Where("deleted_at != ? AND is_dev = '1' ", 1).Find(&m) } diff --git a/consume/init.go b/consume/init.go index 18a64a6..55ba141 100644 --- a/consume/init.go +++ b/consume/init.go @@ -94,6 +94,7 @@ func initConsumes() { jobs[consumeMd.ZhiosTaskRewardConsumeFunName] = ZhiosTaskRewardExchange //兑换 jobs[consumeMd.CanalOneOrengeUserVirtualCcoinFlowFunName] = CanalOneOrengeUserVirtualCoinFlowConsume jobs[consumeMd.ZhiosTaskVideoRewardConsumeFunName] = ZhiosTaskVideoRewardExchange //视频分佣 + jobs[consumeMd.ZhiosNewVideoRewardConsumeFunName] = ZhiosNewVideoRewardExchange //短视频奖励 } diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index fb74f75..4f82f62 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -470,6 +470,15 @@ var RabbitMqQueueKeyList = []*MqQueue{ BindKey: "", ConsumeFunName: "ZhiosTaskVideoRewardExchange", }, + { + ExchangeName: "zhios.new_video_reward.exchange", + Name: "zhios_new_video_reward", + Type: FanOutQueueType, + IsPersistent: false, + RoutKey: "new_video_reward", + BindKey: "", + ConsumeFunName: "ZhiosNewVideoRewardExchange", + }, { ExchangeName: "canal.topic", // Name: "canal_fin_user_flow", @@ -545,5 +554,6 @@ const ( CancalUserIntegralExchangeConsumeFunName = "CancalUserIntegralExchange" ZhiosTaskRewardConsumeFunName = "ZhiosTaskRewardExchange" ZhiosTaskVideoRewardConsumeFunName = "ZhiosTaskVideoRewardExchange" + ZhiosNewVideoRewardConsumeFunName = "ZhiosNewVideoRewardExchange" CanalOneOrengeUserVirtualCcoinFlowFunName = "CanalOneOrengeUserVirtualCoinFlowConsume" ) diff --git a/consume/md/md.go b/consume/md/md.go index 1bcc00d..fe6dec7 100644 --- a/consume/md/md.go +++ b/consume/md/md.go @@ -8,6 +8,8 @@ type ZhiosAcquisition struct { Id string `json:"id"` } type ZhiosTaskReward struct { + CoinId string `json:"coin_id"` + Money string `json:"money"` Uid string `json:"uid"` Mid string `json:"mid"` Reward string `json:"reward"` diff --git a/consume/zhios_new_video_reward_exchange.go b/consume/zhios_new_video_reward_exchange.go new file mode 100644 index 0000000..9e339aa --- /dev/null +++ b/consume/zhios_new_video_reward_exchange.go @@ -0,0 +1,102 @@ +package consume + +import ( + "applet/app/db" + "applet/app/svc" + "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + md3 "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/md" + "encoding/json" + "errors" + "fmt" + "github.com/streadway/amqp" +) + +// +func ZhiosNewVideoRewardExchange(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1) + delivery := ch.Consume(queue.Name, false) + + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + //fmt.Println(string(res.Body)) + fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<") + err = handleZhiosNewVideoRewardExchange(res.Body) + //_ = res.Reject(false) + fmt.Println(err) + _ = res.Ack(true) + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleZhiosNewVideoRewardExchange(msg []byte) error { + //1、解析canal采集至mq中queue的数据结构体 + var canalMsg *md.ZhiosTaskReward + fmt.Println(string(msg)) + var tmpString string + err := json.Unmarshal(msg, &tmpString) + if err != nil { + fmt.Println(err.Error()) + return err + } + fmt.Println(tmpString) + err = json.Unmarshal([]byte(tmpString), &canalMsg) + if err != nil { + return err + } + mid := canalMsg.Mid + eg := db.DBs[mid] + if eg == nil { + return nil + } + amount := canalMsg.Money + //奖励 + oid := canalMsg.Oid + uid := canalMsg.Uid + sess := eg.NewSession() + defer sess.Close() + sess.Begin() + _, err = svc.ExchangeUserVirFinValidAndInterFlowWithSession(sess, + utils.StrToFloat64(amount), "看视频奖励", "0", 1, 170, utils.StrToInt(uid), utils.StrToInt(canalMsg.CoinId), 0, utils.StrToInt64(oid), "", 0, 0) + if err != nil { + sess.Rollback() + return err + } + sess.Commit() + if canalMsg.Mode != "" { + //计算佣金 + var CommissionParam md3.CommissionFirstParam + CommissionParam.CommissionParam.Commission = canalMsg.Reward + CommissionParam.Uid = uid + CommissionParam.Provider = canalMsg.PlanType + title := canalMsg.Title + var mapData = map[string]string{ + "coin_id_type": canalMsg.CoinIdType, + "mode": canalMsg.Mode, + "title": title, + "device_model": canalMsg.DeviceModel, + "reward_type": canalMsg.RewardType, + } + svc.GetLvUser(eg, CommissionParam, utils.StrToInt64(canalMsg.Oid), mid, mapData) + } + + return nil +}