golang-im聊天
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

233 lines
6.8 KiB

  1. package service
  2. import (
  3. "context"
  4. "gim/internal/business/comm/db"
  5. svc "gim/internal/business/comm/service"
  6. repo2 "gim/internal/business/domain/user/repo"
  7. "gim/internal/logic/domain/message/md"
  8. "gim/internal/logic/domain/message/model"
  9. "gim/internal/logic/domain/message/repo"
  10. "gim/internal/logic/proxy"
  11. "gim/pkg/grpclib"
  12. "gim/pkg/grpclib/picker"
  13. "gim/pkg/logger"
  14. "gim/pkg/pb"
  15. "gim/pkg/rpc"
  16. "gim/pkg/util"
  17. "go.uber.org/zap"
  18. "google.golang.org/protobuf/proto"
  19. "strconv"
  20. "time"
  21. )
  22. const MessageLimit = 50 // 最大消息同步数量
  23. const MaxSyncBufLen = 65536 // 最大字节数组长度
  24. type messageService struct{}
  25. var MessageService = new(messageService)
  26. // Sync 消息同步
  27. func (*messageService) Sync(ctx context.Context, userId, seq int64) (*pb.SyncResp, error) {
  28. messages, hasMore, err := MessageService.ListByUserIdAndSeq(ctx, userId, seq)
  29. if err != nil {
  30. return nil, err
  31. }
  32. pbMessages := model.MessagesToPB(messages)
  33. length := len(pbMessages)
  34. resp := &pb.SyncResp{Messages: pbMessages, HasMore: hasMore}
  35. bytes, err := proto.Marshal(resp)
  36. if err != nil {
  37. return nil, err
  38. }
  39. // 如果字节数组大于一个包的长度,需要减少字节数组
  40. for len(bytes) > MaxSyncBufLen {
  41. length = length * 2 / 3
  42. resp = &pb.SyncResp{Messages: pbMessages[0:length], HasMore: true}
  43. bytes, err = proto.Marshal(resp)
  44. if err != nil {
  45. return nil, err
  46. }
  47. }
  48. var userIds = make(map[int64]int32, len(resp.Messages))
  49. for i := range resp.Messages {
  50. if resp.Messages[i].Sender.SenderType == pb.SenderType_ST_USER {
  51. userIds[resp.Messages[i].Sender.SenderId] = 0
  52. }
  53. }
  54. usersResp, err := rpc.GetBusinessIntClient().GetUsers(ctx, &pb.GetUsersReq{UserIds: userIds})
  55. if err != nil {
  56. return nil, err
  57. }
  58. for i := range resp.Messages {
  59. if resp.Messages[i].Sender.SenderType == pb.SenderType_ST_USER {
  60. user, ok := usersResp.Users[resp.Messages[i].Sender.SenderId]
  61. if ok {
  62. resp.Messages[i].Sender.Nickname = user.Nickname
  63. resp.Messages[i].Sender.AvatarUrl = user.AvatarUrl
  64. resp.Messages[i].Sender.Extra = user.Extra
  65. } else {
  66. logger.Logger.Warn("get user failed", zap.Int64("user_id", resp.Messages[i].Sender.SenderId))
  67. }
  68. }
  69. }
  70. return resp, nil
  71. }
  72. // ListByUserIdAndSeq 查询消息
  73. func (*messageService) ListByUserIdAndSeq(ctx context.Context, userId, seq int64) ([]model.Message, bool, error) {
  74. var err error
  75. if seq == 0 {
  76. seq, err = DeviceAckService.GetMaxByUserId(ctx, userId)
  77. if err != nil {
  78. return nil, false, err
  79. }
  80. }
  81. return repo.MessageRepo.ListBySeq(userId, seq, MessageLimit)
  82. }
  83. // SendToUser 将消息发送给用户
  84. func (*messageService) SendToUser(ctx context.Context, sender *pb.Sender, toUserId int64, req *pb.SendMessageReq) (int64, error) {
  85. masterId, _ := grpclib.GetCtxMasterId(ctx)
  86. logger.Logger.Debug("SendToUser",
  87. zap.String("master_id", masterId),
  88. zap.Int64("request_id", grpclib.GetCtxRequestId(ctx)),
  89. zap.Int64("to_user_id", toUserId))
  90. var (
  91. seq int64 = 0
  92. err error
  93. )
  94. if req.IsPersist {
  95. seq, err = SeqService.GetUserNext(ctx, toUserId)
  96. if err != nil {
  97. return 0, err
  98. }
  99. selfMessage := model.Message{
  100. UserId: toUserId,
  101. RequestId: grpclib.GetCtxRequestId(ctx),
  102. SenderType: int32(sender.SenderType),
  103. SenderId: sender.SenderId,
  104. ReceiverType: int32(req.ReceiverType),
  105. ReceiverId: req.ReceiverId,
  106. ToUserIds: model.FormatUserIds(req.ToUserIds),
  107. Type: int(req.MessageType),
  108. Content: req.MessageContent,
  109. Seq: seq,
  110. SendTime: util.UnunixMilliTime(req.SendTime),
  111. Status: int32(pb.MessageStatus_MS_NORMAL),
  112. }
  113. err = repo.MessageRepo.Save(selfMessage)
  114. if err != nil {
  115. logger.Sugar.Error(err)
  116. return 0, err
  117. }
  118. if sender.SenderType == pb.SenderType_ST_USER && sender.SenderId == toUserId {
  119. // 用户需要增加自己的已经同步的序列号
  120. err = repo.DeviceACKRepo.Set(sender.SenderId, sender.DeviceId, seq)
  121. if err != nil {
  122. return 0, err
  123. }
  124. }
  125. }
  126. message := pb.Message{
  127. Sender: sender,
  128. ReceiverType: req.ReceiverType,
  129. ReceiverId: req.ReceiverId,
  130. ToUserIds: req.ToUserIds,
  131. MessageType: req.MessageType,
  132. MessageContent: req.MessageContent,
  133. Seq: seq,
  134. SendTime: req.SendTime,
  135. Status: pb.MessageStatus_MS_NORMAL,
  136. }
  137. // 查询用户在线设备
  138. devices, err := proxy.DeviceProxy.ListOnlineByUserId(ctx, toUserId)
  139. if err != nil {
  140. logger.Sugar.Error(err)
  141. return 0, err
  142. }
  143. isOpenAppPush := svc.SysCfgGet(masterId, "is_open_app_push")
  144. if sender.SenderType == pb.SenderType_ST_USER && req.ReceiverType == pb.ReceiverType_RT_USER && isOpenAppPush == "1" {
  145. uid := strconv.FormatInt(req.ReceiverId, 10)
  146. alia := db.DbUserPushForJg.UserPushForJgGetWithDb(masterId, uid)
  147. if alia != "" {
  148. if sender.Nickname == "" {
  149. sendUser, err := repo2.UserRepo.Get(sender.SenderId)
  150. if err != nil {
  151. logger.Sugar.Error(err)
  152. return 0, err
  153. }
  154. sender.Nickname = sendUser.Nickname
  155. sender.AvatarUrl = sendUser.AvatarUrl
  156. }
  157. //TODO::接收者类型为`user`, 进行极光推送
  158. CommAddPush(md.PushParams{
  159. MasterId: masterId,
  160. Uid: uid,
  161. PushAlia: alia,
  162. Title: "新消息提醒",
  163. Content: "您收到一条新消息,来自会员\"[消息发送者-会员昵称]\",发送时间\"[时间]\",消息类型\"[消息类型]\",备注\"[备注]\"",
  164. PushType: "zhi_ying_gim",
  165. MessageType: req.MessageType.String(),
  166. SendUserNickname: sender.Nickname,
  167. SendUserAvatarUrl: sender.AvatarUrl,
  168. Memo: sender.SenderType.String(),
  169. Times: time.Now().Format("2006-01-02 15:04:05.000"),
  170. })
  171. }
  172. }
  173. for i := range devices {
  174. if sender.DeviceId == devices[i].DeviceId {
  175. // 消息不需要投递给发送消息的设备
  176. continue
  177. }
  178. err = MessageService.SendToDevice(ctx, devices[i], &message)
  179. if err != nil {
  180. logger.Sugar.Error(err, zap.Any("SendToUser error", devices[i]), zap.Error(err))
  181. }
  182. }
  183. return seq, nil
  184. }
  185. // SendToDevice 将消息发送给设备
  186. func (*messageService) SendToDevice(ctx context.Context, device *pb.Device, message *pb.Message) error {
  187. messageSend := pb.MessageSend{Message: message}
  188. _, err := rpc.GetConnectIntClient().DeliverMessage(picker.ContextWithAddr(ctx, device.ConnAddr), &pb.DeliverMessageReq{
  189. DeviceId: device.DeviceId,
  190. MessageSend: &messageSend,
  191. })
  192. if err != nil {
  193. logger.Logger.Error("SendToDevice error", zap.Error(err))
  194. return err
  195. }
  196. // todo 其他推送厂商
  197. return nil
  198. }
  199. func (*messageService) AddSenderInfo(sender *pb.Sender) {
  200. if sender.SenderType == pb.SenderType_ST_USER {
  201. user, err := rpc.GetBusinessIntClient().GetUser(context.TODO(), &pb.GetUserReq{UserId: sender.SenderId})
  202. if err == nil && user != nil {
  203. sender.AvatarUrl = user.User.AvatarUrl
  204. sender.Nickname = user.User.Nickname
  205. sender.Extra = user.User.Extra
  206. }
  207. }
  208. }