golang-im聊天
Não pode escolher mais do que 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.
 
 
 
 

630 linhas
17 KiB

  1. package model
  2. import (
  3. "context"
  4. "database/sql"
  5. "fmt"
  6. "gim/internal/logic/domain/message/repo"
  7. "gim/internal/logic/proxy"
  8. "gim/pkg/gerrors"
  9. "gim/pkg/grpclib"
  10. "gim/pkg/logger"
  11. "gim/pkg/pb"
  12. "gim/pkg/rpc"
  13. "gim/pkg/util"
  14. "strings"
  15. "time"
  16. "go.uber.org/zap"
  17. "google.golang.org/protobuf/proto"
  18. )
  19. const (
  20. UpdateTypeUpdate = 1
  21. UpdateTypeDelete = 2
  22. )
  23. // Group 群组
  24. type Group struct {
  25. Id int64 // 群组id
  26. Name string // 组名
  27. AvatarUrl string // 头像
  28. Introduction string // 群简介
  29. UserNum int32 // 群组人数
  30. IsAllMemberBanned int32 // 是否全员禁言(1:是 2:否)
  31. IsAllAddFriend int32 // 是否允许加好友(1:是 2:否)
  32. MasterId int64 // 站长id
  33. Extra string // 附加字段
  34. CreateTime time.Time // 创建时间
  35. UpdateTime time.Time // 更新时间
  36. Members []GroupUser `gorm:"-"` // 群组成员
  37. }
  38. type GroupUser struct {
  39. Id int64 // 自增主键
  40. GroupId int64 // 群组id
  41. UserId int64 // 用户id
  42. MemberType int // 群组类型
  43. Remarks string // 备注
  44. Extra string // 附加属性
  45. Status int // 状态
  46. CreateTime time.Time // 创建时间
  47. UpdateTime time.Time // 更新时间
  48. UpdateType int `gorm:"-"` // 更新类型
  49. }
  50. type GroupUserV2 struct {
  51. Id int64 // 自增主键
  52. GroupId int64 // 群组id
  53. UserId int64 // 用户id
  54. MemberType int // 群组类型
  55. Remarks string // 备注
  56. Extra string // 附加属性
  57. Status sql.NullInt32 // 状态
  58. CreateTime time.Time // 创建时间
  59. UpdateTime time.Time // 更新时间
  60. UpdateType int `gorm:"-"` // 更新类型
  61. }
  62. func (g *Group) ToProto() *pb.Group {
  63. if g == nil {
  64. return nil
  65. }
  66. return &pb.Group{
  67. GroupId: g.Id,
  68. Name: g.Name,
  69. AvatarUrl: g.AvatarUrl,
  70. Introduction: g.Introduction,
  71. UserMum: g.UserNum,
  72. IsAllMemberBanned: g.IsAllMemberBanned,
  73. IsAllAddFriend: g.IsAllAddFriend,
  74. Extra: g.Extra,
  75. CreateTime: g.CreateTime.Unix(),
  76. UpdateTime: g.UpdateTime.Unix(),
  77. }
  78. }
  79. func CreateGroup(userId, masterId int64, in *pb.CreateGroupReq) *Group {
  80. now := time.Now()
  81. group := &Group{
  82. Name: in.Name,
  83. AvatarUrl: in.AvatarUrl,
  84. Introduction: in.Introduction,
  85. MasterId: masterId,
  86. Extra: in.Extra,
  87. Members: make([]GroupUser, 0, len(in.MemberIds)+1),
  88. IsAllMemberBanned: 2,
  89. IsAllAddFriend: 2,
  90. UserNum: int32(len(in.MemberIds)) + 1,
  91. CreateTime: now,
  92. UpdateTime: now,
  93. }
  94. // 创建者添加为群主
  95. group.Members = append(group.Members, GroupUser{
  96. GroupId: group.Id,
  97. UserId: userId,
  98. MemberType: int(pb.MemberType_GMT_ADMIN),
  99. CreateTime: now,
  100. UpdateTime: now,
  101. UpdateType: UpdateTypeUpdate,
  102. })
  103. // 其让人添加为成员
  104. for i := range in.MemberIds {
  105. group.Members = append(group.Members, GroupUser{
  106. GroupId: group.Id,
  107. UserId: in.MemberIds[i],
  108. MemberType: int(pb.MemberType_GMT_MEMBER),
  109. CreateTime: now,
  110. UpdateTime: now,
  111. UpdateType: UpdateTypeUpdate,
  112. })
  113. }
  114. return group
  115. }
  116. func (g *Group) Update(ctx context.Context, in *pb.UpdateGroupReq) error {
  117. g.Name = in.Name
  118. g.AvatarUrl = in.AvatarUrl
  119. g.Introduction = in.Introduction
  120. g.Extra = in.Extra
  121. g.UpdateTime = time.Now()
  122. return nil
  123. }
  124. func (g *Group) PushUpdate(ctx context.Context, userId int64, isUpdateIntroduction bool) error {
  125. userResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: userId})
  126. if err != nil {
  127. return err
  128. }
  129. err = g.PushMessage(ctx, pb.PushCode_PC_UPDATE_GROUP, &pb.UpdateGroupPush{
  130. OptId: userId,
  131. OptName: userResp.User.Nickname,
  132. OptAvatarUrl: userResp.User.AvatarUrl,
  133. Name: g.Name,
  134. AvatarUrl: g.AvatarUrl,
  135. Introduction: g.Introduction,
  136. IsUpdateIntroduction: isUpdateIntroduction,
  137. Extra: g.Extra,
  138. }, true)
  139. if err != nil {
  140. return err
  141. }
  142. if isUpdateIntroduction {
  143. //_, err = g.SendMessage(ctx,
  144. // &pb.Sender{
  145. // SenderType: pb.SenderType_ST_USER,
  146. // SenderId: 0,
  147. // },
  148. // &pb.SendMessageReq{
  149. // ReceiverType: pb.ReceiverType_RT_GROUP,
  150. // ReceiverId: g.Id,
  151. // ToUserIds: nil,
  152. // MessageType: pb.MessageType_MT_COMMAND,
  153. // MessageContent: commandBuf,
  154. // SendTime: util.UnixMilliTime(time.Now()),
  155. // IsPersist: isPersist,
  156. // })
  157. //if err != nil {
  158. // return err
  159. //}
  160. }
  161. return nil
  162. }
  163. // SendMessage 消息发送至群组
  164. func (g *Group) SendMessage(ctx context.Context, sender *pb.Sender, req *pb.SendMessageReq) (int64, error) {
  165. if sender.SenderType == pb.SenderType_ST_USER && !g.IsMember(sender.SenderId) {
  166. logger.Sugar.Error(ctx, sender.SenderId, req.ReceiverId, "不在群组内")
  167. return 0, gerrors.ErrNotInGroup
  168. }
  169. // 如果发送者是用户,将消息发送给发送者,获取用户seq
  170. var userSeq int64
  171. var err error
  172. if sender.SenderType == pb.SenderType_ST_USER {
  173. userSeq, err = proxy.MessageProxy.SendToUser(ctx, sender, sender.SenderId, req)
  174. if err != nil {
  175. return 0, err
  176. }
  177. }
  178. go func() {
  179. defer util.RecoverPanic()
  180. // 将消息发送给群组用户,使用写扩散
  181. for _, user := range g.Members {
  182. // 前面已经发送过,这里不需要再发送
  183. if sender.SenderType == pb.SenderType_ST_USER && user.UserId == sender.SenderId {
  184. continue
  185. }
  186. _, err := proxy.MessageProxy.SendToUser(grpclib.NewAndCopyRequestId(ctx), sender, user.UserId, req)
  187. if err != nil {
  188. return
  189. }
  190. }
  191. }()
  192. return userSeq, nil
  193. }
  194. // RecallSendMessage 撤回消息发送至群组
  195. func (g *Group) RecallSendMessage(ctx context.Context, sender *pb.Sender, req *pb.RecallMessageReq) (int64, error) {
  196. if sender.SenderType == pb.SenderType_ST_USER && !g.IsMember(sender.SenderId) {
  197. logger.Sugar.Error(ctx, sender.SenderId, req.ReceiverId, "不在群组内")
  198. return 0, gerrors.ErrNotInGroup
  199. }
  200. var err error
  201. //查询到对应seq的消息
  202. msg := &pb.RECALL{}
  203. err = proto.Unmarshal(req.MessageContent, msg)
  204. if err != nil {
  205. return 0, err
  206. }
  207. message, err := repo.MessageRepo.GetMessage(sender.SenderId, msg.RecallSeq)
  208. if err != nil {
  209. return 0, err
  210. }
  211. // 如果发送者是用户,将消息发送给发送者,获取用户seq
  212. var userSeq int64
  213. if sender.SenderType == pb.SenderType_ST_USER {
  214. userSeq, err = proxy.MessageProxy.RecallMessageSendToUser(ctx, sender, sender.SenderId, req, message.SendTime)
  215. if err != nil {
  216. return 0, err
  217. }
  218. }
  219. go func() {
  220. defer util.RecoverPanic()
  221. // 将消息发送给群组用户,使用写扩散
  222. for _, user := range g.Members {
  223. // 前面已经发送过,这里不需要再发送
  224. if sender.SenderType == pb.SenderType_ST_USER && user.UserId == sender.SenderId {
  225. continue
  226. }
  227. msg.RecallSeq, err = repo.MessageRepo.GetMessageSeqForSendTime(user.UserId, message.SendTime)
  228. req.MessageContent, err = proto.Marshal(msg)
  229. if err != nil {
  230. return
  231. }
  232. _, err := proxy.MessageProxy.RecallMessageSendToUser(grpclib.NewAndCopyRequestId(ctx), sender, user.UserId, req, message.SendTime)
  233. if err != nil {
  234. return
  235. }
  236. }
  237. }()
  238. return userSeq, nil
  239. }
  240. // SendRedPackage 发送红包消息发送至群组
  241. func (g *Group) SendRedPackage(ctx context.Context, sender *pb.Sender, req *pb.SendRedPacketReq) (int64, error) {
  242. //TODO::发送红包,暂不过滤群组用户
  243. //if sender.SenderType == pb.SenderType_ST_USER && !g.IsMember(sender.SenderId) {
  244. // logger.Sugar.Error(ctx, sender.SenderId, req.ReceiverId, "不在群组内")
  245. // return 0, gerrors.ErrNotInGroup
  246. //}
  247. // 如果发送者是用户,将消息发送给发送者,获取用户seq
  248. var userSeq int64
  249. var err error
  250. if sender.SenderType == pb.SenderType_ST_USER {
  251. userSeq, err = proxy.MessageProxy.SendRedPackageToUser(ctx, sender, sender.SenderId, req)
  252. if err != nil {
  253. return 0, err
  254. }
  255. }
  256. go func() {
  257. defer util.RecoverPanic()
  258. // 将消息发送给群组用户,使用写扩散
  259. for _, user := range g.Members {
  260. // 前面已经发送过,这里不需要再发送
  261. if sender.SenderType == pb.SenderType_ST_USER && user.UserId == sender.SenderId {
  262. continue
  263. }
  264. _, err := proxy.MessageProxy.SendRedPackageToUser(grpclib.NewAndCopyRequestId(ctx), sender, user.UserId, req)
  265. if err != nil {
  266. return
  267. }
  268. }
  269. }()
  270. return userSeq, nil
  271. }
  272. func (g *Group) IsMember(userId int64) bool {
  273. for i := range g.Members {
  274. if g.Members[i].UserId == userId {
  275. return true
  276. }
  277. }
  278. return false
  279. }
  280. // PushMessage 向群组推送消息
  281. func (g *Group) PushMessage(ctx context.Context, code pb.PushCode, message proto.Message, isPersist bool) error {
  282. logger.Logger.Debug("push_to_group",
  283. zap.Int64("request_id", grpclib.GetCtxRequestId(ctx)),
  284. zap.Int64("group_id", g.Id),
  285. zap.Int32("code", int32(code)),
  286. zap.Any("message", message))
  287. messageBuf, err := proto.Marshal(message)
  288. if err != nil {
  289. return gerrors.WrapError(err)
  290. }
  291. commandBuf, err := proto.Marshal(&pb.Command{Code: int32(code), Data: messageBuf})
  292. if err != nil {
  293. return gerrors.WrapError(err)
  294. }
  295. _, err = g.SendMessage(ctx,
  296. &pb.Sender{
  297. SenderType: pb.SenderType_ST_SYSTEM,
  298. SenderId: 0,
  299. },
  300. &pb.SendMessageReq{
  301. ReceiverType: pb.ReceiverType_RT_GROUP,
  302. ReceiverId: g.Id,
  303. ToUserIds: nil,
  304. MessageType: pb.MessageType_MT_COMMAND,
  305. MessageContent: commandBuf,
  306. SendTime: util.UnixMilliTime(time.Now()),
  307. IsPersist: isPersist,
  308. },
  309. )
  310. if err != nil {
  311. return err
  312. }
  313. return nil
  314. }
  315. // GetMembers 获取群组用户
  316. func (g *Group) GetMembers(ctx context.Context) ([]*pb.GroupMember, error) {
  317. members := g.Members
  318. userIds := make(map[int64]int32, len(members))
  319. for i := range members {
  320. userIds[members[i].UserId] = 0
  321. }
  322. resp, err := rpc.GetBusinessIntClient().GetUsers(ctx, &pb.GetUsersReq{UserIds: userIds})
  323. if err != nil {
  324. return nil, err
  325. }
  326. var infos = make([]*pb.GroupMember, len(members))
  327. for i := range members {
  328. member := pb.GroupMember{
  329. UserId: members[i].UserId,
  330. MemberType: pb.MemberType(members[i].MemberType),
  331. Remarks: members[i].Remarks,
  332. Extra: members[i].Extra,
  333. Status: int32(members[i].Status),
  334. }
  335. user, ok := resp.Users[members[i].UserId]
  336. if ok {
  337. member.Nickname = user.Nickname
  338. member.Sex = user.Sex
  339. member.AvatarUrl = user.AvatarUrl
  340. member.UserExtra = user.Extra
  341. }
  342. //TODO::返回缩略图
  343. // 检查URL中是否已存在'?'
  344. if strings.Contains(member.AvatarUrl, "?") {
  345. // 如果存在'?',则分割URL和查询参数,并替换查询参数
  346. parts := strings.Split(member.AvatarUrl, "?")
  347. member.AvatarUrl = parts[0] + "?imageView2/1/w/200/h/200"
  348. } else {
  349. // 如果不存在'?',则直接添加新的查询参数
  350. member.AvatarUrl += "?imageView2/1/w/200/h/200"
  351. }
  352. infos[i] = &member
  353. }
  354. return infos, nil
  355. }
  356. // AddMembers 给群组添加用户
  357. func (g *Group) AddMembers(ctx context.Context, userIds []int64) ([]int64, []int64, error) {
  358. var existIds []int64
  359. var addedIds []int64
  360. now := time.Now()
  361. for i, userId := range userIds {
  362. if g.IsMember(userId) {
  363. existIds = append(existIds, userIds[i])
  364. continue
  365. }
  366. g.Members = append(g.Members, GroupUser{
  367. GroupId: g.Id,
  368. UserId: userIds[i],
  369. MemberType: int(pb.MemberType_GMT_MEMBER),
  370. CreateTime: now,
  371. UpdateTime: now,
  372. UpdateType: UpdateTypeUpdate,
  373. })
  374. addedIds = append(addedIds, userIds[i])
  375. }
  376. g.UserNum += int32(len(addedIds))
  377. return existIds, addedIds, nil
  378. }
  379. func (g *Group) PushAddMember(ctx context.Context, optUserId int64, addedIds []int64, isFilterGroupLeader bool) error {
  380. var addIdMap = make(map[int64]int32, len(addedIds))
  381. for i := range addedIds {
  382. addIdMap[addedIds[i]] = 0
  383. }
  384. addIdMap[optUserId] = 0
  385. usersResp, err := rpc.GetBusinessIntClient().GetUsers(ctx, &pb.GetUsersReq{UserIds: addIdMap})
  386. if err != nil {
  387. return err
  388. }
  389. var members []*pb.GroupMember
  390. for k, _ := range addIdMap {
  391. if isFilterGroupLeader && k == optUserId {
  392. continue
  393. }
  394. member, ok := usersResp.Users[k]
  395. if !ok {
  396. continue
  397. }
  398. members = append(members, &pb.GroupMember{
  399. UserId: member.UserId,
  400. Nickname: member.Nickname,
  401. Sex: member.Sex,
  402. AvatarUrl: member.AvatarUrl,
  403. UserExtra: member.Extra,
  404. Remarks: "",
  405. Extra: "",
  406. })
  407. }
  408. //
  409. optUser := usersResp.Users[optUserId]
  410. if optUserId == -1 {
  411. optUser.Nickname = "后台操作"
  412. }
  413. err = g.PushMessage(ctx, pb.PushCode_PC_ADD_GROUP_MEMBERS, &pb.AddGroupMembersPush{
  414. OptId: optUser.UserId,
  415. OptName: optUser.Nickname,
  416. Members: members,
  417. }, true)
  418. if err != nil {
  419. logger.Sugar.Error(err)
  420. }
  421. return nil
  422. }
  423. func (g *Group) GetMember(ctx context.Context, userId int64) *GroupUser {
  424. for i := range g.Members {
  425. if g.Members[i].UserId == userId {
  426. return &g.Members[i]
  427. }
  428. }
  429. return nil
  430. }
  431. // UpdateMember 更新群组成员信息
  432. func (g *Group) UpdateMember(ctx context.Context, in *pb.UpdateGroupMemberReq) error {
  433. member := g.GetMember(ctx, in.UserId)
  434. if member == nil {
  435. return nil
  436. }
  437. member.MemberType = int(in.MemberType)
  438. member.Remarks = in.Remarks
  439. member.Extra = in.Extra
  440. member.UpdateTime = time.Now()
  441. member.UpdateType = UpdateTypeUpdate
  442. return nil
  443. }
  444. // DeleteMember 删除用户群组
  445. func (g *Group) DeleteMember(ctx context.Context, userId int64) error {
  446. member := g.GetMember(ctx, userId)
  447. if member == nil {
  448. return nil
  449. }
  450. member.UpdateType = UpdateTypeDelete
  451. return nil
  452. }
  453. func (g *Group) PushUpdateMember(ctx context.Context, optId, userId int64, memberType int32) error {
  454. userResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: optId})
  455. if err != nil {
  456. return err
  457. }
  458. if optId == -1 {
  459. userResp.User.Nickname = "后台操作"
  460. }
  461. updateUserResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: userId})
  462. if err != nil {
  463. return err
  464. }
  465. err = g.PushMessage(ctx, pb.PushCode_PC_UPDATE_GROUP_MEMBER, &pb.UpdateMemberPush{
  466. OptId: optId,
  467. OptName: userResp.User.Nickname,
  468. UpdateUserId: userId,
  469. UpdateUserName: updateUserResp.User.Nickname,
  470. UpdateUserMemberType: memberType,
  471. }, true)
  472. if err != nil {
  473. return err
  474. }
  475. return nil
  476. }
  477. func (g *Group) PushDeleteMember(ctx context.Context, optId, userId int64) error {
  478. userResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: optId})
  479. if err != nil {
  480. return err
  481. }
  482. if optId == -1 {
  483. userResp.User.Nickname = "后台操作"
  484. }
  485. deleteUserResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: userId})
  486. if err != nil {
  487. return err
  488. }
  489. err = g.PushMessage(ctx, pb.PushCode_PC_REMOVE_GROUP_MEMBER, &pb.RemoveGroupMemberPush{
  490. OptId: optId,
  491. OptName: userResp.User.Nickname,
  492. DeletedUserId: userId,
  493. DeletedUserName: deleteUserResp.User.Nickname,
  494. }, true)
  495. if err != nil {
  496. return err
  497. }
  498. return nil
  499. }
  500. func (g *Group) PushGroupMemberBanned(ctx context.Context, optId, userId int64, isAllMemberBanned bool) error {
  501. userResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: optId})
  502. if err != nil {
  503. return err
  504. }
  505. if optId == -1 {
  506. userResp.User.Nickname = "后台操作"
  507. }
  508. if !isAllMemberBanned && userId > 0 {
  509. bannedUserResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: userId})
  510. if err != nil {
  511. return err
  512. }
  513. err = g.PushMessage(ctx, pb.PushCode_PC_BANNED_GROUP_MEMBER, &pb.BannedGroupMemberPush{
  514. OptId: optId,
  515. OptName: userResp.User.Nickname,
  516. BannedUserId: userId,
  517. BannedUserName: bannedUserResp.User.Nickname,
  518. }, true)
  519. if err != nil {
  520. return err
  521. }
  522. return nil
  523. } else {
  524. if userId == 0 {
  525. err = g.PushMessage(ctx, pb.PushCode_PC_BANNED_GROUP_MEMBER, &pb.BannedGroupMemberPush{
  526. OptId: optId,
  527. OptName: userResp.User.Nickname,
  528. BannedUserId: userId,
  529. BannedUserName: fmt.Sprintf("管理员\"%s\"设置群禁言", userResp.User.Nickname),
  530. }, true)
  531. if err != nil {
  532. return err
  533. }
  534. } else {
  535. err = g.PushMessage(ctx, pb.PushCode_PC_BANNED_GROUP_MEMBER, &pb.BannedGroupMemberPush{
  536. OptId: optId,
  537. OptName: userResp.User.Nickname,
  538. BannedUserId: userId,
  539. BannedUserName: fmt.Sprintf("管理员\"%s\"取消群禁言", userResp.User.Nickname),
  540. }, true)
  541. if err != nil {
  542. return err
  543. }
  544. }
  545. return nil
  546. }
  547. }
  548. func (g *Group) PushGroupMemberRemoveBanned(ctx context.Context, optId, userId int64) error {
  549. userResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: optId})
  550. if err != nil {
  551. return err
  552. }
  553. if optId == -1 {
  554. userResp.User.Nickname = "后台操作"
  555. }
  556. removeBannedUserResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: userId})
  557. if err != nil {
  558. return err
  559. }
  560. err = g.PushMessage(ctx, pb.PushCode_PC_REMOVE_BANNED_GROUP_MEMBER, &pb.RemoveBannedGroupMemberPush{
  561. OptId: optId,
  562. OptName: userResp.User.Nickname,
  563. RemoveBannedUserId: userId,
  564. RemoveBannedUserName: removeBannedUserResp.User.Nickname,
  565. }, true)
  566. if err != nil {
  567. return err
  568. }
  569. return nil
  570. }