package service import ( "context" "gim/internal/business/comm/db" svc "gim/internal/business/comm/service" "gim/internal/logic/domain/message/md" "gim/internal/logic/domain/message/model" "gim/internal/logic/domain/message/repo" "gim/internal/logic/proxy" "gim/pkg/grpclib" "gim/pkg/grpclib/picker" "gim/pkg/logger" "gim/pkg/pb" "gim/pkg/rpc" "gim/pkg/util" "go.uber.org/zap" "google.golang.org/protobuf/proto" "strconv" "time" ) const MessageLimit = 50 // 最大消息同步数量 const MaxSyncBufLen = 65536 // 最大字节数组长度 type messageService struct{} var MessageService = new(messageService) // Sync 消息同步 func (*messageService) Sync(ctx context.Context, userId, seq int64) (*pb.SyncResp, error) { messages, hasMore, err := MessageService.ListByUserIdAndSeq(ctx, userId, seq) if err != nil { return nil, err } pbMessages := model.MessagesToPB(messages) length := len(pbMessages) resp := &pb.SyncResp{Messages: pbMessages, HasMore: hasMore} bytes, err := proto.Marshal(resp) if err != nil { return nil, err } // 如果字节数组大于一个包的长度,需要减少字节数组 for len(bytes) > MaxSyncBufLen { length = length * 2 / 3 resp = &pb.SyncResp{Messages: pbMessages[0:length], HasMore: true} bytes, err = proto.Marshal(resp) if err != nil { return nil, err } } var userIds = make(map[int64]int32, len(resp.Messages)) for i := range resp.Messages { if resp.Messages[i].Sender.SenderType == pb.SenderType_ST_USER { userIds[resp.Messages[i].Sender.SenderId] = 0 } } usersResp, err := rpc.GetBusinessIntClient().GetUsers(ctx, &pb.GetUsersReq{UserIds: userIds}) if err != nil { return nil, err } for i := range resp.Messages { if resp.Messages[i].Sender.SenderType == pb.SenderType_ST_USER { user, ok := usersResp.Users[resp.Messages[i].Sender.SenderId] if ok { resp.Messages[i].Sender.Nickname = user.Nickname resp.Messages[i].Sender.AvatarUrl = user.AvatarUrl resp.Messages[i].Sender.Extra = user.Extra } else { logger.Logger.Warn("get user failed", zap.Int64("user_id", resp.Messages[i].Sender.SenderId)) } } } return resp, nil } // ListByUserIdAndSeq 查询消息 func (*messageService) ListByUserIdAndSeq(ctx context.Context, userId, seq int64) ([]model.Message, bool, error) { var err error if seq == 0 { seq, err = DeviceAckService.GetMaxByUserId(ctx, userId) if err != nil { return nil, false, err } } return repo.MessageRepo.ListBySeq(userId, seq, MessageLimit) } // SendToUser 将消息发送给用户 func (*messageService) SendToUser(ctx context.Context, sender *pb.Sender, toUserId int64, req *pb.SendMessageReq) (int64, error) { masterId, _ := grpclib.GetCtxMasterId(ctx) logger.Logger.Debug("SendToUser", zap.String("master_id", masterId), zap.Int64("request_id", grpclib.GetCtxRequestId(ctx)), zap.Int64("to_user_id", toUserId)) var ( seq int64 = 0 err error ) if req.IsPersist { seq, err = SeqService.GetUserNext(ctx, toUserId) if err != nil { return 0, err } selfMessage := model.Message{ UserId: toUserId, RequestId: grpclib.GetCtxRequestId(ctx), SenderType: int32(sender.SenderType), SenderId: sender.SenderId, ReceiverType: int32(req.ReceiverType), ReceiverId: req.ReceiverId, ToUserIds: model.FormatUserIds(req.ToUserIds), Type: int(req.MessageType), Content: req.MessageContent, Seq: seq, SendTime: util.UnunixMilliTime(req.SendTime), Status: int32(pb.MessageStatus_MS_NORMAL), } err = repo.MessageRepo.Save(selfMessage) if err != nil { logger.Sugar.Error(err) return 0, err } if sender.SenderType == pb.SenderType_ST_USER && sender.SenderId == toUserId { // 用户需要增加自己的已经同步的序列号 err = repo.DeviceACKRepo.Set(sender.SenderId, sender.DeviceId, seq) if err != nil { return 0, err } } } message := pb.Message{ Sender: sender, ReceiverType: req.ReceiverType, ReceiverId: req.ReceiverId, ToUserIds: req.ToUserIds, MessageType: req.MessageType, MessageContent: req.MessageContent, Seq: seq, SendTime: req.SendTime, Status: pb.MessageStatus_MS_NORMAL, } // 查询用户在线设备 devices, err := proxy.DeviceProxy.ListOnlineByUserId(ctx, toUserId) if err != nil { logger.Sugar.Error(err) return 0, err } isOpenAppPush := svc.SysCfgGet(masterId, "is_open_app_push") if req.ReceiverType == 1 && isOpenAppPush == "1" { uid := strconv.FormatInt(req.ReceiverId, 10) alia := db.DbUserPushForJg.UserPushForJgGetWithDb(masterId, uid) if alia != "" { //TODO::接收者类型为`user`, 进行极光推送 CommAddPush(md.PushParams{ MasterId: masterId, Uid: uid, PushAlia: "", Title: "新消息提醒", Content: "", PushType: "zhi_ying_gim", MessageType: req.MessageType.String(), SendUserNickname: sender.Nickname, SendUserAvatarUrl: sender.AvatarUrl, Memo: sender.SenderType.String(), Times: time.Now().Format("2006-01-02 15:04:05.000"), }) } } for i := range devices { if sender.DeviceId == devices[i].DeviceId { // 消息不需要投递给发送消息的设备 continue } err = MessageService.SendToDevice(ctx, devices[i], &message) if err != nil { logger.Sugar.Error(err, zap.Any("SendToUser error", devices[i]), zap.Error(err)) } } return seq, nil } // SendToDevice 将消息发送给设备 func (*messageService) SendToDevice(ctx context.Context, device *pb.Device, message *pb.Message) error { messageSend := pb.MessageSend{Message: message} _, err := rpc.GetConnectIntClient().DeliverMessage(picker.ContextWithAddr(ctx, device.ConnAddr), &pb.DeliverMessageReq{ DeviceId: device.DeviceId, MessageSend: &messageSend, }) if err != nil { logger.Logger.Error("SendToDevice error", zap.Error(err)) return err } // todo 其他推送厂商 return nil } func (*messageService) AddSenderInfo(sender *pb.Sender) { if sender.SenderType == pb.SenderType_ST_USER { user, err := rpc.GetBusinessIntClient().GetUser(context.TODO(), &pb.GetUserReq{UserId: sender.SenderId}) if err == nil && user != nil { sender.AvatarUrl = user.User.AvatarUrl sender.Nickname = user.User.Nickname sender.Extra = user.User.Extra } } }