From 8025d9f98cc87c140a7a3b2c5aea5dfef801b9a8 Mon Sep 17 00:00:00 2001 From: shenjiachi Date: Thu, 28 Nov 2024 19:18:44 +0800 Subject: [PATCH] add to deal user virtual coin consume --- ...g_energy_deal_user_virtual_coin_consume.go | 83 +++++++++++++++++++ consume/init.go | 1 + consume/md/consume_key.go | 10 +++ go.mod | 2 +- 4 files changed, 95 insertions(+), 1 deletion(-) create mode 100644 consume/egg_energy_deal_user_virtual_coin_consume.go diff --git a/consume/egg_energy_deal_user_virtual_coin_consume.go b/consume/egg_energy_deal_user_virtual_coin_consume.go new file mode 100644 index 0000000..625f736 --- /dev/null +++ b/consume/egg_energy_deal_user_virtual_coin_consume.go @@ -0,0 +1,83 @@ +package consume + +import ( + "applet/app/cfg" + "applet/app/db" + utils2 "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/EggPlanet/egg_system_rules.git" + md3 "code.fnuoos.com/EggPlanet/egg_system_rules.git/md" + "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule" + md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/md" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "encoding/json" + "errors" + "fmt" + "github.com/streadway/amqp" + "time" +) + +func EggEnergyDealUserVirtualCoinDataConsume(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>EggEnergyDealUserVirtualCoinDataConsume>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1) + delivery := ch.Consume(queue.Name, false) + + egg_system_rules.Init(cfg.RedisAddr) + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + err = handleEggEnergyDealUserVirtualCoinDataConsume(res.Body) + if err != nil { + fmt.Println("EggEnergyDealUserVirtualCoinDataConsume_ERR:::::", err.Error()) + utils2.FilePutContents("EggEnergyDealUserVirtualCoinDataConsume_ERR", utils2.SerializeStr(map[string]interface{}{ + "body": res.Body, + "err": err.Error(), + })) + } + //_ = res.Reject(false) + err = res.Ack(true) + fmt.Println("err ::: ", err) + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleEggEnergyDealUserVirtualCoinDataConsume(msgData []byte) error { + time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒 + // 1.解析mq中queue的数据结构体 + var msg *md2.EggEnergyStructForDealUserVirtualCoinData + err := json.Unmarshal(msgData, &msg) + if err != nil { + return err + } + + engine := db.Db + session := engine.NewSession() + err = rule.DealUserVirtualCoin(session, md3.DealUserVirtualCoinReq{ + Kind: msg.Kind, + Title: msg.Title, + TransferType: msg.TransferType, + CoinId: msg.CoinId, + Uid: msg.Uid, + Amount: msg.Amount, + }) + if err != nil { + return err + } + + return nil +} diff --git a/consume/init.go b/consume/init.go index 52361ba..f10ce26 100644 --- a/consume/init.go +++ b/consume/init.go @@ -20,6 +20,7 @@ func initConsumes() { jobs[consumeMd.EggEnergyStartLevelDividendFunName] = EggEnergyStartLevelDividendConsume jobs[consumeMd.EggEnergyDealPlatformRevenueDataFunName] = EggEnergyDealPlatformRevenueDataConsume jobs[consumeMd.EggEnergyDealFundDataFunName] = EggEnergyDealFundDataConsume + jobs[consumeMd.EggEnergyDealUserVirtualCoinDataFunName] = EggEnergyDealUserVirtualCoinDataConsume jobs[consumeMd.IMEggEnergyBatchSendMessageDataFunName] = IMEggEnergyBatchSendMessageDataConsume } diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index 488b6be..4674cb4 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -38,6 +38,15 @@ var RabbitMqQueueKeyList = []*MqQueue{ BindKey: "", ConsumeFunName: "EggEnergyDealPlatformRevenueDataConsume", }, + { + ExchangeName: "egg.energy", + Name: "egg_deal_user_virtual_coin_queue", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "deal_user_virtual_coin", + BindKey: "", + ConsumeFunName: "EggEnergyDealUserVirtualCoinDataConsume", + }, { ExchangeName: "im.egg.energy", Name: "im_egg_energy_batch_send_message_queue", @@ -53,5 +62,6 @@ const ( EggEnergyStartLevelDividendFunName = "EggEnergyStartLevelDividendConsume" EggEnergyDealFundDataFunName = "EggEnergyDealFundDataConsume" EggEnergyDealPlatformRevenueDataFunName = "EggEnergyDealPlatformRevenueDataConsume" + EggEnergyDealUserVirtualCoinDataFunName = "EggEnergyDealUserVirtualCoinDataConsume" IMEggEnergyBatchSendMessageDataFunName = "IMEggEnergyBatchSendMessageDataConsume" ) diff --git a/go.mod b/go.mod index e339f64..5e88ebe 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ replace code.fnuoos.com/EggPlanet/egg_models.git => E:/company/Egg/egg_models replace code.fnuoos.com/EggPlanet/egg_system_rules.git => E:/company/Egg/egg_system_rules require ( - code.fnuoos.com/EggPlanet/egg_models.git v0.2.1-0.20241119093836-37be936b83fc + code.fnuoos.com/EggPlanet/egg_models.git v0.2.1-0.20241128102555-fc839292d728 code.fnuoos.com/EggPlanet/egg_system_rules.git v0.0.2 code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git v0.0.5 github.com/boombuler/barcode v1.0.1