Przeglądaj źródła

短视频短剧奖励

master
huangjiajun 3 tygodni temu
rodzic
commit
08d43f3831
10 zmienionych plików z 330 dodań i 180 usunięć
  1. +0
    -101
      consume/advertising_sign_consume.go
  2. +0
    -75
      consume/advertising_smash_consume.go
  3. +4
    -3
      consume/aliyun_sms_record_consume.go
  4. +2
    -0
      consume/init.go
  5. +20
    -0
      consume/md/consume_key.go
  6. +8
    -0
      consume/md/md_video.go
  7. +1
    -0
      consume/md/mq_jpush.go
  8. +147
    -0
      consume/playlet_reward_consume.go
  9. +144
    -0
      consume/video_reward_consume.go
  10. +4
    -1
      go.mod

+ 0
- 101
consume/advertising_sign_consume.go Wyświetl plik

@@ -1,101 +0,0 @@
package consume

import (
"applet/app/cfg"
"applet/app/db"

"applet/app/utils/logx"
"applet/consume/md"
"code.fnuoos.com/EggPlanet/egg_models.git/src/model"
"code.fnuoos.com/EggPlanet/egg_system_rules.git"
"code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy"
md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/md"
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
"encoding/json"
"errors"
"fmt"
"github.com/streadway/amqp"
"time"
)

func AdvertisingSignConsume(queue md.MqQueue) {
fmt.Println(">>>>>>>>>>>>JpushRecordConsume>>>>>>>>>>>>")
ch, err := rabbit.Cfg.Pool.GetChannel()
if err != nil {
logx.Error(err)
return
}
defer ch.Release()
//1、将自己绑定到交换机上
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
//2、取出数据进行消费
ch.Qos(1)
delivery := ch.Consume(queue.Name, false)

egg_system_rules.Init(cfg.RedisAddr)
var res amqp.Delivery
var ok bool
for {
res, ok = <-delivery
if ok == true {
err = handleAdvertisingSignConsume(res.Body)
err = res.Ack(true)
fmt.Println("err ::: ", err)
} else {
panic(errors.New("error getting message"))
}
}
fmt.Println("get msg done")
}

func handleAdvertisingSignConsume(msgData []byte) error {
time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒
// 1.解析mq中queue的数据结构体
var msg *md.AdvertisingWatch
err := json.Unmarshal(msgData, &msg)
if err != nil {
return err
}
var data model.AdvertisingCallback
db.Db.Where("id=? ", msg.Id).Get(&data)
if data.Id == 0 {
return errors.New("记录不存在")
}
if data.IsRun == 1 {
return nil
}
req := md2.HomePageStartSignInReq{
UID: int64(data.Uid),
}
err = egg_energy.HomePageStartSignIn(db.Db, req)
if err != nil {
return err
}
UpdateUserTime(int64(data.Uid), "sign")
data.IsRun = 1
db.Db.Where("id=?", data.Id).Cols("is_run").Update(&data)
return nil
}
func UpdateUserTime(uid int64, types string) {
count, _ := db.Db.Where("uid=?", uid).Count(&model.UserNoticeTime{})
if count == 0 {
tmp := &model.UserNoticeTime{Uid: int(uid)}
if types == "login" {
tmp.LoginTime = int(time.Now().Unix())
} else {
tmp.SignTime = int(time.Now().Unix())
}
db.Db.Insert(tmp)
} else {
tmp := &model.UserNoticeTime{}
str := ""
if types == "login" {
str = "login_time"
tmp.LoginTime = int(time.Now().Unix())
} else {
str = "sign_time"
tmp.SignTime = int(time.Now().Unix())
}
db.Db.Where("uid=?", uid).Cols(str).Update(tmp)
}
}

+ 0
- 75
consume/advertising_smash_consume.go Wyświetl plik

@@ -1,75 +0,0 @@
package consume

import (
"applet/app/cfg"
"applet/app/db"
"applet/app/utils/logx"
"applet/consume/md"
"code.fnuoos.com/EggPlanet/egg_models.git/src/model"
"code.fnuoos.com/EggPlanet/egg_system_rules.git"
"code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy"
md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/md"
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"

"encoding/json"
"errors"
"fmt"
"github.com/streadway/amqp"
"time"
)

