golang-im聊天
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

510 line
13 KiB

  1. package model
  2. import (
  3. "context"
  4. "fmt"
  5. "gim/internal/logic/domain/message/repo"
  6. "gim/internal/logic/proxy"
  7. "gim/pkg/gerrors"
  8. "gim/pkg/grpclib"
  9. "gim/pkg/logger"
  10. "gim/pkg/pb"
  11. "gim/pkg/rpc"
  12. "gim/pkg/util"
  13. "time"
  14. "go.uber.org/zap"
  15. "google.golang.org/protobuf/proto"
  16. )
  17. const (
  18. UpdateTypeUpdate = 1
  19. UpdateTypeDelete = 2
  20. )
  21. // Group 群组
  22. type Group struct {
  23. Id int64 // 群组id
  24. Name string // 组名
  25. AvatarUrl string // 头像
  26. Introduction string // 群简介
  27. UserNum int32 // 群组人数
  28. IsAllMemberBanned int32 // 是否全员禁言(1:是 2:否)
  29. Extra string // 附加字段
  30. CreateTime time.Time // 创建时间
  31. UpdateTime time.Time // 更新时间
  32. Members []GroupUser `gorm:"-"` // 群组成员
  33. }
  34. type GroupUser struct {
  35. Id int64 // 自增主键
  36. GroupId int64 // 群组id
  37. UserId int64 // 用户id
  38. MemberType int // 群组类型
  39. Remarks string // 备注
  40. Extra string // 附加属性
  41. Status int // 状态
  42. CreateTime time.Time // 创建时间
  43. UpdateTime time.Time // 更新时间
  44. UpdateType int `gorm:"-"` // 更新类型
  45. }
  46. func (g *Group) ToProto() *pb.Group {
  47. if g == nil {
  48. return nil
  49. }
  50. return &pb.Group{
  51. GroupId: g.Id,
  52. Name: g.Name,
  53. AvatarUrl: g.AvatarUrl,
  54. Introduction: g.Introduction,
  55. UserMum: g.UserNum,
  56. IsAllMemberBanned: g.IsAllMemberBanned,
  57. Extra: g.Extra,
  58. CreateTime: g.CreateTime.Unix(),
  59. UpdateTime: g.UpdateTime.Unix(),
  60. }
  61. }
  62. func CreateGroup(userId int64, in *pb.CreateGroupReq) *Group {
  63. now := time.Now()
  64. group := &Group{
  65. Name: in.Name,
  66. AvatarUrl: in.AvatarUrl,
  67. Introduction: in.Introduction,
  68. Extra: in.Extra,
  69. Members: make([]GroupUser, 0, len(in.MemberIds)+1),
  70. CreateTime: now,
  71. UpdateTime: now,
  72. }
  73. // 创建者添加为管理员
  74. group.Members = append(group.Members, GroupUser{
  75. GroupId: group.Id,
  76. UserId: userId,
  77. MemberType: int(pb.MemberType_GMT_ADMIN),
  78. CreateTime: now,
  79. UpdateTime: now,
  80. UpdateType: UpdateTypeUpdate,
  81. })
  82. // 其让人添加为成员
  83. for i := range in.MemberIds {
  84. group.Members = append(group.Members, GroupUser{
  85. GroupId: group.Id,
  86. UserId: in.MemberIds[i],
  87. MemberType: int(pb.MemberType_GMT_MEMBER),
  88. CreateTime: now,
  89. UpdateTime: now,
  90. UpdateType: UpdateTypeUpdate,
  91. })
  92. }
  93. return group
  94. }
  95. func (g *Group) Update(ctx context.Context, in *pb.UpdateGroupReq) error {
  96. g.Name = in.Name
  97. g.AvatarUrl = in.AvatarUrl
  98. g.Introduction = in.Introduction
  99. g.Extra = in.Extra
  100. g.UpdateTime = time.Now()
  101. return nil
  102. }
  103. func (g *Group) PushUpdate(ctx context.Context, userId int64, isUpdateIntroduction bool) error {
  104. userResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: userId})
  105. if err != nil {
  106. return err
  107. }
  108. err = g.PushMessage(ctx, pb.PushCode_PC_UPDATE_GROUP, &pb.UpdateGroupPush{
  109. OptId: userId,
  110. OptName: userResp.User.Nickname,
  111. Name: g.Name,
  112. AvatarUrl: g.AvatarUrl,
  113. Introduction: g.Introduction,
  114. IsUpdateIntroduction: isUpdateIntroduction,
  115. Extra: g.Extra,
  116. }, true)
  117. if err != nil {
  118. return err
  119. }
  120. if isUpdateIntroduction {
  121. //_, err = g.SendMessage(ctx,
  122. // &pb.Sender{
  123. // SenderType: pb.SenderType_ST_USER,
  124. // SenderId: 0,
  125. // },
  126. // &pb.SendMessageReq{
  127. // ReceiverType: pb.ReceiverType_RT_GROUP,
  128. // ReceiverId: g.Id,
  129. // ToUserIds: nil,
  130. // MessageType: pb.MessageType_MT_COMMAND,
  131. // MessageContent: commandBuf,
  132. // SendTime: util.UnixMilliTime(time.Now()),
  133. // IsPersist: isPersist,
  134. // })
  135. //if err != nil {
  136. // return err
  137. //}
  138. }
  139. return nil
  140. }
  141. // SendMessage 消息发送至群组
  142. func (g *Group) SendMessage(ctx context.Context, sender *pb.Sender, req *pb.SendMessageReq) (int64, error) {
  143. if sender.SenderType == pb.SenderType_ST_USER && !g.IsMember(sender.SenderId) {
  144. logger.Sugar.Error(ctx, sender.SenderId, req.ReceiverId, "不在群组内")
  145. return 0, gerrors.ErrNotInGroup
  146. }
  147. // 如果发送者是用户,将消息发送给发送者,获取用户seq
  148. var userSeq int64
  149. var err error
  150. if sender.SenderType == pb.SenderType_ST_USER {
  151. userSeq, err = proxy.MessageProxy.SendToUser(ctx, sender, sender.SenderId, req)
  152. if err != nil {
  153. return 0, err
  154. }
  155. }
  156. go func() {
  157. defer util.RecoverPanic()
  158. // 将消息发送给群组用户,使用写扩散
  159. for _, user := range g.Members {
  160. // 前面已经发送过,这里不需要再发送
  161. if sender.SenderType == pb.SenderType_ST_USER && user.UserId == sender.SenderId {
  162. continue
  163. }
  164. _, err := proxy.MessageProxy.SendToUser(grpclib.NewAndCopyRequestId(ctx), sender, user.UserId, req)
  165. if err != nil {
  166. return
  167. }
  168. }
  169. }()
  170. return userSeq, nil
  171. }
  172. // RecallSendMessage 撤回消息发送至群组
  173. func (g *Group) RecallSendMessage(ctx context.Context, sender *pb.Sender, req *pb.RecallMessageReq) (int64, error) {
  174. if sender.SenderType == pb.SenderType_ST_USER && !g.IsMember(sender.SenderId) {
  175. logger.Sugar.Error(ctx, sender.SenderId, req.ReceiverId, "不在群组内")
  176. return 0, gerrors.ErrNotInGroup
  177. }
  178. var err error
  179. //查询到对应seq的消息
  180. msg := &pb.RECALL{}
  181. err = proto.Unmarshal(req.MessageContent, msg)
  182. if err != nil {
  183. return 0, err
  184. }
  185. message, err := repo.MessageRepo.GetMessage(sender.SenderId, msg.RecallSeq)
  186. if err != nil {
  187. return 0, err
  188. }
  189. // 如果发送者是用户,将消息发送给发送者,获取用户seq
  190. var userSeq int64
  191. if sender.SenderType == pb.SenderType_ST_USER {
  192. userSeq, err = proxy.MessageProxy.RecallMessageSendToUser(ctx, sender, sender.SenderId, req, message.SendTime)
  193. if err != nil {
  194. return 0, err
  195. }
  196. }
  197. go func() {
  198. defer util.RecoverPanic()
  199. // 将消息发送给群组用户,使用写扩散
  200. for _, user := range g.Members {
  201. // 前面已经发送过,这里不需要再发送
  202. if sender.SenderType == pb.SenderType_ST_USER && user.UserId == sender.SenderId {
  203. continue
  204. }
  205. _, err := proxy.MessageProxy.RecallMessageSendToUser(grpclib.NewAndCopyRequestId(ctx), sender, user.UserId, req, message.SendTime)
  206. if err != nil {
  207. return
  208. }
  209. }
  210. }()
  211. return userSeq, nil
  212. }
  213. // SendRedPackage 发送红包消息发送至群组
  214. func (g *Group) SendRedPackage(ctx context.Context, sender *pb.Sender, req *pb.SendRedPacketReq) (int64, error) {
  215. if sender.SenderType == pb.SenderType_ST_USER && !g.IsMember(sender.SenderId) {
  216. logger.Sugar.Error(ctx, sender.SenderId, req.ReceiverId, "不在群组内")
  217. return 0, gerrors.ErrNotInGroup
  218. }
  219. // 如果发送者是用户,将消息发送给发送者,获取用户seq
  220. var userSeq int64
  221. var err error
  222. if sender.SenderType == pb.SenderType_ST_USER {
  223. userSeq, err = proxy.MessageProxy.SendRedPackageToUser(ctx, sender, sender.SenderId, req)
  224. if err != nil {
  225. return 0, err
  226. }
  227. }
  228. go func() {
  229. defer util.RecoverPanic()
  230. // 将消息发送给群组用户,使用写扩散
  231. for _, user := range g.Members {
  232. // 前面已经发送过,这里不需要再发送
  233. if sender.SenderType == pb.SenderType_ST_USER && user.UserId == sender.SenderId {
  234. continue
  235. }
  236. _, err := proxy.MessageProxy.SendRedPackageToUser(grpclib.NewAndCopyRequestId(ctx), sender, user.UserId, req)
  237. if err != nil {
  238. return
  239. }
  240. }
  241. }()
  242. return userSeq, nil
  243. }
  244. func (g *Group) IsMember(userId int64) bool {
  245. for i := range g.Members {
  246. if g.Members[i].UserId == userId {
  247. return true
  248. }
  249. }
  250. return false
  251. }
  252. // PushMessage 向群组推送消息
  253. func (g *Group) PushMessage(ctx context.Context, code pb.PushCode, message proto.Message, isPersist bool) error {
  254. logger.Logger.Debug("push_to_group",
  255. zap.Int64("request_id", grpclib.GetCtxRequestId(ctx)),
  256. zap.Int64("group_id", g.Id),
  257. zap.Int32("code", int32(code)),
  258. zap.Any("message", message))
  259. messageBuf, err := proto.Marshal(message)
  260. if err != nil {
  261. return gerrors.WrapError(err)
  262. }
  263. commandBuf, err := proto.Marshal(&pb.Command{Code: int32(code), Data: messageBuf})
  264. if err != nil {
  265. return gerrors.WrapError(err)
  266. }
  267. _, err = g.SendMessage(ctx,
  268. &pb.Sender{
  269. SenderType: pb.SenderType_ST_SYSTEM,
  270. SenderId: 0,
  271. },
  272. &pb.SendMessageReq{
  273. ReceiverType: pb.ReceiverType_RT_GROUP,
  274. ReceiverId: g.Id,
  275. ToUserIds: nil,
  276. MessageType: pb.MessageType_MT_COMMAND,
  277. MessageContent: commandBuf,
  278. SendTime: util.UnixMilliTime(time.Now()),
  279. IsPersist: isPersist,
  280. },
  281. )
  282. if err != nil {
  283. return err
  284. }
  285. return nil
  286. }
  287. // GetMembers 获取群组用户
  288. func (g *Group) GetMembers(ctx context.Context) ([]*pb.GroupMember, error) {
  289. members := g.Members
  290. userIds := make(map[int64]int32, len(members))
  291. for i := range members {
  292. userIds[members[i].UserId] = 0
  293. }
  294. resp, err := rpc.GetBusinessIntClient().GetUsers(ctx, &pb.GetUsersReq{UserIds: userIds})
  295. if err != nil {
  296. return nil, err
  297. }
  298. var infos = make([]*pb.GroupMember, len(members))
  299. for i := range members {
  300. member := pb.GroupMember{
  301. UserId: members[i].UserId,
  302. MemberType: pb.MemberType(members[i].MemberType),
  303. Remarks: members[i].Remarks,
  304. Extra: members[i].Extra,
  305. }
  306. user, ok := resp.Users[members[i].UserId]
  307. if ok {
  308. member.Nickname = user.Nickname
  309. member.Sex = user.Sex
  310. member.AvatarUrl = user.AvatarUrl
  311. member.UserExtra = user.Extra
  312. }
  313. infos[i] = &member
  314. }
  315. return infos, nil
  316. }
  317. // AddMembers 给群组添加用户
  318. func (g *Group) AddMembers(ctx context.Context, userIds []int64) ([]int64, []int64, error) {
  319. var existIds []int64
  320. var addedIds []int64
  321. now := time.Now()
  322. for i, userId := range userIds {
  323. if g.IsMember(userId) {
  324. existIds = append(existIds, userIds[i])
  325. continue
  326. }
  327. g.Members = append(g.Members, GroupUser{
  328. GroupId: g.Id,
  329. UserId: userIds[i],
  330. MemberType: int(pb.MemberType_GMT_MEMBER),
  331. CreateTime: now,
  332. UpdateTime: now,
  333. UpdateType: UpdateTypeUpdate,
  334. })
  335. addedIds = append(addedIds, userIds[i])
  336. }
  337. g.UserNum += int32(len(addedIds))
  338. return existIds, addedIds, nil
  339. }
  340. func (g *Group) PushAddMember(ctx context.Context, optUserId int64, addedIds []int64) error {
  341. var addIdMap = make(map[int64]int32, len(addedIds))
  342. for i := range addedIds {
  343. addIdMap[addedIds[i]] = 0
  344. }
  345. addIdMap[optUserId] = 0
  346. usersResp, err := rpc.GetBusinessIntClient().GetUsers(ctx, &pb.GetUsersReq{UserIds: addIdMap})
  347. if err != nil {
  348. return err
  349. }
  350. var members []*pb.GroupMember
  351. for k, _ := range addIdMap {
  352. member, ok := usersResp.Users[k]
  353. if !ok {
  354. continue
  355. }
  356. members = append(members, &pb.GroupMember{
  357. UserId: member.UserId,
  358. Nickname: member.Nickname,
  359. Sex: member.Sex,
  360. AvatarUrl: member.AvatarUrl,
  361. UserExtra: member.Extra,
  362. Remarks: "",
  363. Extra: "",
  364. })
  365. }
  366. optUser := usersResp.Users[optUserId]
  367. err = g.PushMessage(ctx, pb.PushCode_PC_ADD_GROUP_MEMBERS, &pb.AddGroupMembersPush{
  368. OptId: optUser.UserId,
  369. OptName: optUser.Nickname,
  370. Members: members,
  371. }, true)
  372. if err != nil {
  373. logger.Sugar.Error(err)
  374. }
  375. return nil
  376. }
  377. func (g *Group) GetMember(ctx context.Context, userId int64) *GroupUser {
  378. for i := range g.Members {
  379. if g.Members[i].UserId == userId {
  380. return &g.Members[i]
  381. }
  382. }
  383. return nil
  384. }
  385. // UpdateMember 更新群组成员信息
  386. func (g *Group) UpdateMember(ctx context.Context, in *pb.UpdateGroupMemberReq) error {
  387. member := g.GetMember(ctx, in.UserId)
  388. if member == nil {
  389. return nil
  390. }
  391. member.MemberType = int(in.MemberType)
  392. member.Remarks = in.Remarks
  393. member.Extra = in.Extra
  394. member.UpdateTime = time.Now()
  395. member.UpdateType = UpdateTypeUpdate
  396. return nil
  397. }
  398. // DeleteMember 删除用户群组
  399. func (g *Group) DeleteMember(ctx context.Context, userId int64) error {
  400. member := g.GetMember(ctx, userId)
  401. if member == nil {
  402. return nil
  403. }
  404. member.UpdateType = UpdateTypeDelete
  405. return nil
  406. }
  407. func (g *Group) PushDeleteMember(ctx context.Context, optId, userId int64) error {
  408. userResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: optId})
  409. if err != nil {
  410. return err
  411. }
  412. deleteUserResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: userId})
  413. if err != nil {
  414. return err
  415. }
  416. err = g.PushMessage(ctx, pb.PushCode_PC_REMOVE_GROUP_MEMBER, &pb.RemoveGroupMemberPush{
  417. OptId: optId,
  418. OptName: userResp.User.Nickname,
  419. DeletedUserId: userId,
  420. DeletedUserName: deleteUserResp.User.Nickname,
  421. }, true)
  422. if err != nil {
  423. return err
  424. }
  425. return nil
  426. }
  427. func (g *Group) PushGroupMemberBanned(ctx context.Context, optId, userId int64, isAllMemberBanned bool) error {
  428. userResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: optId})
  429. if err != nil {
  430. return err
  431. }
  432. if !isAllMemberBanned {
  433. bannedUserResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: userId})
  434. if err != nil {
  435. return err
  436. }
  437. err = g.PushMessage(ctx, pb.PushCode_PC_BANNED_GROUP_MEMBER, &pb.BannedGroupMemberPush{
  438. OptId: optId,
  439. OptName: userResp.User.Nickname,
  440. BannedUserId: userId,
  441. BannedUserName: bannedUserResp.User.Nickname,
  442. }, true)
  443. if err != nil {
  444. return err
  445. }
  446. return nil
  447. } else {
  448. err = g.PushMessage(ctx, pb.PushCode_PC_BANNED_GROUP_MEMBER, &pb.BannedGroupMemberPush{
  449. OptId: optId,
  450. OptName: userResp.User.Nickname,
  451. BannedUserId: userId,
  452. BannedUserName: fmt.Sprintf("管理员\"%s\"设置了禁言", userResp.User.Nickname),
  453. }, true)
  454. if err != nil {
  455. return err
  456. }
  457. return nil
  458. }
  459. }