You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

149 lines
2.9 KiB

  1. package room
  2. import (
  3. "context"
  4. "egg-im/pkg/gerrors"
  5. "egg-im/pkg/grpclib/picker"
  6. "egg-im/pkg/logger"
  7. "egg-im/pkg/mq"
  8. "egg-im/pkg/pb"
  9. "egg-im/pkg/rpc"
  10. "egg-im/pkg/util"
  11. "time"
  12. "google.golang.org/protobuf/proto"
  13. )
  14. type roomService struct{}
  15. var RoomService = new(roomService)
  16. func (s *roomService) Push(ctx context.Context, sender *pb.Sender, req *pb.PushRoomReq) error {
  17. s.AddSenderInfo(sender)
  18. seq, err := RoomSeqRepo.GetNextSeq(req.RoomId)
  19. if err != nil {
  20. return err
  21. }
  22. msg := &pb.Message{
  23. Sender: sender,
  24. ReceiverType: pb.ReceiverType_RT_ROOM,
  25. ReceiverId: req.RoomId,
  26. ToUserIds: nil,
  27. MessageType: req.MessageType,
  28. MessageContent: req.MessageContent,
  29. Seq: seq,
  30. SendTime: util.UnixMilliTime(time.Now()),
  31. Status: 0,
  32. }
  33. if req.IsPersist {
  34. err = s.AddMessage(req.RoomId, msg)
  35. if err != nil {
  36. return err
  37. }
  38. }
  39. pushRoomMsg := pb.PushRoomMsg{
  40. RoomId: req.RoomId,
  41. MessageSend: &pb.MessageSend{
  42. Message: msg,
  43. },
  44. }
  45. bytes, err := proto.Marshal(&pushRoomMsg)
  46. if err != nil {
  47. return gerrors.WrapError(err)
  48. }
  49. var topicName = mq.PushRoomTopic
  50. if req.IsPriority {
  51. topicName = mq.PushRoomPriorityTopic
  52. }
  53. err = mq.Publish(topicName, bytes)
  54. if err != nil {
  55. return err
  56. }
  57. return nil
  58. }
  59. func (s *roomService) AddMessage(roomId int64, msg *pb.Message) error {
  60. err := RoomMessageRepo.Add(roomId, msg)
  61. if err != nil {
  62. return err
  63. }
  64. return s.DelExpireMessage(roomId)
  65. }
  66. // DelExpireMessage 删除过期消息
  67. func (s *roomService) DelExpireMessage(roomId int64) error {
  68. var (
  69. index int64 = 0
  70. stop bool
  71. min int64
  72. max int64
  73. )
  74. for {
  75. msgs, err := RoomMessageRepo.ListByIndex(roomId, index, index+20)
  76. if err != nil {
  77. return err
  78. }
  79. if len(msgs) == 0 {
  80. break
  81. }
  82. for _, v := range msgs {
  83. if v.SendTime > util.UnixMilliTime(time.Now().Add(-RoomMessageExpireTime)) {
  84. stop = true
  85. break
  86. }
  87. if min == 0 {
  88. min = v.Seq
  89. }
  90. max = v.Seq
  91. }
  92. if stop {
  93. break
  94. }
  95. }
  96. return RoomMessageRepo.DelBySeq(roomId, min, max)
  97. }
  98. // SubscribeRoom 订阅房间
  99. func (s *roomService) SubscribeRoom(ctx context.Context, req *pb.SubscribeRoomReq) error {
  100. if req.Seq == 0 {
  101. return nil
  102. }
  103. messages, err := RoomMessageRepo.List(req.RoomId, req.Seq)
  104. if err != nil {
  105. return err
  106. }
  107. for i := range messages {
  108. _, err := rpc.GetConnectIntClient().DeliverMessage(picker.ContextWithAddr(ctx, req.ConnAddr), &pb.DeliverMessageReq{
  109. DeviceId: req.DeviceId,
  110. MessageSend: &pb.MessageSend{
  111. Message: messages[i],
  112. },
  113. })
  114. if err != nil {
  115. logger.Sugar.Error(err)
  116. }
  117. }
  118. return nil
  119. }
  120. func (*roomService) AddSenderInfo(sender *pb.Sender) {
  121. if sender.SenderType == pb.SenderType_ST_USER {
  122. user, err := rpc.GetBusinessIntClient().GetUser(context.TODO(), &pb.GetUserReq{UserId: sender.SenderId})
  123. if err == nil && user != nil {
  124. sender.AvatarUrl = user.User.AvatarUrl
  125. sender.Nickname = user.User.Nickname
  126. sender.Extra = user.User.Extra
  127. }
  128. }
  129. }