dengbiao 1 місяць тому
джерело
коміт
a1012f8a3b
5 змінених файлів з 107 додано та 121 видалено
  1. +97
    -0
      consume/egg_energy_start_level_dividend_consume.go
  2. +0
    -1
      consume/init.go
  3. +4
    -4
      consume/md/consume_key.go
  4. +0
    -68
      consume/one_circles_sign_in_consume.go
  5. +6
    -48
      go.mod

+ 97
- 0
consume/egg_energy_start_level_dividend_consume.go Переглянути файл

@@ -0,0 +1,97 @@
package consume

import (
"applet/app/cfg"
utils2 "applet/app/utils"
"applet/app/utils/logx"
"applet/consume/md"
db "code.fnuoos.com/EggPlanet/egg_models.git/src"
"code.fnuoos.com/EggPlanet/egg_models.git/src/implement"
"code.fnuoos.com/EggPlanet/egg_system_rules.git"
"code.fnuoos.com/EggPlanet/egg_system_rules.git/enum"
md3 "code.fnuoos.com/EggPlanet/egg_system_rules.git/md"
"code.fnuoos.com/EggPlanet/egg_system_rules.git/rule"
md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/md"
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
"encoding/json"
"errors"
"fmt"
"github.com/streadway/amqp"
"time"
)

func EggEnergyStartLevelDividendConsume(queue md.MqQueue) {
fmt.Println(">>>>>>>>>>>>EggEnergyStartLevelDividendConsume>>>>>>>>>>>>")
ch, err := rabbit.Cfg.Pool.GetChannel()
if err != nil {
logx.Error(err)
return
}
defer ch.Release()
//1、将自己绑定到交换机上
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
//2、取出数据进行消费
ch.Qos(1)
delivery := ch.Consume(queue.Name, false)

egg_system_rules.Init(cfg.RedisAddr)
var res amqp.Delivery
var ok bool
for {
res, ok = <-delivery
if ok == true {
err = handleEggEnergyStartLevelDividendConsume(res.Body)
if err != nil {
fmt.Println("EggEnergyStartLevelDividendConsume_ERR:::::", err.Error())
utils2.FilePutContents("EggEnergyStartLevelDividendConsume_ERR", utils2.SerializeStr(map[string]interface{}{
"body": res.Body,
"err": err.Error(),
}))
}
//_ = res.Reject(false)
err = res.Ack(true)
fmt.Println("err ::: ", err)
} else {
panic(errors.New("error getting message"))
}
}
fmt.Println("get msg done")
}

func handleEggEnergyStartLevelDividendConsume(msgData []byte) error {
time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒
//1、解析mq中queue的数据结构体
var msg *md2.EggEnergyStructForStarLevelDividends
err := json.Unmarshal(msgData, &msg)
if err != nil {
return err
}
engine := db.Db
//2、查找 `egg_energy_basic_setting` 基础设置
eggEnergyBasicSettingDb := implement.NewEggEnergyBasicSettingDb(engine)
eggEnergyBasicSetting, err := eggEnergyBasicSettingDb.EggEnergyBasicSettingGetOneByParams(map[string]interface{}{
"key": "is_open",
"value": 1,
})
if err != nil {
return err
}
if eggEnergyBasicSetting == nil {
return errors.New("蛋蛋能量设置未开启!")
}

//3、给相应的用户加上个人的团队绿色能量
session := engine.NewSession()
err = rule.DealUserVirtualCoin(session, md3.DealUserVirtualCoinReq{
Kind: "add",
Title: enum.UserVirtualAmountFlowTransferType.String(enum.EggEnergyCommunityDividends),
TransferType: int(enum.EggEnergyCommunityDividends),
CoinId: eggEnergyBasicSetting.TeamEggEnergyCoinId,
Uid: msg.Uid,
Amount: msg.SignDividend,
})
if err != nil {
return err
}
return nil
}

+ 0
- 1
consume/init.go Переглянути файл

