dengbiao vor 4 Monaten
Ursprung
Commit
3911f82d97
5 geänderte Dateien mit 190 neuen und 2 gelöschten Zeilen
  1. +19
    -0
      app/db/gim/model/message.go
  2. +107
    -0
      consume/canal_gim_message_consume.go
  3. +4
    -2
      consume/init.go
  4. +10
    -0
      consume/md/consume_key.go
  5. +50
    -0
      consume/md/md_canal_gim_message_consume.go

+ 19
- 0
app/db/gim/model/message.go Datei anzeigen

@@ -0,0 +1,19 @@
package model

import "time"

type Message struct {
Id int64 // 自增主键
UserId int64 // 所属类型id
RequestId int64 // 请求id
SenderType int32 // 发送者类型
SenderId int64 // 发送者账户id
ReceiverType int32 // 接收者账户id
ReceiverId int64 // 接收者id,如果是单聊信息,则为user_id,如果是群组消息,则为group_id
ToUserIds string // 需要@的用户id列表,多个用户用,隔开
Type int // 消息类型
Content []byte // 消息内容
Seq int64 // 消息同步序列
SendTime time.Time // 消息发送时间
Status int32 // 创建时间
}

+ 107
- 0
consume/canal_gim_message_consume.go Datei anzeigen

@@ -0,0 +1,107 @@
package consume

import (
"applet/app/db"
model2 "applet/app/db/gim/model"
"applet/app/utils"
"applet/app/utils/logx"
"applet/consume/md"
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
"encoding/json"
"errors"
"fmt"
"github.com/streadway/amqp"
)

func CanalGimMessageConsume(queue md.MqQueue) {
ch, err := rabbit.Cfg.Pool.GetChannel()
if err != nil {
logx.Error(err)
return
}
defer ch.Release()
//1、将自己绑定到交换机上
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
//2、取出数据进行消费
ch.Qos(100)
delivery := ch.Consume(queue.Name, false)

var res amqp.Delivery
var ok bool
for {
res, ok = <-delivery
if ok == true {
//fmt.Println(string(res.Body))
fmt.Println(">>>>>>>>>>>>>>>>CanalGimMessageConsume<<<<<<<<<<<<<<<<<<<<<<<<<")
err = handleCanalGimMessageTable(res.Body)
if err != nil {
fmt.Println("err ::: ", err)
utils.FilePutContents("CanalGimMessageConsume", "[err]:"+err.Error())
_ = res.Reject(false)
////TODO::重新推回队列末尾,避免造成队列堵塞
//var msg *md.OneCirclesStructForSignIn
//json.Unmarshal(res.Body, &msg)
//ch.Publish(queue.ExchangeName, msg, queue.RoutKey)
} else {
_ = res.Ack(true)
}
} else {
panic(errors.New("error getting message"))
}
}
fmt.Println("get msg done")
}

func handleCanalGimMessageTable(msg []byte) error {
//1、解析canal采集至mq中queue的数据结构体
var canalMsg *md.CanalGimMessage[md.Message]
err := json.Unmarshal(msg, &canalMsg)
if err != nil {
return err
}

//2、判断操作(insert | update)
if canalMsg.Type == md.CanalMsgInsertSqlType {
for _, item := range canalMsg.Data {
var oldM md.Message000
get, err := db.ImDb.Table("message_000").ID(item.Id).Get(&oldM)
if err != nil {
return err
}
if get {
message := &model2.Message{
UserId: oldM.UserId,
RequestId: oldM.RequestId,
SenderType: oldM.SenderType,
SenderId: oldM.SenderId,
ReceiverType: oldM.ReceiverType,
ReceiverId: oldM.ReceiverId,
ToUserIds: oldM.ToUserIds,
Type: oldM.Type,
Content: oldM.Content,
Seq: oldM.Seq,
SendTime: oldM.SendTime,
Status: oldM.Status,
}
_, err = db.ImDb.InsertOne(message)
if err != nil {
return err
}
}
}
}

if canalMsg.Type == md.CanalMsgUpdateSqlType {
//查找是否有数据
m := new(model2.Message)
for _, item := range canalMsg.Data {
m.Status = int32(utils.StrToInt(item.Status))
_, err2 := db.ImDb.Where("user_id =?", item.UserId).And("send_time =?", item.SendTime).Cols("status").Update(&m)
if err2 != nil {
return err2
}
}
}

return nil
}

