golang-im聊天
Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.
 
 
 
 

615 рядки
16 KiB

  1. package model
  2. import (
  3. "context"
  4. "database/sql"
  5. "fmt"
  6. "gim/internal/logic/domain/message/repo"
  7. "gim/internal/logic/proxy"
  8. "gim/pkg/gerrors"
  9. "gim/pkg/grpclib"
  10. "gim/pkg/logger"
  11. "gim/pkg/pb"
  12. "gim/pkg/rpc"
  13. "gim/pkg/util"
  14. "time"
  15. "go.uber.org/zap"
  16. "google.golang.org/protobuf/proto"
  17. )
  18. const (
  19. UpdateTypeUpdate = 1
  20. UpdateTypeDelete = 2
  21. )
  22. // Group 群组
  23. type Group struct {
  24. Id int64 // 群组id
  25. Name string // 组名
  26. AvatarUrl string // 头像
  27. Introduction string // 群简介
  28. UserNum int32 // 群组人数
  29. IsAllMemberBanned int32 // 是否全员禁言(1:是 2:否)
  30. MasterId int64 // 站长id
  31. Extra string // 附加字段
  32. CreateTime time.Time // 创建时间
  33. UpdateTime time.Time // 更新时间
  34. Members []GroupUser `gorm:"-"` // 群组成员
  35. }
  36. type GroupUser struct {
  37. Id int64 // 自增主键
  38. GroupId int64 // 群组id
  39. UserId int64 // 用户id
  40. MemberType int // 群组类型
  41. Remarks string // 备注
  42. Extra string // 附加属性
  43. Status int // 状态
  44. CreateTime time.Time // 创建时间
  45. UpdateTime time.Time // 更新时间
  46. UpdateType int `gorm:"-"` // 更新类型
  47. }
  48. type GroupUserV2 struct {
  49. Id int64 // 自增主键
  50. GroupId int64 // 群组id
  51. UserId int64 // 用户id
  52. MemberType int // 群组类型
  53. Remarks string // 备注
  54. Extra string // 附加属性
  55. Status sql.NullInt32 // 状态
  56. CreateTime time.Time // 创建时间
  57. UpdateTime time.Time // 更新时间
  58. UpdateType int `gorm:"-"` // 更新类型
  59. }
  60. func (g *Group) ToProto() *pb.Group {
  61. if g == nil {
  62. return nil
  63. }
  64. return &pb.Group{
  65. GroupId: g.Id,
  66. Name: g.Name,
  67. AvatarUrl: g.AvatarUrl,
  68. Introduction: g.Introduction,
  69. UserMum: g.UserNum,
  70. IsAllMemberBanned: g.IsAllMemberBanned,
  71. Extra: g.Extra,
  72. CreateTime: g.CreateTime.Unix(),
  73. UpdateTime: g.UpdateTime.Unix(),
  74. }
  75. }
  76. func CreateGroup(userId, masterId int64, in *pb.CreateGroupReq) *Group {
  77. now := time.Now()
  78. group := &Group{
  79. Name: in.Name,
  80. AvatarUrl: in.AvatarUrl,
  81. Introduction: in.Introduction,
  82. MasterId: masterId,
  83. Extra: in.Extra,
  84. Members: make([]GroupUser, 0, len(in.MemberIds)+1),
  85. IsAllMemberBanned: 2,
  86. UserNum: int32(len(in.MemberIds)) + 1,
  87. CreateTime: now,
  88. UpdateTime: now,
  89. }
  90. // 创建者添加为群主
  91. group.Members = append(group.Members, GroupUser{
  92. GroupId: group.Id,
  93. UserId: userId,
  94. MemberType: int(pb.MemberType_GMT_ADMIN),
  95. CreateTime: now,
  96. UpdateTime: now,
  97. UpdateType: UpdateTypeUpdate,
  98. })
  99. // 其让人添加为成员
  100. for i := range in.MemberIds {
  101. group.Members = append(group.Members, GroupUser{
  102. GroupId: group.Id,
  103. UserId: in.MemberIds[i],
  104. MemberType: int(pb.MemberType_GMT_MEMBER),
  105. CreateTime: now,
  106. UpdateTime: now,
  107. UpdateType: UpdateTypeUpdate,
  108. })
  109. }
  110. return group
  111. }
  112. func (g *Group) Update(ctx context.Context, in *pb.UpdateGroupReq) error {
  113. g.Name = in.Name
  114. g.AvatarUrl = in.AvatarUrl
  115. g.Introduction = in.Introduction
  116. g.Extra = in.Extra
  117. g.UpdateTime = time.Now()
  118. return nil
  119. }
  120. func (g *Group) PushUpdate(ctx context.Context, userId int64, isUpdateIntroduction bool) error {
  121. userResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: userId})
  122. if err != nil {
  123. return err
  124. }
  125. err = g.PushMessage(ctx, pb.PushCode_PC_UPDATE_GROUP, &pb.UpdateGroupPush{
  126. OptId: userId,
  127. OptName: userResp.User.Nickname,
  128. OptAvatarUrl: userResp.User.AvatarUrl,
  129. Name: g.Name,
  130. AvatarUrl: g.AvatarUrl,
  131. Introduction: g.Introduction,
  132. IsUpdateIntroduction: isUpdateIntroduction,
  133. Extra: g.Extra,
  134. }, true)
  135. if err != nil {
  136. return err
  137. }
  138. if isUpdateIntroduction {
  139. //_, err = g.SendMessage(ctx,
  140. // &pb.Sender{
  141. // SenderType: pb.SenderType_ST_USER,
  142. // SenderId: 0,
  143. // },
  144. // &pb.SendMessageReq{
  145. // ReceiverType: pb.ReceiverType_RT_GROUP,
  146. // ReceiverId: g.Id,
  147. // ToUserIds: nil,
  148. // MessageType: pb.MessageType_MT_COMMAND,
  149. // MessageContent: commandBuf,
  150. // SendTime: util.UnixMilliTime(time.Now()),
  151. // IsPersist: isPersist,
  152. // })
  153. //if err != nil {
  154. // return err
  155. //}
  156. }
  157. return nil
  158. }
  159. // SendMessage 消息发送至群组
  160. func (g *Group) SendMessage(ctx context.Context, sender *pb.Sender, req *pb.SendMessageReq) (int64, error) {
  161. if sender.SenderType == pb.SenderType_ST_USER && !g.IsMember(sender.SenderId) {
  162. logger.Sugar.Error(ctx, sender.SenderId, req.ReceiverId, "不在群组内")
  163. return 0, gerrors.ErrNotInGroup
  164. }
  165. // 如果发送者是用户,将消息发送给发送者,获取用户seq
  166. var userSeq int64
  167. var err error
  168. if sender.SenderType == pb.SenderType_ST_USER {
  169. userSeq, err = proxy.MessageProxy.SendToUser(ctx, sender, sender.SenderId, req)
  170. if err != nil {
  171. return 0, err
  172. }
  173. }
  174. go func() {
  175. defer util.RecoverPanic()
  176. // 将消息发送给群组用户,使用写扩散
  177. for _, user := range g.Members {
  178. // 前面已经发送过,这里不需要再发送
  179. if sender.SenderType == pb.SenderType_ST_USER && user.UserId == sender.SenderId {
  180. continue
  181. }
  182. _, err := proxy.MessageProxy.SendToUser(grpclib.NewAndCopyRequestId(ctx), sender, user.UserId, req)
  183. if err != nil {
  184. return
  185. }
  186. }
  187. }()
  188. return userSeq, nil
  189. }
  190. // RecallSendMessage 撤回消息发送至群组
  191. func (g *Group) RecallSendMessage(ctx context.Context, sender *pb.Sender, req *pb.RecallMessageReq) (int64, error) {
  192. if sender.SenderType == pb.SenderType_ST_USER && !g.IsMember(sender.SenderId) {
  193. logger.Sugar.Error(ctx, sender.SenderId, req.ReceiverId, "不在群组内")
  194. return 0, gerrors.ErrNotInGroup
  195. }
  196. var err error
  197. //查询到对应seq的消息
  198. msg := &pb.RECALL{}
  199. err = proto.Unmarshal(req.MessageContent, msg)
  200. if err != nil {
  201. return 0, err
  202. }
  203. message, err := repo.MessageRepo.GetMessage(sender.SenderId, msg.RecallSeq)
  204. if err != nil {
  205. return 0, err
  206. }
  207. // 如果发送者是用户,将消息发送给发送者,获取用户seq
  208. var userSeq int64
  209. if sender.SenderType == pb.SenderType_ST_USER {
  210. userSeq, err = proxy.MessageProxy.RecallMessageSendToUser(ctx, sender, sender.SenderId, req, message.SendTime)
  211. if err != nil {
  212. return 0, err
  213. }
  214. }
  215. go func() {
  216. defer util.RecoverPanic()
  217. // 将消息发送给群组用户,使用写扩散
  218. for _, user := range g.Members {
  219. // 前面已经发送过,这里不需要再发送
  220. if sender.SenderType == pb.SenderType_ST_USER && user.UserId == sender.SenderId {
  221. continue
  222. }
  223. msg.RecallSeq, err = repo.MessageRepo.GetMessageSeqForSendTime(user.UserId, message.SendTime)
  224. req.MessageContent, err = proto.Marshal(msg)
  225. if err != nil {
  226. return
  227. }
  228. _, err := proxy.MessageProxy.RecallMessageSendToUser(grpclib.NewAndCopyRequestId(ctx), sender, user.UserId, req, message.SendTime)
  229. if err != nil {
  230. return
  231. }
  232. }
  233. }()
  234. return userSeq, nil
  235. }
  236. // SendRedPackage 发送红包消息发送至群组
  237. func (g *Group) SendRedPackage(ctx context.Context, sender *pb.Sender, req *pb.SendRedPacketReq) (int64, error) {
  238. //TODO::发送红包,暂不过滤群组用户
  239. //if sender.SenderType == pb.SenderType_ST_USER && !g.IsMember(sender.SenderId) {
  240. // logger.Sugar.Error(ctx, sender.SenderId, req.ReceiverId, "不在群组内")
  241. // return 0, gerrors.ErrNotInGroup
  242. //}
  243. // 如果发送者是用户,将消息发送给发送者,获取用户seq
  244. var userSeq int64
  245. var err error
  246. if sender.SenderType == pb.SenderType_ST_USER {
  247. userSeq, err = proxy.MessageProxy.SendRedPackageToUser(ctx, sender, sender.SenderId, req)
  248. if err != nil {
  249. return 0, err
  250. }
  251. }
  252. go func() {
  253. defer util.RecoverPanic()
  254. // 将消息发送给群组用户,使用写扩散
  255. for _, user := range g.Members {
  256. // 前面已经发送过,这里不需要再发送
  257. if sender.SenderType == pb.SenderType_ST_USER && user.UserId == sender.SenderId {
  258. continue
  259. }
  260. _, err := proxy.MessageProxy.SendRedPackageToUser(grpclib.NewAndCopyRequestId(ctx), sender, user.UserId, req)
  261. if err != nil {
  262. return
  263. }
  264. }
  265. }()
  266. return userSeq, nil
  267. }
  268. func (g *Group) IsMember(userId int64) bool {
  269. for i := range g.Members {
  270. if g.Members[i].UserId == userId {
  271. return true
  272. }
  273. }
  274. return false
  275. }
  276. // PushMessage 向群组推送消息
  277. func (g *Group) PushMessage(ctx context.Context, code pb.PushCode, message proto.Message, isPersist bool) error {
  278. logger.Logger.Debug("push_to_group",
  279. zap.Int64("request_id", grpclib.GetCtxRequestId(ctx)),
  280. zap.Int64("group_id", g.Id),
  281. zap.Int32("code", int32(code)),
  282. zap.Any("message", message))
  283. messageBuf, err := proto.Marshal(message)
  284. if err != nil {
  285. return gerrors.WrapError(err)
  286. }
  287. commandBuf, err := proto.Marshal(&pb.Command{Code: int32(code), Data: messageBuf})
  288. if err != nil {
  289. return gerrors.WrapError(err)
  290. }
  291. _, err = g.SendMessage(ctx,
  292. &pb.Sender{
  293. SenderType: pb.SenderType_ST_SYSTEM,
  294. SenderId: 0,
  295. },
  296. &pb.SendMessageReq{
  297. ReceiverType: pb.ReceiverType_RT_GROUP,
  298. ReceiverId: g.Id,
  299. ToUserIds: nil,
  300. MessageType: pb.MessageType_MT_COMMAND,
  301. MessageContent: commandBuf,
  302. SendTime: util.UnixMilliTime(time.Now()),
  303. IsPersist: isPersist,
  304. },
  305. )
  306. if err != nil {
  307. return err
  308. }
  309. return nil
  310. }
  311. // GetMembers 获取群组用户
  312. func (g *Group) GetMembers(ctx context.Context) ([]*pb.GroupMember, error) {
  313. members := g.Members
  314. userIds := make(map[int64]int32, len(members))
  315. for i := range members {
  316. userIds[members[i].UserId] = 0
  317. }
  318. resp, err := rpc.GetBusinessIntClient().GetUsers(ctx, &pb.GetUsersReq{UserIds: userIds})
  319. if err != nil {
  320. return nil, err
  321. }
  322. var infos = make([]*pb.GroupMember, len(members))
  323. for i := range members {
  324. member := pb.GroupMember{
  325. UserId: members[i].UserId,
  326. MemberType: pb.MemberType(members[i].MemberType),
  327. Remarks: members[i].Remarks,
  328. Extra: members[i].Extra,
  329. Status: int32(members[i].Status),
  330. }
  331. user, ok := resp.Users[members[i].UserId]
  332. if ok {
  333. member.Nickname = user.Nickname
  334. member.Sex = user.Sex
  335. member.AvatarUrl = user.AvatarUrl
  336. member.UserExtra = user.Extra
  337. }
  338. infos[i] = &member
  339. }
  340. return infos, nil
  341. }
  342. // AddMembers 给群组添加用户
  343. func (g *Group) AddMembers(ctx context.Context, userIds []int64) ([]int64, []int64, error) {
  344. var existIds []int64
  345. var addedIds []int64
  346. now := time.Now()
  347. for i, userId := range userIds {
  348. if g.IsMember(userId) {
  349. existIds = append(existIds, userIds[i])
  350. continue
  351. }
  352. g.Members = append(g.Members, GroupUser{
  353. GroupId: g.Id,
  354. UserId: userIds[i],
  355. MemberType: int(pb.MemberType_GMT_MEMBER),
  356. CreateTime: now,
  357. UpdateTime: now,
  358. UpdateType: UpdateTypeUpdate,
  359. })
  360. addedIds = append(addedIds, userIds[i])
  361. }
  362. g.UserNum += int32(len(addedIds))
  363. return existIds, addedIds, nil
  364. }
  365. func (g *Group) PushAddMember(ctx context.Context, optUserId int64, addedIds []int64, isFilterGroupLeader bool) error {
  366. var addIdMap = make(map[int64]int32, len(addedIds))
  367. for i := range addedIds {
  368. addIdMap[addedIds[i]] = 0
  369. }
  370. addIdMap[optUserId] = 0
  371. usersResp, err := rpc.GetBusinessIntClient().GetUsers(ctx, &pb.GetUsersReq{UserIds: addIdMap})
  372. if err != nil {
  373. return err
  374. }
  375. var members []*pb.GroupMember
  376. for k, _ := range addIdMap {
  377. if isFilterGroupLeader && k == optUserId {
  378. continue
  379. }
  380. member, ok := usersResp.Users[k]
  381. if !ok {
  382. continue
  383. }
  384. members = append(members, &pb.GroupMember{
  385. UserId: member.UserId,
  386. Nickname: member.Nickname,
  387. Sex: member.Sex,
  388. AvatarUrl: member.AvatarUrl,
  389. UserExtra: member.Extra,
  390. Remarks: "",
  391. Extra: "",
  392. })
  393. }
  394. //
  395. optUser := usersResp.Users[optUserId]
  396. if optUserId == -1 {
  397. optUser.Nickname = "后台操作"
  398. }
  399. err = g.PushMessage(ctx, pb.PushCode_PC_ADD_GROUP_MEMBERS, &pb.AddGroupMembersPush{
  400. OptId: optUser.UserId,
  401. OptName: optUser.Nickname,
  402. Members: members,
  403. }, true)
  404. if err != nil {
  405. logger.Sugar.Error(err)
  406. }
  407. return nil
  408. }
  409. func (g *Group) GetMember(ctx context.Context, userId int64) *GroupUser {
  410. for i := range g.Members {
  411. if g.Members[i].UserId == userId {
  412. return &g.Members[i]
  413. }
  414. }
  415. return nil
  416. }
  417. // UpdateMember 更新群组成员信息
  418. func (g *Group) UpdateMember(ctx context.Context, in *pb.UpdateGroupMemberReq) error {
  419. member := g.GetMember(ctx, in.UserId)
  420. if member == nil {
  421. return nil
  422. }
  423. member.MemberType = int(in.MemberType)
  424. member.Remarks = in.Remarks
  425. member.Extra = in.Extra
  426. member.UpdateTime = time.Now()
  427. member.UpdateType = UpdateTypeUpdate
  428. return nil
  429. }
  430. // DeleteMember 删除用户群组
  431. func (g *Group) DeleteMember(ctx context.Context, userId int64) error {
  432. member := g.GetMember(ctx, userId)
  433. if member == nil {
  434. return nil
  435. }
  436. member.UpdateType = UpdateTypeDelete
  437. return nil
  438. }
  439. func (g *Group) PushUpdateMember(ctx context.Context, optId, userId int64, memberType int32) error {
  440. userResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: optId})
  441. if err != nil {
  442. return err
  443. }
  444. if optId == -1 {
  445. userResp.User.Nickname = "后台操作"
  446. }
  447. updateUserResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: userId})
  448. if err != nil {
  449. return err
  450. }
  451. err = g.PushMessage(ctx, pb.PushCode_PC_UPDATE_GROUP_MEMBER, &pb.UpdateMemberPush{
  452. OptId: optId,
  453. OptName: userResp.User.Nickname,
  454. UpdateUserId: userId,
  455. UpdateUserName: updateUserResp.User.Nickname,
  456. UpdateUserMemberType: memberType,
  457. }, true)
  458. if err != nil {
  459. return err
  460. }
  461. return nil
  462. }
  463. func (g *Group) PushDeleteMember(ctx context.Context, optId, userId int64) error {
  464. userResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: optId})
  465. if err != nil {
  466. return err
  467. }
  468. if optId == -1 {
  469. userResp.User.Nickname = "后台操作"
  470. }
  471. deleteUserResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: userId})
  472. if err != nil {
  473. return err
  474. }
  475. err = g.PushMessage(ctx, pb.PushCode_PC_REMOVE_GROUP_MEMBER, &pb.RemoveGroupMemberPush{
  476. OptId: optId,
  477. OptName: userResp.User.Nickname,
  478. DeletedUserId: userId,
  479. DeletedUserName: deleteUserResp.User.Nickname,
  480. }, true)
  481. if err != nil {
  482. return err
  483. }
  484. return nil
  485. }
  486. func (g *Group) PushGroupMemberBanned(ctx context.Context, optId, userId int64, isAllMemberBanned bool) error {
  487. userResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: optId})
  488. if err != nil {
  489. return err
  490. }
  491. if optId == -1 {
  492. userResp.User.Nickname = "后台操作"
  493. }
  494. if !isAllMemberBanned && userId > 0 {
  495. bannedUserResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: userId})
  496. if err != nil {
  497. return err
  498. }
  499. err = g.PushMessage(ctx, pb.PushCode_PC_BANNED_GROUP_MEMBER, &pb.BannedGroupMemberPush{
  500. OptId: optId,
  501. OptName: userResp.User.Nickname,
  502. BannedUserId: userId,
  503. BannedUserName: bannedUserResp.User.Nickname,
  504. }, true)
  505. if err != nil {
  506. return err
  507. }
  508. return nil
  509. } else {
  510. if userId == 0 {
  511. err = g.PushMessage(ctx, pb.PushCode_PC_BANNED_GROUP_MEMBER, &pb.BannedGroupMemberPush{
  512. OptId: optId,
  513. OptName: userResp.User.Nickname,
  514. BannedUserId: userId,
  515. BannedUserName: fmt.Sprintf("管理员\"%s\"设置群禁言", userResp.User.Nickname),
  516. }, true)
  517. if err != nil {
  518. return err
  519. }
  520. } else {
  521. err = g.PushMessage(ctx, pb.PushCode_PC_BANNED_GROUP_MEMBER, &pb.BannedGroupMemberPush{
  522. OptId: optId,
  523. OptName: userResp.User.Nickname,
  524. BannedUserId: userId,
  525. BannedUserName: fmt.Sprintf("管理员\"%s\"取消群禁言", userResp.User.Nickname),
  526. }, true)
  527. if err != nil {
  528. return err
  529. }
  530. }
  531. return nil
  532. }
  533. }
  534. func (g *Group) PushGroupMemberRemoveBanned(ctx context.Context, optId, userId int64) error {
  535. userResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: optId})
  536. if err != nil {
  537. return err
  538. }
  539. if optId == -1 {
  540. userResp.User.Nickname = "后台操作"
  541. }
  542. removeBannedUserResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: userId})
  543. if err != nil {
  544. return err
  545. }
  546. err = g.PushMessage(ctx, pb.PushCode_PC_REMOVE_BANNED_GROUP_MEMBER, &pb.RemoveBannedGroupMemberPush{
  547. OptId: optId,
  548. OptName: userResp.User.Nickname,
  549. RemoveBannedUserId: userId,
  550. RemoveBannedUserName: removeBannedUserResp.User.Nickname,
  551. }, true)
  552. if err != nil {
  553. return err
  554. }
  555. return nil
  556. }