golang-im聊天
Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.

group.go 14 KiB

2 år sedan
1 år sedan
1 år sedan
1 år sedan
2 år sedan
1 år sedan
2 år sedan
1 år sedan
1 år sedan
2 år sedan
1 år sedan
2 år sedan
1 år sedan
2 år sedan
1 år sedan
2 år sedan
1 år sedan
2 år sedan
1 år sedan
2 år sedan
1 år sedan
2 år sedan
1 år sedan
1 år sedan
1 år sedan
1 år sedan
2 år sedan
1 år sedan
2 år sedan
2 år sedan
2 år sedan
1 år sedan
1 år sedan
1 år sedan
1 år sedan

  1. package model
  2. import (
  3. "context"
  4. "database/sql"
  5. "fmt"
  6. "gim/internal/logic/domain/message/repo"
  7. "gim/internal/logic/proxy"
  8. "gim/pkg/gerrors"
  9. "gim/pkg/grpclib"
  10. "gim/pkg/logger"
  11. "gim/pkg/pb"
  12. "gim/pkg/rpc"
  13. "gim/pkg/util"
  14. "time"
  15. "go.uber.org/zap"
  16. "google.golang.org/protobuf/proto"
  17. )
  18. const (
  19. UpdateTypeUpdate = 1
  20. UpdateTypeDelete = 2
  21. )
  22. // Group 群组
  23. type Group struct {
  24. Id int64 // 群组id
  25. Name string // 组名
  26. AvatarUrl string // 头像
  27. Introduction string // 群简介
  28. UserNum int32 // 群组人数
  29. IsAllMemberBanned int32 // 是否全员禁言(1:是 2:否)
  30. Extra string // 附加字段
  31. CreateTime time.Time // 创建时间
  32. UpdateTime time.Time // 更新时间
  33. Members []GroupUser `gorm:"-"` // 群组成员
  34. }
  35. type GroupUser struct {
  36. Id int64 // 自增主键
  37. GroupId int64 // 群组id
  38. UserId int64 // 用户id
  39. MemberType int // 群组类型
  40. Remarks string // 备注
  41. Extra string // 附加属性
  42. Status int // 状态
  43. CreateTime time.Time // 创建时间
  44. UpdateTime time.Time // 更新时间
  45. UpdateType int `gorm:"-"` // 更新类型
  46. }
  47. type GroupUserV2 struct {
  48. Id int64 // 自增主键
  49. GroupId int64 // 群组id
  50. UserId int64 // 用户id
  51. MemberType int // 群组类型
  52. Remarks string // 备注
  53. Extra string // 附加属性
  54. Status sql.NullInt32 // 状态
  55. CreateTime time.Time // 创建时间
  56. UpdateTime time.Time // 更新时间
  57. UpdateType int `gorm:"-"` // 更新类型
  58. }
  59. func (g *Group) ToProto() *pb.Group {
  60. if g == nil {
  61. return nil
  62. }
  63. return &pb.Group{
  64. GroupId: g.Id,
  65. Name: g.Name,
  66. AvatarUrl: g.AvatarUrl,
  67. Introduction: g.Introduction,
  68. UserMum: g.UserNum,
  69. IsAllMemberBanned: g.IsAllMemberBanned,
  70. Extra: g.Extra,
  71. CreateTime: g.CreateTime.Unix(),
  72. UpdateTime: g.UpdateTime.Unix(),
  73. }
  74. }
  75. func CreateGroup(userId int64, in *pb.CreateGroupReq) *Group {
  76. now := time.Now()
  77. group := &Group{
  78. Name: in.Name,
  79. AvatarUrl: in.AvatarUrl,
  80. Introduction: in.Introduction,
  81. Extra: in.Extra,
  82. Members: make([]GroupUser, 0, len(in.MemberIds)+1),
  83. CreateTime: now,
  84. UpdateTime: now,
  85. }
  86. // 创建者添加为管理员
  87. group.Members = append(group.Members, GroupUser{
  88. GroupId: group.Id,
  89. UserId: userId,
  90. MemberType: int(pb.MemberType_GMT_ADMIN),
  91. CreateTime: now,
  92. UpdateTime: now,
  93. UpdateType: UpdateTypeUpdate,
  94. })
  95. // 其让人添加为成员
  96. for i := range in.MemberIds {
  97. group.Members = append(group.Members, GroupUser{
  98. GroupId: group.Id,
  99. UserId: in.MemberIds[i],
  100. MemberType: int(pb.MemberType_GMT_MEMBER),
  101. CreateTime: now,
  102. UpdateTime: now,
  103. UpdateType: UpdateTypeUpdate,
  104. })
  105. }
  106. return group
  107. }
  108. func (g *Group) Update(ctx context.Context, in *pb.UpdateGroupReq) error {
  109. g.Name = in.Name
  110. g.AvatarUrl = in.AvatarUrl
  111. g.Introduction = in.Introduction
  112. g.Extra = in.Extra
  113. g.UpdateTime = time.Now()
  114. return nil
  115. }
  116. func (g *Group) PushUpdate(ctx context.Context, userId int64, isUpdateIntroduction bool) error {
  117. userResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: userId})
  118. if err != nil {
  119. return err
  120. }
  121. err = g.PushMessage(ctx, pb.PushCode_PC_UPDATE_GROUP, &pb.UpdateGroupPush{
  122. OptId: userId,
  123. OptName: userResp.User.Nickname,
  124. Name: g.Name,
  125. AvatarUrl: g.AvatarUrl,
  126. Introduction: g.Introduction,
  127. IsUpdateIntroduction: isUpdateIntroduction,
  128. Extra: g.Extra,
  129. }, true)
  130. if err != nil {
  131. return err
  132. }
  133. if isUpdateIntroduction {
  134. //_, err = g.SendMessage(ctx,
  135. // &pb.Sender{
  136. // SenderType: pb.SenderType_ST_USER,
  137. // SenderId: 0,
  138. // },
  139. // &pb.SendMessageReq{
  140. // ReceiverType: pb.ReceiverType_RT_GROUP,
  141. // ReceiverId: g.Id,
  142. // ToUserIds: nil,
  143. // MessageType: pb.MessageType_MT_COMMAND,
  144. // MessageContent: commandBuf,
  145. // SendTime: util.UnixMilliTime(time.Now()),
  146. // IsPersist: isPersist,
  147. // })
  148. //if err != nil {
  149. // return err
  150. //}
  151. }
  152. return nil
  153. }
  154. // SendMessage 消息发送至群组
  155. func (g *Group) SendMessage(ctx context.Context, sender *pb.Sender, req *pb.SendMessageReq) (int64, error) {
  156. if sender.SenderType == pb.SenderType_ST_USER && !g.IsMember(sender.SenderId) {
  157. logger.Sugar.Error(ctx, sender.SenderId, req.ReceiverId, "不在群组内")
  158. return 0, gerrors.ErrNotInGroup
  159. }
  160. // 如果发送者是用户,将消息发送给发送者,获取用户seq
  161. var userSeq int64
  162. var err error
  163. if sender.SenderType == pb.SenderType_ST_USER {
  164. userSeq, err = proxy.MessageProxy.SendToUser(ctx, sender, sender.SenderId, req)
  165. if err != nil {
  166. return 0, err
  167. }
  168. }
  169. go func() {
  170. defer util.RecoverPanic()
  171. // 将消息发送给群组用户,使用写扩散
  172. for _, user := range g.Members {
  173. // 前面已经发送过,这里不需要再发送
  174. if sender.SenderType == pb.SenderType_ST_USER && user.UserId == sender.SenderId {
  175. continue
  176. }
  177. _, err := proxy.MessageProxy.SendToUser(grpclib.NewAndCopyRequestId(ctx), sender, user.UserId, req)
  178. if err != nil {
  179. return
  180. }
  181. }
  182. }()
  183. return userSeq, nil
  184. }
  185. // RecallSendMessage 撤回消息发送至群组
  186. func (g *Group) RecallSendMessage(ctx context.Context, sender *pb.Sender, req *pb.RecallMessageReq) (int64, error) {
  187. if sender.SenderType == pb.SenderType_ST_USER && !g.IsMember(sender.SenderId) {
  188. logger.Sugar.Error(ctx, sender.SenderId, req.ReceiverId, "不在群组内")
  189. return 0, gerrors.ErrNotInGroup
  190. }
  191. var err error
  192. //查询到对应seq的消息
  193. msg := &pb.RECALL{}
  194. err = proto.Unmarshal(req.MessageContent, msg)
  195. if err != nil {
  196. return 0, err
  197. }
  198. message, err := repo.MessageRepo.GetMessage(sender.SenderId, msg.RecallSeq)
  199. if err != nil {
  200. return 0, err
  201. }
  202. // 如果发送者是用户,将消息发送给发送者,获取用户seq
  203. var userSeq int64
  204. if sender.SenderType == pb.SenderType_ST_USER {
  205. userSeq, err = proxy.MessageProxy.RecallMessageSendToUser(ctx, sender, sender.SenderId, req, message.SendTime)
  206. if err != nil {
  207. return 0, err
  208. }
  209. }
  210. go func() {
  211. defer util.RecoverPanic()
  212. // 将消息发送给群组用户,使用写扩散
  213. for _, user := range g.Members {
  214. // 前面已经发送过,这里不需要再发送
  215. if sender.SenderType == pb.SenderType_ST_USER && user.UserId == sender.SenderId {
  216. continue
  217. }
  218. _, err := proxy.MessageProxy.RecallMessageSendToUser(grpclib.NewAndCopyRequestId(ctx), sender, user.UserId, req, message.SendTime)
  219. if err != nil {
  220. return
  221. }
  222. }
  223. }()
  224. return userSeq, nil
  225. }
  226. // SendRedPackage 发送红包消息发送至群组
  227. func (g *Group) SendRedPackage(ctx context.Context, sender *pb.Sender, req *pb.SendRedPacketReq) (int64, error) {
  228. if sender.SenderType == pb.SenderType_ST_USER && !g.IsMember(sender.SenderId) {
  229. logger.Sugar.Error(ctx, sender.SenderId, req.ReceiverId, "不在群组内")
  230. return 0, gerrors.ErrNotInGroup
  231. }
  232. // 如果发送者是用户,将消息发送给发送者,获取用户seq
  233. var userSeq int64
  234. var err error
  235. if sender.SenderType == pb.SenderType_ST_USER {
  236. userSeq, err = proxy.MessageProxy.SendRedPackageToUser(ctx, sender, sender.SenderId, req)
  237. if err != nil {
  238. return 0, err
  239. }
  240. }
  241. go func() {
  242. defer util.RecoverPanic()
  243. // 将消息发送给群组用户,使用写扩散
  244. for _, user := range g.Members {
  245. // 前面已经发送过,这里不需要再发送
  246. if sender.SenderType == pb.SenderType_ST_USER && user.UserId == sender.SenderId {
  247. continue
  248. }
  249. _, err := proxy.MessageProxy.SendRedPackageToUser(grpclib.NewAndCopyRequestId(ctx), sender, user.UserId, req)
  250. if err != nil {
  251. return
  252. }
  253. }
  254. }()
  255. return userSeq, nil
  256. }
  257. func (g *Group) IsMember(userId int64) bool {
  258. for i := range g.Members {
  259. if g.Members[i].UserId == userId {
  260. return true
  261. }
  262. }
  263. return false
  264. }
  265. // PushMessage 向群组推送消息
  266. func (g *Group) PushMessage(ctx context.Context, code pb.PushCode, message proto.Message, isPersist bool) error {
  267. logger.Logger.Debug("push_to_group",
  268. zap.Int64("request_id", grpclib.GetCtxRequestId(ctx)),
  269. zap.Int64("group_id", g.Id),
  270. zap.Int32("code", int32(code)),
  271. zap.Any("message", message))
  272. messageBuf, err := proto.Marshal(message)
  273. if err != nil {
  274. return gerrors.WrapError(err)
  275. }
  276. commandBuf, err := proto.Marshal(&pb.Command{Code: int32(code), Data: messageBuf})
  277. if err != nil {
  278. return gerrors.WrapError(err)
  279. }
  280. _, err = g.SendMessage(ctx,
  281. &pb.Sender{
  282. SenderType: pb.SenderType_ST_SYSTEM,
  283. SenderId: 0,
  284. },
  285. &pb.SendMessageReq{
  286. ReceiverType: pb.ReceiverType_RT_GROUP,
  287. ReceiverId: g.Id,
  288. ToUserIds: nil,
  289. MessageType: pb.MessageType_MT_COMMAND,
  290. MessageContent: commandBuf,
  291. SendTime: util.UnixMilliTime(time.Now()),
  292. IsPersist: isPersist,
  293. },
  294. )
  295. if err != nil {
  296. return err
  297. }
  298. return nil
  299. }
  300. // GetMembers 获取群组用户
  301. func (g *Group) GetMembers(ctx context.Context) ([]*pb.GroupMember, error) {
  302. members := g.Members
  303. userIds := make(map[int64]int32, len(members))
  304. for i := range members {
  305. userIds[members[i].UserId] = 0
  306. }
  307. resp, err := rpc.GetBusinessIntClient().GetUsers(ctx, &pb.GetUsersReq{UserIds: userIds})
  308. if err != nil {
  309. return nil, err
  310. }
  311. var infos = make([]*pb.GroupMember, len(members))
  312. for i := range members {
  313. member := pb.GroupMember{
  314. UserId: members[i].UserId,
  315. MemberType: pb.MemberType(members[i].MemberType),
  316. Remarks: members[i].Remarks,
  317. Extra: members[i].Extra,
  318. Status: int32(members[i].Status),
  319. }
  320. user, ok := resp.Users[members[i].UserId]
  321. if ok {
  322. member.Nickname = user.Nickname
  323. member.Sex = user.Sex
  324. member.AvatarUrl = user.AvatarUrl
  325. member.UserExtra = user.Extra
  326. }
  327. infos[i] = &member
  328. }
  329. return infos, nil
  330. }
  331. // AddMembers 给群组添加用户
  332. func (g *Group) AddMembers(ctx context.Context, userIds []int64) ([]int64, []int64, error) {
  333. var existIds []int64
  334. var addedIds []int64
  335. now := time.Now()
  336. for i, userId := range userIds {
  337. if g.IsMember(userId) {
  338. existIds = append(existIds, userIds[i])
  339. continue
  340. }
  341. g.Members = append(g.Members, GroupUser{
  342. GroupId: g.Id,
  343. UserId: userIds[i],
  344. MemberType: int(pb.MemberType_GMT_MEMBER),
  345. CreateTime: now,
  346. UpdateTime: now,
  347. UpdateType: UpdateTypeUpdate,
  348. })
  349. addedIds = append(addedIds, userIds[i])
  350. }
  351. g.UserNum += int32(len(addedIds))
  352. return existIds, addedIds, nil
  353. }
  354. func (g *Group) PushAddMember(ctx context.Context, optUserId int64, addedIds []int64) error {
  355. var addIdMap = make(map[int64]int32, len(addedIds))
  356. for i := range addedIds {
  357. addIdMap[addedIds[i]] = 0
  358. }
  359. addIdMap[optUserId] = 0
  360. usersResp, err := rpc.GetBusinessIntClient().GetUsers(ctx, &pb.GetUsersReq{UserIds: addIdMap})
  361. if err != nil {
  362. return err
  363. }
  364. var members []*pb.GroupMember
  365. for k, _ := range addIdMap {
  366. member, ok := usersResp.Users[k]
  367. if !ok {
  368. continue
  369. }
  370. members = append(members, &pb.GroupMember{
  371. UserId: member.UserId,
  372. Nickname: member.Nickname,
  373. Sex: member.Sex,
  374. AvatarUrl: member.AvatarUrl,
  375. UserExtra: member.Extra,
  376. Remarks: "",
  377. Extra: "",
  378. })
  379. }
  380. optUser := usersResp.Users[optUserId]
  381. err = g.PushMessage(ctx, pb.PushCode_PC_ADD_GROUP_MEMBERS, &pb.AddGroupMembersPush{
  382. OptId: optUser.UserId,
  383. OptName: optUser.Nickname,
  384. Members: members,
  385. }, true)
  386. if err != nil {
  387. logger.Sugar.Error(err)
  388. }
  389. return nil
  390. }
  391. func (g *Group) GetMember(ctx context.Context, userId int64) *GroupUser {
  392. for i := range g.Members {
  393. if g.Members[i].UserId == userId {
  394. return &g.Members[i]
  395. }
  396. }
  397. return nil
  398. }
  399. // UpdateMember 更新群组成员信息
  400. func (g *Group) UpdateMember(ctx context.Context, in *pb.UpdateGroupMemberReq) error {
  401. member := g.GetMember(ctx, in.UserId)
  402. if member == nil {
  403. return nil
  404. }
  405. member.MemberType = int(in.MemberType)
  406. member.Remarks = in.Remarks
  407. member.Extra = in.Extra
  408. member.UpdateTime = time.Now()
  409. member.UpdateType = UpdateTypeUpdate
  410. return nil
  411. }
  412. // DeleteMember 删除用户群组
  413. func (g *Group) DeleteMember(ctx context.Context, userId int64) error {
  414. member := g.GetMember(ctx, userId)
  415. if member == nil {
  416. return nil
  417. }
  418. member.UpdateType = UpdateTypeDelete
  419. return nil
  420. }
  421. func (g *Group) PushDeleteMember(ctx context.Context, optId, userId int64) error {
  422. userResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: optId})
  423. if err != nil {
  424. return err
  425. }
  426. deleteUserResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: userId})
  427. if err != nil {
  428. return err
  429. }
  430. err = g.PushMessage(ctx, pb.PushCode_PC_REMOVE_GROUP_MEMBER, &pb.RemoveGroupMemberPush{
  431. OptId: optId,
  432. OptName: userResp.User.Nickname,
  433. DeletedUserId: userId,
  434. DeletedUserName: deleteUserResp.User.Nickname,
  435. }, true)
  436. if err != nil {
  437. return err
  438. }
  439. return nil
  440. }
  441. func (g *Group) PushGroupMemberBanned(ctx context.Context, optId, userId int64, isAllMemberBanned bool) error {
  442. userResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: optId})
  443. if err != nil {
  444. return err
  445. }
  446. if !isAllMemberBanned {
  447. bannedUserResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: userId})
  448. if err != nil {
  449. return err
  450. }
  451. err = g.PushMessage(ctx, pb.PushCode_PC_BANNED_GROUP_MEMBER, &pb.BannedGroupMemberPush{
  452. OptId: optId,
  453. OptName: userResp.User.Nickname,
  454. BannedUserId: userId,
  455. BannedUserName: bannedUserResp.User.Nickname,
  456. }, true)
  457. if err != nil {
  458. return err
  459. }
  460. return nil
  461. } else {
  462. err = g.PushMessage(ctx, pb.PushCode_PC_BANNED_GROUP_MEMBER, &pb.BannedGroupMemberPush{
  463. OptId: optId,
  464. OptName: userResp.User.Nickname,
  465. BannedUserId: userId,
  466. BannedUserName: fmt.Sprintf("管理员\"%s\"设置了禁言", userResp.User.Nickname),
  467. }, true)
  468. if err != nil {
  469. return err
  470. }
  471. return nil
  472. }
  473. }
  474. func (g *Group) PushGroupMemberRemoveBanned(ctx context.Context, optId, userId int64) error {
  475. userResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: optId})
  476. if err != nil {
  477. return err
  478. }
  479. removeBannedUserResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: userId})
  480. if err != nil {
  481. return err
  482. }
  483. err = g.PushMessage(ctx, pb.PushCode_PC_REMOVE_BANNED_GROUP_MEMBER, &pb.RemoveBannedGroupMemberPush{
  484. OptId: optId,
  485. OptName: userResp.User.Nickname,
  486. RemoveBannedUserId: userId,
  487. RemoveBannedUserName: removeBannedUserResp.User.Nickname,
  488. }, true)
  489. if err != nil {
  490. return err
  491. }
  492. return nil
  493. }