golang-im聊天
25개 이상의 토픽을 선택하실 수 없습니다. Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

message_service.go 13 KiB

2 년 전
2 년 전
1 년 전
2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
1 년 전
1 년 전
1 년 전
1 년 전
1 년 전
1 년 전
1 년 전
1 년 전
1 년 전
1 년 전
2 년 전
1 년 전
1 년 전
1 년 전
1 년 전
1 년 전
1 년 전
1 년 전
1 년 전
1 년 전
2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
1 년 전
1 년 전
1 년 전
1 년 전
1 년 전
2 년 전
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. package service
  2. import (
  3. "context"
  4. "gim/internal/business/comm/db"
  5. svc "gim/internal/business/comm/service"
  6. "gim/internal/business/comm/utils"
  7. repo2 "gim/internal/business/domain/user/repo"
  8. "gim/internal/logic/domain/message/md"
  9. "gim/internal/logic/domain/message/model"
  10. "gim/internal/logic/domain/message/repo"
  11. "gim/internal/logic/proxy"
  12. "gim/pkg/grpclib"
  13. "gim/pkg/grpclib/picker"
  14. "gim/pkg/logger"
  15. "gim/pkg/pb"
  16. "gim/pkg/rpc"
  17. "gim/pkg/util"
  18. "go.uber.org/zap"
  19. "google.golang.org/protobuf/proto"
  20. "strconv"
  21. "time"
  22. )
  23. const MessageLimit = 50 // 最大消息同步数量
  24. const MaxSyncBufLen = 65536 // 最大字节数组长度
  25. type messageService struct{}
  26. var MessageService = new(messageService)
  27. // Sync 消息同步
  28. func (*messageService) Sync(ctx context.Context, userId, seq int64) (*pb.SyncResp, error) {
  29. messages, hasMore, err := MessageService.ListByUserIdAndSeq(ctx, userId, seq)
  30. if err != nil {
  31. return nil, err
  32. }
  33. pbMessages := model.MessagesToPB(messages)
  34. length := len(pbMessages)
  35. resp := &pb.SyncResp{Messages: pbMessages, HasMore: hasMore}
  36. bytes, err := proto.Marshal(resp)
  37. if err != nil {
  38. return nil, err
  39. }
  40. // 如果字节数组大于一个包的长度,需要减少字节数组
  41. for len(bytes) > MaxSyncBufLen {
  42. length = length * 2 / 3
  43. resp = &pb.SyncResp{Messages: pbMessages[0:length], HasMore: true}
  44. bytes, err = proto.Marshal(resp)
  45. if err != nil {
  46. return nil, err
  47. }
  48. }
  49. var userIds = make(map[int64]int32, len(resp.Messages))
  50. for i := range resp.Messages {
  51. if resp.Messages[i].Sender.SenderType == pb.SenderType_ST_USER {
  52. userIds[resp.Messages[i].Sender.SenderId] = 0
  53. }
  54. }
  55. usersResp, err := rpc.GetBusinessIntClient().GetUsers(ctx, &pb.GetUsersReq{UserIds: userIds})
  56. if err != nil {
  57. return nil, err
  58. }
  59. for i := range resp.Messages {
  60. if resp.Messages[i].Sender.SenderType == pb.SenderType_ST_USER {
  61. user, ok := usersResp.Users[resp.Messages[i].Sender.SenderId]
  62. if ok {
  63. resp.Messages[i].Sender.Nickname = user.Nickname
  64. resp.Messages[i].Sender.AvatarUrl = user.AvatarUrl
  65. resp.Messages[i].Sender.Extra = user.Extra
  66. } else {
  67. logger.Logger.Warn("get user failed", zap.Int64("user_id", resp.Messages[i].Sender.SenderId))
  68. }
  69. }
  70. }
  71. return resp, nil
  72. }
  73. // ListByUserIdAndSeq 查询消息
  74. func (*messageService) ListByUserIdAndSeq(ctx context.Context, userId, seq int64) ([]model.Message, bool, error) {
  75. var err error
  76. if seq == 0 {
  77. seq, err = DeviceAckService.GetMaxByUserId(ctx, userId)
  78. if err != nil {
  79. return nil, false, err
  80. }
  81. }
  82. return repo.MessageRepo.ListBySeq(userId, seq, MessageLimit)
  83. }
  84. // SendToUser 将消息发送给用户
  85. func (*messageService) SendToUser(ctx context.Context, sender *pb.Sender, toUserId int64, req *pb.SendMessageReq) (int64, error) {
  86. masterId, _ := grpclib.GetCtxMasterId(ctx)
  87. logger.Logger.Debug("SendToUser",
  88. zap.String("master_id", masterId),
  89. zap.Int64("request_id", grpclib.GetCtxRequestId(ctx)),
  90. zap.Int64("to_user_id", toUserId))
  91. var (
  92. seq int64 = 0
  93. err error
  94. )
  95. if req.IsPersist {
  96. seq, err = SeqService.GetUserNext(ctx, toUserId)
  97. if err != nil {
  98. return 0, err
  99. }
  100. selfMessage := model.Message{
  101. UserId: toUserId,
  102. RequestId: grpclib.GetCtxRequestId(ctx),
  103. SenderType: int32(sender.SenderType),
  104. SenderId: sender.SenderId,
  105. ReceiverType: int32(req.ReceiverType),
  106. ReceiverId: req.ReceiverId,
  107. ToUserIds: model.FormatUserIds(req.ToUserIds),
  108. Type: int(req.MessageType),
  109. Content: req.MessageContent,
  110. Seq: seq,
  111. SendTime: util.UnunixMilliTime(req.SendTime),
  112. Status: int32(pb.MessageStatus_MS_NORMAL),
  113. }
  114. err = repo.MessageRepo.Save(selfMessage)
  115. if err != nil {
  116. logger.Sugar.Error(err)
  117. return 0, err
  118. }
  119. if sender.SenderType == pb.SenderType_ST_USER && sender.SenderId == toUserId {
  120. // 用户需要增加自己的已经同步的序列号
  121. err = repo.DeviceACKRepo.Set(sender.SenderId, sender.DeviceId, seq)
  122. if err != nil {
  123. return 0, err
  124. }
  125. }
  126. }
  127. message := pb.Message{
  128. Sender: sender,
  129. ReceiverType: req.ReceiverType,
  130. ReceiverId: req.ReceiverId,
  131. ToUserIds: req.ToUserIds,
  132. MessageType: req.MessageType,
  133. MessageContent: req.MessageContent,
  134. Seq: seq,
  135. SendTime: req.SendTime,
  136. Status: pb.MessageStatus_MS_NORMAL,
  137. }
  138. // 查询用户在线设备
  139. devices, err := proxy.DeviceProxy.ListOnlineByUserId(ctx, toUserId)
  140. if err != nil {
  141. logger.Sugar.Error(err)
  142. return 0, err
  143. }
  144. // 查询接受者用户在线设备
  145. receiverDevices, err := proxy.DeviceProxy.ListOnlineByUserId(ctx, req.ReceiverId)
  146. if err != nil {
  147. logger.Sugar.Error(err)
  148. return 0, err
  149. }
  150. isOpenAppPush := "0"
  151. if masterId != "" && masterId != "0" {
  152. isOpenAppPush = svc.SysCfgGet(masterId, "is_open_app_push")
  153. }
  154. utils.FilePutContents("SendToUser", utils.SerializeStr(map[string]interface{}{
  155. "devices_len": len(devices),
  156. "sender": sender,
  157. "toUserId": toUserId,
  158. "isOpenAppPush": isOpenAppPush,
  159. "req": req,
  160. "masterId": masterId,
  161. }))
  162. if len(receiverDevices) <= 0 {
  163. //isOpenAppPush := svc.SysCfgGet(masterId, "is_open_app_push")
  164. if sender.SenderType == pb.SenderType_ST_USER && req.ReceiverType == pb.ReceiverType_RT_USER && isOpenAppPush == "1" && req.ReceiverId != toUserId {
  165. uid := strconv.FormatInt(req.ReceiverId, 10)
  166. alia := db.DbUserPushForJg.UserPushForJgGetWithDb(masterId, uid)
  167. if alia != "" {
  168. if sender.Nickname == "" {
  169. sendUser, err := repo2.UserRepo.Get(sender.SenderId)
  170. if err != nil {
  171. logger.Sugar.Error(err)
  172. return 0, err
  173. }
  174. sender.Nickname = sendUser.Nickname
  175. sender.AvatarUrl = sendUser.AvatarUrl
  176. }
  177. var pushContent string
  178. if req.MessageType != pb.MessageType_MT_TEXT {
  179. switch req.MessageType {
  180. case pb.MessageType_MT_UNKNOWN:
  181. pushContent = "您收到一条\"未知\"消息"
  182. break
  183. case pb.MessageType_MT_FACE:
  184. pushContent = "您收到一条表情消息"
  185. break
  186. case pb.MessageType_MT_VOICE:
  187. pushContent = "您收到一条语音消息"
  188. break
  189. case pb.MessageType_MT_IMAGE:
  190. pushContent = "您收到一条图片消息"
  191. break
  192. case pb.MessageType_MT_FILE:
  193. pushContent = "您收到一条视频消息"
  194. break
  195. case pb.MessageType_MT_LOCATION:
  196. pushContent = "您收到一条地理位置消息"
  197. break
  198. case pb.MessageType_MT_COMMAND:
  199. pushContent = "您收到一条指令推送消息"
  200. break
  201. case pb.MessageType_MT_CUSTOM:
  202. pushContent = "您收到一条\"自定义\"推送消息"
  203. break
  204. }
  205. } else {
  206. pushContent = string(req.MessageContent)
  207. }
  208. //TODO::接收者类型为`user`, 进行极光推送
  209. CommAddPush(md.PushParams{
  210. MasterId: masterId,
  211. Uid: uid,
  212. PushAlia: alia,
  213. Title: sender.Nickname,
  214. Content: pushContent,
  215. //Content: "您收到一条新消息,来自会员\"[消息发送者-会员昵称]\",发送时间\"[时间]\"",
  216. //Content: "您收到一条新消息,来自会员\"[消息发送者-会员昵称]\",发送时间\"[时间]\",消息类型\"[消息类型]\",备注\"[备注]\"",
  217. PushType: "zhi_ying_gim",
  218. MessageType: req.MessageType.String(),
  219. SendUserNickname: sender.Nickname,
  220. SendUserAvatarUrl: sender.AvatarUrl,
  221. Memo: sender.SenderType.String(),
  222. Times: time.Now().Format("2006-01-02 15:04:05"),
  223. })
  224. }
  225. }
  226. }
  227. for i := range devices {
  228. if sender.DeviceId == devices[i].DeviceId {
  229. // 消息不需要投递给发送消息的设备
  230. continue
  231. }
  232. err = MessageService.SendToDevice(ctx, devices[i], &message)
  233. if err != nil {
  234. logger.Sugar.Error(err, zap.Any("SendToUser error", devices[i]), zap.Error(err))
  235. }
  236. }
  237. return seq, nil
  238. }
  239. // RecallMessageSendToUser 撤回消息用户
  240. func (*messageService) RecallMessageSendToUser(ctx context.Context, sender *pb.Sender, toUserId int64, req *pb.RecallMessageReq, sendTime time.Time) (int64, error) {
  241. masterId, _ := grpclib.GetCtxMasterId(ctx)
  242. logger.Logger.Debug("SendToUser",
  243. zap.String("master_id", masterId),
  244. zap.Int64("request_id", grpclib.GetCtxRequestId(ctx)),
  245. zap.Int64("to_user_id", toUserId))
  246. var (
  247. seq int64 = 0
  248. err error
  249. )
  250. //1、改变消息状态
  251. _, err = repo.MessageRepo.UpdateStatus(toUserId, sendTime, int(pb.MessageStatus_MS_RECALL))
  252. if err != nil {
  253. return 0, err
  254. }
  255. //3、发送一条新的消息
  256. if req.IsPersist {
  257. seq, err = SeqService.GetUserNext(ctx, toUserId)
  258. if err != nil {
  259. return 0, err
  260. }
  261. selfMessage := model.Message{
  262. UserId: toUserId,
  263. RequestId: grpclib.GetCtxRequestId(ctx),
  264. SenderType: int32(sender.SenderType),
  265. SenderId: sender.SenderId,
  266. ReceiverType: int32(req.ReceiverType),
  267. ReceiverId: req.ReceiverId,
  268. ToUserIds: model.FormatUserIds(req.ToUserIds),
  269. Type: int(req.MessageType),
  270. Content: req.MessageContent,
  271. Seq: seq,
  272. SendTime: util.UnunixMilliTime(req.SendTime),
  273. Status: int32(pb.MessageStatus_MS_NORMAL),
  274. }
  275. err = repo.MessageRepo.Save(selfMessage)
  276. if err != nil {
  277. logger.Sugar.Error(err)
  278. return 0, err
  279. }
  280. if sender.SenderType == pb.SenderType_ST_USER && sender.SenderId == toUserId {
  281. // 用户需要增加自己的已经同步的序列号
  282. err = repo.DeviceACKRepo.Set(sender.SenderId, sender.DeviceId, seq)
  283. if err != nil {
  284. return 0, err
  285. }
  286. }
  287. }
  288. message := pb.Message{
  289. Sender: sender,
  290. ReceiverType: req.ReceiverType,
  291. ReceiverId: req.ReceiverId,
  292. ToUserIds: req.ToUserIds,
  293. MessageType: req.MessageType,
  294. MessageContent: req.MessageContent,
  295. Seq: seq,
  296. SendTime: req.SendTime,
  297. Status: pb.MessageStatus_MS_NORMAL,
  298. }
  299. // 查询用户在线设备
  300. devices, err := proxy.DeviceProxy.ListOnlineByUserId(ctx, toUserId)
  301. if err != nil {
  302. logger.Sugar.Error(err)
  303. return 0, err
  304. }
  305. for i := range devices {
  306. if sender.DeviceId == devices[i].DeviceId {
  307. // 消息不需要投递给发送消息的设备
  308. continue
  309. }
  310. err = MessageService.SendToDevice(ctx, devices[i], &message)
  311. if err != nil {
  312. logger.Sugar.Error(err, zap.Any("SendToUser error", devices[i]), zap.Error(err))
  313. }
  314. }
  315. return seq, nil
  316. }
  317. // SendRedPackageToUser 发送红包给用户
  318. func (*messageService) SendRedPackageToUser(ctx context.Context, sender *pb.Sender, toUserId int64, req *pb.SendRedPacketReq) (int64, error) {
  319. masterId, _ := grpclib.GetCtxMasterId(ctx)
  320. logger.Logger.Debug("SendRedPackageToUser",
  321. zap.String("master_id", masterId),
  322. zap.Int64("request_id", grpclib.GetCtxRequestId(ctx)),
  323. zap.Int64("to_user_id", toUserId))
  324. var (
  325. seq int64 = 0
  326. err error
  327. )
  328. //1、发送一条新的消息
  329. seq, err = SeqService.GetUserNext(ctx, toUserId)
  330. if err != nil {
  331. return 0, err
  332. }
  333. selfMessage := model.Message{
  334. UserId: toUserId,
  335. RequestId: grpclib.GetCtxRequestId(ctx),
  336. SenderType: int32(sender.SenderType),
  337. SenderId: sender.SenderId,
  338. ReceiverType: int32(req.ReceiverType),
  339. ReceiverId: req.ReceiverId,
  340. //ToUserIds: model.FormatUserIds(req.ToUserIds),
  341. Type: int(req.MessageType),
  342. Content: req.MessageContent,
  343. Seq: seq,
  344. SendTime: util.UnunixMilliTime(req.SendTime),
  345. Status: int32(pb.MessageStatus_MS_NORMAL),
  346. }
  347. err = repo.MessageRepo.Save(selfMessage)
  348. if err != nil {
  349. logger.Sugar.Error(err)
  350. return 0, err
  351. }
  352. if sender.SenderType == pb.SenderType_ST_USER && sender.SenderId == toUserId {
  353. // 用户需要增加自己的已经同步的序列号
  354. err = repo.DeviceACKRepo.Set(sender.SenderId, sender.DeviceId, seq)
  355. if err != nil {
  356. return 0, err
  357. }
  358. }
  359. message := pb.Message{
  360. Sender: sender,
  361. ReceiverType: req.ReceiverType,
  362. ReceiverId: req.ReceiverId,
  363. //ToUserIds: req.ToUserIds,
  364. MessageType: pb.MessageType_MT_RED_PACKAGE,
  365. MessageContent: req.MessageContent,
  366. Seq: seq,
  367. SendTime: req.SendTime,
  368. Status: pb.MessageStatus_MS_NORMAL,
  369. }
  370. // 查询用户在线设备
  371. devices, err := proxy.DeviceProxy.ListOnlineByUserId(ctx, toUserId)
  372. if err != nil {
  373. logger.Sugar.Error(err)
  374. return 0, err
  375. }
  376. for i := range devices {
  377. if sender.DeviceId == devices[i].DeviceId {
  378. // 消息不需要投递给发送消息的设备
  379. continue
  380. }
  381. err = MessageService.SendToDevice(ctx, devices[i], &message)
  382. if err != nil {
  383. logger.Sugar.Error(err, zap.Any("SendToUser error", devices[i]), zap.Error(err))
  384. }
  385. }
  386. return seq, nil
  387. }
  388. // SendToDevice 将消息发送给设备
  389. func (*messageService) SendToDevice(ctx context.Context, device *pb.Device, message *pb.Message) error {
  390. messageSend := pb.MessageSend{Message: message}
  391. _, err := rpc.GetConnectIntClient().DeliverMessage(picker.ContextWithAddr(ctx, device.ConnAddr), &pb.DeliverMessageReq{
  392. DeviceId: device.DeviceId,
  393. MessageSend: &messageSend,
  394. })
  395. if err != nil {
  396. logger.Logger.Error("SendToDevice error", zap.Error(err))
  397. return err
  398. }
  399. // todo 其他推送厂商
  400. return nil
  401. }
  402. func (*messageService) AddSenderInfo(sender *pb.Sender) {
  403. if sender.SenderType == pb.SenderType_ST_USER {
  404. user, err := rpc.GetBusinessIntClient().GetUser(context.TODO(), &pb.GetUserReq{UserId: sender.SenderId})
  405. if err == nil && user != nil {
  406. sender.AvatarUrl = user.User.AvatarUrl
  407. sender.Nickname = user.User.Nickname
  408. sender.Extra = user.User.Extra
  409. }
  410. }
  411. }