@@ -17,7 +17,6 @@ func Init() {

// 增加消费任务队列
func initConsumes() {
//////////////////////////////////////// ORDER /////////////////////////////////////////////////////
jobs[consumeMd.JudgePackageOrdStateQueueConsumeFunName] = JudgePackageOrdStateQueueConsume
}



+ 4
- 4
consume/md/consume_key.go Переглянути файл

@@ -12,16 +12,16 @@ type MqQueue struct {

var RabbitMqQueueKeyList = []*MqQueue{
{
ExchangeName: "order",
Name: "judge_package_ord_state_queue",
ExchangeName: "egg.energy",
Name: "egg_energy_star_level_dividend_queue",
Type: TopicQueueType,
IsPersistent: false,
RoutKey: "judge_package_ord_state",
RoutKey: "star_level_dividend",
BindKey: "",
ConsumeFunName: "JudgePackageOrdStateQueueConsume",
},
}

const (
JudgePackageOrdStateQueueConsumeFunName = "JudgePackageOrdStateQueueConsume"
EggEnergyStartLevelDividendFunName = "EggEnergyStartLevelDividendConsume"
)

+ 0
- 68
consume/one_circles_sign_in_consume.go Переглянути файл

@@ -1,68 +0,0 @@
package consume

import (
svc "applet/app/admin/svc/enterprise_manage"
"applet/app/cfg"
"applet/app/utils"
"applet/app/utils/logx"
"applet/consume/md"
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
"code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/rule/one_circles"
"encoding/json"
"errors"
"fmt"
"github.com/streadway/amqp"
)

func JudgePackageOrdStateQueueConsume(queue md.MqQueue) {
fmt.Println(">>>>>>>>>>>>JudgePackageOrdStateQueueConsume>>>>>>>>>>>>")
ch, err := rabbit.Cfg.Pool.GetChannel()
if err != nil {
logx.Error(err)
return
}
defer ch.Release()
//1、将自己绑定到交换机上
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
//2、取出数据进行消费
ch.Qos(5)
delivery := ch.Consume(queue.Name, false)

one_circles.Init(cfg.RedisAddr)

var res amqp.Delivery
var ok bool
for {
res, ok = <-delivery
if ok == true {
err = handleJudgePackageOrdStateQueueConsume(ch, res.Body)
if err != nil {
fmt.Println("err ::: ", err)
utils.FilePutContents("JudgePackageOrdStateQueueConsume", "[err]:"+err.Error())
//_ = res.Reject(true) //TODO::拒绝 Ack
_ = res.Reject(false)
} else {
_ = res.Ack(true)
}
} else {
panic(errors.New("error getting message"))
}
}
fmt.Println("get msg done")
}

func handleJudgePackageOrdStateQueueConsume(ch *rabbit.Channel, msgData []byte) error {
//1、解析mq中queue的数据结构体
var msg *md.JudgePackageOrdOrdState
err := json.Unmarshal(msgData, &msg)
if err != nil {
return err
}

err = svc.DealJudgePackageOrdOrdState(msg.OrdNo)
fmt.Println("err::::", err)
if err != nil {
return err
}
return nil
}

+ 6
- 48
go.mod Переглянути файл

@@ -1,41 +1,25 @@
module applet

go 1.18
go 1.19

// go.mod文件中
replace code.fnuoos.com/EggPlanet/egg_models.git => E:/company/Egg/egg_models

replace code.fnuoos.com/EggPlanet/egg_system_rules.git => E:/company/Egg/egg_system_rules

require (
code.fnuoos.com/go_rely_warehouse/zyos_go_es.git v1.0.0
code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git v0.0.5
code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git v1.9.10-0.20240903060255-62e7a9ea46fe
github.com/360EntSecGroup-Skylar/excelize v1.4.1
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751
github.com/boombuler/barcode v1.0.1
github.com/chromedp/chromedp v0.10.0
github.com/dchest/uniuri v0.0.0-20200228104902-7aecb25e1fe5
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/forgoer/openssl v1.2.1
github.com/gin-contrib/sessions v1.0.0
github.com/gin-gonic/gin v1.9.1
github.com/go-playground/locales v0.14.1
github.com/go-playground/universal-translator v0.18.1
github.com/go-playground/validator/v10 v10.19.0
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-sql-driver/mysql v1.8.1
github.com/gomodule/redigo v2.0.0+incompatible
github.com/iGoogle-ink/gopay v1.5.36
github.com/jinzhu/copier v0.4.0
github.com/makiuchi-d/gozxing v0.1.1
github.com/mcuadros/go-defaults v1.2.0
github.com/medivhzhan/weapp/v2 v2.5.0
github.com/mingrammer/commonregex v1.0.1
github.com/qiniu/api.v7/v7 v7.8.2
github.com/robfig/cron/v3 v3.0.1
github.com/shopspring/decimal v1.3.1
github.com/sony/sonyflake v1.0.0
github.com/streadway/amqp v1.0.0
github.com/swaggo/swag v1.7.0
github.com/syyongx/php2go v0.9.8
github.com/wechatpay-apiv3/wechatpay-go v0.2.20
go.uber.org/zap v1.16.0
@@ -43,55 +27,31 @@ require (
google.golang.org/protobuf v1.33.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gopkg.in/yaml.v2 v2.4.0
xorm.io/xorm v1.3.2
)

require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/KyleBanks/depth v1.2.1 // indirect
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
code.fnuoos.com/EggPlanet/egg_models.git v0.2.1-0.20241114063419-cb68a0ed34ee // indirect
code.fnuoos.com/EggPlanet/egg_system_rules.git v0.0.2 // indirect
github.com/bytedance/sonic v1.11.3 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect
github.com/chenzhuoyu/iasm v0.9.1 // indirect
github.com/chromedp/cdproto v0.0.0-20240801214329-3f85d328b335 // indirect
github.com/chromedp/sysutil v1.0.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.19.5 // indirect
github.com/go-openapi/spec v0.20.3 // indirect
github.com/go-openapi/swag v0.19.15 // indirect
github.com/gobwas/httphead v0.1.0 // indirect
github.com/gobwas/pool v0.2.1 // indirect
github.com/gobwas/ws v1.4.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/gookit/color v1.3.6 // indirect
github.com/gorilla/context v1.1.2 // indirect
github.com/gorilla/securecookie v1.1.2 // indirect
github.com/gorilla/sessions v1.2.2 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect
github.com/olivere/elastic/v7 v7.0.32 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/gomega v1.19.0 // indirect
github.com/pelletier/go-toml/v2 v2.2.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/syndtr/goleveldb v1.0.0 // indirect
github.com/tidwall/gjson v1.14.1 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
go.uber.org/atomic v1.7.0 // indirect
@@ -103,11 +63,9 @@ require (
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
honnef.co/go/tools v0.0.1-2020.1.4 // indirect
xorm.io/builder v0.3.11-0.20220531020008-1bd24a7dc978 // indirect
)

Завантаження…
Відмінити
Зберегти