+ 4
- 2
consume/init.go Datei anzeigen

@@ -103,8 +103,10 @@ func initConsumes() {
//jobs[consumeMd.InstallmentPaymentAutoRepaidConsumeFunName] = InstallmentPaymentAutoRepaidConsume //分期付 - 自动扣款

////////////////////////////////////// SuperCloudIssuance /////////////////////////////////////////////////////
jobs[consumeMd.SuperCloudIssuanceMsgCallBackFunName] = SuperCloudIssuanceMsgCallBackConsume
jobs[consumeMd.SuperCloudIssuanceAsyncMLoginFunName] = SuperCloudIssuanceAsyncMLoginConsume
//jobs[consumeMd.SuperCloudIssuanceMsgCallBackFunName] = SuperCloudIssuanceMsgCallBackConsume
//jobs[consumeMd.SuperCloudIssuanceAsyncMLoginFunName] = SuperCloudIssuanceAsyncMLoginConsume

jobs[consumeMd.CanalGimMessageConsumeFunName] = CanalGimMessageConsume
}

func Run() {


+ 10
- 0
consume/md/consume_key.go Datei anzeigen

@@ -83,6 +83,15 @@ var RabbitMqQueueKeyList = []*MqQueue{
BindKey: "",
ConsumeFunName: "CanalGuideOrderForNumericalStatementConsume",
},
{
ExchangeName: "canal.topic",
Name: "canal_gim_message",
Type: TopicQueueType,
IsPersistent: false,
RoutKey: "canal_gim_message",
BindKey: "",
ConsumeFunName: "CanalGimMessageConsume",
},
{
ExchangeName: "canal.topic",
Name: "canal_guide_order",
@@ -625,4 +634,5 @@ const (
ZhiosUserProfileInviteCode = "ZhiosUserProfileInviteCode"
ZhiosAutoUnFreeze = "ZhiosAutoUnFreeze"
InstallmentPaymentAutoRepaidConsumeFunName = "InstallmentPaymentAutoRepaidConsume"
CanalGimMessageConsumeFunName = "CanalGimMessageConsume"
)

+ 50
- 0
consume/md/md_canal_gim_message_consume.go Datei anzeigen

@@ -0,0 +1,50 @@
package md

import "time"

type Message struct {
Id string `json:"id"`
UserId string `json:"user_id"`
RequestId string `json:"request_id"`
SenderType string `json:"sender_type"`
SenderId string `json:"sender_id"`
ReceiverType string `json:"receiver_type"`
ReceiverId string `json:"receiver_id"`
ToUserIds string `json:"to_user_ids"`
Type string `json:"type"`
Content string `json:"content"`
Seq string `json:"seq"`
SendTime string `json:"send_time"`
Status string `json:"status"`
CreateTime string `json:"create_time"`
UpdateTime string `json:"update_time"`
}

type Message000 struct {
Id int64 // 自增主键
UserId int64 // 所属类型id
RequestId int64 // 请求id
SenderType int32 // 发送者类型
SenderId int64 // 发送者账户id
ReceiverType int32 // 接收者账户id
ReceiverId int64 // 接收者id,如果是单聊信息,则为user_id,如果是群组消息,则为group_id
ToUserIds string // 需要@的用户id列表,多个用户用,隔开
Type int // 消息类型
Content []byte // 消息内容
Seq int64 // 消息同步序列
SendTime time.Time // 消息发送时间
Status int32 // 创建时间
}

type CanalGimMessage[T any] struct {
Data []T `json:"data"`
Database string `json:"database"`
ES int64 `json:"es"`
ID int64 `json:"id"`
IsDdl bool `json:"isDdl"`
Old []T `json:"old"`
PkNames []string `json:"pkNames"`
Table string `json:"table"`
TS int64 `json:"ts"`
Type string `json:"type"`
}

Laden…
Abbrechen
Speichern