golang-im聊天
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

609 regels
16 KiB

  1. package model
  2. import (
  3. "context"
  4. "database/sql"
  5. "fmt"
  6. "gim/internal/logic/domain/message/repo"
  7. "gim/internal/logic/proxy"
  8. "gim/pkg/gerrors"
  9. "gim/pkg/grpclib"
  10. "gim/pkg/logger"
  11. "gim/pkg/pb"
  12. "gim/pkg/rpc"
  13. "gim/pkg/util"
  14. "time"
  15. "go.uber.org/zap"
  16. "google.golang.org/protobuf/proto"
  17. )
  18. const (
  19. UpdateTypeUpdate = 1
  20. UpdateTypeDelete = 2
  21. )
  22. // Group 群组
  23. type Group struct {
  24. Id int64 // 群组id
  25. Name string // 组名
  26. AvatarUrl string // 头像
  27. Introduction string // 群简介
  28. UserNum int32 // 群组人数
  29. IsAllMemberBanned int32 // 是否全员禁言(1:是 2:否)
  30. MasterId int64 // 站长id
  31. Extra string // 附加字段
  32. CreateTime time.Time // 创建时间
  33. UpdateTime time.Time // 更新时间
  34. Members []GroupUser `gorm:"-"` // 群组成员
  35. }
  36. type GroupUser struct {
  37. Id int64 // 自增主键
  38. GroupId int64 // 群组id
  39. UserId int64 // 用户id
  40. MemberType int // 群组类型
  41. Remarks string // 备注
  42. Extra string // 附加属性
  43. Status int // 状态
  44. CreateTime time.Time // 创建时间
  45. UpdateTime time.Time // 更新时间
  46. UpdateType int `gorm:"-"` // 更新类型
  47. }
  48. type GroupUserV2 struct {
  49. Id int64 // 自增主键
  50. GroupId int64 // 群组id
  51. UserId int64 // 用户id
  52. MemberType int // 群组类型
  53. Remarks string // 备注
  54. Extra string // 附加属性
  55. Status sql.NullInt32 // 状态
  56. CreateTime time.Time // 创建时间
  57. UpdateTime time.Time // 更新时间
  58. UpdateType int `gorm:"-"` // 更新类型
  59. }
  60. func (g *Group) ToProto() *pb.Group {
  61. if g == nil {
  62. return nil
  63. }
  64. return &pb.Group{
  65. GroupId: g.Id,
  66. Name: g.Name,
  67. AvatarUrl: g.AvatarUrl,
  68. Introduction: g.Introduction,
  69. UserMum: g.UserNum,
  70. IsAllMemberBanned: g.IsAllMemberBanned,
  71. Extra: g.Extra,
  72. CreateTime: g.CreateTime.Unix(),
  73. UpdateTime: g.UpdateTime.Unix(),
  74. }
  75. }
  76. func CreateGroup(userId, masterId int64, in *pb.CreateGroupReq) *Group {
  77. now := time.Now()
  78. group := &Group{
  79. Name: in.Name,
  80. AvatarUrl: in.AvatarUrl,
  81. Introduction: in.Introduction,
  82. MasterId: masterId,
  83. Extra: in.Extra,
  84. Members: make([]GroupUser, 0, len(in.MemberIds)+1),
  85. IsAllMemberBanned: 2,
  86. CreateTime: now,
  87. UpdateTime: now,
  88. }
  89. // 创建者添加为群主
  90. group.Members = append(group.Members, GroupUser{
  91. GroupId: group.Id,
  92. UserId: userId,
  93. MemberType: int(pb.MemberType_GMT_ADMIN),
  94. CreateTime: now,
  95. UpdateTime: now,
  96. UpdateType: UpdateTypeUpdate,
  97. })
  98. // 其让人添加为成员
  99. for i := range in.MemberIds {
  100. group.Members = append(group.Members, GroupUser{
  101. GroupId: group.Id,
  102. UserId: in.MemberIds[i],
  103. MemberType: int(pb.MemberType_GMT_MEMBER),
  104. CreateTime: now,
  105. UpdateTime: now,
  106. UpdateType: UpdateTypeUpdate,
  107. })
  108. }
  109. return group
  110. }
  111. func (g *Group) Update(ctx context.Context, in *pb.UpdateGroupReq) error {
  112. g.Name = in.Name
  113. g.AvatarUrl = in.AvatarUrl
  114. g.Introduction = in.Introduction
  115. g.Extra = in.Extra
  116. g.UpdateTime = time.Now()
  117. return nil
  118. }
  119. func (g *Group) PushUpdate(ctx context.Context, userId int64, isUpdateIntroduction bool) error {
  120. userResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: userId})
  121. if err != nil {
  122. return err
  123. }
  124. err = g.PushMessage(ctx, pb.PushCode_PC_UPDATE_GROUP, &pb.UpdateGroupPush{
  125. OptId: userId,
  126. OptName: userResp.User.Nickname,
  127. Name: g.Name,
  128. AvatarUrl: g.AvatarUrl,
  129. Introduction: g.Introduction,
  130. IsUpdateIntroduction: isUpdateIntroduction,
  131. Extra: g.Extra,
  132. }, true)
  133. if err != nil {
  134. return err
  135. }
  136. if isUpdateIntroduction {
  137. //_, err = g.SendMessage(ctx,
  138. // &pb.Sender{
  139. // SenderType: pb.SenderType_ST_USER,
  140. // SenderId: 0,
  141. // },
  142. // &pb.SendMessageReq{
  143. // ReceiverType: pb.ReceiverType_RT_GROUP,
  144. // ReceiverId: g.Id,
  145. // ToUserIds: nil,
  146. // MessageType: pb.MessageType_MT_COMMAND,
  147. // MessageContent: commandBuf,
  148. // SendTime: util.UnixMilliTime(time.Now()),
  149. // IsPersist: isPersist,
  150. // })
  151. //if err != nil {
  152. // return err
  153. //}
  154. }
  155. return nil
  156. }
  157. // SendMessage 消息发送至群组
  158. func (g *Group) SendMessage(ctx context.Context, sender *pb.Sender, req *pb.SendMessageReq) (int64, error) {
  159. if sender.SenderType == pb.SenderType_ST_USER && !g.IsMember(sender.SenderId) {
  160. logger.Sugar.Error(ctx, sender.SenderId, req.ReceiverId, "不在群组内")
  161. return 0, gerrors.ErrNotInGroup
  162. }
  163. // 如果发送者是用户,将消息发送给发送者,获取用户seq
  164. var userSeq int64
  165. var err error
  166. if sender.SenderType == pb.SenderType_ST_USER {
  167. userSeq, err = proxy.MessageProxy.SendToUser(ctx, sender, sender.SenderId, req)
  168. if err != nil {
  169. return 0, err
  170. }
  171. }
  172. go func() {
  173. defer util.RecoverPanic()
  174. // 将消息发送给群组用户,使用写扩散
  175. for _, user := range g.Members {
  176. // 前面已经发送过,这里不需要再发送
  177. if sender.SenderType == pb.SenderType_ST_USER && user.UserId == sender.SenderId {
  178. continue
  179. }
  180. _, err := proxy.MessageProxy.SendToUser(grpclib.NewAndCopyRequestId(ctx), sender, user.UserId, req)
  181. if err != nil {
  182. return
  183. }
  184. }
  185. }()
  186. return userSeq, nil
  187. }
  188. // RecallSendMessage 撤回消息发送至群组
  189. func (g *Group) RecallSendMessage(ctx context.Context, sender *pb.Sender, req *pb.RecallMessageReq) (int64, error) {
  190. if sender.SenderType == pb.SenderType_ST_USER && !g.IsMember(sender.SenderId) {
  191. logger.Sugar.Error(ctx, sender.SenderId, req.ReceiverId, "不在群组内")
  192. return 0, gerrors.ErrNotInGroup
  193. }
  194. var err error
  195. //查询到对应seq的消息
  196. msg := &pb.RECALL{}
  197. err = proto.Unmarshal(req.MessageContent, msg)
  198. if err != nil {
  199. return 0, err
  200. }
  201. message, err := repo.MessageRepo.GetMessage(sender.SenderId, msg.RecallSeq)
  202. if err != nil {
  203. return 0, err
  204. }
  205. // 如果发送者是用户,将消息发送给发送者,获取用户seq
  206. var userSeq int64
  207. if sender.SenderType == pb.SenderType_ST_USER {
  208. userSeq, err = proxy.MessageProxy.RecallMessageSendToUser(ctx, sender, sender.SenderId, req, message.SendTime)
  209. if err != nil {
  210. return 0, err
  211. }
  212. }
  213. go func() {
  214. defer util.RecoverPanic()
  215. // 将消息发送给群组用户,使用写扩散
  216. for _, user := range g.Members {
  217. // 前面已经发送过,这里不需要再发送
  218. if sender.SenderType == pb.SenderType_ST_USER && user.UserId == sender.SenderId {
  219. continue
  220. }
  221. msg.RecallSeq, err = repo.MessageRepo.GetMessageSeqForSendTime(user.UserId, message.SendTime)
  222. req.MessageContent, err = proto.Marshal(msg)
  223. if err != nil {
  224. return
  225. }
  226. _, err := proxy.MessageProxy.RecallMessageSendToUser(grpclib.NewAndCopyRequestId(ctx), sender, user.UserId, req, message.SendTime)
  227. if err != nil {
  228. return
  229. }
  230. }
  231. }()
  232. return userSeq, nil
  233. }
  234. // SendRedPackage 发送红包消息发送至群组
  235. func (g *Group) SendRedPackage(ctx context.Context, sender *pb.Sender, req *pb.SendRedPacketReq) (int64, error) {
  236. if sender.SenderType == pb.SenderType_ST_USER && !g.IsMember(sender.SenderId) {
  237. logger.Sugar.Error(ctx, sender.SenderId, req.ReceiverId, "不在群组内")
  238. return 0, gerrors.ErrNotInGroup
  239. }
  240. // 如果发送者是用户,将消息发送给发送者,获取用户seq
  241. var userSeq int64
  242. var err error
  243. if sender.SenderType == pb.SenderType_ST_USER {
  244. userSeq, err = proxy.MessageProxy.SendRedPackageToUser(ctx, sender, sender.SenderId, req)
  245. if err != nil {
  246. return 0, err
  247. }
  248. }
  249. go func() {
  250. defer util.RecoverPanic()
  251. // 将消息发送给群组用户,使用写扩散
  252. for _, user := range g.Members {
  253. // 前面已经发送过,这里不需要再发送
  254. if sender.SenderType == pb.SenderType_ST_USER && user.UserId == sender.SenderId {
  255. continue
  256. }
  257. _, err := proxy.MessageProxy.SendRedPackageToUser(grpclib.NewAndCopyRequestId(ctx), sender, user.UserId, req)
  258. if err != nil {
  259. return
  260. }
  261. }
  262. }()
  263. return userSeq, nil
  264. }
  265. func (g *Group) IsMember(userId int64) bool {
  266. for i := range g.Members {
  267. if g.Members[i].UserId == userId {
  268. return true
  269. }
  270. }
  271. return false
  272. }
  273. // PushMessage 向群组推送消息
  274. func (g *Group) PushMessage(ctx context.Context, code pb.PushCode, message proto.Message, isPersist bool) error {
  275. logger.Logger.Debug("push_to_group",
  276. zap.Int64("request_id", grpclib.GetCtxRequestId(ctx)),
  277. zap.Int64("group_id", g.Id),
  278. zap.Int32("code", int32(code)),
  279. zap.Any("message", message))
  280. messageBuf, err := proto.Marshal(message)
  281. if err != nil {
  282. return gerrors.WrapError(err)
  283. }
  284. commandBuf, err := proto.Marshal(&pb.Command{Code: int32(code), Data: messageBuf})
  285. if err != nil {
  286. return gerrors.WrapError(err)
  287. }
  288. _, err = g.SendMessage(ctx,
  289. &pb.Sender{
  290. SenderType: pb.SenderType_ST_SYSTEM,
  291. SenderId: 0,
  292. },
  293. &pb.SendMessageReq{
  294. ReceiverType: pb.ReceiverType_RT_GROUP,
  295. ReceiverId: g.Id,
  296. ToUserIds: nil,
  297. MessageType: pb.MessageType_MT_COMMAND,
  298. MessageContent: commandBuf,
  299. SendTime: util.UnixMilliTime(time.Now()),
  300. IsPersist: isPersist,
  301. },
  302. )
  303. if err != nil {
  304. return err
  305. }
  306. return nil
  307. }
  308. // GetMembers 获取群组用户
  309. func (g *Group) GetMembers(ctx context.Context) ([]*pb.GroupMember, error) {
  310. members := g.Members
  311. userIds := make(map[int64]int32, len(members))
  312. for i := range members {
  313. userIds[members[i].UserId] = 0
  314. }
  315. resp, err := rpc.GetBusinessIntClient().GetUsers(ctx, &pb.GetUsersReq{UserIds: userIds})
  316. if err != nil {
  317. return nil, err
  318. }
  319. var infos = make([]*pb.GroupMember, len(members))
  320. for i := range members {
  321. member := pb.GroupMember{
  322. UserId: members[i].UserId,
  323. MemberType: pb.MemberType(members[i].MemberType),
  324. Remarks: members[i].Remarks,
  325. Extra: members[i].Extra,
  326. Status: int32(members[i].Status),
  327. }
  328. user, ok := resp.Users[members[i].UserId]
  329. if ok {
  330. member.Nickname = user.Nickname
  331. member.Sex = user.Sex
  332. member.AvatarUrl = user.AvatarUrl
  333. member.UserExtra = user.Extra
  334. }
  335. infos[i] = &member
  336. }
  337. return infos, nil
  338. }
  339. // AddMembers 给群组添加用户
  340. func (g *Group) AddMembers(ctx context.Context, userIds []int64) ([]int64, []int64, error) {
  341. var existIds []int64
  342. var addedIds []int64
  343. now := time.Now()
  344. for i, userId := range userIds {
  345. if g.IsMember(userId) {
  346. existIds = append(existIds, userIds[i])
  347. continue
  348. }
  349. g.Members = append(g.Members, GroupUser{
  350. GroupId: g.Id,
  351. UserId: userIds[i],
  352. MemberType: int(pb.MemberType_GMT_MEMBER),
  353. CreateTime: now,
  354. UpdateTime: now,
  355. UpdateType: UpdateTypeUpdate,
  356. })
  357. addedIds = append(addedIds, userIds[i])
  358. }
  359. g.UserNum += int32(len(addedIds))
  360. return existIds, addedIds, nil
  361. }
  362. func (g *Group) PushAddMember(ctx context.Context, optUserId int64, addedIds []int64) error {
  363. var addIdMap = make(map[int64]int32, len(addedIds))
  364. for i := range addedIds {
  365. addIdMap[addedIds[i]] = 0
  366. }
  367. addIdMap[optUserId] = 0
  368. usersResp, err := rpc.GetBusinessIntClient().GetUsers(ctx, &pb.GetUsersReq{UserIds: addIdMap})
  369. if err != nil {
  370. return err
  371. }
  372. var members []*pb.GroupMember
  373. for k, _ := range addIdMap {
  374. member, ok := usersResp.Users[k]
  375. if !ok {
  376. continue
  377. }
  378. members = append(members, &pb.GroupMember{
  379. UserId: member.UserId,
  380. Nickname: member.Nickname,
  381. Sex: member.Sex,
  382. AvatarUrl: member.AvatarUrl,
  383. UserExtra: member.Extra,
  384. Remarks: "",
  385. Extra: "",
  386. })
  387. }
  388. //
  389. optUser := usersResp.Users[optUserId]
  390. if optUserId == -1 {
  391. optUser.Nickname = "后台操作"
  392. }
  393. err = g.PushMessage(ctx, pb.PushCode_PC_ADD_GROUP_MEMBERS, &pb.AddGroupMembersPush{
  394. OptId: optUser.UserId,
  395. OptName: optUser.Nickname,
  396. Members: members,
  397. }, true)
  398. if err != nil {
  399. logger.Sugar.Error(err)
  400. }
  401. return nil
  402. }
  403. func (g *Group) GetMember(ctx context.Context, userId int64) *GroupUser {
  404. for i := range g.Members {
  405. if g.Members[i].UserId == userId {
  406. return &g.Members[i]
  407. }
  408. }
  409. return nil
  410. }
  411. // UpdateMember 更新群组成员信息
  412. func (g *Group) UpdateMember(ctx context.Context, in *pb.UpdateGroupMemberReq) error {
  413. member := g.GetMember(ctx, in.UserId)
  414. if member == nil {
  415. return nil
  416. }
  417. member.MemberType = int(in.MemberType)
  418. member.Remarks = in.Remarks
  419. member.Extra = in.Extra
  420. member.UpdateTime = time.Now()
  421. member.UpdateType = UpdateTypeUpdate
  422. return nil
  423. }
  424. // DeleteMember 删除用户群组
  425. func (g *Group) DeleteMember(ctx context.Context, userId int64) error {
  426. member := g.GetMember(ctx, userId)
  427. if member == nil {
  428. return nil
  429. }
  430. member.UpdateType = UpdateTypeDelete
  431. return nil
  432. }
  433. func (g *Group) PushUpdateMember(ctx context.Context, optId, userId int64, memberType int32) error {
  434. userResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: optId})
  435. if err != nil {
  436. return err
  437. }
  438. if optId == -1 {
  439. userResp.User.Nickname = "后台操作"
  440. }
  441. updateUserResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: userId})
  442. if err != nil {
  443. return err
  444. }
  445. err = g.PushMessage(ctx, pb.PushCode_PC_UPDATE_GROUP_MEMBER, &pb.UpdateMemberPush{
  446. OptId: optId,
  447. OptName: userResp.User.Nickname,
  448. UpdateUserId: userId,
  449. UpdateUserName: updateUserResp.User.Nickname,
  450. UpdateUserMemberType: memberType,
  451. }, true)
  452. if err != nil {
  453. return err
  454. }
  455. return nil
  456. }
  457. func (g *Group) PushDeleteMember(ctx context.Context, optId, userId int64) error {
  458. userResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: optId})
  459. if err != nil {
  460. return err
  461. }
  462. if optId == -1 {
  463. userResp.User.Nickname = "后台操作"
  464. }
  465. deleteUserResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: userId})
  466. if err != nil {
  467. return err
  468. }
  469. err = g.PushMessage(ctx, pb.PushCode_PC_REMOVE_GROUP_MEMBER, &pb.RemoveGroupMemberPush{
  470. OptId: optId,
  471. OptName: userResp.User.Nickname,
  472. DeletedUserId: userId,
  473. DeletedUserName: deleteUserResp.User.Nickname,
  474. }, true)
  475. if err != nil {
  476. return err
  477. }
  478. return nil
  479. }
  480. func (g *Group) PushGroupMemberBanned(ctx context.Context, optId, userId int64, isAllMemberBanned bool) error {
  481. userResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: optId})
  482. if err != nil {
  483. return err
  484. }
  485. if optId == -1 {
  486. userResp.User.Nickname = "后台操作"
  487. }
  488. if !isAllMemberBanned && userId > 0 {
  489. bannedUserResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: userId})
  490. if err != nil {
  491. return err
  492. }
  493. err = g.PushMessage(ctx, pb.PushCode_PC_BANNED_GROUP_MEMBER, &pb.BannedGroupMemberPush{
  494. OptId: optId,
  495. OptName: userResp.User.Nickname,
  496. BannedUserId: userId,
  497. BannedUserName: bannedUserResp.User.Nickname,
  498. }, true)
  499. if err != nil {
  500. return err
  501. }
  502. return nil
  503. } else {
  504. if userId == 0 {
  505. err = g.PushMessage(ctx, pb.PushCode_PC_BANNED_GROUP_MEMBER, &pb.BannedGroupMemberPush{
  506. OptId: optId,
  507. OptName: userResp.User.Nickname,
  508. BannedUserId: userId,
  509. BannedUserName: fmt.Sprintf("管理员\"%s\"设置群禁言", userResp.User.Nickname),
  510. }, true)
  511. if err != nil {
  512. return err
  513. }
  514. } else {
  515. err = g.PushMessage(ctx, pb.PushCode_PC_BANNED_GROUP_MEMBER, &pb.BannedGroupMemberPush{
  516. OptId: optId,
  517. OptName: userResp.User.Nickname,
  518. BannedUserId: userId,
  519. BannedUserName: fmt.Sprintf("管理员\"%s\"取消群禁言", userResp.User.Nickname),
  520. }, true)
  521. if err != nil {
  522. return err
  523. }
  524. }
  525. return nil
  526. }
  527. }
  528. func (g *Group) PushGroupMemberRemoveBanned(ctx context.Context, optId, userId int64) error {
  529. userResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: optId})
  530. if err != nil {
  531. return err
  532. }
  533. if optId == -1 {
  534. userResp.User.Nickname = "后台操作"
  535. }
  536. removeBannedUserResp, err := rpc.GetBusinessIntClient().GetUser(ctx, &pb.GetUserReq{UserId: userId})
  537. if err != nil {
  538. return err
  539. }
  540. err = g.PushMessage(ctx, pb.PushCode_PC_REMOVE_BANNED_GROUP_MEMBER, &pb.RemoveBannedGroupMemberPush{
  541. OptId: optId,
  542. OptName: userResp.User.Nickname,
  543. RemoveBannedUserId: userId,
  544. RemoveBannedUserName: removeBannedUserResp.User.Nickname,
  545. }, true)
  546. if err != nil {
  547. return err
  548. }
  549. return nil
  550. }