Ver código fonte

极光推送

短信
砸蛋
签到
master
huangjiajun 2 semanas atrás
pai
commit
351ca16c3f
10 arquivos alterados com 458 adições e 4 exclusões
  1. +101
    -0
      consume/advertising_sign_consume.go
  2. +75
    -0
      consume/advertising_smash_consume.go
  3. +96
    -0
      consume/aliyun_sms_record_consume.go
  4. +2
    -1
      consume/egg_energy_fund_data_consume.go
  5. +1
    -1
      consume/egg_energy_platform_revenue_data.go
  6. +4
    -0
      consume/init.go
  7. +97
    -0
      consume/jpush_record_consume.go
  8. +40
    -0
      consume/md/consume_key.go
  9. +21
    -0
      consume/md/mq_jpush.go
  10. +21
    -2
      go.mod

+ 101
- 0
consume/advertising_sign_consume.go Ver arquivo

@@ -0,0 +1,101 @@
package consume

import (
"applet/app/cfg"
db "code.fnuoos.com/EggPlanet/egg_models.git/src"

"applet/app/utils/logx"
"applet/consume/md"
"code.fnuoos.com/EggPlanet/egg_models.git/src/model"
"code.fnuoos.com/EggPlanet/egg_system_rules.git"
"code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy"
md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/md"
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
"encoding/json"
"errors"
"fmt"
"github.com/streadway/amqp"
"time"
)

func AdvertisingSignConsume(queue md.MqQueue) {
fmt.Println(">>>>>>>>>>>>JpushRecordConsume>>>>>>>>>>>>")
ch, err := rabbit.Cfg.Pool.GetChannel()
if err != nil {
logx.Error(err)
return
}
defer ch.Release()
//1、将自己绑定到交换机上
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
//2、取出数据进行消费
ch.Qos(1)
delivery := ch.Consume(queue.Name, false)

egg_system_rules.Init(cfg.RedisAddr)
var res amqp.Delivery
var ok bool
for {
res, ok = <-delivery
if ok == true {
err = handleAdvertisingSignConsume(res.Body)
err = res.Ack(true)
fmt.Println("err ::: ", err)
} else {
panic(errors.New("error getting message"))
}
}
fmt.Println("get msg done")
}

func handleAdvertisingSignConsume(msgData []byte) error {
time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒
// 1.解析mq中queue的数据结构体
var msg *md.AdvertisingWatch
err := json.Unmarshal(msgData, &msg)
if err != nil {
return err
}
var data model.AdvertisingCallback
db.Db.Where("id=? ", msg.Id).Get(&data)
if data.Id == 0 {
return errors.New("记录不存在")
}
if data.IsRun == 1 {
return nil
}
req := md2.HomePageStartSignInReq{
UID: int64(data.Uid),
}
err = egg_energy.HomePageStartSignIn(db.Db, req)
if err != nil {
return err
}
UpdateUserTime(int64(data.Uid), "sign")
data.IsRun = 1
db.Db.Where("id=?", data.Id).Cols("is_run").Update(&data)
return nil
}
func UpdateUserTime(uid int64, types string) {
count, _ := db.Db.Where("uid=?", uid).Count(&model.UserNoticeTime{})
if count == 0 {
tmp := &model.UserNoticeTime{Uid: int(uid)}
if types == "login" {
tmp.LoginTime = int(time.Now().Unix())
} else {
tmp.SignTime = int(time.Now().Unix())
}
db.Db.Insert(tmp)
} else {
tmp := &model.UserNoticeTime{}
str := ""
if types == "login" {
str = "login_time"
tmp.LoginTime = int(time.Now().Unix())
} else {
str = "sign_time"
tmp.SignTime = int(time.Now().Unix())
}
db.Db.Where("uid=?", uid).Cols(str).Update(tmp)
}
}

+ 75
- 0
consume/advertising_smash_consume.go Ver arquivo

@@ -0,0 +1,75 @@
package consume

