diff --git a/consume/advertising_sign_consume.go b/consume/advertising_sign_consume.go new file mode 100644 index 0000000..62f3b12 --- /dev/null +++ b/consume/advertising_sign_consume.go @@ -0,0 +1,101 @@ +package consume + +import ( + "applet/app/cfg" + db "code.fnuoos.com/EggPlanet/egg_models.git/src" + + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/EggPlanet/egg_models.git/src/model" + "code.fnuoos.com/EggPlanet/egg_system_rules.git" + "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy" + md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/md" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "encoding/json" + "errors" + "fmt" + "github.com/streadway/amqp" + "time" +) + +func AdvertisingSignConsume(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>JpushRecordConsume>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1) + delivery := ch.Consume(queue.Name, false) + + egg_system_rules.Init(cfg.RedisAddr) + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + err = handleAdvertisingSignConsume(res.Body) + err = res.Ack(true) + fmt.Println("err ::: ", err) + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleAdvertisingSignConsume(msgData []byte) error { + time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒 + // 1.解析mq中queue的数据结构体 + var msg *md.AdvertisingWatch + err := json.Unmarshal(msgData, &msg) + if err != nil { + return err + } + var data model.AdvertisingCallback + db.Db.Where("id=? ", msg.Id).Get(&data) + if data.Id == 0 { + return errors.New("记录不存在") + } + if data.IsRun == 1 { + return nil + } + req := md2.HomePageStartSignInReq{ + UID: int64(data.Uid), + } + err = egg_energy.HomePageStartSignIn(db.Db, req) + if err != nil { + return err + } + UpdateUserTime(int64(data.Uid), "sign") + data.IsRun = 1 + db.Db.Where("id=?", data.Id).Cols("is_run").Update(&data) + return nil +} +func UpdateUserTime(uid int64, types string) { + count, _ := db.Db.Where("uid=?", uid).Count(&model.UserNoticeTime{}) + if count == 0 { + tmp := &model.UserNoticeTime{Uid: int(uid)} + if types == "login" { + tmp.LoginTime = int(time.Now().Unix()) + } else { + tmp.SignTime = int(time.Now().Unix()) + } + db.Db.Insert(tmp) + } else { + tmp := &model.UserNoticeTime{} + str := "" + if types == "login" { + str = "login_time" + tmp.LoginTime = int(time.Now().Unix()) + } else { + str = "sign_time" + tmp.SignTime = int(time.Now().Unix()) + } + db.Db.Where("uid=?", uid).Cols(str).Update(tmp) + } +} diff --git a/consume/advertising_smash_consume.go b/consume/advertising_smash_consume.go new file mode 100644 index 0000000..84dab30 --- /dev/null +++ b/consume/advertising_smash_consume.go @@ -0,0 +1,75 @@ +package consume + +import ( + "applet/app/cfg" + "applet/app/utils/logx" + "applet/consume/md" + db "code.fnuoos.com/EggPlanet/egg_models.git/src" + "code.fnuoos.com/EggPlanet/egg_models.git/src/model" + "code.fnuoos.com/EggPlanet/egg_system_rules.git" + "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy" + md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/md" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + + "encoding/json" + "errors" + "fmt" + "github.com/streadway/amqp" + "time" +) + +func AdvertisingSmashConsume(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>JpushRecordConsume>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1) + delivery := ch.Consume(queue.Name, false) + + egg_system_rules.Init(cfg.RedisAddr) + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + err = handleAdvertisingSmashConsume(res.Body) + err = res.Ack(true) + fmt.Println("err ::: ", err) + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleAdvertisingSmashConsume(msgData []byte) error { + time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒 + // 1.解析mq中queue的数据结构体 + var msg *md.AdvertisingWatch + err := json.Unmarshal(msgData, &msg) + if err != nil { + return err + } + var data model.AdvertisingCallback + db.Db.Where("id=? ", msg.Id).Get(&data) + if data.Id == 0 { + return errors.New("记录不存在") + } + if data.IsRun == 1 { + return nil + } + req := md2.HomePageWatchOverAdReq{UID: int64(data.Uid)} + err = egg_energy.HomePageWatchOverAd(db.Db, req) + if err != nil { + return err + } + data.IsRun = 1 + db.Db.Where("id=?", data.Id).Cols("is_run").Update(&data) + return nil +} diff --git a/consume/aliyun_sms_record_consume.go b/consume/aliyun_sms_record_consume.go new file mode 100644 index 0000000..ea373af --- /dev/null +++ b/consume/aliyun_sms_record_consume.go @@ -0,0 +1,96 @@ +package consume + +import ( + "applet/app/cfg" + db "code.fnuoos.com/EggPlanet/egg_models.git/src" + + utils2 "applet/app/utils" + "applet/app/utils/cache" + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/EggPlanet/egg_models.git/src/implement" + "code.fnuoos.com/EggPlanet/egg_models.git/src/model" + "code.fnuoos.com/EggPlanet/egg_system_rules.git" + "code.fnuoos.com/EggPlanet/egg_system_rules.git/aliyun" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "encoding/json" + "errors" + "fmt" + "github.com/streadway/amqp" + "time" +) + +func AliyunSmsRecordConsume(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>AliyunSmsRecordConsume>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1) + delivery := ch.Consume(queue.Name, false) + + egg_system_rules.Init(cfg.RedisAddr) + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + err = handleAliyunSmsRecordConsume(res.Body) + if err != nil { + fmt.Println("AliyunSmsRecordConsume_ERR:::::", err.Error()) + utils2.FilePutContents("AliyunSmsRecordConsume_ERR", utils2.SerializeStr(map[string]interface{}{ + "body": res.Body, + "err": err.Error(), + })) + _ = res.Reject(false) + //TODO::重新推回队列末尾,避免造成队列堵塞 + var msg *md.JpushRecordFundData + json.Unmarshal(res.Body, &msg) + ch.Publish(queue.ExchangeName, msg, queue.RoutKey) + } else { + //_ = res.Reject(false) + err = res.Ack(true) + } + + fmt.Println("err ::: ", err) + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleAliyunSmsRecordConsume(msgData []byte) error { + time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒 + // 1.解析mq中queue的数据结构体 + var msg *md.AliyunSmsRecordFundData + err := json.Unmarshal(msgData, &msg) + if err != nil { + return err + } + engine := db.Db + redisConn := cache.GetPool().Get() + sysCfgDb := implement.NewSysCfgDb(engine, redisConn) + aliyunSmsId := sysCfgDb.SysCfgGetWithDb("aliyun_sms_id") + aliyunSmsSecret := sysCfgDb.SysCfgGetWithDb("aliyun_sms_secret") + aliyunSmsSignName := sysCfgDb.SysCfgGetWithDb("aliyun_sms_sign_name") + aliyunSmsSaleCode := sysCfgDb.SysCfgGetWithDb("aliyun_sms_sale_code") + extra := "{\"content\":\"" + msg.Content + "\"}" + if msg.Code != "" { + aliyunSmsSaleCode = msg.Code + extra = msg.Extra + } + err = aliyun.AliyunSendSms(aliyunSmsId, aliyunSmsSecret, msg.Phone, aliyunSmsSignName, aliyunSmsSaleCode, extra) + if err != nil { + return err + } + if msg.Id != "" { + engine.Where("id=?", msg.Id).Cols("state").Update(&model.AliyunSmsRecord{State: 1}) + } + return nil +} diff --git a/consume/egg_energy_fund_data_consume.go b/consume/egg_energy_fund_data_consume.go index 0898d01..45c81b7 100644 --- a/consume/egg_energy_fund_data_consume.go +++ b/consume/egg_energy_fund_data_consume.go @@ -2,7 +2,8 @@ package consume import ( "applet/app/cfg" - "applet/app/db" + db "code.fnuoos.com/EggPlanet/egg_models.git/src" + utils2 "applet/app/utils" "applet/app/utils/logx" "applet/consume/md" diff --git a/consume/egg_energy_platform_revenue_data.go b/consume/egg_energy_platform_revenue_data.go index f5cd933..15889f9 100644 --- a/consume/egg_energy_platform_revenue_data.go +++ b/consume/egg_energy_platform_revenue_data.go @@ -2,10 +2,10 @@ package consume import ( "applet/app/cfg" - "applet/app/db" utils2 "applet/app/utils" "applet/app/utils/logx" "applet/consume/md" + db "code.fnuoos.com/EggPlanet/egg_models.git/src" "code.fnuoos.com/EggPlanet/egg_models.git/src/implement" "code.fnuoos.com/EggPlanet/egg_models.git/src/model" "code.fnuoos.com/EggPlanet/egg_system_rules.git" diff --git a/consume/init.go b/consume/init.go index 88895a8..44afce0 100644 --- a/consume/init.go +++ b/consume/init.go @@ -20,6 +20,10 @@ func initConsumes() { jobs[consumeMd.EggEnergyStartLevelDividendFunName] = EggEnergyStartLevelDividendConsume jobs[consumeMd.EggEnergyDealPlatformRevenueDataFunName] = EggEnergyDealPlatformRevenueDataConsume jobs[consumeMd.EggEnergyDealFundDataFunName] = EggEnergyDealFundDataConsume + jobs[consumeMd.AliyunSmsRecordFunName] = AliyunSmsRecordConsume //阿里云短信 + jobs[consumeMd.JpushRecordFunName] = JpushRecordConsume //极光推送 + jobs[consumeMd.AdvertisingSmashConsume] = AdvertisingSmashConsume //砸蛋 + jobs[consumeMd.AdvertisingSignConsume] = AdvertisingSignConsume //签到 jobs[consumeMd.EggEnergyDealUserVirtualCoinDataFunName] = EggEnergyDealUserVirtualCoinDataConsume jobs[consumeMd.IMEggEnergyBatchSendMessageDataFunName] = IMEggEnergyBatchSendMessageDataConsume jobs[consumeMd.IMEggEnergyDelFriendCircleDataFunName] = IMEggEnergyDelFriendCircleDataConsume diff --git a/consume/jpush_record_consume.go b/consume/jpush_record_consume.go new file mode 100644 index 0000000..2d3d572 --- /dev/null +++ b/consume/jpush_record_consume.go @@ -0,0 +1,97 @@ +package consume + +import ( + "applet/app/cfg" + db "code.fnuoos.com/EggPlanet/egg_models.git/src" + + utils2 "applet/app/utils" + "applet/app/utils/cache" + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/EggPlanet/egg_models.git/src/implement" + "code.fnuoos.com/EggPlanet/egg_models.git/src/model" + "code.fnuoos.com/EggPlanet/egg_system_rules.git" + "code.fnuoos.com/EggPlanet/egg_system_rules.git/jPush" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "encoding/json" + "errors" + "fmt" + "github.com/streadway/amqp" + "strings" + "time" +) + +func JpushRecordConsume(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>JpushRecordConsume>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1) + delivery := ch.Consume(queue.Name, false) + + egg_system_rules.Init(cfg.RedisAddr) + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + err = handleJpushRecordConsume(res.Body) + if err != nil { + fmt.Println("JpushRecordConsume_ERR:::::", err.Error()) + utils2.FilePutContents("JpushRecordConsume_ERR", utils2.SerializeStr(map[string]interface{}{ + "body": res.Body, + "err": err.Error(), + })) + _ = res.Reject(false) + //TODO::重新推回队列末尾,避免造成队列堵塞 + var msg *md.JpushRecordFundData + json.Unmarshal(res.Body, &msg) + ch.Publish(queue.ExchangeName, msg, queue.RoutKey) + } else { + //_ = res.Reject(false) + err = res.Ack(true) + } + + fmt.Println("err ::: ", err) + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleJpushRecordConsume(msgData []byte) error { + time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒 + // 1.解析mq中queue的数据结构体 + var msg *md.JpushRecordFundData + err := json.Unmarshal(msgData, &msg) + if err != nil { + return err + } + engine := db.Db + redisConn := cache.GetPool().Get() + sysCfgDb := implement.NewSysCfgDb(engine, redisConn) + jpushKey := sysCfgDb.SysCfgGetWithDb("jpush_key") + jpushSecret := sysCfgDb.SysCfgGetWithDb("jpush_secret") + if msg.Target == "0" { //广播全部 + _, err := jPush.PushAllUser(jpushKey, jpushSecret, msg.Title, msg.Content, msg.Platform, nil) + if err != nil { + return err + } + } else { + _, err = jPush.PushMoreUser(jpushKey, jpushSecret, msg.Title, msg.Content, msg.Platform, strings.Split(msg.UserId, ","), nil) + if err != nil { + return err + } + } + if msg.Id != "" { + engine.Where("id=?", msg.Id).Cols("state").Update(&model.JpushRecord{State: 1}) + } + return nil +} diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index e378076..e389a91 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -101,9 +101,49 @@ var RabbitMqQueueKeyList = []*MqQueue{ BindKey: "", ConsumeFunName: "EggEnergyAutoExchangeGreenEnergyConsume", }, + { + ExchangeName: "egg.jpush", + Name: "egg_jpush_record_queue", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "jpush_record", + BindKey: "", + ConsumeFunName: "JpushRecordConsume", + }, + { + ExchangeName: "egg.aliyun_sms", + Name: "egg_aliyun_sms_record_queue", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "aliyun_sms_record", + BindKey: "", + ConsumeFunName: "AliyunSmsRecordConsume", + }, + { + ExchangeName: "egg.advertising", + Name: "egg_advertising_smash", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "advertising_smash", + BindKey: "", + ConsumeFunName: "AdvertisingSmashConsume", + }, + { + ExchangeName: "egg.advertising", + Name: "egg_advertising_sign", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "advertising_sign", + BindKey: "", + ConsumeFunName: "AdvertisingSignConsume", + }, } const ( + AdvertisingSignConsume = "AdvertisingSignConsume" + AdvertisingSmashConsume = "AdvertisingSmashConsume" + JpushRecordFunName = "JpushRecordConsume" + AliyunSmsRecordFunName = "AliyunSmsRecordConsume" EggEnergyStartLevelDividendFunName = "EggEnergyStartLevelDividendConsume" EggEnergyDealFundDataFunName = "EggEnergyDealFundDataConsume" EggEnergyDealPlatformRevenueDataFunName = "EggEnergyDealPlatformRevenueDataConsume" diff --git a/consume/md/mq_jpush.go b/consume/md/mq_jpush.go new file mode 100644 index 0000000..15001e1 --- /dev/null +++ b/consume/md/mq_jpush.go @@ -0,0 +1,21 @@ +package md + +type JpushRecordFundData struct { + Id string `json:"id"` + UserId string `json:"user_id"` + Title string `json:"title"` + Content string `json:"content"` + Platform string `json:"platform"` + Target string `json:"target"` +} +type AliyunSmsRecordFundData struct { + Id string `json:"id"` + Phone string `json:"phone"` + Title string `json:"title"` + Content string `json:"content"` + Code string `json:"code"` + Extra string `json:"extra"` +} +type AdvertisingWatch struct { + Id string `json:"id"` +} diff --git a/go.mod b/go.mod index 47a3ae8..345128b 100644 --- a/go.mod +++ b/go.mod @@ -28,12 +28,32 @@ require ( github.com/syyongx/php2go v0.9.8 github.com/wechatpay-apiv3/wechatpay-go v0.2.20 go.uber.org/zap v1.16.0 + google.golang.org/grpc v1.31.0 google.golang.org/protobuf v1.33.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 gopkg.in/yaml.v2 v2.4.0 xorm.io/xorm v1.3.1 ) +require ( + github.com/alibabacloud-go/alibabacloud-gateway-spi v0.0.5 // indirect + github.com/alibabacloud-go/darabonba-openapi/v2 v2.0.10 // indirect + github.com/alibabacloud-go/debug v1.0.1 // indirect + github.com/alibabacloud-go/dypnsapi-20170525/v2 v2.2.3 // indirect + github.com/alibabacloud-go/endpoint-util v1.1.0 // indirect + github.com/alibabacloud-go/openapi-util v0.1.0 // indirect + github.com/alibabacloud-go/tea v1.2.2 // indirect + github.com/alibabacloud-go/tea-utils v1.4.3 // indirect + github.com/alibabacloud-go/tea-utils/v2 v2.0.6 // indirect + github.com/alibabacloud-go/tea-xml v1.1.3 // indirect + github.com/aliyun/credentials-go v1.3.10 // indirect + github.com/clbanning/mxj/v2 v2.5.5 // indirect + github.com/go-pay/crypto v0.0.1 // indirect + github.com/google/go-cmp v0.6.0 // indirect + github.com/tjfoc/gmsm v1.4.1 // indirect + gopkg.in/ini.v1 v1.67.0 // indirect +) + require ( filippo.io/edwards25519 v1.1.0 // indirect github.com/bytedance/sonic v1.11.3 // indirect @@ -48,7 +68,7 @@ require ( github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/gookit/color v1.3.6 // indirect - github.com/jinzhu/copier v0.4.0 // indirect + github.com/jinzhu/copier v0.4.0 github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/cpuid/v2 v2.2.7 // indirect @@ -74,7 +94,6 @@ require ( golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect - golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect