Browse Source

add 成长值监听

one_orenge
DengBiao 11 months ago
parent
commit
078e408558
8 changed files with 449 additions and 9 deletions
  1. +121
    -0
      app/db/db_user_public_platoon_double_network_setting.go
  2. +121
    -0
      app/db/db_user_public_platoon_double_network_user_coin_record.go
  3. +18
    -0
      app/db/model/user_public_platoon_double_network_setting.go
  4. +12
    -0
      app/db/model/user_public_platoon_double_network_user_coin_record.go
  5. +127
    -0
      consume/canal_user_virtual_coin_flow_consume.go
  6. +2
    -0
      consume/init.go
  7. +10
    -9
      consume/md/consume_key.go
  8. +38
    -0
      consume/md/md_canal_user_virtual_coin_flow.go

+ 121
- 0
app/db/db_user_public_platoon_double_network_setting.go View File

@@ -0,0 +1,121 @@
package db

import (
"applet/app/db/model"
"applet/app/utils"
"applet/app/utils/logx"
"errors"
"fmt"
"reflect"
"xorm.io/xorm"
)

// BatchSelectUserPublicPlatoonDoubleNetworkSettings 批量查询数据 TODO::和下面的方法重复了,建议采用下面的 `UserPublicPlatoonDoubleNetworkSettingFindByParams` 方法
func BatchSelectUserPublicPlatoonDoubleNetworkSettings(Db *xorm.Engine, params map[string]interface{}) (*[]model.UserPublicPlatoonDoubleNetworkSetting, error) {
var UserPublicPlatoonDoubleNetworkSettingData []model.UserPublicPlatoonDoubleNetworkSetting
if err := Db.In(utils.AnyToString(params["key"]), params["value"]).
Find(&UserPublicPlatoonDoubleNetworkSettingData); err != nil {
return nil, logx.Warn(err)
}
return &UserPublicPlatoonDoubleNetworkSettingData, nil
}

// UserPublicPlatoonDoubleNetworkSettingInsert 插入单条数据
func UserPublicPlatoonDoubleNetworkSettingInsert(Db *xorm.Engine, UserPublicPlatoonDoubleNetworkSetting *model.UserPublicPlatoonDoubleNetworkSetting) (int, error) {
_, err := Db.InsertOne(UserPublicPlatoonDoubleNetworkSetting)
if err != nil {
return 0, err
}
return UserPublicPlatoonDoubleNetworkSetting.Id, nil
}

// BatchAddUserPublicPlatoonDoubleNetworkSettings 批量新增数据
func BatchAddUserPublicPlatoonDoubleNetworkSettings(Db *xorm.Engine, UserPublicPlatoonDoubleNetworkSettingData []*model.UserPublicPlatoonDoubleNetworkSetting) (int64, error) {
affected, err := Db.Insert(UserPublicPlatoonDoubleNetworkSettingData)
if err != nil {
return 0, err
}
return affected, nil
}

func GetUserPublicPlatoonDoubleNetworkSettingCount(Db *xorm.Engine) int {
var UserPublicPlatoonDoubleNetworkSetting model.UserPublicPlatoonDoubleNetworkSetting
session := Db.Where("")
count, err := session.Count(&UserPublicPlatoonDoubleNetworkSetting)
if err != nil {
return 0
}
return int(count)
}

// UserPublicPlatoonDoubleNetworkSettingDelete 删除记录
func UserPublicPlatoonDoubleNetworkSettingDelete(Db *xorm.Engine, id interface{}) (int64, error) {
if reflect.TypeOf(id).Kind() == reflect.Slice {
return Db.In("id", id).Delete(model.UserPublicPlatoonDoubleNetworkSetting{})
} else {
return Db.Where("id = ?", id).Delete(model.UserPublicPlatoonDoubleNetworkSetting{})
}
}

// UserPublicPlatoonDoubleNetworkSettingUpdate 更新记录
func UserPublicPlatoonDoubleNetworkSettingUpdate(Db *xorm.Engine, id interface{}, UserPublicPlatoonDoubleNetworkSetting *model.UserPublicPlatoonDoubleNetworkSetting, forceColums ...string) (int64, error) {
var (
affected int64
err error
)
if forceColums != nil {
affected, err = Db.Where("id=?", id).Cols(forceColums...).Update(UserPublicPlatoonDoubleNetworkSetting)
} else {
affected, err = Db.Where("id=?", id).Update(UserPublicPlatoonDoubleNetworkSetting)
}
if err != nil {
return 0, err
}
return affected, nil
}

// UserPublicPlatoonDoubleNetworkSettingGetOneByParams 通过传入的参数查询数据(单条)
func UserPublicPlatoonDoubleNetworkSettingGetOneByParams(Db *xorm.Engine, params map[string]interface{}) (*model.UserPublicPlatoonDoubleNetworkSetting, error) {
var m model.UserPublicPlatoonDoubleNetworkSetting
var query = fmt.Sprintf("%s =?", params["key"])
has, err := Db.Where(query, params["value"]).Get(&m)
if err != nil {
return nil, logx.Error(err)
}
if has == false {
return nil, nil
}
return &m, nil
}

// UserPublicPlatoonDoubleNetworkSettingFindByParams 通过传入的参数查询数据(多条)
func UserPublicPlatoonDoubleNetworkSettingFindByParams(Db *xorm.Engine, params map[string]interface{}) (*[]model.UserPublicPlatoonDoubleNetworkSetting, error) {
var m []model.UserPublicPlatoonDoubleNetworkSetting
if params["value"] == nil {
return nil, errors.New("参数有误")
}
if params["key"] == nil {
//查询全部数据
err := Db.Find(&m)
if err != nil {
return nil, logx.Error(err)
}
return &m, nil
} else {
if reflect.TypeOf(params["value"]).Kind() == reflect.Slice {
//指定In查询
if err := Db.In(utils.AnyToString(params["key"]), params["value"]).Find(&m); err != nil {
return nil, logx.Warn(err)
}
return &m, nil
} else {
var query = fmt.Sprintf("%s =?", params["key"])
err := Db.Where(query, params["value"]).Find(&m)
if err != nil {
return nil, logx.Error(err)
}
return &m, nil
}

}
}

+ 121
- 0
app/db/db_user_public_platoon_double_network_user_coin_record.go View File

@@ -0,0 +1,121 @@
package db

import (
"applet/app/db/model"
"applet/app/utils"
"applet/app/utils/logx"
"errors"
"fmt"
"reflect"
"xorm.io/xorm"
)

// BatchSelectUserPublicPlatoonDoubleNetworkUserCoinRecords 批量查询数据 TODO::和下面的方法重复了,建议采用下面的 `UserPublicPlatoonDoubleNetworkUserCoinRecordFindByParams` 方法
func BatchSelectUserPublicPlatoonDoubleNetworkUserCoinRecords(Db *xorm.Engine, params map[string]interface{}) (*[]model.UserPublicPlatoonDoubleNetworkUserCoinRecord, error) {
var UserPublicPlatoonDoubleNetworkUserCoinRecordData []model.UserPublicPlatoonDoubleNetworkUserCoinRecord
if err := Db.In(utils.AnyToString(params["key"]), params["value"]).
Find(&UserPublicPlatoonDoubleNetworkUserCoinRecordData); err != nil {
return nil, logx.Warn(err)
}
return &UserPublicPlatoonDoubleNetworkUserCoinRecordData, nil
}

// UserPublicPlatoonDoubleNetworkUserCoinRecordInsert 插入单条数据
func UserPublicPlatoonDoubleNetworkUserCoinRecordInsert(Db *xorm.Engine, UserPublicPlatoonDoubleNetworkUserCoinRecord *model.UserPublicPlatoonDoubleNetworkUserCoinRecord) (int, error) {
_, err := Db.InsertOne(UserPublicPlatoonDoubleNetworkUserCoinRecord)
if err != nil {
return 0, err
}
return UserPublicPlatoonDoubleNetworkUserCoinRecord.Id, nil
}

// BatchAddUserPublicPlatoonDoubleNetworkUserCoinRecords 批量新增数据
func BatchAddUserPublicPlatoonDoubleNetworkUserCoinRecords(Db *xorm.Engine, UserPublicPlatoonDoubleNetworkUserCoinRecordData []*model.UserPublicPlatoonDoubleNetworkUserCoinRecord) (int64, error) {
affected, err := Db.Insert(UserPublicPlatoonDoubleNetworkUserCoinRecordData)
if err != nil {
return 0, err
}
return affected, nil
}

func GetUserPublicPlatoonDoubleNetworkUserCoinRecordCount(Db *xorm.Engine) int {
var UserPublicPlatoonDoubleNetworkUserCoinRecord model.UserPublicPlatoonDoubleNetworkUserCoinRecord
session := Db.Where("")
count, err := session.Count(&UserPublicPlatoonDoubleNetworkUserCoinRecord)
if err != nil {
return 0
}
return int(count)
}

// UserPublicPlatoonDoubleNetworkUserCoinRecordDelete 删除记录
func UserPublicPlatoonDoubleNetworkUserCoinRecordDelete(Db *xorm.Engine, id interface{}) (int64, error) {
if reflect.TypeOf(id).Kind() == reflect.Slice {
return Db.In("id", id).Delete(model.UserPublicPlatoonDoubleNetworkUserCoinRecord{})
} else {
return Db.Where("id = ?", id).Delete(model.UserPublicPlatoonDoubleNetworkUserCoinRecord{})
}
}

// UserPublicPlatoonDoubleNetworkUserCoinRecordUpdate 更新记录
func UserPublicPlatoonDoubleNetworkUserCoinRecordUpdate(Db *xorm.Engine, id interface{}, UserPublicPlatoonDoubleNetworkUserCoinRecord *model.UserPublicPlatoonDoubleNetworkUserCoinRecord, forceColums ...string) (int64, error) {
var (
affected int64
err error
)
if forceColums != nil {
affected, err = Db.Where("id=?", id).Cols(forceColums...).Update(UserPublicPlatoonDoubleNetworkUserCoinRecord)
} else {
affected, err = Db.Where("id=?", id).Update(UserPublicPlatoonDoubleNetworkUserCoinRecord)
}
if err != nil {
return 0, err
}
return affected, nil
}

// UserPublicPlatoonDoubleNetworkUserCoinRecordGetOneByParams 通过传入的参数查询数据(单条)
func UserPublicPlatoonDoubleNetworkUserCoinRecordGetOneByParams(Db *xorm.Engine, params map[string]interface{}) (*model.UserPublicPlatoonDoubleNetworkUserCoinRecord, error) {
var m model.UserPublicPlatoonDoubleNetworkUserCoinRecord
var query = fmt.Sprintf("%s =?", params["key"])
has, err := Db.Where(query, params["value"]).Get(&m)
if err != nil {
return nil, logx.Error(err)
}
if has == false {
return nil, nil
}
return &m, nil
}

// UserPublicPlatoonDoubleNetworkUserCoinRecordFindByParams 通过传入的参数查询数据(多条)
func UserPublicPlatoonDoubleNetworkUserCoinRecordFindByParams(Db *xorm.Engine, params map[string]interface{}) (*[]model.UserPublicPlatoonDoubleNetworkUserCoinRecord, error) {
var m []model.UserPublicPlatoonDoubleNetworkUserCoinRecord
if params["value"] == nil {
return nil, errors.New("参数有误")
}
if params["key"] == nil {
//查询全部数据
err := Db.Find(&m)
if err != nil {
return nil, logx.Error(err)
}
return &m, nil
} else {
if reflect.TypeOf(params["value"]).Kind() == reflect.Slice {
//指定In查询
if err := Db.In(utils.AnyToString(params["key"]), params["value"]).Find(&m); err != nil {
return nil, logx.Warn(err)
}
return &m, nil
} else {
var query = fmt.Sprintf("%s =?", params["key"])
err := Db.Where(query, params["value"]).Find(&m)
if err != nil {
return nil, logx.Error(err)
}
return &m, nil
}

}
}

+ 18
- 0
app/db/model/user_public_platoon_double_network_setting.go View File

@@ -0,0 +1,18 @@
package model

import (
"time"
)

type UserPublicPlatoonDoubleNetworkSetting struct {
Id int `json:"id" xorm:"not null pk autoincr INT(11)"`
IsOpen int `json:"is_open" xorm:"not null default 0 comment('是否开启(0:关闭 1:开启)') TINYINT(1)"`
SeveralTimes int `json:"several_times" xorm:"not null default 0 comment('几乘') TINYINT(3)"`
SeveralRows int `json:"several_rows" xorm:"not null default 0 comment('几排') TINYINT(3)"`
OriginatorUid int `json:"originator_uid" xorm:"not null default -1 comment('创始人uid') INT(11)"`
SettlementDate string `json:"settlement_date" xorm:"not null default '0000-00-00 00:00:00' comment('结算日期') CHAR(50)"`
CoinId int `json:"coin_id" xorm:"not null default 0 comment('虚拟币id(作用于成长值)') INT(11)"`
Ext string `json:"ext" xorm:"comment('拓展字段(json存储)') TEXT"`
CreateAt time.Time `json:"create_at" xorm:"not null default 'CURRENT_TIMESTAMP' DATETIME"`
UpdateAt time.Time `json:"update_at" xorm:"not null default 'CURRENT_TIMESTAMP' DATETIME"`
}

+ 12
- 0
app/db/model/user_public_platoon_double_network_user_coin_record.go View File

@@ -0,0 +1,12 @@
package model

type UserPublicPlatoonDoubleNetworkUserCoinRecord struct {
Id int `json:"id" xorm:"not null pk autoincr INT(11)"`
Uid int `json:"uid" xorm:"not null default 0 comment('uid') INT(11)"`
RecommendUid int `json:"recommend_uid" xorm:"not null default 0 comment('推荐人uid') INT(11)"`
LastAmount string `json:"last_amount" xorm:"not null default 0.0000 comment('上次金额') DECIMAL(10,4)"`
Amount string `json:"amount" xorm:"not null default 0.0000 comment('当前金额') DECIMAL(10,4)"`
CoinId int `json:"coin_id" xorm:"not null default 0 comment('虚拟币id(作用于成长值)') INT(11)"`
CreateAt string `json:"create_at" xorm:"not null default 'CURRENT_TIMESTAMP' DATETIME"`
UpdateAt string `json:"update_at" xorm:"not null default 'CURRENT_TIMESTAMP' DATETIME"`
}

+ 127
- 0
consume/canal_user_virtual_coin_flow_consume.go View File

@@ -0,0 +1,127 @@
package consume

import (
"applet/app/db"
"applet/app/db/model"
"applet/app/utils"
"applet/app/utils/logx"
"applet/consume/md"
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
"encoding/json"
"errors"
"fmt"
"github.com/shopspring/decimal"
"github.com/streadway/amqp"
"strings"
"time"
)

