|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172 |
- package connect
-
- import (
- "gim/config"
- "gim/pkg/db"
- "gim/pkg/logger"
- "gim/pkg/mq"
- "gim/pkg/pb"
- "time"
-
- "github.com/go-redis/redis"
-
- "go.uber.org/zap"
- "google.golang.org/protobuf/proto"
- )
-
- // StartSubscribe 启动MQ消息处理逻辑
- func StartSubscribe() {
- pushRoomPriorityChannel := db.RedisCli.Subscribe(mq.PushRoomPriorityTopic).Channel()
- pushRoomChannel := db.RedisCli.Subscribe(mq.PushRoomTopic).Channel()
- for i := 0; i < config.PushRoomSubscribeNum; i++ {
- go handlePushRoomMsg(pushRoomPriorityChannel, pushRoomChannel)
- }
-
- pushAllChannel := db.RedisCli.Subscribe(mq.PushAllTopic).Channel()
- for i := 0; i < config.PushAllSubscribeNum; i++ {
- go handlePushAllMsg(pushAllChannel)
- }
- }
-
- func handlePushRoomMsg(priorityChannel, channel <-chan *redis.Message) {
- for {
- select {
- case msg := <-priorityChannel:
- handlePushRoom([]byte(msg.Payload))
- default:
- select {
- case msg := <-channel:
- handlePushRoom([]byte(msg.Payload))
- default:
- time.Sleep(100 * time.Millisecond)
- continue
- }
- }
- }
- }
-
- func handlePushAllMsg(channel <-chan *redis.Message) {
- for msg := range channel {
- handlePushAll([]byte(msg.Payload))
- }
- }
-
- func handlePushRoom(bytes []byte) {
- var msg pb.PushRoomMsg
- err := proto.Unmarshal(bytes, &msg)
- if err != nil {
- logger.Logger.Error("handlePushRoom error", zap.Error(err))
- return
- }
- PushRoom(msg.RoomId, msg.MessageSend)
- }
-
- func handlePushAll(bytes []byte) {
- var msg pb.PushAllMsg
- err := proto.Unmarshal(bytes, &msg)
- if err != nil {
- logger.Logger.Error("handlePushRoom error", zap.Error(err))
- return
- }
- PushAll(msg.MessageSend)
- }
|