You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

645 lines
17 KiB

  1. package model
  2. import (
  3. "context"
  4. "database/sql"
  5. "egg-im/internal/logic/domain/message/repo"
  6. "egg-im/internal/logic/proxy"
  7. "egg-im/pkg/gerrors"
  8. "egg-im/pkg/grpclib"
  9. "egg-im/pkg/logger"
  10. "egg-im/pkg/pb"
  11. "egg-im/pkg/rpc"
  12. "egg-im/pkg/util"
  13. "fmt"
  14. "strings"
  15. "time"
  16. "go.uber.org/zap"
  17. "google.golang.org/protobuf/proto"
  18. )
  19. const (
  20. UpdateTypeUpdate = 1
  21. UpdateTypeDelete = 2
  22. )
  23. // Group 群组
  24. type Group struct {
  25. Id int64 // 群组id
  26. Name string // 组名
  27. AvatarUrl string // 头像
  28. Introduction string // 群简介
  29. UserNum int32 // 群组人数
  30. IsAllMemberBanned int32 // 是否全员禁言(1:是 2:否)
  31. IsAllAddFriend int32 // 是否允许加好友(1:是 2:否)
  32. Extra string // 附加字段
  33. Kind int32 //群类型(1:用户创建 2:系统创建)
  34. CreateTime time.Time // 创建时间
  35. UpdateTime time.Time // 更新时间
  36. Members []GroupUser `gorm:"-"` // 群组成员
  37. }
  38. type GroupUser struct {
  39. Id int64 // 自增主键
  40. GroupId int64 // 群组id
  41. UserId int64 // 用户id
  42. MemberType int // 群组类型
  43. Remarks string // 备注
  44. Extra string // 附加属性
  45. Status int // 状态
  46. CreateTime time.Time // 创建时间
  47. UpdateTime time.Time // 更新时间
  48. UpdateType int `gorm:"-"` // 更新类型
  49. }
  50. type GroupUserV2 struct {
  51. Id int64 // 自增主键
  52. GroupId int64 // 群组id
  53. UserId int64 // 用户id
  54. MemberType int // 群组类型
  55. Remarks string // 备注
  56. Extra string // 附加属性
  57. Status sql.NullInt32 // 状态
  58. CreateTime time.Time // 创建时间
  59. UpdateTime time.Time // 更新时间
  60. UpdateType int `gorm:"-"` // 更新类型
  61. }
  62. func (g *Group) ToProto() *pb.Group {
  63. if g == nil {
  64. return nil
  65. }
  66. return &pb.Group{
  67. GroupId: g.Id,
  68. Name: g.Name,
  69. AvatarUrl: g.AvatarUrl,
  70. Introduction: g.Introduction,
  71. UserMum: g.UserNum,
  72. IsAllMemberBanned: g.IsAllMemberBanned,
  73. IsAllAddFriend: g.IsAllAddFriend,
  74. Extra: g.Extra,
  75. Kind: pb.GroupKindType(g.Kind),
  76. CreateTime: g.CreateTime.Unix(),
  77. UpdateTime: g.UpdateTime.Unix(),
  78. }
  79. }
  80. func CreateGroup(userId int64, in *pb.CreateGroupReq) *Group {
  81. now := time.Now()
  82. group := &Group{
  83. Name: in.Name,
  84. AvatarUrl: in.AvatarUrl,
  85. Introduction: in.Introduction,
  86. Extra: in.Extra,
  87. Kind: int32(in.Kind),
  88. Members: make([]GroupUser, 0, len(in.MemberIds)+1),
  89. IsAllMemberBanned: 2,
  90. IsAllAddFriend: 2,
  91. UserNum: int32(len(in.MemberIds)) + 1,
  92. CreateTime: now,
  93. UpdateTime: now,
  94. }
  95. // 创建者添加为群主
  96. group.Members = append(group.Members, GroupUser{
  97. GroupId: group.Id,
  98. UserId: userId,
  99. MemberType: int(pb.MemberType_GMT_ADMIN),
  100. CreateTime: now,
  101. UpdateTime: now,
  102. UpdateType: UpdateTypeUpdate,
  103. })
  104. // 其让人添加为成员
  105. for i := range in.MemberIds {
  106. group.Members = append(group.Members, GroupUser{
  107. GroupId: group.Id,
  108. UserId: in.MemberIds[i],
  109. MemberType: int(pb.MemberType_GMT_MEMBER),
  110. CreateTime: now,
  111. UpdateTime: now,
  112. UpdateType: UpdateTypeUpdate,
  113. })
  114. }
  115. return group
  116. }
  117. func (g *Group) Update(ctx context.Context, in *pb.UpdateGroupReq) error {
  118. g.Name = in.Name
  119. g.AvatarUrl = in.AvatarUrl
  120. g.Introduction = in.Introduction
  121. g.Extra = in.Extra
  122. g.UpdateTime = time.Now()
  123. return nil
  124. }
  125. func (g *Group) PushUpdate(ctx context.Context, userId int64, isUpdateIntroduction bool) error {
  126. userResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: userId})
  127. if err != nil {
  128. return err
  129. }
  130. err = g.PushMessage(ctx, pb.PushCode_PC_UPDATE_GROUP, &pb.UpdateGroupPush{
  131. OptId: userId,
  132. OptName: userResp.User.Nickname,
  133. OptAvatarUrl: userResp.User.AvatarUrl,
  134. Name: g.Name,
  135. AvatarUrl: g.AvatarUrl,
  136. Introduction: g.Introduction,
  137. IsUpdateIntroduction: isUpdateIntroduction,
  138. Extra: g.Extra,
  139. }, true)
  140. if err != nil {
  141. return err
  142. }
  143. if isUpdateIntroduction {
  144. //_, err = g.SendMessage(ctx,
  145. // &pb.Sender{
  146. // SenderType: pb.SenderType_ST_USER,
  147. // SenderId: 0,
  148. // },
  149. // &pb.SendMessageReq{
  150. // ReceiverType: pb.ReceiverType_RT_GROUP,
  151. // ReceiverId: g.Id,
  152. // ToUserIds: nil,
  153. // MessageType: pb.MessageType_MT_COMMAND,
  154. // MessageContent: commandBuf,
  155. // SendTime: util.UnixMilliTime(time.Now()),
  156. // IsPersist: isPersist,
  157. // })
  158. //if err != nil {
  159. // return err
  160. //}
  161. }
  162. return nil
  163. }
  164. // SendMessage 消息发送至群组
  165. func (g *Group) SendMessage(ctx context.Context, sender *pb.Sender, req *pb.SendMessageReq) (int64, error) {
  166. if sender.SenderType == pb.SenderType_ST_USER && !g.IsMember(sender.SenderId) {
  167. logger.Sugar.Error(ctx, sender.SenderId, req.ReceiverId, "不在群组内")
  168. return 0, gerrors.ErrNotInGroup
  169. }
  170. // 如果发送者是用户,将消息发送给发送者,获取用户seq
  171. var userSeq int64
  172. var err error
  173. if sender.SenderType == pb.SenderType_ST_USER {
  174. userSeq, err = proxy.MessageProxy.SendToUser(ctx, sender, sender.SenderId, req)
  175. if err != nil {
  176. return 0, err
  177. }
  178. }
  179. go func() {
  180. defer util.RecoverPanic()
  181. // 将消息发送给群组用户,使用写扩散
  182. for _, user := range g.Members {
  183. // 前面已经发送过,这里不需要再发送
  184. if sender.SenderType == pb.SenderType_ST_USER && user.UserId == sender.SenderId {
  185. continue
  186. }
  187. _, err := proxy.MessageProxy.SendToUser(grpclib.NewAndCopyRequestId(ctx), sender, user.UserId, req)
  188. if err != nil {
  189. return
  190. }
  191. }
  192. }()
  193. return userSeq, nil
  194. }
  195. // RecallSendMessage 撤回消息发送至群组
  196. func (g *Group) RecallSendMessage(ctx context.Context, sender *pb.Sender, req *pb.RecallMessageReq) (int64, error) {
  197. if sender.SenderType == pb.SenderType_ST_USER && !g.IsMember(sender.SenderId) {
  198. logger.Sugar.Error(ctx, sender.SenderId, req.ReceiverId, "不在群组内")
  199. return 0, gerrors.ErrNotInGroup
  200. }
  201. var err error
  202. //查询到对应seq的消息
  203. msg := &pb.RECALL{}
  204. err = proto.Unmarshal(req.MessageContent, msg)
  205. if err != nil {
  206. return 0, err
  207. }
  208. message, err := repo.MessageRepo.GetMessage(sender.SenderId, msg.RecallSeq)
  209. if err != nil {
  210. return 0, err
  211. }
  212. // 如果发送者是用户,将消息发送给发送者,获取用户seq
  213. var userSeq int64
  214. if sender.SenderType == pb.SenderType_ST_USER {
  215. userSeq, err = proxy.MessageProxy.RecallMessageSendToUser(ctx, sender, sender.SenderId, req, message.SendTime)
  216. if err != nil {
  217. return 0, err
  218. }
  219. }
  220. go func() {
  221. defer util.RecoverPanic()
  222. // 将消息发送给群组用户,使用写扩散
  223. for _, user := range g.Members {
  224. // 前面已经发送过,这里不需要再发送
  225. if sender.SenderType == pb.SenderType_ST_USER && user.UserId == sender.SenderId {
  226. continue
  227. }
  228. msg.RecallSeq, err = repo.MessageRepo.GetMessageSeqForSendTime(user.UserId, message.SendTime)
  229. req.MessageContent, err = proto.Marshal(msg)
  230. if err != nil {
  231. return
  232. }
  233. _, err := proxy.MessageProxy.RecallMessageSendToUser(grpclib.NewAndCopyRequestId(ctx), sender, user.UserId, req, message.SendTime)
  234. if err != nil {
  235. return
  236. }
  237. }
  238. }()
  239. return userSeq, nil
  240. }
  241. // SendRedPackage 发送红包消息发送至群组
  242. func (g *Group) SendRedPackage(ctx context.Context, sender *pb.Sender, req *pb.SendRedPacketReq) (int64, error) {
  243. //TODO::发送红包,暂不过滤群组用户
  244. //if sender.SenderType == pb.SenderType_ST_USER && !g.IsMember(sender.SenderId) {
  245. // logger.Sugar.Error(ctx, sender.SenderId, req.ReceiverId, "不在群组内")
  246. // return 0, gerrors.ErrNotInGroup
  247. //}
  248. // 如果发送者是用户,将消息发送给发送者,获取用户seq
  249. var userSeq int64
  250. var err error
  251. if sender.SenderType == pb.SenderType_ST_USER {
  252. userSeq, err = proxy.MessageProxy.SendRedPackageToUser(ctx, sender, sender.SenderId, req)
  253. if err != nil {
  254. return 0, err
  255. }
  256. }
  257. go func() {
  258. defer util.RecoverPanic()
  259. // 将消息发送给群组用户,使用写扩散
  260. for _, user := range g.Members {
  261. // 前面已经发送过,这里不需要再发送
  262. if sender.SenderType == pb.SenderType_ST_USER && user.UserId == sender.SenderId {
  263. continue
  264. }
  265. _, err := proxy.MessageProxy.SendRedPackageToUser(grpclib.NewAndCopyRequestId(ctx), sender, user.UserId, req)
  266. if err != nil {
  267. return
  268. }
  269. }
  270. }()
  271. return userSeq, nil
  272. }
  273. func (g *Group) IsMember(userId int64) bool {
  274. for i := range g.Members {
  275. if g.Members[i].UserId == userId {
  276. return true
  277. }
  278. }
  279. return false
  280. }
  281. // PushMessage 向群组推送消息
  282. func (g *Group) PushMessage(ctx context.Context, code pb.PushCode, message proto.Message, isPersist bool) error {
  283. logger.Logger.Debug("push_to_group",
  284. zap.Int64("request_id", grpclib.GetCtxRequestId(ctx)),
  285. zap.Int64("group_id", g.Id),
  286. zap.Int32("code", int32(code)),
  287. zap.Any("message", message))
  288. messageBuf, err := proto.Marshal(message)
  289. if err != nil {
  290. return gerrors.WrapError(err)
  291. }
  292. commandBuf, err := proto.Marshal(&pb.Command{Code: int32(code), Data: messageBuf})
  293. if err != nil {
  294. return gerrors.WrapError(err)
  295. }
  296. _, err = g.SendMessage(ctx,
  297. &pb.Sender{
  298. SenderType: pb.SenderType_ST_SYSTEM,
  299. SenderId: 0,
  300. },
  301. &pb.SendMessageReq{
  302. ReceiverType: pb.ReceiverType_RT_GROUP,
  303. ReceiverId: g.Id,
  304. ToUserIds: nil,
  305. MessageType: pb.MessageType_MT_COMMAND,
  306. MessageContent: commandBuf,
  307. SendTime: util.UnixMilliTime(time.Now()),
  308. IsPersist: isPersist,
  309. },
  310. )
  311. if err != nil {
  312. return err
  313. }
  314. return nil
  315. }
  316. // GetMembers 获取群组用户
  317. func (g *Group) GetMembers(ctx context.Context) ([]*pb.GroupMember, error) {
  318. members := g.Members
  319. userIds := make(map[int64]int32, len(members))
  320. for i := range members {
  321. userIds[members[i].UserId] = 0
  322. }
  323. resp, err := rpc.GetBusinessIntClient().GetUsers(ctx, &pb.GetUsersReq{UserIds: userIds})
  324. if err != nil {
  325. return nil, err
  326. }
  327. var infos = make([]*pb.GroupMember, len(members))
  328. for i := range members {
  329. member := pb.GroupMember{
  330. UserId: members[i].UserId,
  331. MemberType: pb.MemberType(members[i].MemberType),
  332. Remarks: members[i].Remarks,
  333. Extra: members[i].Extra,
  334. Status: int32(members[i].Status),
  335. }
  336. user, ok := resp.Users[members[i].UserId]
  337. if ok {
  338. member.Nickname = user.Nickname
  339. member.Sex = user.Sex
  340. member.AvatarUrl = user.AvatarUrl
  341. member.UserExtra = user.Extra
  342. }
  343. //TODO::返回缩略图
  344. // 检查URL中是否已存在'?'
  345. if strings.Contains(member.AvatarUrl, "?") {
  346. // 如果存在'?',则分割URL和查询参数,并替换查询参数
  347. parts := strings.Split(member.AvatarUrl, "?")
  348. member.AvatarUrl = parts[0] + "?imageView2/1/w/200/h/200"
  349. } else {
  350. // 如果不存在'?',则直接添加新的查询参数
  351. member.AvatarUrl += "?imageView2/1/w/200/h/200"
  352. }
  353. infos[i] = &member
  354. }
  355. return infos, nil
  356. }
  357. // AddMembers 给群组添加用户
  358. func (g *Group) AddMembers(ctx context.Context, userIds []int64) ([]int64, []int64, error) {
  359. var existIds []int64
  360. var addedIds []int64
  361. now := time.Now()
  362. for i, userId := range userIds {
  363. if g.IsMember(userId) {
  364. existIds = append(existIds, userIds[i])
  365. continue
  366. }
  367. g.Members = append(g.Members, GroupUser{
  368. GroupId: g.Id,
  369. UserId: userIds[i],
  370. MemberType: int(pb.MemberType_GMT_MEMBER),
  371. CreateTime: now,
  372. UpdateTime: now,
  373. UpdateType: UpdateTypeUpdate,
  374. })
  375. addedIds = append(addedIds, userIds[i])
  376. }
  377. g.UserNum += int32(len(addedIds))
  378. return existIds, addedIds, nil
  379. }
  380. func (g *Group) PushAddMember(ctx context.Context, optUserId int64, addedIds []int64, isFilterGroupLeader bool) error {
  381. var addIdMap = make(map[int64]int32, len(addedIds))
  382. for i := range addedIds {
  383. addIdMap[addedIds[i]] = 0
  384. }
  385. addIdMap[optUserId] = 0
  386. usersResp, err := rpc.GetBusinessIntClient().GetUsers(ctx, &pb.GetUsersReq{UserIds: addIdMap})
  387. if err != nil {
  388. return err
  389. }
  390. var members []*pb.GroupMember
  391. for k, _ := range addIdMap {
  392. if isFilterGroupLeader && k == optUserId {
  393. continue
  394. }
  395. member, ok := usersResp.Users[k]
  396. if !ok {
  397. continue
  398. }
  399. members = append(members, &pb.GroupMember{
  400. UserId: member.UserId,
  401. Nickname: member.Nickname,
  402. Sex: member.Sex,
  403. AvatarUrl: member.AvatarUrl,
  404. UserExtra: member.Extra,
  405. Remarks: "",
  406. Extra: "",
  407. })
  408. }
  409. //
  410. optUser := usersResp.Users[optUserId]
  411. if optUserId == -1 {
  412. if optUser == nil {
  413. optUser = &pb.User{
  414. UserId: optUserId,
  415. }
  416. optUser = &pb.User{}
  417. }
  418. optUser.Nickname = "后台操作"
  419. }
  420. if optUserId == -2 {
  421. if optUser == nil {
  422. optUser = &pb.User{
  423. UserId: optUserId,
  424. }
  425. }
  426. optUser.Nickname = "扫码加入"
  427. }
  428. err = g.PushMessage(ctx, pb.PushCode_PC_ADD_GROUP_MEMBERS, &pb.AddGroupMembersPush{
  429. OptId: optUser.UserId,
  430. OptName: optUser.Nickname,
  431. Members: members,
  432. }, true)
  433. if err != nil {
  434. logger.Sugar.Error(err)
  435. }
  436. return nil
  437. }
  438. func (g *Group) GetMember(ctx context.Context, userId int64) *GroupUser {
  439. for i := range g.Members {
  440. if g.Members[i].UserId == userId {
  441. return &g.Members[i]
  442. }
  443. }
  444. return nil
  445. }
  446. // UpdateMember 更新群组成员信息
  447. func (g *Group) UpdateMember(ctx context.Context, in *pb.UpdateGroupMemberReq) error {
  448. member := g.GetMember(ctx, in.UserId)
  449. if member == nil {
  450. return nil
  451. }
  452. member.MemberType = int(in.MemberType)
  453. member.Remarks = in.Remarks
  454. member.Extra = in.Extra
  455. member.UpdateTime = time.Now()
  456. member.UpdateType = UpdateTypeUpdate
  457. return nil
  458. }
  459. // DeleteMember 删除用户群组
  460. func (g *Group) DeleteMember(ctx context.Context, userId int64) error {
  461. member := g.GetMember(ctx, userId)
  462. if member == nil {
  463. return nil
  464. }
  465. member.UpdateType = UpdateTypeDelete
  466. return nil
  467. }
  468. func (g *Group) PushUpdateMember(ctx context.Context, optId, userId int64, memberType int32) error {
  469. userResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: optId})
  470. if err != nil {
  471. return err
  472. }
  473. if optId == -1 {
  474. userResp.User.Nickname = "后台操作"
  475. }
  476. updateUserResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: userId})
  477. if err != nil {
  478. return err
  479. }
  480. err = g.PushMessage(ctx, pb.PushCode_PC_UPDATE_GROUP_MEMBER, &pb.UpdateMemberPush{
  481. OptId: optId,
  482. OptName: userResp.User.Nickname,
  483. UpdateUserId: userId,
  484. UpdateUserName: updateUserResp.User.Nickname,
  485. UpdateUserMemberType: memberType,
  486. }, true)
  487. if err != nil {
  488. return err
  489. }
  490. return nil
  491. }
  492. func (g *Group) PushDeleteMember(ctx context.Context, optId, userId int64) error {
  493. userResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: optId})
  494. if err != nil {
  495. return err
  496. }
  497. if optId == -1 {
  498. userResp.User.Nickname = "后台操作"
  499. }
  500. deleteUserResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: userId})
  501. if err != nil {
  502. return err
  503. }
  504. err = g.PushMessage(ctx, pb.PushCode_PC_REMOVE_GROUP_MEMBER, &pb.RemoveGroupMemberPush{
  505. OptId: optId,
  506. OptName: userResp.User.Nickname,
  507. DeletedUserId: userId,
  508. DeletedUserName: deleteUserResp.User.Nickname,
  509. }, true)
  510. if err != nil {
  511. return err
  512. }
  513. return nil
  514. }
  515. func (g *Group) PushGroupMemberBanned(ctx context.Context, optId, userId int64, isAllMemberBanned bool) error {
  516. userResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: optId})
  517. if err != nil {
  518. return err
  519. }
  520. if optId == -1 {
  521. userResp.User.Nickname = "后台操作"
  522. }
  523. if !isAllMemberBanned && userId > 0 {
  524. bannedUserResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: userId})
  525. if err != nil {
  526. return err
  527. }
  528. err = g.PushMessage(ctx, pb.PushCode_PC_BANNED_GROUP_MEMBER, &pb.BannedGroupMemberPush{
  529. OptId: optId,
  530. OptName: userResp.User.Nickname,
  531. BannedUserId: userId,
  532. BannedUserName: bannedUserResp.User.Nickname,
  533. }, true)
  534. if err != nil {
  535. return err
  536. }
  537. return nil
  538. } else {
  539. if userId == 0 {
  540. err = g.PushMessage(ctx, pb.PushCode_PC_BANNED_GROUP_MEMBER, &pb.BannedGroupMemberPush{
  541. OptId: optId,
  542. OptName: userResp.User.Nickname,
  543. BannedUserId: userId,
  544. BannedUserName: fmt.Sprintf("管理员\"%s\"设置群禁言", userResp.User.Nickname),
  545. }, true)
  546. if err != nil {
  547. return err
  548. }
  549. } else {
  550. err = g.PushMessage(ctx, pb.PushCode_PC_BANNED_GROUP_MEMBER, &pb.BannedGroupMemberPush{
  551. OptId: optId,
  552. OptName: userResp.User.Nickname,
  553. BannedUserId: userId,
  554. BannedUserName: fmt.Sprintf("管理员\"%s\"取消群禁言", userResp.User.Nickname),
  555. }, true)
  556. if err != nil {
  557. return err
  558. }
  559. }
  560. return nil
  561. }
  562. }
  563. func (g *Group) PushGroupMemberRemoveBanned(ctx context.Context, optId, userId int64) error {
  564. userResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: optId})
  565. if err != nil {
  566. return err
  567. }
  568. if optId == -1 {
  569. userResp.User.Nickname = "后台操作"
  570. }
  571. removeBannedUserResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: userId})
  572. if err != nil {
  573. return err
  574. }
  575. err = g.PushMessage(ctx, pb.PushCode_PC_REMOVE_BANNED_GROUP_MEMBER, &pb.RemoveBannedGroupMemberPush{
  576. OptId: optId,
  577. OptName: userResp.User.Nickname,
  578. RemoveBannedUserId: userId,
  579. RemoveBannedUserName: removeBannedUserResp.User.Nickname,
  580. }, true)
  581. if err != nil {
  582. return err
  583. }
  584. return nil
  585. }