golang-im聊天
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

198 lines
5.5 KiB

  1. package service
  2. import (
  3. "context"
  4. "gim/internal/business/comm/utils"
  5. "gim/internal/logic/domain/message/model"
  6. "gim/internal/logic/domain/message/repo"
  7. "gim/internal/logic/proxy"
  8. "gim/pkg/grpclib"
  9. "gim/pkg/grpclib/picker"
  10. "gim/pkg/logger"
  11. "gim/pkg/pb"
  12. "gim/pkg/rpc"
  13. "gim/pkg/util"
  14. "go.uber.org/zap"
  15. "google.golang.org/protobuf/proto"
  16. )
  17. const MessageLimit = 50 // 最大消息同步数量
  18. const MaxSyncBufLen = 65536 // 最大字节数组长度
  19. type messageService struct{}
  20. var MessageService = new(messageService)
  21. // Sync 消息同步
  22. func (*messageService) Sync(ctx context.Context, userId, seq int64) (*pb.SyncResp, error) {
  23. messages, hasMore, err := MessageService.ListByUserIdAndSeq(ctx, userId, seq)
  24. if err != nil {
  25. return nil, err
  26. }
  27. pbMessages := model.MessagesToPB(messages)
  28. length := len(pbMessages)
  29. resp := &pb.SyncResp{Messages: pbMessages, HasMore: hasMore}
  30. bytes, err := proto.Marshal(resp)
  31. if err != nil {
  32. return nil, err
  33. }
  34. // 如果字节数组大于一个包的长度,需要减少字节数组
  35. for len(bytes) > MaxSyncBufLen {
  36. length = length * 2 / 3
  37. resp = &pb.SyncResp{Messages: pbMessages[0:length], HasMore: true}
  38. bytes, err = proto.Marshal(resp)
  39. if err != nil {
  40. return nil, err
  41. }
  42. }
  43. var userIds = make(map[int64]int32, len(resp.Messages))
  44. for i := range resp.Messages {
  45. if resp.Messages[i].Sender.SenderType == pb.SenderType_ST_USER {
  46. userIds[resp.Messages[i].Sender.SenderId] = 0
  47. }
  48. }
  49. usersResp, err := rpc.GetBusinessIntClient().GetUsers(ctx, &pb.GetUsersReq{UserIds: userIds})
  50. if err != nil {
  51. return nil, err
  52. }
  53. for i := range resp.Messages {
  54. if resp.Messages[i].Sender.SenderType == pb.SenderType_ST_USER {
  55. user, ok := usersResp.Users[resp.Messages[i].Sender.SenderId]
  56. if ok {
  57. resp.Messages[i].Sender.Nickname = user.Nickname
  58. resp.Messages[i].Sender.AvatarUrl = user.AvatarUrl
  59. resp.Messages[i].Sender.Extra = user.Extra
  60. } else {
  61. logger.Logger.Warn("get user failed", zap.Int64("user_id", resp.Messages[i].Sender.SenderId))
  62. }
  63. }
  64. }
  65. return resp, nil
  66. }
  67. // ListByUserIdAndSeq 查询消息
  68. func (*messageService) ListByUserIdAndSeq(ctx context.Context, userId, seq int64) ([]model.Message, bool, error) {
  69. var err error
  70. if seq == 0 {
  71. seq, err = DeviceAckService.GetMaxByUserId(ctx, userId)
  72. if err != nil {
  73. return nil, false, err
  74. }
  75. }
  76. return repo.MessageRepo.ListBySeq(userId, seq, MessageLimit)
  77. }
  78. // SendToUser 将消息发送给用户
  79. func (*messageService) SendToUser(ctx context.Context, sender *pb.Sender, toUserId int64, req *pb.SendMessageReq) (int64, error) {
  80. logger.Logger.Debug("SendToUser",
  81. zap.Int64("request_id", grpclib.GetCtxRequestId(ctx)),
  82. zap.Int64("to_user_id", toUserId))
  83. var (
  84. seq int64 = 0
  85. err error
  86. )
  87. if req.IsPersist {
  88. seq, err = SeqService.GetUserNext(ctx, toUserId)
  89. if err != nil {
  90. return 0, err
  91. }
  92. selfMessage := model.Message{
  93. UserId: toUserId,
  94. RequestId: grpclib.GetCtxRequestId(ctx),
  95. SenderType: int32(sender.SenderType),
  96. SenderId: sender.SenderId,
  97. ReceiverType: int32(req.ReceiverType),
  98. ReceiverId: req.ReceiverId,
  99. ToUserIds: model.FormatUserIds(req.ToUserIds),
  100. Type: int(req.MessageType),
  101. Content: req.MessageContent,
  102. Seq: seq,
  103. SendTime: util.UnunixMilliTime(req.SendTime),
  104. Status: int32(pb.MessageStatus_MS_NORMAL),
  105. }
  106. err = repo.MessageRepo.Save(selfMessage)
  107. if err != nil {
  108. logger.Sugar.Error(err)
  109. return 0, err
  110. }
  111. if sender.SenderType == pb.SenderType_ST_USER && sender.SenderId == toUserId {
  112. // 用户需要增加自己的已经同步的序列号
  113. err = repo.DeviceACKRepo.Set(sender.SenderId, sender.DeviceId, seq)
  114. if err != nil {
  115. return 0, err
  116. }
  117. }
  118. }
  119. message := pb.Message{
  120. Sender: sender,
  121. ReceiverType: req.ReceiverType,
  122. ReceiverId: req.ReceiverId,
  123. ToUserIds: req.ToUserIds,
  124. MessageType: req.MessageType,
  125. MessageContent: req.MessageContent,
  126. Seq: seq,
  127. SendTime: req.SendTime,
  128. Status: pb.MessageStatus_MS_NORMAL,
  129. }
  130. // 查询用户在线设备
  131. devices, err := proxy.DeviceProxy.ListOnlineByUserId(ctx, toUserId)
  132. if err != nil {
  133. logger.Sugar.Error(err)
  134. return 0, err
  135. }
  136. utils.FilePutContents("devices", utils.SerializeStr(map[string]interface{}{
  137. "data": devices,
  138. }))
  139. for i := range devices {
  140. // 消息不需要投递给发送消息的设备
  141. if sender.DeviceId == devices[i].DeviceId {
  142. continue
  143. }
  144. err = MessageService.SendToDevice(ctx, devices[i], &message)
  145. if err != nil {
  146. logger.Sugar.Error(err, zap.Any("SendToUser error", devices[i]), zap.Error(err))
  147. }
  148. }
  149. return seq, nil
  150. }
  151. // SendToDevice 将消息发送给设备
  152. func (*messageService) SendToDevice(ctx context.Context, device *pb.Device, message *pb.Message) error {
  153. messageSend := pb.MessageSend{Message: message}
  154. _, err := rpc.GetConnectIntClient().DeliverMessage(picker.ContextWithAddr(ctx, device.ConnAddr), &pb.DeliverMessageReq{
  155. DeviceId: device.DeviceId,
  156. MessageSend: &messageSend,
  157. })
  158. if err != nil {
  159. logger.Logger.Error("SendToDevice error", zap.Error(err))
  160. return err
  161. }
  162. // todo 其他推送厂商
  163. return nil
  164. }
  165. func (*messageService) AddSenderInfo(sender *pb.Sender) {
  166. if sender.SenderType == pb.SenderType_ST_USER {
  167. user, err := rpc.GetBusinessIntClient().GetUser(context.TODO(), &pb.GetUserReq{UserId: sender.SenderId})
  168. if err == nil && user != nil {
  169. sender.AvatarUrl = user.User.AvatarUrl
  170. sender.Nickname = user.User.Nickname
  171. sender.Extra = user.User.Extra
  172. }
  173. }
  174. }