package service import ( "context" "gim/internal/business/comm/db" svc "gim/internal/business/comm/service" "gim/internal/business/comm/utils" repo2 "gim/internal/business/domain/user/repo" "gim/internal/logic/domain/message/md" "gim/internal/logic/domain/message/model" "gim/internal/logic/domain/message/repo" "gim/internal/logic/proxy" "gim/pkg/grpclib" "gim/pkg/grpclib/picker" "gim/pkg/logger" "gim/pkg/pb" "gim/pkg/rpc" "gim/pkg/util" "go.uber.org/zap" "google.golang.org/protobuf/proto" "strconv" "time" ) const MessageLimit = 50 // 最大消息同步数量 const MaxSyncBufLen = 65536 // 最大字节数组长度 type messageService struct{} var MessageService = new(messageService) // Sync 消息同步 func (*messageService) Sync(ctx context.Context, userId, seq int64) (*pb.SyncResp, error) { messages, hasMore, err := MessageService.ListByUserIdAndSeq(ctx, userId, seq) if err != nil { return nil, err } pbMessages := model.MessagesToPB(messages) length := len(pbMessages) resp := &pb.SyncResp{Messages: pbMessages, HasMore: hasMore} bytes, err := proto.Marshal(resp) if err != nil { return nil, err } // 如果字节数组大于一个包的长度,需要减少字节数组 for len(bytes) > MaxSyncBufLen { length = length * 2 / 3 resp = &pb.SyncResp{Messages: pbMessages[0:length], HasMore: true} bytes, err = proto.Marshal(resp) if err != nil { return nil, err } } var userIds = make(map[int64]int32, len(resp.Messages)) for i := range resp.Messages { if resp.Messages[i].Sender.SenderType == pb.SenderType_ST_USER { userIds[resp.Messages[i].Sender.SenderId] = 0 } } usersResp, err := rpc.GetBusinessIntClient().GetUsers(ctx, &pb.GetUsersReq{UserIds: userIds}) if err != nil { return nil, err } for i := range resp.Messages { if resp.Messages[i].Sender.SenderType == pb.SenderType_ST_USER { user, ok := usersResp.Users[resp.Messages[i].Sender.SenderId] if ok { resp.Messages[i].Sender.Nickname = user.Nickname resp.Messages[i].Sender.AvatarUrl = user.AvatarUrl resp.Messages[i].Sender.Extra = user.Extra } else { logger.Logger.Warn("get user failed", zap.Int64("user_id", resp.Messages[i].Sender.SenderId)) } } } return resp, nil } // ListByUserIdAndSeq 查询消息 func (*messageService) ListByUserIdAndSeq(ctx context.Context, userId, seq int64) ([]model.Message, bool, error) { var err error if seq == 0 { seq, err = DeviceAckService.GetMaxByUserId(ctx, userId) if err != nil { return nil, false, err } } return repo.MessageRepo.ListBySeq(userId, seq, MessageLimit) } // SendToUser 将消息发送给用户 func (*messageService) SendToUser(ctx context.Context, sender *pb.Sender, toUserId int64, req *pb.SendMessageReq) (int64, error) { masterId, _ := grpclib.GetCtxMasterId(ctx) logger.Logger.Debug("SendToUser", zap.String("master_id", masterId), zap.Int64("request_id", grpclib.GetCtxRequestId(ctx)), zap.Int64("to_user_id", toUserId)) var ( seq int64 = 0 err error ) if req.IsPersist { seq, err = SeqService.GetUserNext(ctx, toUserId) if err != nil { return 0, err } selfMessage := model.Message{ UserId: toUserId, RequestId: grpclib.GetCtxRequestId(ctx), SenderType: int32(sender.SenderType), SenderId: sender.SenderId, ReceiverType: int32(req.ReceiverType), ReceiverId: req.ReceiverId, ToUserIds: model.FormatUserIds(req.ToUserIds), Type: int(req.MessageType), Content: req.MessageContent, Seq: seq, SendTime: util.UnunixMilliTime(req.SendTime), Status: int32(pb.MessageStatus_MS_NORMAL), } err = repo.MessageRepo.Save(selfMessage) if err != nil { logger.Sugar.Error(err) return 0, err } if sender.SenderType == pb.SenderType_ST_USER && sender.SenderId == toUserId { // 用户需要增加自己的已经同步的序列号 err = repo.DeviceACKRepo.Set(sender.SenderId, sender.DeviceId, seq) if err != nil { return 0, err } } } message := pb.Message{ Sender: sender, ReceiverType: req.ReceiverType, ReceiverId: req.ReceiverId, ToUserIds: req.ToUserIds, MessageType: req.MessageType, MessageContent: req.MessageContent, Seq: seq, SendTime: req.SendTime, Status: pb.MessageStatus_MS_NORMAL, } // 查询用户在线设备 devices, err := proxy.DeviceProxy.ListOnlineByUserId(ctx, toUserId) if err != nil { logger.Sugar.Error(err) return 0, err } // 查询接受者用户在线设备 receiverDevices, err := proxy.DeviceProxy.ListOnlineByUserId(ctx, req.ReceiverId) if err != nil { logger.Sugar.Error(err) return 0, err } isOpenAppPush := svc.SysCfgGet(masterId, "is_open_app_push") utils.FilePutContents("SendToUser", utils.SerializeStr(map[string]interface{}{ "devices_len": len(devices), "sender": sender, "toUserId": toUserId, "isOpenAppPush": isOpenAppPush, "req": req, "masterId": masterId, })) if len(receiverDevices) <= 0 { //isOpenAppPush := svc.SysCfgGet(masterId, "is_open_app_push") if sender.SenderType == pb.SenderType_ST_USER && req.ReceiverType == pb.ReceiverType_RT_USER && isOpenAppPush == "1" && req.ReceiverId != toUserId { uid := strconv.FormatInt(req.ReceiverId, 10) alia := db.DbUserPushForJg.UserPushForJgGetWithDb(masterId, uid) if alia != "" { if sender.Nickname == "" { sendUser, err := repo2.UserRepo.Get(sender.SenderId) if err != nil { logger.Sugar.Error(err) return 0, err } sender.Nickname = sendUser.Nickname sender.AvatarUrl = sendUser.AvatarUrl } var pushContent string if req.MessageType != pb.MessageType_MT_TEXT { switch req.MessageType { case pb.MessageType_MT_UNKNOWN: pushContent = "您收到一条\"未知\"消息" break case pb.MessageType_MT_FACE: pushContent = "您收到一条表情消息" break case pb.MessageType_MT_VOICE: pushContent = "您收到一条语音消息" break case pb.MessageType_MT_IMAGE: pushContent = "您收到一条图片消息" break case pb.MessageType_MT_FILE: pushContent = "您收到一条视频消息" break case pb.MessageType_MT_LOCATION: pushContent = "您收到一条地理位置消息" break case pb.MessageType_MT_COMMAND: pushContent = "您收到一条指令推送消息" break case pb.MessageType_MT_CUSTOM: pushContent = "您收到一条\"自定义\"推送消息" break } } else { pushContent = string(req.MessageContent) } //TODO::接收者类型为`user`, 进行极光推送 CommAddPush(md.PushParams{ MasterId: masterId, Uid: uid, PushAlia: alia, Title: sender.Nickname, Content: pushContent, //Content: "您收到一条新消息,来自会员\"[消息发送者-会员昵称]\",发送时间\"[时间]\"", //Content: "您收到一条新消息,来自会员\"[消息发送者-会员昵称]\",发送时间\"[时间]\",消息类型\"[消息类型]\",备注\"[备注]\"", PushType: "zhi_ying_gim", MessageType: req.MessageType.String(), SendUserNickname: sender.Nickname, SendUserAvatarUrl: sender.AvatarUrl, Memo: sender.SenderType.String(), Times: time.Now().Format("2006-01-02 15:04:05"), }) } } } for i := range devices { if sender.DeviceId == devices[i].DeviceId { // 消息不需要投递给发送消息的设备 continue } err = MessageService.SendToDevice(ctx, devices[i], &message) if err != nil { logger.Sugar.Error(err, zap.Any("SendToUser error", devices[i]), zap.Error(err)) } } return seq, nil } // RecallMessageSendToUser 撤回消息用户 func (*messageService) RecallMessageSendToUser(ctx context.Context, sender *pb.Sender, toUserId int64, req *pb.RecallMessageReq, sendTime time.Time) (int64, error) { masterId, _ := grpclib.GetCtxMasterId(ctx) logger.Logger.Debug("SendToUser", zap.String("master_id", masterId), zap.Int64("request_id", grpclib.GetCtxRequestId(ctx)), zap.Int64("to_user_id", toUserId)) var ( seq int64 = 0 err error ) //1、改变消息状态 _, err = repo.MessageRepo.UpdateStatus(toUserId, sendTime, int(pb.MessageStatus_MS_RECALL)) if err != nil { return 0, err } //3、发送一条新的消息 if req.IsPersist { seq, err = SeqService.GetUserNext(ctx, toUserId) if err != nil { return 0, err } selfMessage := model.Message{ UserId: toUserId, RequestId: grpclib.GetCtxRequestId(ctx), SenderType: int32(sender.SenderType), SenderId: sender.SenderId, ReceiverType: int32(req.ReceiverType), ReceiverId: req.ReceiverId, ToUserIds: model.FormatUserIds(req.ToUserIds), Type: int(req.MessageType), Content: req.MessageContent, Seq: seq, SendTime: util.UnunixMilliTime(req.SendTime), Status: int32(pb.MessageStatus_MS_NORMAL), } err = repo.MessageRepo.Save(selfMessage) if err != nil { logger.Sugar.Error(err) return 0, err } if sender.SenderType == pb.SenderType_ST_USER && sender.SenderId == toUserId { // 用户需要增加自己的已经同步的序列号 err = repo.DeviceACKRepo.Set(sender.SenderId, sender.DeviceId, seq) if err != nil { return 0, err } } } message := pb.Message{ Sender: sender, ReceiverType: req.ReceiverType, ReceiverId: req.ReceiverId, ToUserIds: req.ToUserIds, MessageType: req.MessageType, MessageContent: req.MessageContent, Seq: seq, SendTime: req.SendTime, Status: pb.MessageStatus_MS_NORMAL, } // 查询用户在线设备 devices, err := proxy.DeviceProxy.ListOnlineByUserId(ctx, toUserId) if err != nil { logger.Sugar.Error(err) return 0, err } for i := range devices { if sender.DeviceId == devices[i].DeviceId { // 消息不需要投递给发送消息的设备 continue } err = MessageService.SendToDevice(ctx, devices[i], &message) if err != nil { logger.Sugar.Error(err, zap.Any("SendToUser error", devices[i]), zap.Error(err)) } } return seq, nil } // SendRedPackageToUser 发送红包给用户 func (*messageService) SendRedPackageToUser(ctx context.Context, sender *pb.Sender, toUserId int64, req *pb.SendRedPacketReq) (int64, error) { masterId, _ := grpclib.GetCtxMasterId(ctx) logger.Logger.Debug("SendRedPackageToUser", zap.String("master_id", masterId), zap.Int64("request_id", grpclib.GetCtxRequestId(ctx)), zap.Int64("to_user_id", toUserId)) var ( seq int64 = 0 err error ) //1、发送一条新的消息 seq, err = SeqService.GetUserNext(ctx, toUserId) if err != nil { return 0, err } selfMessage := model.Message{ UserId: toUserId, RequestId: grpclib.GetCtxRequestId(ctx), SenderType: int32(sender.SenderType), SenderId: sender.SenderId, ReceiverType: int32(req.ReceiverType), ReceiverId: req.ReceiverId, //ToUserIds: model.FormatUserIds(req.ToUserIds), Type: int(req.MessageType), Content: req.MessageContent, Seq: seq, SendTime: util.UnunixMilliTime(req.SendTime), Status: int32(pb.MessageStatus_MS_NORMAL), } err = repo.MessageRepo.Save(selfMessage) if err != nil { logger.Sugar.Error(err) return 0, err } if sender.SenderType == pb.SenderType_ST_USER && sender.SenderId == toUserId { // 用户需要增加自己的已经同步的序列号 err = repo.DeviceACKRepo.Set(sender.SenderId, sender.DeviceId, seq) if err != nil { return 0, err } } message := pb.Message{ Sender: sender, ReceiverType: req.ReceiverType, ReceiverId: req.ReceiverId, //ToUserIds: req.ToUserIds, MessageType: pb.MessageType_MT_RED_PACKAGE, MessageContent: req.MessageContent, Seq: seq, SendTime: req.SendTime, Status: pb.MessageStatus_MS_NORMAL, } // 查询用户在线设备 devices, err := proxy.DeviceProxy.ListOnlineByUserId(ctx, toUserId) if err != nil { logger.Sugar.Error(err) return 0, err } for i := range devices { if sender.DeviceId == devices[i].DeviceId { // 消息不需要投递给发送消息的设备 continue } err = MessageService.SendToDevice(ctx, devices[i], &message) if err != nil { logger.Sugar.Error(err, zap.Any("SendToUser error", devices[i]), zap.Error(err)) } } return seq, nil } // SendToDevice 将消息发送给设备 func (*messageService) SendToDevice(ctx context.Context, device *pb.Device, message *pb.Message) error { messageSend := pb.MessageSend{Message: message} _, err := rpc.GetConnectIntClient().DeliverMessage(picker.ContextWithAddr(ctx, device.ConnAddr), &pb.DeliverMessageReq{ DeviceId: device.DeviceId, MessageSend: &messageSend, }) if err != nil { logger.Logger.Error("SendToDevice error", zap.Error(err)) return err } // todo 其他推送厂商 return nil } func (*messageService) AddSenderInfo(sender *pb.Sender) { if sender.SenderType == pb.SenderType_ST_USER { user, err := rpc.GetBusinessIntClient().GetUser(context.TODO(), &pb.GetUserReq{UserId: sender.SenderId}) if err == nil && user != nil { sender.AvatarUrl = user.User.AvatarUrl sender.Nickname = user.User.Nickname sender.Extra = user.User.Extra } } }