diff --git a/app/db/db_user_virtual_coin_flow_aggregation.go b/app/db/db_user_virtual_coin_flow_aggregation.go new file mode 100644 index 0000000..0878fe5 --- /dev/null +++ b/app/db/db_user_virtual_coin_flow_aggregation.go @@ -0,0 +1,121 @@ +package db + +import ( + "applet/app/db/model" + "applet/app/utils" + "applet/app/utils/logx" + "errors" + "fmt" + "reflect" + "xorm.io/xorm" +) + +// BatchSelectUserVirtualCoinFlowAggregations 批量查询数据 TODO::和下面的方法重复了,建议采用下面的 `UserVirtualCoinFlowAggregationFindByParams` 方法 +func BatchSelectUserVirtualCoinFlowAggregations(Db *xorm.Engine, params map[string]interface{}) (*[]model.UserVirtualCoinFlowAggregation, error) { + var UserVirtualCoinFlowAggregationData []model.UserVirtualCoinFlowAggregation + if err := Db.In(utils.AnyToString(params["key"]), params["value"]). + Find(&UserVirtualCoinFlowAggregationData); err != nil { + return nil, logx.Warn(err) + } + return &UserVirtualCoinFlowAggregationData, nil +} + +// UserVirtualCoinFlowAggregationInsert 插入单条数据 +func UserVirtualCoinFlowAggregationInsert(Db *xorm.Engine, UserVirtualCoinFlowAggregation *model.UserVirtualCoinFlowAggregation) (int64, error) { + _, err := Db.InsertOne(UserVirtualCoinFlowAggregation) + if err != nil { + return 0, err + } + return UserVirtualCoinFlowAggregation.Id, nil +} + +// BatchAddUserVirtualCoinFlowAggregations 批量新增数据 +func BatchAddUserVirtualCoinFlowAggregations(Db *xorm.Engine, UserVirtualCoinFlowAggregationData []*model.UserVirtualCoinFlowAggregation) (int64, error) { + affected, err := Db.Insert(UserVirtualCoinFlowAggregationData) + if err != nil { + return 0, err + } + return affected, nil +} + +func GetUserVirtualCoinFlowAggregationCount(Db *xorm.Engine) int { + var UserVirtualCoinFlowAggregation model.UserVirtualCoinFlowAggregation + session := Db.Where("") + count, err := session.Count(&UserVirtualCoinFlowAggregation) + if err != nil { + return 0 + } + return int(count) +} + +// UserVirtualCoinFlowAggregationDelete 删除记录 +func UserVirtualCoinFlowAggregationDelete(Db *xorm.Engine, id interface{}) (int64, error) { + if reflect.TypeOf(id).Kind() == reflect.Slice { + return Db.In("id", id).Delete(model.UserVirtualCoinFlowAggregation{}) + } else { + return Db.Where("id = ?", id).Delete(model.UserVirtualCoinFlowAggregation{}) + } +} + +// UserVirtualCoinFlowAggregationUpdate 更新记录 +func UserVirtualCoinFlowAggregationUpdate(Db *xorm.Engine, id interface{}, UserVirtualCoinFlowAggregation *model.UserVirtualCoinFlowAggregation, forceColums ...string) (int64, error) { + var ( + affected int64 + err error + ) + if forceColums != nil { + affected, err = Db.Where("id=?", id).Cols(forceColums...).Update(UserVirtualCoinFlowAggregation) + } else { + affected, err = Db.Where("id=?", id).Update(UserVirtualCoinFlowAggregation) + } + if err != nil { + return 0, err + } + return affected, nil +} + +// UserVirtualCoinFlowAggregationGetOneByParams 通过传入的参数查询数据(单条) +func UserVirtualCoinFlowAggregationGetOneByParams(Db *xorm.Engine, params map[string]interface{}) (*model.UserVirtualCoinFlowAggregation, error) { + var m model.UserVirtualCoinFlowAggregation + var query = fmt.Sprintf("%s =?", params["key"]) + has, err := Db.Where(query, params["value"]).Get(&m) + if err != nil { + return nil, logx.Error(err) + } + if has == false { + return nil, nil + } + return &m, nil +} + +// UserVirtualCoinFlowAggregationFindByParams 通过传入的参数查询数据(多条) +func UserVirtualCoinFlowAggregationFindByParams(Db *xorm.Engine, params map[string]interface{}) (*[]model.UserVirtualCoinFlowAggregation, error) { + var m []model.UserVirtualCoinFlowAggregation + if params["value"] == nil { + return nil, errors.New("参数有误") + } + if params["key"] == nil { + //查询全部数据 + err := Db.Find(&m) + if err != nil { + return nil, logx.Error(err) + } + return &m, nil + } else { + if reflect.TypeOf(params["value"]).Kind() == reflect.Slice { + //指定In查询 + if err := Db.In(utils.AnyToString(params["key"]), params["value"]).Find(&m); err != nil { + return nil, logx.Warn(err) + } + return &m, nil + } else { + var query = fmt.Sprintf("%s =?", params["key"]) + err := Db.Where(query, params["value"]).Find(&m) + if err != nil { + return nil, logx.Error(err) + } + return &m, nil + } + + } +} diff --git a/app/db/model/user_virtual_coin_flow_aggregation.go b/app/db/model/user_virtual_coin_flow_aggregation.go new file mode 100644 index 0000000..027884c --- /dev/null +++ b/app/db/model/user_virtual_coin_flow_aggregation.go @@ -0,0 +1,13 @@ +package model + +type UserVirtualCoinFlowAggregation struct { + Id int64 `json:"id" xorm:"pk autoincr BIGINT(20)"` + Uid int `json:"uid" xorm:"not null comment('用户id') index INT(11)"` + CoinId int `json:"coin_id" xorm:"not null comment('虚拟币id') INT(11)"` + TodayData string `json:"today_data" xorm:"not null comment('今日数量') DECIMAL(20,8)"` + ThisWeekData string `json:"this_week_data" xorm:"not null comment('本周数量') DECIMAL(20,8)"` + ThisMonthData string `json:"this_month_data" xorm:"not null comment('本月数量') DECIMAL(20,8)"` + NowData string `json:"now_data" xorm:"not null comment('当前数量') DECIMAL(20,8)"` + CreateAt string `json:"create_at" xorm:"not null default 'CURRENT_TIMESTAMP' comment('创建时间') DATETIME"` + UpdateAt string `json:"update_at" xorm:"not null default 'CURRENT_TIMESTAMP' comment('更新时间') DATETIME"` +} diff --git a/consume/init.go b/consume/init.go index 8c0f34f..e09dbf0 100644 --- a/consume/init.go +++ b/consume/init.go @@ -20,46 +20,46 @@ func initConsumes() { //jobs[consumeMd.ZhiosGuideStoreOrderFunName] = ZhiosGuideStoreOrder // - jobs[consumeMd.ZhiosIntegralProxyRechargeFunName] = ZhiosIntegralProxyRecharge - jobs[consumeMd.ZhiosUserUpLvFunName] = ZhiosUserUpLv - jobs[consumeMd.CanalGuideOrderByUserUpLvConsume] = CanalGuideOrderByUserUpLvConsume - jobs[consumeMd.ZhiosOrderFreeFunName] = ZhiosOrderFree - jobs[consumeMd.ZhiosOrderTotalFunName] = ZhiosOrderTotal - jobs[consumeMd.ZhiosOrderTotalSecondFunName] = ZhiosOrderTotalSecond + //jobs[consumeMd.ZhiosIntegralProxyRechargeFunName] = ZhiosIntegralProxyRecharge + //jobs[consumeMd.ZhiosUserUpLvFunName] = ZhiosUserUpLv + //jobs[consumeMd.CanalGuideOrderByUserUpLvConsume] = CanalGuideOrderByUserUpLvConsume + //jobs[consumeMd.ZhiosOrderFreeFunName] = ZhiosOrderFree + //jobs[consumeMd.ZhiosOrderTotalFunName] = ZhiosOrderTotal + //jobs[consumeMd.ZhiosOrderTotalSecondFunName] = ZhiosOrderTotalSecond + //// + //jobs[consumeMd.ZhiosOrderSettleTotalFunName] = ZhiosSettleTotal + //jobs[consumeMd.ZhiosOrderHjyFunName] = ZhiosOrderHjy + //jobs[consumeMd.ZhiosOrderBuckleFunName] = ZhiosOrderBuckle + //// + //jobs[consumeMd.ZhiosSupplierAfterOrderFunName] = ZhiosSupplierAfterOrder // - jobs[consumeMd.ZhiosOrderSettleTotalFunName] = ZhiosSettleTotal - jobs[consumeMd.ZhiosOrderHjyFunName] = ZhiosOrderHjy - jobs[consumeMd.ZhiosOrderBuckleFunName] = ZhiosOrderBuckle + //jobs[consumeMd.ZhiosAppreciationFunName] = ZhiosAppreciation + //jobs[consumeMd.ZhiosValidUserFunName] = ZhiosValidUser // - jobs[consumeMd.ZhiosSupplierAfterOrderFunName] = ZhiosSupplierAfterOrder - - jobs[consumeMd.ZhiosAppreciationFunName] = ZhiosAppreciation - jobs[consumeMd.ZhiosValidUserFunName] = ZhiosValidUser - - jobs[consumeMd.ZhiosAcquisitionConditionFunName] = ZhiosAcquisitionCondition - - jobs[consumeMd.DouShenUserRegisterConsumeForOfficialFunName] = DouShenUserRegisterConsumeForOfficial - jobs[consumeMd.DouShenUserRegisterConsumeForOperationCenterFunName] = DouShenUserRegisterConsumeForOperationCenter - jobs[consumeMd.DouShenUserRegisterConsumeForMyRecommenderFunName] = DouShenUserRegisterConsumeForMyRecommender - jobs[consumeMd.DouShenUserRegisterConsumeForMyFansFunName] = DouShenUserRegisterConsumeForMyFans - jobs[consumeMd.DouShenUserRegisterConsumeForUserRegisterUpLvFunName] = DouShenUserRegisterConsumeForUserRegisterUpLv - - jobs[consumeMd.ZhiosFastReturnOrderPayFunName] = ZhiosFastReturnOrderPay - jobs[consumeMd.ZhiosFastReturnOrderSuccessFunName] = ZhiosFastReturnOrderSuccess - jobs[consumeMd.ZhiosFastReturnOrderRefundFunName] = ZhiosFastReturnOrderRefund - jobs[consumeMd.ZhiosFastReturnOrderRefundSecondFunName] = ZhiosFastReturnOrderRefundSecond - - jobs[consumeMd.YoumishangExchangeStoreFunName] = YoumishangExchangeStore - - jobs[consumeMd.ZhiosRechargeOrderFailFunName] = ZhiosRechargeOrderFail - - jobs[consumeMd.CloudIssuanceAsyncMLoginFunName] = CloudIssuanceAsyncMLoginConsume - jobs[consumeMd.ZhiosTikTokUpdateFunName] = ZhiosTikTokUpdate - jobs[consumeMd.ZhiosTikTokAllUpdateFunName] = ZhiosTikTokAllUpdate - - jobs[consumeMd.ZhiosCapitalPoolOrderTotalFunName] = ZhiosCapitalPoolOrderTotal - jobs[consumeMd.ZhiosExpressOrderFail] = ZhiosExpressOrderFail - jobs[consumeMd.ZhiosWithdrawReward] = ZhiosWithdrawReward + //jobs[consumeMd.ZhiosAcquisitionConditionFunName] = ZhiosAcquisitionCondition + // + //jobs[consumeMd.DouShenUserRegisterConsumeForOfficialFunName] = DouShenUserRegisterConsumeForOfficial + //jobs[consumeMd.DouShenUserRegisterConsumeForOperationCenterFunName] = DouShenUserRegisterConsumeForOperationCenter + //jobs[consumeMd.DouShenUserRegisterConsumeForMyRecommenderFunName] = DouShenUserRegisterConsumeForMyRecommender + //jobs[consumeMd.DouShenUserRegisterConsumeForMyFansFunName] = DouShenUserRegisterConsumeForMyFans + //jobs[consumeMd.DouShenUserRegisterConsumeForUserRegisterUpLvFunName] = DouShenUserRegisterConsumeForUserRegisterUpLv + // + //jobs[consumeMd.ZhiosFastReturnOrderPayFunName] = ZhiosFastReturnOrderPay + //jobs[consumeMd.ZhiosFastReturnOrderSuccessFunName] = ZhiosFastReturnOrderSuccess + //jobs[consumeMd.ZhiosFastReturnOrderRefundFunName] = ZhiosFastReturnOrderRefund + //jobs[consumeMd.ZhiosFastReturnOrderRefundSecondFunName] = ZhiosFastReturnOrderRefundSecond + // + //jobs[consumeMd.YoumishangExchangeStoreFunName] = YoumishangExchangeStore + // + //jobs[consumeMd.ZhiosRechargeOrderFailFunName] = ZhiosRechargeOrderFail + // + //jobs[consumeMd.CloudIssuanceAsyncMLoginFunName] = CloudIssuanceAsyncMLoginConsume + //jobs[consumeMd.ZhiosTikTokUpdateFunName] = ZhiosTikTokUpdate + //jobs[consumeMd.ZhiosTikTokAllUpdateFunName] = ZhiosTikTokAllUpdate + // + //jobs[consumeMd.ZhiosCapitalPoolOrderTotalFunName] = ZhiosCapitalPoolOrderTotal + //jobs[consumeMd.ZhiosExpressOrderFail] = ZhiosExpressOrderFail + //jobs[consumeMd.ZhiosWithdrawReward] = ZhiosWithdrawReward // @@ -86,7 +86,8 @@ func initConsumes() { //////////////////////////////////////// withdraw ///////////////////////////////////////////////////// //jobs[consumeMd.WithdrawConsumeFunName] = WithdrawConsume - //jobs[consumeMd.ZhiosMallGreenCoinConsumeFunName] = ZhiosMallGreenCoinConsume //绿色双链积分 + //jobs[consumeMd.ZhiosMallGreenCoinConsumeFunName] = ZhiosMallGreenCoinConsume //绿色双链积分 + jobs[consumeMd.ZhiosOneCirclesCoinConsumeFunName] = ZhiosOneCirclesCoinConsume //一个圈圈虚拟币变化 } func Run() { diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index e44e3e9..6d2513e 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -389,6 +389,15 @@ var RabbitMqQueueKeyList = []*MqQueue{ BindKey: "", ConsumeFunName: "ZhiosMallGreenCoinConsume", }, + { + ExchangeName: "canal.topic", + Name: "user_virtual_coin_flow_aggregation", + Type: TopicQueueType, + IsPersistent: false, + RoutKey: "canal_user_virtual_coin_flow_aggregation", + BindKey: "", + ConsumeFunName: "ZhiosOneCirclesCoinConsume", + }, { ExchangeName: "canal.topic", Name: "canal_user_virtual_coin_flow", @@ -440,6 +449,7 @@ const ( ZhiosUserRelateFunName = "ZhiosUserRelate" ZhiosIntegralProxyRechargeFunName = "ZhiosIntegralProxyRecharge" ZhiosMallGreenCoinConsumeFunName = "ZhiosMallGreenCoinConsume" + ZhiosOneCirclesCoinConsumeFunName = "ZhiosOneCirclesCoinConsume" ZhiosUserUpLvFunName = "ZhiosUserUpLv" CanalGuideOrderByUserUpLvConsume = "CanalGuideOrderByUserUpLvConsume" ZhiosOrderFreeFunName = "ZhiosOrderFree" diff --git a/consume/zhios_one_circles_coin_consume.go b/consume/zhios_one_circles_coin_consume.go new file mode 100644 index 0000000..e90b10f --- /dev/null +++ b/consume/zhios_one_circles_coin_consume.go @@ -0,0 +1,120 @@ +package consume + +import ( + "applet/app/db" + "applet/app/db/model" + "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "encoding/json" + "errors" + "fmt" + "github.com/shopspring/decimal" + "github.com/streadway/amqp" + "strings" + "time" +) + +func ZhiosOneCirclesCoinConsume(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(50) + delivery := ch.Consume(queue.Name, false) + + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + //fmt.Println(string(res.Body)) + fmt.Println(">>>>>>>>>>>>>>>>ZhiosOneCirclesCoinConsume<<<<<<<<<<<<<<<<<<<<<<<<<") + err = handleZhiosOneCirclesCoinConsume(res.Body) + if err != nil { + fmt.Println("handleZhiosOneCirclesCoinConsume:::::", err.Error()) + } + //_ = res.Reject(false) + err = res.Ack(true) + fmt.Println("err ::: ", err) + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleZhiosOneCirclesCoinConsume(msg []byte) error { + //1、解析canal采集至mq中queue的数据结构体 + var canalMsg *md.CanalUserVirtualCoinFlowOrderMessage[md.CanalUserVirtualCoinFlowOrder] + err := json.Unmarshal(msg, &canalMsg) + if err != nil { + return err + } + masterId := strings.Split(canalMsg.Database, "_")[1] + if masterId != "31585332" { + return nil + } + engine := db.DBs[masterId] + now := time.Now() + + if canalMsg.Type == md.CanalMsgInsertSqlType { + ////2、查找 one_circles_green_energy_basic_setting 基础设置 + //userPublicPlatoonDoubleNetworkSetting, err := db.UserPublicPlatoonDoubleNetworkSettingGetOneByParams(engine, map[string]interface{}{ + // "key": "is_open", + // "value": 1, + //}) + //if err != nil { + // return err + //} + + if canalMsg.Data[0].CoinId == utils.IntToStr(7) { //TODO::数据量太大,减少查询直接写死 + //3、查找 user_public_platoon_double_network_user_coin_record + userVirtualCoinFlowAggregation, err1 := db.UserVirtualCoinFlowAggregationGetOneByParams(engine, map[string]interface{}{ + "key": "uid", + "value": canalMsg.Data[0].Uid, + }) + if err1 != nil { + return err1 + } + if userVirtualCoinFlowAggregation == nil { + //新增记录 + _, err3 := db.UserVirtualCoinFlowAggregationInsert(engine, &model.UserVirtualCoinFlowAggregation{ + Uid: utils.StrToInt(canalMsg.Data[0].Uid), + CoinId: utils.StrToInt(canalMsg.Data[0].CoinId), + TodayData: canalMsg.Data[0].Amout, + ThisWeekData: canalMsg.Data[0].Amout, + ThisMonthData: canalMsg.Data[0].Amout, + NowData: canalMsg.Data[0].Amout, + CreateAt: now.Format("2006-01-02 15:04:05"), + UpdateAt: now.Format("2006-01-02 15:04:05"), + }) + if err3 != nil { + return err3 + } + } else { + //更新记录 + amount, _ := decimal.NewFromString(canalMsg.Data[0].Amout) + todayData, _ := decimal.NewFromString(userVirtualCoinFlowAggregation.TodayData) + thisWeekData, _ := decimal.NewFromString(userVirtualCoinFlowAggregation.ThisWeekData) + thisMonthData, _ := decimal.NewFromString(userVirtualCoinFlowAggregation.ThisMonthData) + userVirtualCoinFlowAggregation.TodayData = todayData.Add(amount).String() + userVirtualCoinFlowAggregation.ThisWeekData = thisWeekData.Add(amount).String() + userVirtualCoinFlowAggregation.ThisMonthData = thisMonthData.Add(amount).String() + userVirtualCoinFlowAggregation.NowData = canalMsg.Data[0].AfterAmout + _, err2 := db.UserVirtualCoinFlowAggregationUpdate(engine, userVirtualCoinFlowAggregation.Id, userVirtualCoinFlowAggregation, "today_data", "this_week_data", "this_month_data", "now_data") + if err2 != nil { + return err2 + } + } + } + } + return nil +}