|
|
@@ -0,0 +1,92 @@ |
|
|
|
package consume |
|
|
|
|
|
|
|
import ( |
|
|
|
"applet/app/db" |
|
|
|
"applet/app/db/model" |
|
|
|
"applet/app/utils" |
|
|
|
"applet/app/utils/logx" |
|
|
|
"applet/consume/md" |
|
|
|
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" |
|
|
|
"encoding/json" |
|
|
|
"errors" |
|
|
|
"fmt" |
|
|
|
"github.com/streadway/amqp" |
|
|
|
"strings" |
|
|
|
"time" |
|
|
|
) |
|
|
|
|
|
|
|
func CancalUserRelateConsume(queue md.MqQueue) { |
|
|
|
fmt.Println(">>>>>>>>>>>>CancalUserRelateConsume>>>>>>>>>>>>") |
|
|
|
ch, err := rabbit.Cfg.Pool.GetChannel() |
|
|
|
if err != nil { |
|
|
|
logx.Error(err) |
|
|
|
return |
|
|
|
} |
|
|
|
defer ch.Release() |
|
|
|
//1、将自己绑定到交换机上 |
|
|
|
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) |
|
|
|
//2、取出数据进行消费 |
|
|
|
ch.Qos(1) |
|
|
|
delivery := ch.Consume(queue.Name, false) |
|
|
|
|
|
|
|
var res amqp.Delivery |
|
|
|
var ok bool |
|
|
|
for { |
|
|
|
res, ok = <-delivery |
|
|
|
if ok == true { |
|
|
|
//fmt.Println(string(res.Body)) |
|
|
|
fmt.Println(">>>>>>>>>>>>>>>>CancalUserRelateConsume<<<<<<<<<<<<<<<<<<<<<<<<<") |
|
|
|
err = handleCancalUserRelateConsume(res.Body) |
|
|
|
if err != nil { |
|
|
|
fmt.Println("CancalUserRelateConsume_ERR:::::", err.Error()) |
|
|
|
} |
|
|
|
//_ = res.Reject(false) |
|
|
|
err = res.Ack(true) |
|
|
|
fmt.Println("err ::: ", err) |
|
|
|
} else { |
|
|
|
panic(errors.New("error getting message")) |
|
|
|
} |
|
|
|
} |
|
|
|
fmt.Println("get msg done") |
|
|
|
} |
|
|
|
|
|
|
|
func handleCancalUserRelateConsume(msg []byte) error { |
|
|
|
//1、解析canal采集至mq中queue的数据结构体 |
|
|
|
var canalMsg *md.CanalUserRelateMessage[md.CanalUserRelate] |
|
|
|
err := json.Unmarshal(msg, &canalMsg) |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
masterId := strings.Split(canalMsg.Database, "_")[1] |
|
|
|
if masterId != "15763466" { |
|
|
|
return nil |
|
|
|
} |
|
|
|
engine := db.DBs["123456"] |
|
|
|
// |
|
|
|
|
|
|
|
if canalMsg.Type == md.CanalMsgInsertSqlType { |
|
|
|
var data1 model.UserExtendTotal |
|
|
|
date := time.Unix(utils.TimeStdParseUnix(canalMsg.Data[0].InviteTime), 0).Format("20060102") |
|
|
|
month := time.Unix(utils.TimeStdParseUnix(canalMsg.Data[0].InviteTime), 0).Format("200601") |
|
|
|
engine.Where("uid=? and date=?", canalMsg.Data[0].ParentUid, date).Get(&data1) |
|
|
|
if data1.Id == 0 { |
|
|
|
data1 = model.UserExtendTotal{ |
|
|
|
Uid: utils.StrToInt(canalMsg.Data[0].ParentUid), |
|
|
|
Date: utils.StrToInt(date), |
|
|
|
Month: utils.StrToInt(month), |
|
|
|
Count: 0, |
|
|
|
TeamCount: 0, |
|
|
|
} |
|
|
|
engine.Insert(&data1) |
|
|
|
} |
|
|
|
if utils.StrToInt(canalMsg.Data[0].Level) == 1 { |
|
|
|
data1.Count++ |
|
|
|
} else { |
|
|
|
data1.TeamCount++ |
|
|
|
} |
|
|
|
engine.Where("id=?", data1).AllCols().Update(&data1) |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
} |