func AdvertisingSmashConsume(queue md.MqQueue) {
fmt.Println(">>>>>>>>>>>>JpushRecordConsume>>>>>>>>>>>>")
ch, err := rabbit.Cfg.Pool.GetChannel()
if err != nil {
logx.Error(err)
return
}
defer ch.Release()
//1、将自己绑定到交换机上
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
//2、取出数据进行消费
ch.Qos(1)
delivery := ch.Consume(queue.Name, false)

egg_system_rules.Init(cfg.RedisAddr)
var res amqp.Delivery
var ok bool
for {
res, ok = <-delivery
if ok == true {
err = handleAdvertisingSmashConsume(res.Body)
err = res.Ack(true)
fmt.Println("err ::: ", err)
} else {
panic(errors.New("error getting message"))
}
}
fmt.Println("get msg done")
}

func handleAdvertisingSmashConsume(msgData []byte) error {
time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒
// 1.解析mq中queue的数据结构体
var msg *md.AdvertisingWatch
err := json.Unmarshal(msgData, &msg)
if err != nil {
return err
}
var data model.AdvertisingCallback
db.Db.Where("id=? ", msg.Id).Get(&data)
if data.Id == 0 {
return errors.New("记录不存在")
}
if data.IsRun == 1 {
return nil
}
req := md2.HomePageWatchOverAdReq{UID: int64(data.Uid)}
err = egg_energy.HomePageWatchOverAd(db.Db, req)
if err != nil {
return err
}
data.IsRun = 1
db.Db.Where("id=?", data.Id).Cols("is_run").Update(&data)
return nil
}

+ 4
- 3
consume/aliyun_sms_record_consume.go Wyświetl plik