func CanalUserVirtualCoinFlowConsume(queue md.MqQueue) {
fmt.Println(">>>>>>>>>>>>CanalUserVirtualCoinFlowConsume>>>>>>>>>>>>")
ch, err := rabbit.Cfg.Pool.GetChannel()
if err != nil {
logx.Error(err)
return
}
defer ch.Release()
//1、将自己绑定到交换机上
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
//2、取出数据进行消费
ch.Qos(1)
delivery := ch.Consume(queue.Name, false)

var res amqp.Delivery
var ok bool
for {
res, ok = <-delivery
if ok == true {
//fmt.Println(string(res.Body))
fmt.Println(">>>>>>>>>>>>>>>>CanalUserVirtualCoinFlowConsume<<<<<<<<<<<<<<<<<<<<<<<<<")
err = handleCanalUserVirtualCoinFlow(res.Body)
if err != nil {
fmt.Println("handleCanalUserVirtualCoinFlow_ERR:::::", err.Error())
}
//_ = res.Reject(false)
err = res.Ack(true)
fmt.Println("err ::: ", err)
} else {
panic(errors.New("error getting message"))
}
}
fmt.Println("get msg done")
}

func handleCanalUserVirtualCoinFlow(msg []byte) error {
//1、解析canal采集至mq中queue的数据结构体
var canalMsg *md.CanalUserVirtualCoinFlowOrderMessage[md.CanalUserVirtualCoinFlowOrder]
err := json.Unmarshal(msg, &canalMsg)
if err != nil {
return err
}

masterId := strings.Split(canalMsg.Database, "_")[1]
if masterId != "32053480" {
return nil
}
engine := db.DBs[masterId]
now := time.Now()

//2、查找 one_circles_green_energy_basic_setting 基础设置
userPublicPlatoonDoubleNetworkSetting, err := db.UserPublicPlatoonDoubleNetworkSettingGetOneByParams(engine, map[string]interface{}{
"key": "is_open",
"value": 1,
})
if err != nil {
return err
}
if userPublicPlatoonDoubleNetworkSetting == nil {
return errors.New("公排双网未开启")
}
if canalMsg.Type == md.CanalMsgInsertSqlType {
if canalMsg.Data[0].CoinId == utils.IntToStr(userPublicPlatoonDoubleNetworkSetting.CoinId) {
//3、查找 user_public_platoon_double_network_user_coin_record
userPublicPlatoonDoubleNetworkUserCoinRecord, err1 := db.UserPublicPlatoonDoubleNetworkUserCoinRecordGetOneByParams(engine, map[string]interface{}{
"key": "uid",
"value": canalMsg.Data[0].Uid,
})
if err1 != nil {
return err1
}
if userPublicPlatoonDoubleNetworkUserCoinRecord == nil {
userProfile, err2 := db.UserProfileFindByIDSess(engine.NewSession(), canalMsg.Data[0].Uid)
if err2 != nil {
return err2
}
//新增记录
_, err3 := db.UserPublicPlatoonDoubleNetworkUserCoinRecordInsert(engine, &model.UserPublicPlatoonDoubleNetworkUserCoinRecord{
Uid: utils.StrToInt(canalMsg.Data[0].Uid),
LastAmount: canalMsg.Data[0].AfterAmout,
Amount: canalMsg.Data[0].AfterAmout,
RecommendUid: userProfile.ParentUid,
CoinId: utils.StrToInt(canalMsg.Data[0].CoinId),
CreateAt: now.Format("2006-01-02 15:04:05"),
UpdateAt: now.Format("2006-01-02 15:04:05"),
})
if err3 != nil {
return err3
}
} else {
//更新记录
afterAmount, _ := decimal.NewFromString(canalMsg.Data[0].AfterAmout)
amount, _ := decimal.NewFromString(userPublicPlatoonDoubleNetworkUserCoinRecord.Amount)
lastAmount, _ := decimal.NewFromString(userPublicPlatoonDoubleNetworkUserCoinRecord.LastAmount)
if canalMsg.Data[0].Direction == "1" || canalMsg.Data[0].Direction == "2" {
//收入 && 支出
userPublicPlatoonDoubleNetworkUserCoinRecord.Amount = amount.Add(afterAmount.Sub(lastAmount)).String()
}
userPublicPlatoonDoubleNetworkUserCoinRecord.LastAmount = canalMsg.Data[0].AfterAmout
_, err2 := db.UserPublicPlatoonDoubleNetworkUserCoinRecordUpdate(engine, userPublicPlatoonDoubleNetworkUserCoinRecord.Id, userPublicPlatoonDoubleNetworkUserCoinRecord, "amount", "last_amount")
if err2 != nil {
return err2
}
}
}
}

return nil
}

