From dc144e9cfcceff3ad7f707d2a06efd7b27620b59 Mon Sep 17 00:00:00 2001 From: huangjiajun <582604932@qq.com> Date: Sat, 27 Apr 2024 14:00:02 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/db/db_task_reward.go | 44 ++++++++++ app/db/model/task_center_reward_list.go | 23 ++++++ app/db/model/task_reward_total.go | 13 +++ consume/init.go | 1 + consume/md/consume_key.go | 10 +++ consume/zhios_task_total.go | 102 ++++++++++++++++++++++++ 6 files changed, 193 insertions(+) create mode 100644 app/db/db_task_reward.go create mode 100644 app/db/model/task_center_reward_list.go create mode 100644 app/db/model/task_reward_total.go create mode 100644 consume/zhios_task_total.go diff --git a/app/db/db_task_reward.go b/app/db/db_task_reward.go new file mode 100644 index 0000000..7984d50 --- /dev/null +++ b/app/db/db_task_reward.go @@ -0,0 +1,44 @@ +package db + +import ( + "applet/app/db/model" + "applet/app/utils" + "errors" + "xorm.io/xorm" +) + +func GetTaskCenterRewardList(sess *xorm.Session, id string) *model.TaskCenterRewardList { + var data model.TaskCenterRewardList + get, err := sess.Where("id=?", id).Get(&data) + if get == false || err != nil { + return nil + } + return &data +} + +func GetTaskTotal(sess *xorm.Session, uid, date, month, coinId, types, taskType, title, amount string) error { + var data model.TaskRewardTotal + get, _ := sess.Where("uid=? and date=? and coin_id=? and type=? and task_type=?", uid, date, coinId, types, taskType).Get(&data) + if get == false { + data = model.TaskRewardTotal{ + Uid: utils.StrToInt(uid), + Date: utils.StrToInt(date), + CoinId: utils.StrToInt(coinId), + Sum: "0", + Month: utils.StrToInt(month), + Type: types, + TaskType: taskType, + Title: title, + } + one, err := sess.InsertOne(&data) + if one == 0 || err != nil { + return errors.New("失败") + } + } + data.Sum = utils.Float64ToStrByPrec(utils.StrToFloat64(amount)+utils.StrToFloat64(data.Sum), 9) + update, err := sess.Where("id=?", data.Id).Cols("sum").Update(&data) + if update == 0 || err != nil { + return errors.New("失败") + } + return nil +} diff --git a/app/db/model/task_center_reward_list.go b/app/db/model/task_center_reward_list.go new file mode 100644 index 0000000..2389f67 --- /dev/null +++ b/app/db/model/task_center_reward_list.go @@ -0,0 +1,23 @@ +package model + +type TaskCenterRewardList struct { + Id int `json:"id" xorm:"not null pk autoincr INT(11)"` + Uid int `json:"uid" xorm:"default 0 comment('用户id') INT(11)"` + Type string `json:"type" xorm:"default '' comment('类型 sign签到') VARCHAR(255)"` + Title string `json:"title" xorm:"default '' comment('任务标题') VARCHAR(255)"` + IsFinish int `json:"is_finish" xorm:"default 0 comment('是否完成') INT(1)"` + CreateTime int `json:"create_time" xorm:"default 0 comment('创建时间') INT(11)"` + FinishTime int `json:"finish_time" xorm:"default 0 comment('完成时间') INT(11)"` + TaskId int `json:"task_id" xorm:"default 0 comment('任务id') INT(11)"` + TaskNum int `json:"task_num" xorm:"default 0 comment('任务数量') INT(11)"` + TaskNumSecond int `json:"task_num_second" xorm:"default 0 comment('任务数量') INT(11)"` + IsSend int `json:"is_send" xorm:"default 0 comment('任务数量') INT(11)"` + Oid int64 `json:"oid" xorm:"default 0 comment('任务数量') INT(11)"` + Reward string `json:"reward" xorm:"default '' comment('奖励 ') VARCHAR(255)"` + Platform string `json:"platform" xorm:"default '' comment('平台 ') VARCHAR(255)"` + TaskName string `json:"task_name" xorm:"default '' comment('平台 ') VARCHAR(255)"` + DeviceModel string `json:"device_model" xorm:"default '' comment('平台 ') VARCHAR(255)"` + Bili string `json:"bili" xorm:"default '' comment('平台 ') VARCHAR(255)"` + TaskType string `json:"task_type" xorm:"default '' comment('平台 ') VARCHAR(255)"` + Price string `json:"price" xorm:"default 0.000000 comment('') DECIMAL(16,6)"` +} diff --git a/app/db/model/task_reward_total.go b/app/db/model/task_reward_total.go new file mode 100644 index 0000000..6b618ff --- /dev/null +++ b/app/db/model/task_reward_total.go @@ -0,0 +1,13 @@ +package model + +type TaskRewardTotal struct { + Id int `json:"id" xorm:"not null pk autoincr INT(11)"` + Uid int `json:"uid" xorm:"default 0 INT(11)"` + Date int `json:"date" xorm:"default 0 INT(11)"` + CoinId int `json:"coin_id" xorm:"default 0 INT(11)"` + Sum string `json:"sum" xorm:"default 0.00000000 DECIMAL(20,8)"` + Month int `json:"month" xorm:"default 0 INT(11)"` + Type string `json:"type" xorm:"VARCHAR(255)"` + TaskType string `json:"task_type" xorm:"VARCHAR(255)"` + Title string `json:"title" xorm:"VARCHAR(255)"` +} diff --git a/consume/init.go b/consume/init.go index 8c0f34f..cd220be 100644 --- a/consume/init.go +++ b/consume/init.go @@ -60,6 +60,7 @@ func initConsumes() { jobs[consumeMd.ZhiosCapitalPoolOrderTotalFunName] = ZhiosCapitalPoolOrderTotal jobs[consumeMd.ZhiosExpressOrderFail] = ZhiosExpressOrderFail jobs[consumeMd.ZhiosWithdrawReward] = ZhiosWithdrawReward + jobs[consumeMd.ZhiosTaskTotal] = ZhiosTaskTotal // diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index e44e3e9..53e163e 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -263,6 +263,15 @@ var RabbitMqQueueKeyList = []*MqQueue{ BindKey: "", ConsumeFunName: "zhiosWithdrawReward", }, + { + ExchangeName: "zhios.task.exchange", + Name: "zhios_task_total", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "task_total", + BindKey: "", + ConsumeFunName: "zhiosTaskTotal", + }, { ExchangeName: "zhios.user_valid.exchange", Name: "zhios_user_valid", @@ -468,6 +477,7 @@ const ( ZhiosCapitalPoolOrderTotalFunName = "ZhiosCapitalPoolOrderTotal" ZhiosExpressOrderFail = "zhiosExpressOrderFail" ZhiosWithdrawReward = "zhiosWithdrawReward" + ZhiosTaskTotal = "zhiosTaskTotal" ZhiosTikTokUpdateFunName = "ZhiosTikTokUpdate" ZhiosTikTokAllUpdateFunName = "ZhiosTikTokAllUpdate" CloudIssuanceAsyncMLoginFunName = "CloudIssuanceAsyncMLoginConsume" diff --git a/consume/zhios_task_total.go b/consume/zhios_task_total.go new file mode 100644 index 0000000..9ed65ff --- /dev/null +++ b/consume/zhios_task_total.go @@ -0,0 +1,102 @@ +package consume + +import ( + "applet/app/db" + "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "encoding/json" + "errors" + "fmt" + "github.com/streadway/amqp" + "time" +) + +func ZhiosTaskTotal(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1) + delivery := ch.Consume(queue.Name, false) + + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + //fmt.Println(string(res.Body)) + fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<") + err = handleZhiosTaskTotal(res.Body) + //_ = res.Reject(false) + if err == nil { + _ = res.Ack(true) + } + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleZhiosTaskTotal(msg []byte) error { + //1、解析canal采集至mq中queue的数据结构体 + var canalMsg *md.ZhiosAcquisition + fmt.Println(string(msg)) + var tmpString string + err := json.Unmarshal(msg, &tmpString) + if err != nil { + fmt.Println(err.Error()) + return err + } + fmt.Println(tmpString) + err = json.Unmarshal([]byte(tmpString), &canalMsg) + if err != nil { + return err + } + mid := canalMsg.Mid + eg := db.DBs[mid] + if eg == nil { + return nil + } + sess := eg.NewSession() + defer sess.Close() + sess.Begin() + + list := db.GetTaskCenterRewardList(sess, canalMsg.Id) + if list == nil { + sess.Rollback() + return nil + } + if list.IsSend == 1 { + sess.Rollback() + return nil + } + var tmp = make(map[string]string) + json.Unmarshal([]byte(list.Reward), &tmp) + for k, v := range tmp { + if utils.StrToFloat64(v) == 0 { + continue + } + err := db.GetTaskTotal(sess, utils.IntToStr(list.Uid), time.Unix(int64(list.CreateTime), 0).Format("20060102"), time.Unix(int64(list.CreateTime), 0).Format("200601"), k, list.Type, list.TaskType, list.Title, v) + if err != nil { + sess.Rollback() + return err + } + } + list.IsSend = 1 + update, err := sess.Where("id=?", list.Id).Cols("is_send").Update(list) + if update == 0 || err != nil { + sess.Rollback() + return errors.New("失败") + } + sess.Commit() + return nil +}