@@ -93,10 +93,10 @@ func initConsumes() { | |||||
jobs[consumeMd.CancalUserIntegralExchangeConsumeFunName] = CancalUserIntegralExchange //兑换 | jobs[consumeMd.CancalUserIntegralExchangeConsumeFunName] = CancalUserIntegralExchange //兑换 | ||||
jobs[consumeMd.ZhiosTaskRewardConsumeFunName] = ZhiosTaskRewardExchange //兑换 | jobs[consumeMd.ZhiosTaskRewardConsumeFunName] = ZhiosTaskRewardExchange //兑换 | ||||
jobs[consumeMd.CanalOneOrengeUserVirtualCcoinFlowFunName] = CanalOneOrengeUserVirtualCoinFlowConsume | jobs[consumeMd.CanalOneOrengeUserVirtualCcoinFlowFunName] = CanalOneOrengeUserVirtualCoinFlowConsume | ||||
jobs[consumeMd.ZhiosTaskVideoRewardConsumeFunName] = ZhiosTaskVideoRewardExchange //视频分佣 | |||||
jobs[consumeMd.ZhiosNewVideoRewardConsumeFunName] = ZhiosNewVideoRewardExchange //短视频奖励 | |||||
jobs[consumeMd.ZhiosRelateRewardConsumeFunName] = ZhiosRelateRewardExchange //分佣结算 | |||||
jobs[consumeMd.ZhiosTaskVideoRewardConsumeFunName] = ZhiosTaskVideoRewardExchange //视频分佣 | |||||
jobs[consumeMd.ZhiosNewVideoRewardConsumeFunName] = ZhiosNewVideoRewardExchange //短视频奖励 | |||||
jobs[consumeMd.ZhiosRelateRewardConsumeFunName] = ZhiosRelateRewardExchange //分佣结算 | |||||
jobs[consumeMd.ZhiosOwnNewVideoRewardConsumeFunName] = ZhiosOwnNewVideoRewardExchange //短视频奖励 | |||||
} | } | ||||
@@ -488,6 +488,15 @@ var RabbitMqQueueKeyList = []*MqQueue{ | |||||
BindKey: "", | BindKey: "", | ||||
ConsumeFunName: "ZhiosNewVideoRewardExchange", | ConsumeFunName: "ZhiosNewVideoRewardExchange", | ||||
}, | }, | ||||
{ | |||||
ExchangeName: "zhios.new_video_reward.exchange", | |||||
Name: "zhios_own_new_video_reward_dev", | |||||
Type: FanOutQueueType, | |||||
IsPersistent: false, | |||||
RoutKey: "own_new_video_reward_dev", | |||||
BindKey: "", | |||||
ConsumeFunName: "ZhiosOwnNewVideoRewardExchange", | |||||
}, | |||||
{ | { | ||||
ExchangeName: "canal.topic", // | ExchangeName: "canal.topic", // | ||||
Name: "canal_fin_user_flow", | Name: "canal_fin_user_flow", | ||||
@@ -565,5 +574,6 @@ const ( | |||||
ZhiosRelateRewardConsumeFunName = "ZhiosRelateRewardExchange" | ZhiosRelateRewardConsumeFunName = "ZhiosRelateRewardExchange" | ||||
ZhiosTaskVideoRewardConsumeFunName = "ZhiosTaskVideoRewardExchange" | ZhiosTaskVideoRewardConsumeFunName = "ZhiosTaskVideoRewardExchange" | ||||
ZhiosNewVideoRewardConsumeFunName = "ZhiosNewVideoRewardExchange" | ZhiosNewVideoRewardConsumeFunName = "ZhiosNewVideoRewardExchange" | ||||
ZhiosOwnNewVideoRewardConsumeFunName = "ZhiosOwnNewVideoRewardExchange" | |||||
CanalOneOrengeUserVirtualCcoinFlowFunName = "CanalOneOrengeUserVirtualCoinFlowConsume" | CanalOneOrengeUserVirtualCcoinFlowFunName = "CanalOneOrengeUserVirtualCoinFlowConsume" | ||||
) | ) |
@@ -0,0 +1,85 @@ | |||||
package consume | |||||
import ( | |||||
"applet/app/db" | |||||
"applet/app/svc" | |||||
"applet/app/utils" | |||||
"applet/app/utils/logx" | |||||
"applet/consume/md" | |||||
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" | |||||
"encoding/json" | |||||
"errors" | |||||
"fmt" | |||||
"github.com/streadway/amqp" | |||||
) | |||||
// | |||||
func ZhiosOwnNewVideoRewardExchange(queue md.MqQueue) { | |||||
fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>") | |||||
ch, err := rabbit.Cfg.Pool.GetChannel() | |||||
if err != nil { | |||||
logx.Error(err) | |||||
return | |||||
} | |||||
defer ch.Release() | |||||
//1、将自己绑定到交换机上 | |||||
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) | |||||
//2、取出数据进行消费 | |||||
ch.Qos(1) | |||||
delivery := ch.Consume(queue.Name, false) | |||||
var res amqp.Delivery | |||||
var ok bool | |||||
for { | |||||
res, ok = <-delivery | |||||
if ok == true { | |||||
//fmt.Println(string(res.Body)) | |||||
fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<") | |||||
err = handleZhiosOwnNewVideoRewardExchange(res.Body) | |||||
//_ = res.Reject(false) | |||||
fmt.Println(err) | |||||
_ = res.Ack(true) | |||||
} else { | |||||
panic(errors.New("error getting message")) | |||||
} | |||||
} | |||||
fmt.Println("get msg done") | |||||
} | |||||
func handleZhiosOwnNewVideoRewardExchange(msg []byte) error { | |||||
//1、解析canal采集至mq中queue的数据结构体 | |||||
var canalMsg *md.ZhiosTaskReward | |||||
fmt.Println(string(msg)) | |||||
var tmpString string | |||||
err := json.Unmarshal(msg, &tmpString) | |||||
if err != nil { | |||||
fmt.Println(err.Error()) | |||||
return err | |||||
} | |||||
fmt.Println(tmpString) | |||||
err = json.Unmarshal([]byte(tmpString), &canalMsg) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
mid := canalMsg.Mid | |||||
eg := db.DBs[mid] | |||||
if eg == nil { | |||||
return nil | |||||
} | |||||
amount := canalMsg.Money | |||||
//奖励 | |||||
oid := canalMsg.Oid | |||||
uid := canalMsg.Uid | |||||
sess := eg.NewSession() | |||||
defer sess.Close() | |||||
sess.Begin() | |||||
_, err = svc.ExchangeUserVirFinValidAndInterFlowWithSession(sess, | |||||
utils.StrToFloat64(amount), "看视频奖励", "0", 1, 170, utils.StrToInt(uid), utils.StrToInt(canalMsg.CoinId), 0, utils.StrToInt64(oid), "", 0, 0) | |||||
if err != nil { | |||||
sess.Rollback() | |||||
return err | |||||
} | |||||
sess.Commit() | |||||
return nil | |||||
} |