From 3911f82d97d66ea4c90ade1ce94f923ba0420f79 Mon Sep 17 00:00:00 2001 From: dengbiao Date: Thu, 18 Jul 2024 18:22:54 +0800 Subject: [PATCH] update --- app/db/gim/model/message.go | 19 ++++ consume/canal_gim_message_consume.go | 107 +++++++++++++++++++++ consume/init.go | 6 +- consume/md/consume_key.go | 10 ++ consume/md/md_canal_gim_message_consume.go | 50 ++++++++++ 5 files changed, 190 insertions(+), 2 deletions(-) create mode 100644 app/db/gim/model/message.go create mode 100644 consume/canal_gim_message_consume.go create mode 100644 consume/md/md_canal_gim_message_consume.go diff --git a/app/db/gim/model/message.go b/app/db/gim/model/message.go new file mode 100644 index 0000000..d5436a7 --- /dev/null +++ b/app/db/gim/model/message.go @@ -0,0 +1,19 @@ +package model + +import "time" + +type Message struct { + Id int64 // 自增主键 + UserId int64 // 所属类型id + RequestId int64 // 请求id + SenderType int32 // 发送者类型 + SenderId int64 // 发送者账户id + ReceiverType int32 // 接收者账户id + ReceiverId int64 // 接收者id,如果是单聊信息,则为user_id,如果是群组消息,则为group_id + ToUserIds string // 需要@的用户id列表,多个用户用,隔开 + Type int // 消息类型 + Content []byte // 消息内容 + Seq int64 // 消息同步序列 + SendTime time.Time // 消息发送时间 + Status int32 // 创建时间 +} diff --git a/consume/canal_gim_message_consume.go b/consume/canal_gim_message_consume.go new file mode 100644 index 0000000..0abf827 --- /dev/null +++ b/consume/canal_gim_message_consume.go @@ -0,0 +1,107 @@ +package consume + +import ( + "applet/app/db" + model2 "applet/app/db/gim/model" + "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "encoding/json" + "errors" + "fmt" + "github.com/streadway/amqp" +) + +func CanalGimMessageConsume(queue md.MqQueue) { + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(100) + delivery := ch.Consume(queue.Name, false) + + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + //fmt.Println(string(res.Body)) + fmt.Println(">>>>>>>>>>>>>>>>CanalGimMessageConsume<<<<<<<<<<<<<<<<<<<<<<<<<") + err = handleCanalGimMessageTable(res.Body) + if err != nil { + fmt.Println("err ::: ", err) + utils.FilePutContents("CanalGimMessageConsume", "[err]:"+err.Error()) + _ = res.Reject(false) + ////TODO::重新推回队列末尾,避免造成队列堵塞 + //var msg *md.OneCirclesStructForSignIn + //json.Unmarshal(res.Body, &msg) + //ch.Publish(queue.ExchangeName, msg, queue.RoutKey) + } else { + _ = res.Ack(true) + } + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleCanalGimMessageTable(msg []byte) error { + //1、解析canal采集至mq中queue的数据结构体 + var canalMsg *md.CanalGimMessage[md.Message] + err := json.Unmarshal(msg, &canalMsg) + if err != nil { + return err + } + + //2、判断操作(insert | update) + if canalMsg.Type == md.CanalMsgInsertSqlType { + for _, item := range canalMsg.Data { + var oldM md.Message000 + get, err := db.ImDb.Table("message_000").ID(item.Id).Get(&oldM) + if err != nil { + return err + } + if get { + message := &model2.Message{ + UserId: oldM.UserId, + RequestId: oldM.RequestId, + SenderType: oldM.SenderType, + SenderId: oldM.SenderId, + ReceiverType: oldM.ReceiverType, + ReceiverId: oldM.ReceiverId, + ToUserIds: oldM.ToUserIds, + Type: oldM.Type, + Content: oldM.Content, + Seq: oldM.Seq, + SendTime: oldM.SendTime, + Status: oldM.Status, + } + _, err = db.ImDb.InsertOne(message) + if err != nil { + return err + } + } + } + } + + if canalMsg.Type == md.CanalMsgUpdateSqlType { + //查找是否有数据 + m := new(model2.Message) + for _, item := range canalMsg.Data { + m.Status = int32(utils.StrToInt(item.Status)) + _, err2 := db.ImDb.Where("user_id =?", item.UserId).And("send_time =?", item.SendTime).Cols("status").Update(&m) + if err2 != nil { + return err2 + } + } + } + + return nil +} diff --git a/consume/init.go b/consume/init.go index 6f22461..a9b4eae 100644 --- a/consume/init.go +++ b/consume/init.go @@ -103,8 +103,10 @@ func initConsumes() { //jobs[consumeMd.InstallmentPaymentAutoRepaidConsumeFunName] = InstallmentPaymentAutoRepaidConsume //分期付 - 自动扣款 ////////////////////////////////////// SuperCloudIssuance ///////////////////////////////////////////////////// - jobs[consumeMd.SuperCloudIssuanceMsgCallBackFunName] = SuperCloudIssuanceMsgCallBackConsume - jobs[consumeMd.SuperCloudIssuanceAsyncMLoginFunName] = SuperCloudIssuanceAsyncMLoginConsume + //jobs[consumeMd.SuperCloudIssuanceMsgCallBackFunName] = SuperCloudIssuanceMsgCallBackConsume + //jobs[consumeMd.SuperCloudIssuanceAsyncMLoginFunName] = SuperCloudIssuanceAsyncMLoginConsume + + jobs[consumeMd.CanalGimMessageConsumeFunName] = CanalGimMessageConsume } func Run() { diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index 2a1b197..6f04f26 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -83,6 +83,15 @@ var RabbitMqQueueKeyList = []*MqQueue{ BindKey: "", ConsumeFunName: "CanalGuideOrderForNumericalStatementConsume", }, + { + ExchangeName: "canal.topic", + Name: "canal_gim_message", + Type: TopicQueueType, + IsPersistent: false, + RoutKey: "canal_gim_message", + BindKey: "", + ConsumeFunName: "CanalGimMessageConsume", + }, { ExchangeName: "canal.topic", Name: "canal_guide_order", @@ -625,4 +634,5 @@ const ( ZhiosUserProfileInviteCode = "ZhiosUserProfileInviteCode" ZhiosAutoUnFreeze = "ZhiosAutoUnFreeze" InstallmentPaymentAutoRepaidConsumeFunName = "InstallmentPaymentAutoRepaidConsume" + CanalGimMessageConsumeFunName = "CanalGimMessageConsume" ) diff --git a/consume/md/md_canal_gim_message_consume.go b/consume/md/md_canal_gim_message_consume.go new file mode 100644 index 0000000..22d8664 --- /dev/null +++ b/consume/md/md_canal_gim_message_consume.go @@ -0,0 +1,50 @@ +package md + +import "time" + +type Message struct { + Id string `json:"id"` + UserId string `json:"user_id"` + RequestId string `json:"request_id"` + SenderType string `json:"sender_type"` + SenderId string `json:"sender_id"` + ReceiverType string `json:"receiver_type"` + ReceiverId string `json:"receiver_id"` + ToUserIds string `json:"to_user_ids"` + Type string `json:"type"` + Content string `json:"content"` + Seq string `json:"seq"` + SendTime string `json:"send_time"` + Status string `json:"status"` + CreateTime string `json:"create_time"` + UpdateTime string `json:"update_time"` +} + +type Message000 struct { + Id int64 // 自增主键 + UserId int64 // 所属类型id + RequestId int64 // 请求id + SenderType int32 // 发送者类型 + SenderId int64 // 发送者账户id + ReceiverType int32 // 接收者账户id + ReceiverId int64 // 接收者id,如果是单聊信息,则为user_id,如果是群组消息,则为group_id + ToUserIds string // 需要@的用户id列表,多个用户用,隔开 + Type int // 消息类型 + Content []byte // 消息内容 + Seq int64 // 消息同步序列 + SendTime time.Time // 消息发送时间 + Status int32 // 创建时间 +} + +type CanalGimMessage[T any] struct { + Data []T `json:"data"` + Database string `json:"database"` + ES int64 `json:"es"` + ID int64 `json:"id"` + IsDdl bool `json:"isDdl"` + Old []T `json:"old"` + PkNames []string `json:"pkNames"` + Table string `json:"table"` + TS int64 `json:"ts"` + Type string `json:"type"` +}