@@ -3,6 +3,7 @@ package consume
import (
"applet/app/cfg"
"applet/app/db"
"code.fnuoos.com/EggPlanet/egg_system_rules.git"

utils2 "applet/app/utils"
"applet/app/utils/cache"
@@ -10,7 +11,6 @@ import (
"applet/consume/md"
"code.fnuoos.com/EggPlanet/egg_models.git/src/implement"
"code.fnuoos.com/EggPlanet/egg_models.git/src/model"
"code.fnuoos.com/EggPlanet/egg_system_rules.git"
"code.fnuoos.com/EggPlanet/egg_system_rules.git/aliyun"
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
"encoding/json"
@@ -49,8 +49,9 @@ func AliyunSmsRecordConsume(queue md.MqQueue) {
}))
_ = res.Reject(false)
//TODO::重新推回队列末尾,避免造成队列堵塞
var msg *md.JpushRecordFundData
var msg *md.AliyunSmsRecordFundData
json.Unmarshal(res.Body, &msg)
msg.Num = "1"
ch.Publish(queue.ExchangeName, msg, queue.RoutKey)
} else {
//_ = res.Reject(false)
@@ -89,7 +90,7 @@ func handleAliyunSmsRecordConsume(msgData []byte) error {
return nil
}
err = aliyun.AliyunSendSms(aliyunSmsId, aliyunSmsSecret, msg.Phone, aliyunSmsSignName, aliyunSmsSaleCode, extra)
if err != nil {
if err != nil && msg.Num != "1" {
return err
}
if msg.Id != "" {


+ 2
- 0
consume/init.go Wyświetl plik

@@ -34,6 +34,8 @@ func initConsumes() {
jobs[consumeMd.EggCanalInviteUserNumsFunName] = EggCanalInviteUserNumsConsume // 处理拉新人数
jobs[consumeMd.EggCanalViolateNumsFunName] = EggCanalViolateNumsConsume // 处理违规次数
jobs[consumeMd.IMEggEnergySendRedPackageFunName] = IMEggEnergySendRedPackageConsume // 处理用户发送红包次数
jobs[consumeMd.VideoRewardFunName] = VideoRewardConsume // 短视频奖励
jobs[consumeMd.PlayletRewardFunName] = PlayletRewardConsume // 短剧奖励
}

func Run() {


+ 20
- 0
consume/md/consume_key.go Wyświetl plik

@@ -182,6 +182,24 @@ var RabbitMqQueueKeyList = []*MqQueue{
BindKey: "",
ConsumeFunName: "IMEggEnergySendRedPackageConsume",
},
{
ExchangeName: "egg.video_playlet",
Name: "egg_video_reward",
Type: DirectQueueType,
IsPersistent: false,
RoutKey: "video",
BindKey: "",
ConsumeFunName: "VideoRewardFunName",
},
{
ExchangeName: "egg.video_playlet",
Name: "egg_playlet_reward",
Type: DirectQueueType,
IsPersistent: false,
RoutKey: "playlet",
BindKey: "",
ConsumeFunName: "PlayletRewardFunName",
},
}

const (
@@ -204,4 +222,6 @@ const (
EggCanalInviteUserNumsFunName = "EggCanalInviteUserNumsConsume"
EggCanalViolateNumsFunName = "EggCanalViolateNumsConsume"
IMEggEnergySendRedPackageFunName = "IMEggEnergySendRedPackageConsume"
VideoRewardFunName = "VideoRewardFunName"
PlayletRewardFunName = "PlayletRewardFunName"
)

+ 8
- 0
consume/md/md_video.go Wyświetl plik

@@ -0,0 +1,8 @@
package md

type VideoReward struct {
Uid string `json:"uid"`
}
type PlayletReward struct {
Uid string `json:"uid"`
}

+ 1
- 0
consume/md/mq_jpush.go Wyświetl plik

@@ -15,6 +15,7 @@ type AliyunSmsRecordFundData struct {
Content string `json:"content"`
Code string `json:"code"`
Extra string `json:"extra"`
Num string `json:"num"`
}
type AdvertisingWatch struct {
Id string `json:"id"`


+ 147
- 0
consume/playlet_reward_consume.go Wyświetl plik

@@ -0,0 +1,147 @@
package consume

import (
"applet/app/cfg"
"applet/app/db"
"applet/app/e"
utils2 "applet/app/utils"
"code.fnuoos.com/EggPlanet/egg_system_rules.git"
"code.fnuoos.com/EggPlanet/egg_system_rules.git/enum"
md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/md"
"code.fnuoos.com/EggPlanet/egg_system_rules.git/svc"

"applet/app/utils/cache"
"applet/app/utils/logx"
"applet/consume/md"
"code.fnuoos.com/EggPlanet/egg_models.git/src/implement"
"code.fnuoos.com/EggPlanet/egg_models.git/src/model"
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
"encoding/json"
"errors"
"fmt"
"github.com/streadway/amqp"
"github.com/tidwall/gjson"
"time"
)

func PlayletRewardConsume(queue md.MqQueue) {
fmt.Println(">>>>>>>>>>>>PlayletRewardConsume>>>>>>>>>>>>")
ch, err := rabbit.Cfg.Pool.GetChannel()
if err != nil {
logx.Error(err)
return
}
defer ch.Release()
//1、将自己绑定到交换机上
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
//2、取出数据进行消费
ch.Qos(100)
delivery := ch.Consume(queue.Name, false)

egg_system_rules.Init(cfg.RedisAddr)
var res amqp.Delivery
var ok bool
for {
res, ok = <-delivery
if ok == true {
err = handlePlayletRewardConsume(ch, res.Body)
if err != nil {
fmt.Println("PlayletRewardConsume_ERR:::::", err.Error())
utils2.FilePutContents("PlayletRewardConsume_ERR", utils2.SerializeStr(map[string]interface{}{
"body": res.Body,
"err": err.Error(),
}))
_ = res.Reject(false)
//TODO::重新推回队列末尾,避免造成队列堵塞
var msg *md.PlayletReward
json.Unmarshal(res.Body, &msg)
ch.Publish(queue.ExchangeName, msg, queue.RoutKey)
} else {
//_ = res.Reject(false)
err = res.Ack(true)
}

fmt.Println("err ::: ", err)
} else {
panic(errors.New("error getting message"))
}
}
fmt.Println("get msg done")
}

func handlePlayletRewardConsume(ch *rabbit.Channel, msgData []byte) error {
time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒
// 1.解析mq中queue的数据结构体
var msg *md.PlayletReward
err := json.Unmarshal(msgData, &msg)
if err != nil {
return err
}
//1、分布式锁阻拦
requestIdPrefix := fmt.Sprintf("playlet.reward.lock:%s", msg.Uid)
cb, err := svc.HandleDistributedLockForComm(msg.Uid, "playlet.reward.lock.update:%s", requestIdPrefix)
if err != nil {
return err
}
if cb != nil {
defer cb() // 释放锁
}
eg := db.Db
redisConn := cache.GetPool().Get()
sysCfgDb := implement.NewSysCfgDb(eg, redisConn)
playletBase := sysCfgDb.SysCfgGetWithDb("playlet_base")
sess := eg.NewSession()
defer sess.Close()
sess.Begin()
NewPlayletTotalDb := implement.NewPlayletTotalDb(db.Db)
total, _ := NewPlayletTotalDb.GetPlayletTotalSess(sess, msg.Uid, time.Now().Format("20060102"))
if total == nil {
total = &model.PlayletTotal{
Uid: utils2.StrToInt(msg.Uid),
Date: utils2.StrToInt(time.Now().Format("20060102")),
Time: time.Now(),
}
insert, err := sess.Insert(total)
if insert == 0 || err != nil {
sess.Rollback()
return e.NewErr(400, "获取奖励失败")
}
}
Leave := utils2.StrToInt(gjson.Get(playletBase, "total").String()) - total.Total
if Leave-1 < 0 {
sess.Rollback()
return nil
}
total.Total++

update, err2 := sess.Where("id=?", total.Id).Cols("total").Update(total)
if update == 0 || err2 != nil {
sess.Rollback()
return e.NewErr(400, "获取奖励失败")
}
sess.Commit()
numKey := "playlet.num:" + time.Now().Format("20060102") + "." + msg.Uid
todayRange := utils2.GetTimeRange("today")
cache.SetEx(numKey, utils2.IntToStr(total.Total), int(todayRange["end"]-time.Now().Unix()))
NewEggEnergyBasicSettingDb := implement.NewEggEnergyBasicSettingDb(db.Db)
eggData, _ := NewEggEnergyBasicSettingDb.EggEnergyBasicSettingGetOne()
err = ch.PublishV2(md2.EggEnergyExchange, md2.EggEnergyStructForDealUserVirtualCoinData{
Kind: "add",
Title: enum.EggEnergyPlayletRewardPersonalActiveCoin.String(),
TransferType: int(enum.EggEnergyPlayletRewardPersonalActiveCoin),
CoinId: eggData.PersonEggPointsCoinId,
Uid: utils2.StrToInt64(msg.Uid),
Amount: utils2.StrToFloat64(gjson.Get(playletBase, "reward").String()),
}, md2.EggEnergyRoutKeyForDealUserVirtualCoinData)
if err != nil {
ch.PublishV2(md2.EggEnergyExchange, md2.EggEnergyStructForDealUserVirtualCoinData{
Kind: "add",
Title: enum.EggEnergyPlayletRewardPersonalActiveCoin.String(),
TransferType: int(enum.EggEnergyPlayletRewardPersonalActiveCoin),
CoinId: eggData.PersonEggPointsCoinId,
Uid: utils2.StrToInt64(msg.Uid),
Amount: utils2.StrToFloat64(gjson.Get(playletBase, "reward").String()),
}, md2.EggEnergyRoutKeyForDealUserVirtualCoinData)
}
return nil
}

+ 144
- 0
consume/video_reward_consume.go Wyświetl plik

@@ -0,0 +1,144 @@
package consume

import (
"applet/app/cfg"
"applet/app/db"
"applet/app/e"
utils2 "applet/app/utils"
"applet/app/utils/cache"
"applet/app/utils/logx"
"applet/consume/md"
"code.fnuoos.com/EggPlanet/egg_models.git/src/implement"
"code.fnuoos.com/EggPlanet/egg_models.git/src/model"
"code.fnuoos.com/EggPlanet/egg_system_rules.git"
"code.fnuoos.com/EggPlanet/egg_system_rules.git/enum"
md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/md"
"code.fnuoos.com/EggPlanet/egg_system_rules.git/svc"
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
"encoding/json"
"errors"
"fmt"
"github.com/streadway/amqp"
"github.com/tidwall/gjson"
"time"
)

func VideoRewardConsume(queue md.MqQueue) {
fmt.Println(">>>>>>>>>>>>VideoRewardConsume>>>>>>>>>>>>")
ch, err := rabbit.Cfg.Pool.GetChannel()
if err != nil {
logx.Error(err)
return
}
defer ch.Release()
//1、将自己绑定到交换机上
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
//2、取出数据进行消费
ch.Qos(100)
delivery := ch.Consume(queue.Name, false)

egg_system_rules.Init(cfg.RedisAddr)
var res amqp.Delivery
var ok bool
for {
res, ok = <-delivery
if ok == true {
err = handleVideoRewardConsume(ch, res.Body)
if err != nil {
fmt.Println("VideoRewardConsume_ERR:::::", err.Error())
utils2.FilePutContents("VideoRewardConsume_ERR", utils2.SerializeStr(map[string]interface{}{
"body": res.Body,
"err": err.Error(),
}))
_ = res.Reject(false)
//TODO::重新推回队列末尾,避免造成队列堵塞
var msg *md.VideoReward
json.Unmarshal(res.Body, &msg)
ch.Publish(queue.ExchangeName, msg, queue.RoutKey)
} else {
//_ = res.Reject(false)
err = res.Ack(true)
}

fmt.Println("err ::: ", err)
} else {
panic(errors.New("error getting message"))
}
}
fmt.Println("get msg done")
}

func handleVideoRewardConsume(ch *rabbit.Channel, msgData []byte) error {
time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒
// 1.解析mq中queue的数据结构体
var msg *md.VideoReward
err := json.Unmarshal(msgData, &msg)
if err != nil {
return err
}
//1、分布式锁阻拦
requestIdPrefix := fmt.Sprintf("video.reward.lock:%s", msg.Uid)
cb, err := svc.HandleDistributedLockForComm(msg.Uid, "video.reward.lock.update:%s", requestIdPrefix)
if err != nil {
return err
}
if cb != nil {
defer cb() // 释放锁
}
eg := db.Db
redisConn := cache.GetPool().Get()
sysCfgDb := implement.NewSysCfgDb(eg, redisConn)
videoBase := sysCfgDb.SysCfgGetWithDb("video_base")
sess := eg.NewSession()
defer sess.Close()
sess.Begin()
NewVideoTotalDb := implement.NewVideoTotalDb(db.Db)
total, _ := NewVideoTotalDb.GetVideoTotalSess(sess, msg.Uid, time.Now().Format("20060102"))
if total == nil {
total = &model.VideoTotal{
Uid: utils2.StrToInt(msg.Uid),
Date: utils2.StrToInt(time.Now().Format("20060102")),
Time: time.Now(),
}
insert, err := sess.Insert(total)
if insert == 0 || err != nil {
sess.Rollback()
return e.NewErr(400, "获取奖励失败")
}
}
Leave := utils2.StrToInt(gjson.Get(videoBase, "total").String()) - total.Total
if Leave-1 < 0 {
return nil
}
total.Total++
update, err2 := sess.Where("id=?", total.Id).Cols("total").Update(total)
if update == 0 || err2 != nil {
sess.Rollback()
return e.NewErr(400, "获取奖励失败")
}
sess.Commit()
numKey := "video.num:" + time.Now().Format("20060102") + "." + msg.Uid
todayRange := utils2.GetTimeRange("today")
cache.SetEx(numKey, utils2.IntToStr(total.Total), int(todayRange["end"]-time.Now().Unix()))
NewEggEnergyBasicSettingDb := implement.NewEggEnergyBasicSettingDb(db.Db)
eggData, _ := NewEggEnergyBasicSettingDb.EggEnergyBasicSettingGetOne()
err = ch.PublishV2(md2.EggEnergyExchange, md2.EggEnergyStructForDealUserVirtualCoinData{
Kind: "add",
Title: enum.EggEnergyVideoRewardPersonalActiveCoin.String(),
TransferType: int(enum.EggEnergyVideoRewardPersonalActiveCoin),
CoinId: eggData.PersonEggPointsCoinId,
Uid: utils2.StrToInt64(msg.Uid),
Amount: utils2.StrToFloat64(gjson.Get(videoBase, "reward").String()),
}, md2.EggEnergyRoutKeyForDealUserVirtualCoinData)
if err != nil {
ch.PublishV2(md2.EggEnergyExchange, md2.EggEnergyStructForDealUserVirtualCoinData{
Kind: "add",
Title: enum.EggEnergyVideoRewardPersonalActiveCoin.String(),
TransferType: int(enum.EggEnergyVideoRewardPersonalActiveCoin),
CoinId: eggData.PersonEggPointsCoinId,
Uid: utils2.StrToInt64(msg.Uid),
Amount: utils2.StrToFloat64(gjson.Get(videoBase, "reward").String()),
}, md2.EggEnergyRoutKeyForDealUserVirtualCoinData)
}
return nil
}

+ 4
- 1
go.mod Wyświetl plik

@@ -8,7 +8,7 @@ go 1.19

require (
code.fnuoos.com/EggPlanet/egg_models.git v0.2.1-0.20241206115326-8cbc93c7c64d
code.fnuoos.com/EggPlanet/egg_system_rules.git v0.0.4-0.20241207095445-64c8aa0b486e
code.fnuoos.com/EggPlanet/egg_system_rules.git v0.0.4-0.20241209064215-2bf33ead99e1
code.fnuoos.com/go_rely_warehouse/zyos_go_es.git v1.0.1-0.20241118083738-0f22da9ba0be
code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git v0.0.5
github.com/boombuler/barcode v1.0.1
@@ -26,6 +26,7 @@ require (
github.com/sony/sonyflake v1.0.0
github.com/streadway/amqp v1.0.0
github.com/syyongx/php2go v0.9.8
github.com/tidwall/gjson v1.18.0
github.com/wechatpay-apiv3/wechatpay-go v0.2.20
go.uber.org/zap v1.16.0
google.golang.org/grpc v1.32.0
@@ -51,6 +52,8 @@ require (
github.com/go-pay/crypto v0.0.1 // indirect
github.com/go-pay/xtime v0.0.2 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tjfoc/gmsm v1.4.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)


Ładowanie…
Anuluj
Zapisz