+ 2
- 0
consume/init.go View File

@@ -62,6 +62,8 @@ func initConsumes() {
jobs[consumeMd.ZhiosExpressOrderFail] = ZhiosExpressOrderFail
jobs[consumeMd.ZhiosWithdrawReward] = ZhiosWithdrawReward

jobs[consumeMd.CanalUserVirtualCcoinFlowFunName] = CanalUserVirtualCoinFlowConsume

////////////////////////////////////// V1 /////////////////////////////////////////////////////
//jobs[consumeMd.CloudIssuanceMsgCallBackFunName] = CloudIssuanceMsgCallBackConsume



+ 10
- 9
consume/md/consume_key.go View File

@@ -380,15 +380,15 @@ var RabbitMqQueueKeyList = []*MqQueue{
BindKey: "",
ConsumeFunName: "ZhiosIntegralProxyRecharge",
},
//{
// ExchangeName: "zhios.order_buckle.exchange",
// Name: "zhios_order_buckle_dev",
// Type: DirectQueueType,
// IsPersistent: false,
// RoutKey: "order_buckle_dev",
// BindKey: "",
// ConsumeFunName: "ZhiosOrderBuckle",
//},
{
ExchangeName: "canal.topic",
Name: "canal_user_virtual_coin_flow",
Type: TopicQueueType,
IsPersistent: false,
RoutKey: "canal_user_virtual_coin_flow",
BindKey: "",
ConsumeFunName: "CanalUserVirtualCcoinFlowConsume",
},
}

const (
@@ -433,4 +433,5 @@ const (
ZhiosAcquisitionConditionDevFunName = "ZhiosAcquisitionConditionDev"
SupplyCloudChainFenxiaoNewChangeFunName = "SupplyCloudChainFenxiaoNewChangeConsume"
MallAddSupplyGoodsFunName = "MallAddSupplyGoodsConsume"
CanalUserVirtualCcoinFlowFunName = "CanalUserVirtualCcoinFlowConsume"
)

+ 38
- 0
consume/md/md_canal_user_virtual_coin_flow.go View File

@@ -0,0 +1,38 @@
package md

type CanalUserVirtualCoinFlowOrder struct {
Id string `json:"id"`
Uid string `json:"uid"`
CoinId string `json:"coin_id"`
Direction string `json:"direction"`
Title string `json:"title"`
OrdId string `json:"ord_id"`
Amout string `json:"amout"`
BeforeAmout string `json:"before_amout"`
AfterAmout string `json:"after_amout"`
SysFee string `json:"sys_fee"`
CreateTime string `json:"create_time"`
CoinIdTo string `json:"coin_id_to"`
TransferType string `json:"transfer_type"`
IsRevoke string `json:"is_revoke"`
TransferId string `json:"transfer_id"`
TransferMoney string `json:"transfer_money"`
ToUid string `json:"to_uid"`
State string `json:"state"`
ThirdPartId string `json:"third_part_id"`
Date string `json:"date"`
Price string `json:"price"`
}

type CanalUserVirtualCoinFlowOrderMessage[T any] struct {
Data []T `json:"data"`
Database string `json:"database"`
ES int64 `json:"es"`
ID int64 `json:"id"`
IsDdl bool `json:"isDdl"`
Old []T `json:"old"`
PkNames []string `json:"pkNames"`
Table string `json:"table"`
TS int64 `json:"ts"`
Type string `json:"type"`
}

Loading…
Cancel
Save