@@ -0,0 +1,121 @@ | |||
package db | |||
import ( | |||
"applet/app/db/model" | |||
"applet/app/utils" | |||
"applet/app/utils/logx" | |||
"errors" | |||
"fmt" | |||
"reflect" | |||
"xorm.io/xorm" | |||
) | |||
// BatchSelectUserVirtualCoinFlowAggregations 批量查询数据 TODO::和下面的方法重复了,建议采用下面的 `UserVirtualCoinFlowAggregationFindByParams` 方法 | |||
func BatchSelectUserVirtualCoinFlowAggregations(Db *xorm.Engine, params map[string]interface{}) (*[]model.UserVirtualCoinFlowAggregation, error) { | |||
var UserVirtualCoinFlowAggregationData []model.UserVirtualCoinFlowAggregation | |||
if err := Db.In(utils.AnyToString(params["key"]), params["value"]). | |||
Find(&UserVirtualCoinFlowAggregationData); err != nil { | |||
return nil, logx.Warn(err) | |||
} | |||
return &UserVirtualCoinFlowAggregationData, nil | |||
} | |||
// UserVirtualCoinFlowAggregationInsert 插入单条数据 | |||
func UserVirtualCoinFlowAggregationInsert(Db *xorm.Engine, UserVirtualCoinFlowAggregation *model.UserVirtualCoinFlowAggregation) (int64, error) { | |||
_, err := Db.InsertOne(UserVirtualCoinFlowAggregation) | |||
if err != nil { | |||
return 0, err | |||
} | |||
return UserVirtualCoinFlowAggregation.Id, nil | |||
} | |||
// BatchAddUserVirtualCoinFlowAggregations 批量新增数据 | |||
func BatchAddUserVirtualCoinFlowAggregations(Db *xorm.Engine, UserVirtualCoinFlowAggregationData []*model.UserVirtualCoinFlowAggregation) (int64, error) { | |||
affected, err := Db.Insert(UserVirtualCoinFlowAggregationData) | |||
if err != nil { | |||
return 0, err | |||
} | |||
return affected, nil | |||
} | |||
func GetUserVirtualCoinFlowAggregationCount(Db *xorm.Engine) int { | |||
var UserVirtualCoinFlowAggregation model.UserVirtualCoinFlowAggregation | |||
session := Db.Where("") | |||
count, err := session.Count(&UserVirtualCoinFlowAggregation) | |||
if err != nil { | |||
return 0 | |||
} | |||
return int(count) | |||
} | |||
// UserVirtualCoinFlowAggregationDelete 删除记录 | |||
func UserVirtualCoinFlowAggregationDelete(Db *xorm.Engine, id interface{}) (int64, error) { | |||
if reflect.TypeOf(id).Kind() == reflect.Slice { | |||
return Db.In("id", id).Delete(model.UserVirtualCoinFlowAggregation{}) | |||
} else { | |||
return Db.Where("id = ?", id).Delete(model.UserVirtualCoinFlowAggregation{}) | |||
} | |||
} | |||
// UserVirtualCoinFlowAggregationUpdate 更新记录 | |||
func UserVirtualCoinFlowAggregationUpdate(Db *xorm.Engine, id interface{}, UserVirtualCoinFlowAggregation *model.UserVirtualCoinFlowAggregation, forceColums ...string) (int64, error) { | |||
var ( | |||
affected int64 | |||
err error | |||
) | |||
if forceColums != nil { | |||
affected, err = Db.Where("id=?", id).Cols(forceColums...).Update(UserVirtualCoinFlowAggregation) | |||
} else { | |||
affected, err = Db.Where("id=?", id).Update(UserVirtualCoinFlowAggregation) | |||
} | |||
if err != nil { | |||
return 0, err | |||
} | |||
return affected, nil | |||
} | |||
// UserVirtualCoinFlowAggregationGetOneByParams 通过传入的参数查询数据(单条) | |||
func UserVirtualCoinFlowAggregationGetOneByParams(Db *xorm.Engine, params map[string]interface{}) (*model.UserVirtualCoinFlowAggregation, error) { | |||
var m model.UserVirtualCoinFlowAggregation | |||
var query = fmt.Sprintf("%s =?", params["key"]) | |||
has, err := Db.Where(query, params["value"]).Get(&m) | |||
if err != nil { | |||
return nil, logx.Error(err) | |||
} | |||
if has == false { | |||
return nil, nil | |||
} | |||
return &m, nil | |||
} | |||
// UserVirtualCoinFlowAggregationFindByParams 通过传入的参数查询数据(多条) | |||
func UserVirtualCoinFlowAggregationFindByParams(Db *xorm.Engine, params map[string]interface{}) (*[]model.UserVirtualCoinFlowAggregation, error) { | |||
var m []model.UserVirtualCoinFlowAggregation | |||
if params["value"] == nil { | |||
return nil, errors.New("参数有误") | |||
} | |||
if params["key"] == nil { | |||
//查询全部数据 | |||
err := Db.Find(&m) | |||
if err != nil { | |||
return nil, logx.Error(err) | |||
} | |||
return &m, nil | |||
} else { | |||
if reflect.TypeOf(params["value"]).Kind() == reflect.Slice { | |||
//指定In查询 | |||
if err := Db.In(utils.AnyToString(params["key"]), params["value"]).Find(&m); err != nil { | |||
return nil, logx.Warn(err) | |||
} | |||
return &m, nil | |||
} else { | |||
var query = fmt.Sprintf("%s =?", params["key"]) | |||
err := Db.Where(query, params["value"]).Find(&m) | |||
if err != nil { | |||
return nil, logx.Error(err) | |||
} | |||
return &m, nil | |||
} | |||
} | |||
} |
@@ -0,0 +1,13 @@ | |||
package model | |||
type UserVirtualCoinFlowAggregation struct { | |||
Id int64 `json:"id" xorm:"pk autoincr BIGINT(20)"` | |||
Uid int `json:"uid" xorm:"not null comment('用户id') index INT(11)"` | |||
CoinId int `json:"coin_id" xorm:"not null comment('虚拟币id') INT(11)"` | |||
TodayData string `json:"today_data" xorm:"not null comment('今日数量') DECIMAL(20,8)"` | |||
ThisWeekData string `json:"this_week_data" xorm:"not null comment('本周数量') DECIMAL(20,8)"` | |||
ThisMonthData string `json:"this_month_data" xorm:"not null comment('本月数量') DECIMAL(20,8)"` | |||
NowData string `json:"now_data" xorm:"not null comment('当前数量') DECIMAL(20,8)"` | |||
CreateAt string `json:"create_at" xorm:"not null default 'CURRENT_TIMESTAMP' comment('创建时间') DATETIME"` | |||
UpdateAt string `json:"update_at" xorm:"not null default 'CURRENT_TIMESTAMP' comment('更新时间') DATETIME"` | |||
} |
@@ -20,46 +20,46 @@ func initConsumes() { | |||
//jobs[consumeMd.ZhiosGuideStoreOrderFunName] = ZhiosGuideStoreOrder | |||
// | |||
jobs[consumeMd.ZhiosIntegralProxyRechargeFunName] = ZhiosIntegralProxyRecharge | |||
jobs[consumeMd.ZhiosUserUpLvFunName] = ZhiosUserUpLv | |||
jobs[consumeMd.CanalGuideOrderByUserUpLvConsume] = CanalGuideOrderByUserUpLvConsume | |||
jobs[consumeMd.ZhiosOrderFreeFunName] = ZhiosOrderFree | |||
jobs[consumeMd.ZhiosOrderTotalFunName] = ZhiosOrderTotal | |||
jobs[consumeMd.ZhiosOrderTotalSecondFunName] = ZhiosOrderTotalSecond | |||
//jobs[consumeMd.ZhiosIntegralProxyRechargeFunName] = ZhiosIntegralProxyRecharge | |||
//jobs[consumeMd.ZhiosUserUpLvFunName] = ZhiosUserUpLv | |||
//jobs[consumeMd.CanalGuideOrderByUserUpLvConsume] = CanalGuideOrderByUserUpLvConsume | |||
//jobs[consumeMd.ZhiosOrderFreeFunName] = ZhiosOrderFree | |||
//jobs[consumeMd.ZhiosOrderTotalFunName] = ZhiosOrderTotal | |||
//jobs[consumeMd.ZhiosOrderTotalSecondFunName] = ZhiosOrderTotalSecond | |||
//// | |||
//jobs[consumeMd.ZhiosOrderSettleTotalFunName] = ZhiosSettleTotal | |||
//jobs[consumeMd.ZhiosOrderHjyFunName] = ZhiosOrderHjy | |||
//jobs[consumeMd.ZhiosOrderBuckleFunName] = ZhiosOrderBuckle | |||
//// | |||
//jobs[consumeMd.ZhiosSupplierAfterOrderFunName] = ZhiosSupplierAfterOrder | |||
// | |||
jobs[consumeMd.ZhiosOrderSettleTotalFunName] = ZhiosSettleTotal | |||
jobs[consumeMd.ZhiosOrderHjyFunName] = ZhiosOrderHjy | |||
jobs[consumeMd.ZhiosOrderBuckleFunName] = ZhiosOrderBuckle | |||
//jobs[consumeMd.ZhiosAppreciationFunName] = ZhiosAppreciation | |||
//jobs[consumeMd.ZhiosValidUserFunName] = ZhiosValidUser | |||
// | |||
jobs[consumeMd.ZhiosSupplierAfterOrderFunName] = ZhiosSupplierAfterOrder | |||
jobs[consumeMd.ZhiosAppreciationFunName] = ZhiosAppreciation | |||
jobs[consumeMd.ZhiosValidUserFunName] = ZhiosValidUser | |||
jobs[consumeMd.ZhiosAcquisitionConditionFunName] = ZhiosAcquisitionCondition | |||
jobs[consumeMd.DouShenUserRegisterConsumeForOfficialFunName] = DouShenUserRegisterConsumeForOfficial | |||
jobs[consumeMd.DouShenUserRegisterConsumeForOperationCenterFunName] = DouShenUserRegisterConsumeForOperationCenter | |||
jobs[consumeMd.DouShenUserRegisterConsumeForMyRecommenderFunName] = DouShenUserRegisterConsumeForMyRecommender | |||
jobs[consumeMd.DouShenUserRegisterConsumeForMyFansFunName] = DouShenUserRegisterConsumeForMyFans | |||
jobs[consumeMd.DouShenUserRegisterConsumeForUserRegisterUpLvFunName] = DouShenUserRegisterConsumeForUserRegisterUpLv | |||
jobs[consumeMd.ZhiosFastReturnOrderPayFunName] = ZhiosFastReturnOrderPay | |||
jobs[consumeMd.ZhiosFastReturnOrderSuccessFunName] = ZhiosFastReturnOrderSuccess | |||
jobs[consumeMd.ZhiosFastReturnOrderRefundFunName] = ZhiosFastReturnOrderRefund | |||
jobs[consumeMd.ZhiosFastReturnOrderRefundSecondFunName] = ZhiosFastReturnOrderRefundSecond | |||
jobs[consumeMd.YoumishangExchangeStoreFunName] = YoumishangExchangeStore | |||
jobs[consumeMd.ZhiosRechargeOrderFailFunName] = ZhiosRechargeOrderFail | |||
jobs[consumeMd.CloudIssuanceAsyncMLoginFunName] = CloudIssuanceAsyncMLoginConsume | |||
jobs[consumeMd.ZhiosTikTokUpdateFunName] = ZhiosTikTokUpdate | |||
jobs[consumeMd.ZhiosTikTokAllUpdateFunName] = ZhiosTikTokAllUpdate | |||
jobs[consumeMd.ZhiosCapitalPoolOrderTotalFunName] = ZhiosCapitalPoolOrderTotal | |||
jobs[consumeMd.ZhiosExpressOrderFail] = ZhiosExpressOrderFail | |||
jobs[consumeMd.ZhiosWithdrawReward] = ZhiosWithdrawReward | |||
//jobs[consumeMd.ZhiosAcquisitionConditionFunName] = ZhiosAcquisitionCondition | |||
// | |||
//jobs[consumeMd.DouShenUserRegisterConsumeForOfficialFunName] = DouShenUserRegisterConsumeForOfficial | |||
//jobs[consumeMd.DouShenUserRegisterConsumeForOperationCenterFunName] = DouShenUserRegisterConsumeForOperationCenter | |||
//jobs[consumeMd.DouShenUserRegisterConsumeForMyRecommenderFunName] = DouShenUserRegisterConsumeForMyRecommender | |||
//jobs[consumeMd.DouShenUserRegisterConsumeForMyFansFunName] = DouShenUserRegisterConsumeForMyFans | |||
//jobs[consumeMd.DouShenUserRegisterConsumeForUserRegisterUpLvFunName] = DouShenUserRegisterConsumeForUserRegisterUpLv | |||
// | |||
//jobs[consumeMd.ZhiosFastReturnOrderPayFunName] = ZhiosFastReturnOrderPay | |||
//jobs[consumeMd.ZhiosFastReturnOrderSuccessFunName] = ZhiosFastReturnOrderSuccess | |||
//jobs[consumeMd.ZhiosFastReturnOrderRefundFunName] = ZhiosFastReturnOrderRefund | |||
//jobs[consumeMd.ZhiosFastReturnOrderRefundSecondFunName] = ZhiosFastReturnOrderRefundSecond | |||
// | |||
//jobs[consumeMd.YoumishangExchangeStoreFunName] = YoumishangExchangeStore | |||
// | |||
//jobs[consumeMd.ZhiosRechargeOrderFailFunName] = ZhiosRechargeOrderFail | |||
// | |||
//jobs[consumeMd.CloudIssuanceAsyncMLoginFunName] = CloudIssuanceAsyncMLoginConsume | |||
//jobs[consumeMd.ZhiosTikTokUpdateFunName] = ZhiosTikTokUpdate | |||
//jobs[consumeMd.ZhiosTikTokAllUpdateFunName] = ZhiosTikTokAllUpdate | |||
// | |||
//jobs[consumeMd.ZhiosCapitalPoolOrderTotalFunName] = ZhiosCapitalPoolOrderTotal | |||
//jobs[consumeMd.ZhiosExpressOrderFail] = ZhiosExpressOrderFail | |||
//jobs[consumeMd.ZhiosWithdrawReward] = ZhiosWithdrawReward | |||
// | |||
@@ -86,7 +86,8 @@ func initConsumes() { | |||
//////////////////////////////////////// withdraw ///////////////////////////////////////////////////// | |||
//jobs[consumeMd.WithdrawConsumeFunName] = WithdrawConsume | |||
//jobs[consumeMd.ZhiosMallGreenCoinConsumeFunName] = ZhiosMallGreenCoinConsume //绿色双链积分 | |||
//jobs[consumeMd.ZhiosMallGreenCoinConsumeFunName] = ZhiosMallGreenCoinConsume //绿色双链积分 | |||
jobs[consumeMd.ZhiosOneCirclesCoinConsumeFunName] = ZhiosOneCirclesCoinConsume //一个圈圈虚拟币变化 | |||
} | |||
func Run() { | |||
@@ -389,6 +389,15 @@ var RabbitMqQueueKeyList = []*MqQueue{ | |||
BindKey: "", | |||
ConsumeFunName: "ZhiosMallGreenCoinConsume", | |||
}, | |||
{ | |||
ExchangeName: "canal.topic", | |||
Name: "user_virtual_coin_flow_aggregation", | |||
Type: TopicQueueType, | |||
IsPersistent: false, | |||
RoutKey: "canal_user_virtual_coin_flow_aggregation", | |||
BindKey: "", | |||
ConsumeFunName: "ZhiosOneCirclesCoinConsume", | |||
}, | |||
{ | |||
ExchangeName: "canal.topic", | |||
Name: "canal_user_virtual_coin_flow", | |||
@@ -440,6 +449,7 @@ const ( | |||
ZhiosUserRelateFunName = "ZhiosUserRelate" | |||
ZhiosIntegralProxyRechargeFunName = "ZhiosIntegralProxyRecharge" | |||
ZhiosMallGreenCoinConsumeFunName = "ZhiosMallGreenCoinConsume" | |||
ZhiosOneCirclesCoinConsumeFunName = "ZhiosOneCirclesCoinConsume" | |||
ZhiosUserUpLvFunName = "ZhiosUserUpLv" | |||
CanalGuideOrderByUserUpLvConsume = "CanalGuideOrderByUserUpLvConsume" | |||
ZhiosOrderFreeFunName = "ZhiosOrderFree" | |||
@@ -0,0 +1,120 @@ | |||
package consume | |||
import ( | |||
"applet/app/db" | |||
"applet/app/db/model" | |||
"applet/app/utils" | |||
"applet/app/utils/logx" | |||
"applet/consume/md" | |||
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" | |||
"encoding/json" | |||
"errors" | |||
"fmt" | |||
"github.com/shopspring/decimal" | |||
"github.com/streadway/amqp" | |||
"strings" | |||
"time" | |||
) | |||
func ZhiosOneCirclesCoinConsume(queue md.MqQueue) { | |||
fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>") | |||
ch, err := rabbit.Cfg.Pool.GetChannel() | |||
if err != nil { | |||
logx.Error(err) | |||
return | |||
} | |||
defer ch.Release() | |||
//1、将自己绑定到交换机上 | |||
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) | |||
//2、取出数据进行消费 | |||
ch.Qos(50) | |||
delivery := ch.Consume(queue.Name, false) | |||
var res amqp.Delivery | |||
var ok bool | |||
for { | |||
res, ok = <-delivery | |||
if ok == true { | |||
//fmt.Println(string(res.Body)) | |||
fmt.Println(">>>>>>>>>>>>>>>>ZhiosOneCirclesCoinConsume<<<<<<<<<<<<<<<<<<<<<<<<<") | |||
err = handleZhiosOneCirclesCoinConsume(res.Body) | |||
if err != nil { | |||
fmt.Println("handleZhiosOneCirclesCoinConsume:::::", err.Error()) | |||
} | |||
//_ = res.Reject(false) | |||
err = res.Ack(true) | |||
fmt.Println("err ::: ", err) | |||
} else { | |||
panic(errors.New("error getting message")) | |||
} | |||
} | |||
fmt.Println("get msg done") | |||
} | |||
func handleZhiosOneCirclesCoinConsume(msg []byte) error { | |||
//1、解析canal采集至mq中queue的数据结构体 | |||
var canalMsg *md.CanalUserVirtualCoinFlowOrderMessage[md.CanalUserVirtualCoinFlowOrder] | |||
err := json.Unmarshal(msg, &canalMsg) | |||
if err != nil { | |||
return err | |||
} | |||
masterId := strings.Split(canalMsg.Database, "_")[1] | |||
if masterId != "31585332" { | |||
return nil | |||
} | |||
engine := db.DBs[masterId] | |||
now := time.Now() | |||
if canalMsg.Type == md.CanalMsgInsertSqlType { | |||
////2、查找 one_circles_green_energy_basic_setting 基础设置 | |||
//userPublicPlatoonDoubleNetworkSetting, err := db.UserPublicPlatoonDoubleNetworkSettingGetOneByParams(engine, map[string]interface{}{ | |||
// "key": "is_open", | |||
// "value": 1, | |||
//}) | |||
//if err != nil { | |||
// return err | |||
//} | |||
if canalMsg.Data[0].CoinId == utils.IntToStr(7) { //TODO::数据量太大,减少查询直接写死 | |||
//3、查找 user_public_platoon_double_network_user_coin_record | |||
userVirtualCoinFlowAggregation, err1 := db.UserVirtualCoinFlowAggregationGetOneByParams(engine, map[string]interface{}{ | |||
"key": "uid", | |||
"value": canalMsg.Data[0].Uid, | |||
}) | |||
if err1 != nil { | |||
return err1 | |||
} | |||
if userVirtualCoinFlowAggregation == nil { | |||
//新增记录 | |||
_, err3 := db.UserVirtualCoinFlowAggregationInsert(engine, &model.UserVirtualCoinFlowAggregation{ | |||
Uid: utils.StrToInt(canalMsg.Data[0].Uid), | |||
CoinId: utils.StrToInt(canalMsg.Data[0].CoinId), | |||
TodayData: canalMsg.Data[0].Amout, | |||
ThisWeekData: canalMsg.Data[0].Amout, | |||
ThisMonthData: canalMsg.Data[0].Amout, | |||
NowData: canalMsg.Data[0].Amout, | |||
CreateAt: now.Format("2006-01-02 15:04:05"), | |||
UpdateAt: now.Format("2006-01-02 15:04:05"), | |||
}) | |||
if err3 != nil { | |||
return err3 | |||
} | |||
} else { | |||
//更新记录 | |||
amount, _ := decimal.NewFromString(canalMsg.Data[0].Amout) | |||
todayData, _ := decimal.NewFromString(userVirtualCoinFlowAggregation.TodayData) | |||
thisWeekData, _ := decimal.NewFromString(userVirtualCoinFlowAggregation.ThisWeekData) | |||
thisMonthData, _ := decimal.NewFromString(userVirtualCoinFlowAggregation.ThisMonthData) | |||
userVirtualCoinFlowAggregation.TodayData = todayData.Add(amount).String() | |||
userVirtualCoinFlowAggregation.ThisWeekData = thisWeekData.Add(amount).String() | |||
userVirtualCoinFlowAggregation.ThisMonthData = thisMonthData.Add(amount).String() | |||
userVirtualCoinFlowAggregation.NowData = canalMsg.Data[0].AfterAmout | |||
_, err2 := db.UserVirtualCoinFlowAggregationUpdate(engine, userVirtualCoinFlowAggregation.Id, userVirtualCoinFlowAggregation, "today_data", "this_week_data", "this_month_data", "now_data") | |||
if err2 != nil { | |||
return err2 | |||
} | |||
} | |||
} | |||
} | |||
return nil | |||
} |