import (
"applet/app/cfg"
"applet/app/utils/logx"
"applet/consume/md"
db "code.fnuoos.com/EggPlanet/egg_models.git/src"
"code.fnuoos.com/EggPlanet/egg_models.git/src/model"
"code.fnuoos.com/EggPlanet/egg_system_rules.git"
"code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy"
md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/md"
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"

"encoding/json"
"errors"
"fmt"
"github.com/streadway/amqp"
"time"
)

func AdvertisingSmashConsume(queue md.MqQueue) {
fmt.Println(">>>>>>>>>>>>JpushRecordConsume>>>>>>>>>>>>")
ch, err := rabbit.Cfg.Pool.GetChannel()
if err != nil {
logx.Error(err)
return
}
defer ch.Release()
//1、将自己绑定到交换机上
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
//2、取出数据进行消费
ch.Qos(1)
delivery := ch.Consume(queue.Name, false)

egg_system_rules.Init(cfg.RedisAddr)
var res amqp.Delivery
var ok bool
for {
res, ok = <-delivery
if ok == true {
err = handleAdvertisingSmashConsume(res.Body)
err = res.Ack(true)
fmt.Println("err ::: ", err)
} else {
panic(errors.New("error getting message"))
}
}
fmt.Println("get msg done")
}

func handleAdvertisingSmashConsume(msgData []byte) error {
time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒
// 1.解析mq中queue的数据结构体
var msg *md.AdvertisingWatch
err := json.Unmarshal(msgData, &msg)
if err != nil {
return err
}
var data model.AdvertisingCallback
db.Db.Where("id=? ", msg.Id).Get(&data)
if data.Id == 0 {
return errors.New("记录不存在")
}
if data.IsRun == 1 {
return nil
}
req := md2.HomePageWatchOverAdReq{UID: int64(data.Uid)}
err = egg_energy.HomePageWatchOverAd(db.Db, req)
if err != nil {
return err
}
data.IsRun = 1
db.Db.Where("id=?", data.Id).Cols("is_run").Update(&data)
return nil
}

+ 96
- 0
consume/aliyun_sms_record_consume.go Ver arquivo

@@ -0,0 +1,96 @@
package consume

import (
"applet/app/cfg"
db "code.fnuoos.com/EggPlanet/egg_models.git/src"

utils2 "applet/app/utils"
"applet/app/utils/cache"
"applet/app/utils/logx"
"applet/consume/md"
"code.fnuoos.com/EggPlanet/egg_models.git/src/implement"
"code.fnuoos.com/EggPlanet/egg_models.git/src/model"
"code.fnuoos.com/EggPlanet/egg_system_rules.git"
"code.fnuoos.com/EggPlanet/egg_system_rules.git/aliyun"
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
"encoding/json"
"errors"
"fmt"
"github.com/streadway/amqp"
"time"
)

func AliyunSmsRecordConsume(queue md.MqQueue) {
fmt.Println(">>>>>>>>>>>>AliyunSmsRecordConsume>>>>>>>>>>>>")
ch, err := rabbit.Cfg.Pool.GetChannel()
if err != nil {
logx.Error(err)
return
}
defer ch.Release()
//1、将自己绑定到交换机上
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
//2、取出数据进行消费
ch.Qos(1)
delivery := ch.Consume(queue.Name, false)

egg_system_rules.Init(cfg.RedisAddr)
var res amqp.Delivery
var ok bool
for {
res, ok = <-delivery
if ok == true {
err = handleAliyunSmsRecordConsume(res.Body)
if err != nil {
fmt.Println("AliyunSmsRecordConsume_ERR:::::", err.Error())
utils2.FilePutContents("AliyunSmsRecordConsume_ERR", utils2.SerializeStr(map[string]interface{}{
"body": res.Body,
"err": err.Error(),
}))
_ = res.Reject(false)
//TODO::重新推回队列末尾,避免造成队列堵塞
var msg *md.JpushRecordFundData
json.Unmarshal(res.Body, &msg)
ch.Publish(queue.ExchangeName, msg, queue.RoutKey)
} else {
//_ = res.Reject(false)
err = res.Ack(true)
}

fmt.Println("err ::: ", err)
} else {
panic(errors.New("error getting message"))
}
}
fmt.Println("get msg done")
}

