golang-im聊天
 
 
 
 

73 řádky
1.6 KiB

  1. package connect
  2. import (
  3. "gim/config"
  4. "gim/pkg/db"
  5. "gim/pkg/logger"
  6. "gim/pkg/mq"
  7. "gim/pkg/pb"
  8. "time"
  9. "github.com/go-redis/redis"
  10. "go.uber.org/zap"
  11. "google.golang.org/protobuf/proto"
  12. )
  13. // StartSubscribe 启动MQ消息处理逻辑
  14. func StartSubscribe() {
  15. pushRoomPriorityChannel := db.RedisCli.Subscribe(mq.PushRoomPriorityTopic).Channel()
  16. pushRoomChannel := db.RedisCli.Subscribe(mq.PushRoomTopic).Channel()
  17. for i := 0; i < config.PushRoomSubscribeNum; i++ {
  18. go handlePushRoomMsg(pushRoomPriorityChannel, pushRoomChannel)
  19. }
  20. pushAllChannel := db.RedisCli.Subscribe(mq.PushAllTopic).Channel()
  21. for i := 0; i < config.PushAllSubscribeNum; i++ {
  22. go handlePushAllMsg(pushAllChannel)
  23. }
  24. }
  25. func handlePushRoomMsg(priorityChannel, channel <-chan *redis.Message) {
  26. for {
  27. select {
  28. case msg := <-priorityChannel:
  29. handlePushRoom([]byte(msg.Payload))
  30. default:
  31. select {
  32. case msg := <-channel:
  33. handlePushRoom([]byte(msg.Payload))
  34. default:
  35. time.Sleep(100 * time.Millisecond)
  36. continue
  37. }
  38. }
  39. }
  40. }
  41. func handlePushAllMsg(channel <-chan *redis.Message) {
  42. for msg := range channel {
  43. handlePushAll([]byte(msg.Payload))
  44. }
  45. }
  46. func handlePushRoom(bytes []byte) {
  47. var msg pb.PushRoomMsg
  48. err := proto.Unmarshal(bytes, &msg)
  49. if err != nil {
  50. logger.Logger.Error("handlePushRoom error", zap.Error(err))
  51. return
  52. }
  53. PushRoom(msg.RoomId, msg.MessageSend)
  54. }
  55. func handlePushAll(bytes []byte) {
  56. var msg pb.PushAllMsg
  57. err := proto.Unmarshal(bytes, &msg)
  58. if err != nil {
  59. logger.Logger.Error("handlePushRoom error", zap.Error(err))
  60. return
  61. }
  62. PushAll(msg.MessageSend)
  63. }