@@ -0,0 +1,44 @@ | |||||
package db | |||||
import ( | |||||
"applet/app/db/model" | |||||
"applet/app/utils" | |||||
"errors" | |||||
"xorm.io/xorm" | |||||
) | |||||
func GetTaskCenterRewardList(sess *xorm.Session, id string) *model.TaskCenterRewardList { | |||||
var data model.TaskCenterRewardList | |||||
get, err := sess.Where("id=?", id).Get(&data) | |||||
if get == false || err != nil { | |||||
return nil | |||||
} | |||||
return &data | |||||
} | |||||
func GetTaskTotal(sess *xorm.Session, uid, date, month, coinId, types, taskType, title, amount string) error { | |||||
var data model.TaskRewardTotal | |||||
get, _ := sess.Where("uid=? and date=? and coin_id=? and type=? and task_type=?", uid, date, coinId, types, taskType).Get(&data) | |||||
if get == false { | |||||
data = model.TaskRewardTotal{ | |||||
Uid: utils.StrToInt(uid), | |||||
Date: utils.StrToInt(date), | |||||
CoinId: utils.StrToInt(coinId), | |||||
Sum: "0", | |||||
Month: utils.StrToInt(month), | |||||
Type: types, | |||||
TaskType: taskType, | |||||
Title: title, | |||||
} | |||||
one, err := sess.InsertOne(&data) | |||||
if one == 0 || err != nil { | |||||
return errors.New("失败") | |||||
} | |||||
} | |||||
data.Sum = utils.Float64ToStrByPrec(utils.StrToFloat64(amount)+utils.StrToFloat64(data.Sum), 9) | |||||
update, err := sess.Where("id=?", data.Id).Cols("sum").Update(&data) | |||||
if update == 0 || err != nil { | |||||
return errors.New("失败") | |||||
} | |||||
return nil | |||||
} |
@@ -0,0 +1,23 @@ | |||||
package model | |||||
type TaskCenterRewardList struct { | |||||
Id int `json:"id" xorm:"not null pk autoincr INT(11)"` | |||||
Uid int `json:"uid" xorm:"default 0 comment('用户id') INT(11)"` | |||||
Type string `json:"type" xorm:"default '' comment('类型 sign签到') VARCHAR(255)"` | |||||
Title string `json:"title" xorm:"default '' comment('任务标题') VARCHAR(255)"` | |||||
IsFinish int `json:"is_finish" xorm:"default 0 comment('是否完成') INT(1)"` | |||||
CreateTime int `json:"create_time" xorm:"default 0 comment('创建时间') INT(11)"` | |||||
FinishTime int `json:"finish_time" xorm:"default 0 comment('完成时间') INT(11)"` | |||||
TaskId int `json:"task_id" xorm:"default 0 comment('任务id') INT(11)"` | |||||
TaskNum int `json:"task_num" xorm:"default 0 comment('任务数量') INT(11)"` | |||||
TaskNumSecond int `json:"task_num_second" xorm:"default 0 comment('任务数量') INT(11)"` | |||||
IsSend int `json:"is_send" xorm:"default 0 comment('任务数量') INT(11)"` | |||||
Oid int64 `json:"oid" xorm:"default 0 comment('任务数量') INT(11)"` | |||||
Reward string `json:"reward" xorm:"default '' comment('奖励 ') VARCHAR(255)"` | |||||
Platform string `json:"platform" xorm:"default '' comment('平台 ') VARCHAR(255)"` | |||||
TaskName string `json:"task_name" xorm:"default '' comment('平台 ') VARCHAR(255)"` | |||||
DeviceModel string `json:"device_model" xorm:"default '' comment('平台 ') VARCHAR(255)"` | |||||
Bili string `json:"bili" xorm:"default '' comment('平台 ') VARCHAR(255)"` | |||||
TaskType string `json:"task_type" xorm:"default '' comment('平台 ') VARCHAR(255)"` | |||||
Price string `json:"price" xorm:"default 0.000000 comment('') DECIMAL(16,6)"` | |||||
} |
@@ -0,0 +1,13 @@ | |||||
package model | |||||
type TaskRewardTotal struct { | |||||
Id int `json:"id" xorm:"not null pk autoincr INT(11)"` | |||||
Uid int `json:"uid" xorm:"default 0 INT(11)"` | |||||
Date int `json:"date" xorm:"default 0 INT(11)"` | |||||
CoinId int `json:"coin_id" xorm:"default 0 INT(11)"` | |||||
Sum string `json:"sum" xorm:"default 0.00000000 DECIMAL(20,8)"` | |||||
Month int `json:"month" xorm:"default 0 INT(11)"` | |||||
Type string `json:"type" xorm:"VARCHAR(255)"` | |||||
TaskType string `json:"task_type" xorm:"VARCHAR(255)"` | |||||
Title string `json:"title" xorm:"VARCHAR(255)"` | |||||
} |
@@ -60,6 +60,7 @@ func initConsumes() { | |||||
jobs[consumeMd.ZhiosCapitalPoolOrderTotalFunName] = ZhiosCapitalPoolOrderTotal | jobs[consumeMd.ZhiosCapitalPoolOrderTotalFunName] = ZhiosCapitalPoolOrderTotal | ||||
jobs[consumeMd.ZhiosExpressOrderFail] = ZhiosExpressOrderFail | jobs[consumeMd.ZhiosExpressOrderFail] = ZhiosExpressOrderFail | ||||
jobs[consumeMd.ZhiosWithdrawReward] = ZhiosWithdrawReward | jobs[consumeMd.ZhiosWithdrawReward] = ZhiosWithdrawReward | ||||
jobs[consumeMd.ZhiosTaskTotal] = ZhiosTaskTotal | |||||
// | // | ||||
@@ -263,6 +263,15 @@ var RabbitMqQueueKeyList = []*MqQueue{ | |||||
BindKey: "", | BindKey: "", | ||||
ConsumeFunName: "zhiosWithdrawReward", | ConsumeFunName: "zhiosWithdrawReward", | ||||
}, | }, | ||||
{ | |||||
ExchangeName: "zhios.task.exchange", | |||||
Name: "zhios_task_total", | |||||
Type: DirectQueueType, | |||||
IsPersistent: false, | |||||
RoutKey: "task_total", | |||||
BindKey: "", | |||||
ConsumeFunName: "zhiosTaskTotal", | |||||
}, | |||||
{ | { | ||||
ExchangeName: "zhios.user_valid.exchange", | ExchangeName: "zhios.user_valid.exchange", | ||||
Name: "zhios_user_valid", | Name: "zhios_user_valid", | ||||
@@ -468,6 +477,7 @@ const ( | |||||
ZhiosCapitalPoolOrderTotalFunName = "ZhiosCapitalPoolOrderTotal" | ZhiosCapitalPoolOrderTotalFunName = "ZhiosCapitalPoolOrderTotal" | ||||
ZhiosExpressOrderFail = "zhiosExpressOrderFail" | ZhiosExpressOrderFail = "zhiosExpressOrderFail" | ||||
ZhiosWithdrawReward = "zhiosWithdrawReward" | ZhiosWithdrawReward = "zhiosWithdrawReward" | ||||
ZhiosTaskTotal = "zhiosTaskTotal" | |||||
ZhiosTikTokUpdateFunName = "ZhiosTikTokUpdate" | ZhiosTikTokUpdateFunName = "ZhiosTikTokUpdate" | ||||
ZhiosTikTokAllUpdateFunName = "ZhiosTikTokAllUpdate" | ZhiosTikTokAllUpdateFunName = "ZhiosTikTokAllUpdate" | ||||
CloudIssuanceAsyncMLoginFunName = "CloudIssuanceAsyncMLoginConsume" | CloudIssuanceAsyncMLoginFunName = "CloudIssuanceAsyncMLoginConsume" | ||||
@@ -0,0 +1,102 @@ | |||||
package consume | |||||
import ( | |||||
"applet/app/db" | |||||
"applet/app/utils" | |||||
"applet/app/utils/logx" | |||||
"applet/consume/md" | |||||
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" | |||||
"encoding/json" | |||||
"errors" | |||||
"fmt" | |||||
"github.com/streadway/amqp" | |||||
"time" | |||||
) | |||||
func ZhiosTaskTotal(queue md.MqQueue) { | |||||
fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>") | |||||
ch, err := rabbit.Cfg.Pool.GetChannel() | |||||
if err != nil { | |||||
logx.Error(err) | |||||
return | |||||
} | |||||
defer ch.Release() | |||||
//1、将自己绑定到交换机上 | |||||
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) | |||||
//2、取出数据进行消费 | |||||
ch.Qos(1) | |||||
delivery := ch.Consume(queue.Name, false) | |||||
var res amqp.Delivery | |||||
var ok bool | |||||
for { | |||||
res, ok = <-delivery | |||||
if ok == true { | |||||
//fmt.Println(string(res.Body)) | |||||
fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<") | |||||
err = handleZhiosTaskTotal(res.Body) | |||||
//_ = res.Reject(false) | |||||
if err == nil { | |||||
_ = res.Ack(true) | |||||
} | |||||
} else { | |||||
panic(errors.New("error getting message")) | |||||
} | |||||
} | |||||
fmt.Println("get msg done") | |||||
} | |||||
func handleZhiosTaskTotal(msg []byte) error { | |||||
//1、解析canal采集至mq中queue的数据结构体 | |||||
var canalMsg *md.ZhiosAcquisition | |||||
fmt.Println(string(msg)) | |||||
var tmpString string | |||||
err := json.Unmarshal(msg, &tmpString) | |||||
if err != nil { | |||||
fmt.Println(err.Error()) | |||||
return err | |||||
} | |||||
fmt.Println(tmpString) | |||||
err = json.Unmarshal([]byte(tmpString), &canalMsg) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
mid := canalMsg.Mid | |||||
eg := db.DBs[mid] | |||||
if eg == nil { | |||||
return nil | |||||
} | |||||
sess := eg.NewSession() | |||||
defer sess.Close() | |||||
sess.Begin() | |||||
list := db.GetTaskCenterRewardList(sess, canalMsg.Id) | |||||
if list == nil { | |||||
sess.Rollback() | |||||
return nil | |||||
} | |||||
if list.IsSend == 1 { | |||||
sess.Rollback() | |||||
return nil | |||||
} | |||||
var tmp = make(map[string]string) | |||||
json.Unmarshal([]byte(list.Reward), &tmp) | |||||
for k, v := range tmp { | |||||
if utils.StrToFloat64(v) == 0 { | |||||
continue | |||||
} | |||||
err := db.GetTaskTotal(sess, utils.IntToStr(list.Uid), time.Unix(int64(list.CreateTime), 0).Format("20060102"), time.Unix(int64(list.CreateTime), 0).Format("200601"), k, list.Type, list.TaskType, list.Title, v) | |||||
if err != nil { | |||||
sess.Rollback() | |||||
return err | |||||
} | |||||
} | |||||
list.IsSend = 1 | |||||
update, err := sess.Where("id=?", list.Id).Cols("is_send").Update(list) | |||||
if update == 0 || err != nil { | |||||
sess.Rollback() | |||||
return errors.New("失败") | |||||
} | |||||
sess.Commit() | |||||
return nil | |||||
} |