func handleAliyunSmsRecordConsume(msgData []byte) error {
time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒
// 1.解析mq中queue的数据结构体
var msg *md.AliyunSmsRecordFundData
err := json.Unmarshal(msgData, &msg)
if err != nil {
return err
}
engine := db.Db
redisConn := cache.GetPool().Get()
sysCfgDb := implement.NewSysCfgDb(engine, redisConn)
aliyunSmsId := sysCfgDb.SysCfgGetWithDb("aliyun_sms_id")
aliyunSmsSecret := sysCfgDb.SysCfgGetWithDb("aliyun_sms_secret")
aliyunSmsSignName := sysCfgDb.SysCfgGetWithDb("aliyun_sms_sign_name")
aliyunSmsSaleCode := sysCfgDb.SysCfgGetWithDb("aliyun_sms_sale_code")
extra := "{\"content\":\"" + msg.Content + "\"}"
if msg.Code != "" {
aliyunSmsSaleCode = msg.Code
extra = msg.Extra
}
err = aliyun.AliyunSendSms(aliyunSmsId, aliyunSmsSecret, msg.Phone, aliyunSmsSignName, aliyunSmsSaleCode, extra)
if err != nil {
return err
}
if msg.Id != "" {
engine.Where("id=?", msg.Id).Cols("state").Update(&model.AliyunSmsRecord{State: 1})
}
return nil
}

+ 2
- 1
consume/egg_energy_fund_data_consume.go Ver arquivo

@@ -2,7 +2,8 @@ package consume

