DengBiao пре 1 година
родитељ
комит
c9227fbc96
15 измењених фајлова са 1255 додато и 410 уклоњено
  1. +5
    -5
      README.md
  2. +1
    -1
      cmd/business/main.go
  3. +1
    -1
      cmd/logic/main.go
  4. +26
    -0
      internal/logic/api/logic_ext.go
  5. +5
    -0
      internal/logic/app/friend_app.go
  6. +10
    -0
      internal/logic/app/group_app.go
  7. +25
    -0
      internal/logic/app/message_app.go
  8. +30
    -0
      internal/logic/domain/friend/friend_service.go
  9. +36
    -1
      internal/logic/domain/group/model/group.go
  10. +79
    -0
      internal/logic/domain/message/service/message_service.go
  11. +1
    -0
      internal/logic/proxy/message_proxy.go
  12. +542
    -183
      pkg/pb/connect.ext.pb.go
  13. +437
    -219
      pkg/pb/logic.ext.pb.go
  14. +41
    -0
      pkg/proto/connect.ext.proto
  15. +16
    -0
      pkg/proto/logic.ext.proto

+ 5
- 5
README.md Прегледај датотеку

@@ -69,17 +69,17 @@ TCP的网络层使用linux的epoll实现,相比golang原生,能减少gorouti
采用读扩散,会将消息短暂的保存到Redis,长连接登录消息同步不会同步离线消息。
### 核心流程时序图
#### 长连接登录
![登录.png](https://upload-images.jianshu.io/upload_images/5760439-2e54d3c5dd0a44c1.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![登录.png](https://camo.githubusercontent.com/c3bb28e0bfe068f5ba619d571d2c665adc83138d56d1f5cb76c76c98e8d3ca74/68747470733a2f2f75706c6f61642d696d616765732e6a69616e7368752e696f2f75706c6f61645f696d616765732f353736303433392d326535346433633564643061343463312e706e673f696d6167654d6f6772322f6175746f2d6f7269656e742f7374726970253743696d61676556696577322f322f772f31323430)
#### 离线消息同步
![离线消息同步.png](https://upload-images.jianshu.io/upload_images/5760439-aa513ea0de851e12.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![离线消息同步.png](https://camo.githubusercontent.com/19edb5f72f832ef38ba2152f8179f91aaa55eefc3943afd44431f68824e3387b/68747470733a2f2f75706c6f61642d696d616765732e6a69616e7368752e696f2f75706c6f61645f696d616765732f353736303433392d616135313365613064653835316531322e706e673f696d6167654d6f6772322f6175746f2d6f7269656e742f7374726970253743696d61676556696577322f322f772f31323430)
#### 心跳
![心跳.png](https://upload-images.jianshu.io/upload_images/5760439-26d491374da3843b.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![心跳.png](https://camo.githubusercontent.com/f8bbad45931b4b6c14d9ac6b4156459372593a8202ee7ac3978d4e54f79818aa/68747470733a2f2f75706c6f61642d696d616765732e6a69616e7368752e696f2f75706c6f61645f696d616765732f353736303433392d323664343931333734646133383433622e706e673f696d6167654d6f6772322f6175746f2d6f7269656e742f7374726970253743696d61676556696577322f322f772f31323430)
#### 消息单发
c1.d1和c1.d2分别表示c1用户的两个设备d1和d2,c2.d3和c2.d4同理
![消息单发.png](https://upload-images.jianshu.io/upload_images/5760439-35f1a91c8d7fffa6.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![消息单发.png](https://camo.githubusercontent.com/18705cdbc15e29fdabdaf473f297337ac48d06e5e86662e9f72261d910821ce4/68747470733a2f2f75706c6f61642d696d616765732e6a69616e7368752e696f2f75706c6f61645f696d616765732f353736303433392d333566316139316338643766666661362e706e673f696d6167654d6f6772322f6175746f2d6f7269656e742f7374726970253743696d61676556696577322f322f772f31323430)
#### 群组消息群发
c1,c2.c3表示一个群组中的三个用户
![消息群发.png](https://upload-images.jianshu.io/upload_images/5760439-47a87c45b899b3f9.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![消息群发.png](https://camo.githubusercontent.com/b1fdc7d86b79d9c2375c7e438f13ff6379381575d5b3873adc87ceb10d642a25/68747470733a2f2f75706c6f61642d696d616765732e6a69616e7368752e696f2f75706c6f61645f696d616765732f353736303433392d343761383763343562383939623366392e706e673f696d6167654d6f6772322f6175746f2d6f7269656e742f7374726970253743696d61676556696577322f322f772f31323430)
#### APP
基于Flutter写了一个简单的客户端
GitHub地址:https://github.com/alberliu/fim


+ 1
- 1
cmd/business/main.go Прегледај датотеку

@@ -20,7 +20,7 @@ import (
)

func main() {
config.Init()
//config.Init()
db.Init()

server := grpc.NewServer(grpc.UnaryInterceptor(interceptor.NewInterceptor("business_interceptor", urlwhitelist.Business)))


+ 1
- 1
cmd/logic/main.go Прегледај датотеку

@@ -25,7 +25,7 @@ func init() {
}

func main() {
config.Init()
//config.Init()
db.Init()

server := grpc.NewServer(grpc.UnaryInterceptor(interceptor.NewInterceptor("logic_interceptor", urlwhitelist.Logic)))


+ 26
- 0
internal/logic/api/logic_ext.go Прегледај датотеку

@@ -10,6 +10,32 @@ import (

type LogicExtServer struct{}

func (s *LogicExtServer) SendRedPacket(ctx context.Context, in *pb.SendRedPacketReq) (*pb.SendRedPacketResp, error) {
userId, deviceId, err := grpclib.GetCtxData(ctx)
if err != nil {
return nil, err
}
if in.MessageContentBack != "" {
buf, err := proto.Marshal(&pb.Text{
Text: in.MessageContentBack,
})
if err != nil {
return nil, err
}
in.MessageContent = buf
}
sender := pb.Sender{
SenderType: pb.SenderType_ST_USER,
SenderId: userId,
DeviceId: deviceId,
}
seq, err := app.MessageApp.SendRedPackage(ctx, &sender, in)
if err != nil {
return nil, err
}
return &pb.SendRedPacketResp{Seq: seq}, nil
}

func (s *LogicExtServer) RecallMessage(ctx context.Context, in *pb.RecallMessageReq) (*pb.RecallMessageResp, error) {
userId, deviceId, err := grpclib.GetCtxData(ctx)
if err != nil {


+ 5
- 0
internal/logic/app/friend_app.go Прегледај датотеку

@@ -61,3 +61,8 @@ func (*friendApp) SendToFriend(ctx context.Context, sender *pb.Sender, req *pb.S
func (*friendApp) RecallMessageSendToFriend(ctx context.Context, sender *pb.Sender, req *pb.RecallMessageReq) (int64, error) {
return frienddomain.FriendService.RecallMessageSendToFriend(ctx, sender, req)
}

// RedPackageMessageSendToFriend 红包发送至好友
func (*friendApp) RedPackageMessageSendToFriend(ctx context.Context, sender *pb.Sender, req *pb.SendRedPacketReq) (int64, error) {
return frienddomain.FriendService.RedPackageMessageSendToFriend(ctx, sender, req)
}

+ 10
- 0
internal/logic/app/group_app.go Прегледај датотеку

@@ -158,3 +158,13 @@ func (*groupApp) RecallSendMessage(ctx context.Context, sender *pb.Sender, req *

return group.RecallSendMessage(ctx, sender, req)
}

// SendRedPackage 发送红包消息
func (*groupApp) SendRedPackage(ctx context.Context, sender *pb.Sender, req *pb.SendRedPacketReq) (int64, error) {
group, err := repo.GroupRepo.Get(req.ReceiverId)
if err != nil {
return 0, err
}

return group.SendRedPackage(ctx, sender, req)
}

+ 25
- 0
internal/logic/app/message_app.go Прегледај датотеку

@@ -22,6 +22,11 @@ func (*messageApp) RecallMessageSendToUser(ctx context.Context, sender *pb.Sende
return service.MessageService.RecallMessageSendToUser(ctx, sender, toUserId, req, isRecallMessageUser)
}

// SendRedPackageToUser 发送红包给用户
func (*messageApp) SendRedPackageToUser(ctx context.Context, sender *pb.Sender, toUserId int64, req *pb.SendRedPacketReq) (int64, error) {
return service.MessageService.SendRedPackageToUser(ctx, sender, toUserId, req)
}

// PushToUser 推送消息给用户
func (*messageApp) PushToUser(ctx context.Context, userId int64, code pb.PushCode, message proto.Message, isPersist bool) error {
return service.PushService.PushToUser(ctx, userId, code, message, isPersist)
@@ -82,3 +87,23 @@ func (s *messageApp) RecallMessage(ctx context.Context, sender *pb.Sender, req *
}
return 0, nil
}

// SendRedPackage 发送红包消息
func (s *messageApp) SendRedPackage(ctx context.Context, sender *pb.Sender, req *pb.SendRedPacketReq) (int64, error) {
// 如果发送者是用户,需要补充用户的信息
service.MessageService.AddSenderInfo(sender)
switch req.ReceiverType {
// 消息接收者为用户
case pb.ReceiverType_RT_USER:
// 发送者为用户
if sender.SenderType == pb.SenderType_ST_USER {
return FriendApp.RedPackageMessageSendToFriend(ctx, sender, req)
} else {
return s.SendRedPackageToUser(ctx, sender, req.ReceiverId, req)
}
// 消息接收者是群组
case pb.ReceiverType_RT_GROUP:
return GroupApp.SendRedPackage(ctx, sender, req)
}
return 0, nil
}

+ 30
- 0
internal/logic/domain/friend/friend_service.go Прегледај датотеку

@@ -244,3 +244,33 @@ func (*friendService) RecallMessageSendToFriend(ctx context.Context, sender *pb.

return seq, nil
}

// RedPackageMessageSendToFriend 红包发送至好友
func (*friendService) RedPackageMessageSendToFriend(ctx context.Context, sender *pb.Sender, req *pb.SendRedPacketReq) (int64, error) {
//TODO::判断是否为好友
friend, err := FriendRepo.Get(sender.SenderId, req.ReceiverId)
if err != nil {
return 0, err
}
if friend == nil || friend.Status != FriendStatusAgree {
return 0, gerrors.ErrNotIsFriend
}

utils.FilePutContents("RedPackageMessageSendToFriend", utils.SerializeStr(map[string]interface{}{
"send": sender,
"req": req,
}))
// 发给发送者
seq, err := proxy.MessageProxy.SendRedPackageToUser(ctx, sender, sender.SenderId, req)
if err != nil {
return 0, err
}

// 发给接收者
_, err = proxy.MessageProxy.SendRedPackageToUser(ctx, sender, req.ReceiverId, req)
if err != nil {
return 0, err
}

return seq, nil
}

+ 36
- 1
internal/logic/domain/group/model/group.go Прегледај датотеку

@@ -79,7 +79,7 @@ func CreateGroup(userId int64, in *pb.CreateGroupReq) *Group {
group.Members = append(group.Members, GroupUser{
GroupId: group.Id,
UserId: userId,
MemberType: int(pb.MembertypeGmtAdmin),
MemberType: int(pb.MemberType_GMT_ADMIN),
CreateTime: now,
UpdateTime: now,
UpdateType: UpdateTypeUpdate,
@@ -196,6 +196,41 @@ func (g *Group) RecallSendMessage(ctx context.Context, sender *pb.Sender, req *p
return userSeq, nil
}

// SendRedPackage 发送红包消息发送至群组
func (g *Group) SendRedPackage(ctx context.Context, sender *pb.Sender, req *pb.SendRedPacketReq) (int64, error) {
if sender.SenderType == pb.SenderType_ST_USER && !g.IsMember(sender.SenderId) {
logger.Sugar.Error(ctx, sender.SenderId, req.ReceiverId, "不在群组内")
return 0, gerrors.ErrNotInGroup
}

// 如果发送者是用户,将消息发送给发送者,获取用户seq
var userSeq int64
var err error
if sender.SenderType == pb.SenderType_ST_USER {
userSeq, err = proxy.MessageProxy.SendRedPackageToUser(ctx, sender, sender.SenderId, req)
if err != nil {
return 0, err
}
}

go func() {
defer util.RecoverPanic()
// 将消息发送给群组用户,使用写扩散
for _, user := range g.Members {
// 前面已经发送过,这里不需要再发送
if sender.SenderType == pb.SenderType_ST_USER && user.UserId == sender.SenderId {
continue
}
_, err := proxy.MessageProxy.SendRedPackageToUser(grpclib.NewAndCopyRequestId(ctx), sender, user.UserId, req)
if err != nil {
return
}
}
}()

return userSeq, nil
}

func (g *Group) IsMember(userId int64) bool {
for i := range g.Members {
if g.Members[i].UserId == userId {


+ 79
- 0
internal/logic/domain/message/service/message_service.go Прегледај датотеку

@@ -348,6 +348,85 @@ func (*messageService) RecallMessageSendToUser(ctx context.Context, sender *pb.S
return seq, nil
}

// SendRedPackageToUser 发送红包给用户
func (*messageService) SendRedPackageToUser(ctx context.Context, sender *pb.Sender, toUserId int64, req *pb.SendRedPacketReq) (int64, error) {
masterId, _ := grpclib.GetCtxMasterId(ctx)
logger.Logger.Debug("SendRedPackageToUser",
zap.String("master_id", masterId),
zap.Int64("request_id", grpclib.GetCtxRequestId(ctx)),
zap.Int64("to_user_id", toUserId))
var (
seq int64 = 0
err error
)

//1、发送一条新的消息
seq, err = SeqService.GetUserNext(ctx, toUserId)
if err != nil {
return 0, err
}

selfMessage := model.Message{
UserId: toUserId,
RequestId: grpclib.GetCtxRequestId(ctx),
SenderType: int32(sender.SenderType),
SenderId: sender.SenderId,
ReceiverType: int32(req.ReceiverType),
ReceiverId: req.ReceiverId,
//ToUserIds: model.FormatUserIds(req.ToUserIds),
Type: int(req.MessageType),
Content: req.MessageContent,
Seq: seq,
SendTime: util.UnunixMilliTime(req.SendTime),
Status: int32(pb.MessageStatus_MS_NORMAL),
}
err = repo.MessageRepo.Save(selfMessage)
if err != nil {
logger.Sugar.Error(err)
return 0, err
}

if sender.SenderType == pb.SenderType_ST_USER && sender.SenderId == toUserId {
// 用户需要增加自己的已经同步的序列号
err = repo.DeviceACKRepo.Set(sender.SenderId, sender.DeviceId, seq)
if err != nil {
return 0, err
}
}

message := pb.Message{
Sender: sender,
ReceiverType: req.ReceiverType,
ReceiverId: req.ReceiverId,
//ToUserIds: req.ToUserIds,
MessageType: pb.MessageType_MT_RED_PACKAGE,
MessageContent: req.MessageContent,
Seq: seq,
SendTime: req.SendTime,
Status: pb.MessageStatus_MS_NORMAL,
}

// 查询用户在线设备
devices, err := proxy.DeviceProxy.ListOnlineByUserId(ctx, toUserId)
if err != nil {
logger.Sugar.Error(err)
return 0, err
}

for i := range devices {
if sender.DeviceId == devices[i].DeviceId {
// 消息不需要投递给发送消息的设备
continue
}
err = MessageService.SendToDevice(ctx, devices[i], &message)
if err != nil {
logger.Sugar.Error(err, zap.Any("SendToUser error", devices[i]), zap.Error(err))
}
}

return seq, nil
}

// SendToDevice 将消息发送给设备
func (*messageService) SendToDevice(ctx context.Context, device *pb.Device, message *pb.Message) error {
messageSend := pb.MessageSend{Message: message}


+ 1
- 0
internal/logic/proxy/message_proxy.go Прегледај датотеку

@@ -12,5 +12,6 @@ var MessageProxy messageProxy
type messageProxy interface {
SendToUser(ctx context.Context, sender *pb.Sender, toUserId int64, req *pb.SendMessageReq) (int64, error)
RecallMessageSendToUser(ctx context.Context, sender *pb.Sender, toUserId int64, req *pb.RecallMessageReq, isRecallMessageUser bool) (int64, error)
SendRedPackageToUser(ctx context.Context, sender *pb.Sender, toUserId int64, req *pb.SendRedPacketReq) (int64, error)
PushToUser(ctx context.Context, userId int64, code pb.PushCode, message proto.Message, isPersist bool) error
}

+ 542
- 183
pkg/pb/connect.ext.pb.go
Разлика између датотеке није приказан због своје велике величине
Прегледај датотеку


+ 437
- 219
pkg/pb/logic.ext.pb.go
Разлика између датотеке није приказан због своје велике величине
Прегледај датотеку


+ 41
- 0
pkg/proto/connect.ext.proto Прегледај датотеку

@@ -46,6 +46,32 @@ enum MessageType {
MT_COMMAND = 7; // 指令推送
MT_CUSTOM = 8; // 自定义
MT_RECALL = 9; // 撤回消息
MT_RED_PACKAGE = 10; // 红包消息
}

// 红包类型
enum RedPacketType {
RPT_UNKNOWN = 0; // 未知
RPT_FRIEND = 1; // 好友红包
RPT_GROUP_NORMAL = 2; // 群组普通红包
RPT_GROUP_LUCK = 3; // 群组手气红包
RPT_GROUP_SPECIALLY = 4; // 群组专属红包
RPT_SYSTEM_FOR = 5; // 系统红包
}

// 红包消息类型
enum RedPacketMessageType {
RMT_UNKNOWN = 0; // 未知
RMT_SEND = 1; // 发红包
RMT_GRAB = 2; // 抢红包
}

// 红包状态类型
enum RedPacketStatusType {
RPS_NOT_DRAW = 0; // 未领取
RPS_DRAWING = 1; // 领取中
RPS_DRAW_OVER = 2; // 领取完
RPS_EXPIRE = 3; //已过期
}

// 文本消息
@@ -107,6 +133,21 @@ message RECALL {
int64 recall_seq = 1; // 撤回消息seq
}


// 红包消息
message RED_PACKAGE {
RedPacketMessageType red_message_type = 1;// 红包消息类型
RedPacketType red_packet_type = 2; // 红包类型
string red_packet_content = 3; // 红包文字内容
float red_packet_amount = 6; // 红包金额
int32 red_packet_nums = 5; // 红包数量
float red_packet_balance_amount = 7; // 红包余额
repeated int64 received_user_ids = 8; // 已领取用户id
repeated float received_user_amount = 9; // 已领取用户金额
repeated string received_user_nickname = 10; // 已领取用户昵称
RedPacketStatusType red_packet_status_type = 11; // 领取状态
}

/************************************消息体定义结束************************************/

// 上行数据


+ 16
- 0
pkg/proto/logic.ext.proto Прегледај датотеку

@@ -16,6 +16,9 @@ service LogicExt {
// 推送消息到房间
rpc PushRoom(PushRoomReq)returns(Empty);

// 发送红包
rpc SendRedPacket (SendRedPacketReq) returns (SendRedPacketResp);

// 添加好友
rpc AddFriend (AddFriendReq) returns (Empty);
// 同意添加好友
@@ -85,6 +88,19 @@ message RecallMessageResp {
int64 seq = 1; // 消息序列号
}


message SendRedPacketReq {
ReceiverType receiver_type = 1; // 接收者类型,1:user;2:group
int64 receiver_id = 2; // 用户id或者群组id
MessageType message_type = 3; // 消息类型
bytes message_content = 4; // 消息内容
int64 send_time = 5; // 消息发送时间戳,精确到毫秒
string message_content_back = 6;
}
message SendRedPacketResp {
int64 seq = 1; // 消息序列号
}

message PushRoomReq{
int64 room_id = 1; // 房间id
MessageType message_type = 2; // 消息类型


Loading…
Откажи
Сачувај