import (
"applet/app/cfg"
"applet/app/db"
db "code.fnuoos.com/EggPlanet/egg_models.git/src"

utils2 "applet/app/utils"
"applet/app/utils/logx"
"applet/consume/md"


+ 1
- 1
consume/egg_energy_platform_revenue_data.go Ver arquivo

@@ -2,10 +2,10 @@ package consume

import (
"applet/app/cfg"
"applet/app/db"
utils2 "applet/app/utils"
"applet/app/utils/logx"
"applet/consume/md"
db "code.fnuoos.com/EggPlanet/egg_models.git/src"
"code.fnuoos.com/EggPlanet/egg_models.git/src/implement"
"code.fnuoos.com/EggPlanet/egg_models.git/src/model"
"code.fnuoos.com/EggPlanet/egg_system_rules.git"


+ 4
- 0
consume/init.go Ver arquivo

@@ -20,6 +20,10 @@ func initConsumes() {
jobs[consumeMd.EggEnergyStartLevelDividendFunName] = EggEnergyStartLevelDividendConsume
jobs[consumeMd.EggEnergyDealPlatformRevenueDataFunName] = EggEnergyDealPlatformRevenueDataConsume
jobs[consumeMd.EggEnergyDealFundDataFunName] = EggEnergyDealFundDataConsume
jobs[consumeMd.AliyunSmsRecordFunName] = AliyunSmsRecordConsume //阿里云短信
jobs[consumeMd.JpushRecordFunName] = JpushRecordConsume //极光推送
jobs[consumeMd.AdvertisingSmashConsume] = AdvertisingSmashConsume //砸蛋
jobs[consumeMd.AdvertisingSignConsume] = AdvertisingSignConsume //签到
jobs[consumeMd.EggEnergyDealUserVirtualCoinDataFunName] = EggEnergyDealUserVirtualCoinDataConsume
jobs[consumeMd.IMEggEnergyBatchSendMessageDataFunName] = IMEggEnergyBatchSendMessageDataConsume
jobs[consumeMd.IMEggEnergyDelFriendCircleDataFunName] = IMEggEnergyDelFriendCircleDataConsume


+ 97
- 0
consume/jpush_record_consume.go Ver arquivo

@@ -0,0 +1,97 @@
package consume

import (
"applet/app/cfg"
db "code.fnuoos.com/EggPlanet/egg_models.git/src"

utils2 "applet/app/utils"
"applet/app/utils/cache"
"applet/app/utils/logx"
"applet/consume/md"
"code.fnuoos.com/EggPlanet/egg_models.git/src/implement"
"code.fnuoos.com/EggPlanet/egg_models.git/src/model"
"code.fnuoos.com/EggPlanet/egg_system_rules.git"
"code.fnuoos.com/EggPlanet/egg_system_rules.git/jPush"
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
"encoding/json"
"errors"
"fmt"
"github.com/streadway/amqp"
"strings"
"time"
)

func JpushRecordConsume(queue md.MqQueue) {
fmt.Println(">>>>>>>>>>>>JpushRecordConsume>>>>>>>>>>>>")
ch, err := rabbit.Cfg.Pool.GetChannel()
if err != nil {
logx.Error(err)
return
}
defer ch.Release()
//1、将自己绑定到交换机上
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
//2、取出数据进行消费
ch.Qos(1)
delivery := ch.Consume(queue.Name, false)

egg_system_rules.Init(cfg.RedisAddr)
var res amqp.Delivery
var ok bool
for {
res, ok = <-delivery
if ok == true {
err = handleJpushRecordConsume(res.Body)
if err != nil {
fmt.Println("JpushRecordConsume_ERR:::::", err.Error())
utils2.FilePutContents("JpushRecordConsume_ERR", utils2.SerializeStr(map[string]interface{}{
"body": res.Body,
"err": err.Error(),
}))
_ = res.Reject(false)
//TODO::重新推回队列末尾,避免造成队列堵塞
var msg *md.JpushRecordFundData
json.Unmarshal(res.Body, &msg)
ch.Publish(queue.ExchangeName, msg, queue.RoutKey)
} else {
//_ = res.Reject(false)
err = res.Ack(true)
}

fmt.Println("err ::: ", err)
} else {
panic(errors.New("error getting message"))
}
}
fmt.Println("get msg done")
}

func handleJpushRecordConsume(msgData []byte) error {
time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒
// 1.解析mq中queue的数据结构体
var msg *md.JpushRecordFundData
err := json.Unmarshal(msgData, &msg)
if err != nil {
return err
}
engine := db.Db
redisConn := cache.GetPool().Get()
sysCfgDb := implement.NewSysCfgDb(engine, redisConn)
jpushKey := sysCfgDb.SysCfgGetWithDb("jpush_key")
jpushSecret := sysCfgDb.SysCfgGetWithDb("jpush_secret")
if msg.Target == "0" { //广播全部
_, err := jPush.PushAllUser(jpushKey, jpushSecret, msg.Title, msg.Content, msg.Platform, nil)
if err != nil {
return err
}
} else {
_, err = jPush.PushMoreUser(jpushKey, jpushSecret, msg.Title, msg.Content, msg.Platform, strings.Split(msg.UserId, ","), nil)
if err != nil {
return err
}
}
if msg.Id != "" {
engine.Where("id=?", msg.Id).Cols("state").Update(&model.JpushRecord{State: 1})
}
return nil
}

+ 40
- 0
consume/md/consume_key.go Ver arquivo

@@ -101,9 +101,49 @@ var RabbitMqQueueKeyList = []*MqQueue{
BindKey: "",
ConsumeFunName: "EggEnergyAutoExchangeGreenEnergyConsume",
},
{
ExchangeName: "egg.jpush",
Name: "egg_jpush_record_queue",
Type: DirectQueueType,
IsPersistent: false,
RoutKey: "jpush_record",
BindKey: "",
ConsumeFunName: "JpushRecordConsume",
},
{
ExchangeName: "egg.aliyun_sms",
Name: "egg_aliyun_sms_record_queue",
Type: DirectQueueType,
IsPersistent: false,
RoutKey: "aliyun_sms_record",
BindKey: "",
ConsumeFunName: "AliyunSmsRecordConsume",
},
{
ExchangeName: "egg.advertising",
Name: "egg_advertising_smash",
Type: DirectQueueType,
IsPersistent: false,
RoutKey: "advertising_smash",
BindKey: "",
ConsumeFunName: "AdvertisingSmashConsume",
},
{
ExchangeName: "egg.advertising",
Name: "egg_advertising_sign",
Type: DirectQueueType,
IsPersistent: false,
RoutKey: "advertising_sign",
BindKey: "",
ConsumeFunName: "AdvertisingSignConsume",
},
}

const (
AdvertisingSignConsume = "AdvertisingSignConsume"
AdvertisingSmashConsume = "AdvertisingSmashConsume"
JpushRecordFunName = "JpushRecordConsume"
AliyunSmsRecordFunName = "AliyunSmsRecordConsume"
EggEnergyStartLevelDividendFunName = "EggEnergyStartLevelDividendConsume"
EggEnergyDealFundDataFunName = "EggEnergyDealFundDataConsume"
EggEnergyDealPlatformRevenueDataFunName = "EggEnergyDealPlatformRevenueDataConsume"


+ 21
- 0
consume/md/mq_jpush.go Ver arquivo

@@ -0,0 +1,21 @@
package md

type JpushRecordFundData struct {
Id string `json:"id"`
UserId string `json:"user_id"`
Title string `json:"title"`
Content string `json:"content"`
Platform string `json:"platform"`
Target string `json:"target"`
}
type AliyunSmsRecordFundData struct {
Id string `json:"id"`
Phone string `json:"phone"`
Title string `json:"title"`
Content string `json:"content"`
Code string `json:"code"`
Extra string `json:"extra"`
}
type AdvertisingWatch struct {
Id string `json:"id"`
}

+ 21
- 2
go.mod Ver arquivo

@@ -28,12 +28,32 @@ require (
github.com/syyongx/php2go v0.9.8
github.com/wechatpay-apiv3/wechatpay-go v0.2.20
go.uber.org/zap v1.16.0
google.golang.org/grpc v1.31.0
google.golang.org/protobuf v1.33.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gopkg.in/yaml.v2 v2.4.0
xorm.io/xorm v1.3.1
)

require (
github.com/alibabacloud-go/alibabacloud-gateway-spi v0.0.5 // indirect
github.com/alibabacloud-go/darabonba-openapi/v2 v2.0.10 // indirect
github.com/alibabacloud-go/debug v1.0.1 // indirect
github.com/alibabacloud-go/dypnsapi-20170525/v2 v2.2.3 // indirect
github.com/alibabacloud-go/endpoint-util v1.1.0 // indirect
github.com/alibabacloud-go/openapi-util v0.1.0 // indirect
github.com/alibabacloud-go/tea v1.2.2 // indirect
github.com/alibabacloud-go/tea-utils v1.4.3 // indirect
github.com/alibabacloud-go/tea-utils/v2 v2.0.6 // indirect
github.com/alibabacloud-go/tea-xml v1.1.3 // indirect
github.com/aliyun/credentials-go v1.3.10 // indirect
github.com/clbanning/mxj/v2 v2.5.5 // indirect
github.com/go-pay/crypto v0.0.1 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/tjfoc/gmsm v1.4.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)

require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/bytedance/sonic v1.11.3 // indirect
@@ -48,7 +68,7 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/gookit/color v1.3.6 // indirect
github.com/jinzhu/copier v0.4.0 // indirect
github.com/jinzhu/copier v0.4.0
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
@@ -74,7 +94,6 @@ require (
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect


Carregando…
Cancelar
Salvar