You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

message_service.go 12 KiB

1 week ago
1 week ago
1 week ago
1 week ago
1 week ago
1 week ago
1 week ago
1 week ago
1 week ago
1 week ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. package service
  2. import (
  3. "context"
  4. "egg-im/internal/business/comm/db"
  5. svc "egg-im/internal/business/comm/service"
  6. "egg-im/internal/business/comm/utils"
  7. repo2 "egg-im/internal/business/domain/user/repo"
  8. "egg-im/internal/logic/domain/message/md"
  9. "egg-im/internal/logic/domain/message/model"
  10. "egg-im/internal/logic/domain/message/repo"
  11. "egg-im/internal/logic/proxy"
  12. "egg-im/pkg/grpclib"
  13. "egg-im/pkg/grpclib/picker"
  14. "egg-im/pkg/logger"
  15. "egg-im/pkg/pb"
  16. "egg-im/pkg/rpc"
  17. "egg-im/pkg/util"
  18. "go.uber.org/zap"
  19. "google.golang.org/protobuf/proto"
  20. "strconv"
  21. "time"
  22. )
  23. const MessageLimit = 50 // 最大消息同步数量
  24. const MaxSyncBufLen = 65536 // 最大字节数组长度
  25. type messageService struct{}
  26. var MessageService = new(messageService)
  27. // Sync 消息同步
  28. func (*messageService) Sync(ctx context.Context, userId, seq int64) (*pb.SyncResp, error) {
  29. messages, hasMore, err := MessageService.ListByUserIdAndSeq(ctx, userId, seq)
  30. if err != nil {
  31. return nil, err
  32. }
  33. pbMessages := model.MessagesToPB(messages)
  34. length := len(pbMessages)
  35. resp := &pb.SyncResp{Messages: pbMessages, HasMore: hasMore}
  36. bytes, err := proto.Marshal(resp)
  37. if err != nil {
  38. return nil, err
  39. }
  40. // 如果字节数组大于一个包的长度,需要减少字节数组
  41. for len(bytes) > MaxSyncBufLen {
  42. length = length * 2 / 3
  43. resp = &pb.SyncResp{Messages: pbMessages[0:length], HasMore: true}
  44. bytes, err = proto.Marshal(resp)
  45. if err != nil {
  46. return nil, err
  47. }
  48. }
  49. var userIds = make(map[int64]int32, len(resp.Messages))
  50. for i := range resp.Messages {
  51. if resp.Messages[i].Sender.SenderType == pb.SenderType_ST_USER {
  52. userIds[resp.Messages[i].Sender.SenderId] = 0
  53. }
  54. }
  55. usersResp, err := rpc.GetBusinessIntClient().GetUsers(ctx, &pb.GetUsersReq{UserIds: userIds})
  56. if err != nil {
  57. return nil, err
  58. }
  59. for i := range resp.Messages {
  60. if resp.Messages[i].Sender.SenderType == pb.SenderType_ST_USER {
  61. user, ok := usersResp.Users[resp.Messages[i].Sender.SenderId]
  62. if ok {
  63. resp.Messages[i].Sender.Nickname = user.Nickname
  64. resp.Messages[i].Sender.AvatarUrl = user.AvatarUrl
  65. resp.Messages[i].Sender.Extra = user.Extra
  66. } else {
  67. logger.Logger.Warn("get user failed", zap.Int64("user_id", resp.Messages[i].Sender.SenderId))
  68. }
  69. }
  70. }
  71. return resp, nil
  72. }
  73. // ListByUserIdAndSeq 查询消息
  74. func (*messageService) ListByUserIdAndSeq(ctx context.Context, userId, seq int64) ([]model.Message, bool, error) {
  75. var err error
  76. if seq == 0 {
  77. seq, err = DeviceAckService.GetMaxByUserId(ctx, userId)
  78. if err != nil {
  79. return nil, false, err
  80. }
  81. }
  82. return repo.MessageRepo.ListBySeq(userId, seq, MessageLimit)
  83. }
  84. // SendToUser 将消息发送给用户
  85. func (*messageService) SendToUser(ctx context.Context, sender *pb.Sender, toUserId int64, req *pb.SendMessageReq) (int64, error) {
  86. logger.Logger.Debug("SendToUser",
  87. zap.Int64("request_id", grpclib.GetCtxRequestId(ctx)),
  88. zap.Int64("to_user_id", toUserId))
  89. var (
  90. seq int64 = 0
  91. err error
  92. )
  93. if req.IsPersist {
  94. seq, err = SeqService.GetUserNext(ctx, toUserId)
  95. if err != nil {
  96. return 0, err
  97. }
  98. selfMessage := model.Message{
  99. UserId: toUserId,
  100. RequestId: grpclib.GetCtxRequestId(ctx),
  101. SenderType: int32(sender.SenderType),
  102. SenderId: sender.SenderId,
  103. ReceiverType: int32(req.ReceiverType),
  104. ReceiverId: req.ReceiverId,
  105. ToUserIds: model.FormatUserIds(req.ToUserIds),
  106. Type: int(req.MessageType),
  107. Content: req.MessageContent,
  108. Seq: seq,
  109. SendTime: util.UnunixMilliTime(req.SendTime),
  110. Status: int32(pb.MessageStatus_MS_NORMAL),
  111. }
  112. err = repo.MessageRepo.Save(selfMessage)
  113. if err != nil {
  114. logger.Sugar.Error(err)
  115. return 0, err
  116. }
  117. if sender.SenderType == pb.SenderType_ST_USER && sender.SenderId == toUserId {
  118. // 用户需要增加自己的已经同步的序列号
  119. err = repo.DeviceACKRepo.Set(sender.SenderId, sender.DeviceId, seq)
  120. if err != nil {
  121. return 0, err
  122. }
  123. }
  124. }
  125. message := pb.Message{
  126. Sender: sender,
  127. ReceiverType: req.ReceiverType,
  128. ReceiverId: req.ReceiverId,
  129. ToUserIds: req.ToUserIds,
  130. MessageType: req.MessageType,
  131. MessageContent: req.MessageContent,
  132. Seq: seq,
  133. SendTime: req.SendTime,
  134. Status: pb.MessageStatus_MS_NORMAL,
  135. }
  136. // 查询用户在线设备
  137. devices, err := proxy.DeviceProxy.ListOnlineByUserId(ctx, toUserId)
  138. if err != nil {
  139. logger.Sugar.Error(err)
  140. return 0, err
  141. }
  142. // 查询接受者用户在线设备
  143. receiverDevices, err := proxy.DeviceProxy.ListOnlineByUserId(ctx, req.ReceiverId)
  144. if err != nil {
  145. logger.Sugar.Error(err)
  146. return 0, err
  147. }
  148. isOpenAppPush := svc.SysCfgGet("is_open_app_push")
  149. utils.FilePutContents("SendToUser", utils.SerializeStr(map[string]interface{}{
  150. "devices_len": len(devices),
  151. "sender": sender,
  152. "toUserId": toUserId,
  153. "isOpenAppPush": isOpenAppPush,
  154. "req": req,
  155. }))
  156. if len(receiverDevices) <= 0 {
  157. if sender.SenderType == pb.SenderType_ST_USER && req.ReceiverType == pb.ReceiverType_RT_USER && isOpenAppPush == "1" && req.ReceiverId != toUserId {
  158. uid := strconv.FormatInt(req.ReceiverId, 10)
  159. alia := db.DbUserPushForJg.UserPushForJgGetWithDb(uid)
  160. if alia != "" {
  161. if sender.Nickname == "" {
  162. sendUser, err := repo2.UserRepo.Get(sender.SenderId)
  163. if err != nil {
  164. logger.Sugar.Error(err)
  165. return 0, err
  166. }
  167. sender.Nickname = sendUser.Nickname
  168. sender.AvatarUrl = sendUser.AvatarUrl
  169. }
  170. var pushContent string
  171. if req.MessageType != pb.MessageType_MT_TEXT {
  172. switch req.MessageType {
  173. case pb.MessageType_MT_UNKNOWN:
  174. pushContent = "您收到一条\"未知\"消息"
  175. break
  176. case pb.MessageType_MT_FACE:
  177. pushContent = "您收到一条表情消息"
  178. break
  179. case pb.MessageType_MT_VOICE:
  180. pushContent = "您收到一条语音消息"
  181. break
  182. case pb.MessageType_MT_IMAGE:
  183. pushContent = "您收到一条图片消息"
  184. break
  185. case pb.MessageType_MT_FILE:
  186. pushContent = "您收到一条视频消息"
  187. break
  188. case pb.MessageType_MT_LOCATION:
  189. pushContent = "您收到一条地理位置消息"
  190. break
  191. case pb.MessageType_MT_COMMAND:
  192. pushContent = "您收到一条指令推送消息"
  193. break
  194. case pb.MessageType_MT_CUSTOM:
  195. pushContent = "您收到一条\"自定义\"推送消息"
  196. break
  197. }
  198. } else {
  199. pushContent = string(req.MessageContent)
  200. }
  201. //TODO::接收者类型为`user`, 进行极光推送
  202. CommAddPush(md.PushParams{
  203. Uid: uid,
  204. PushAlia: alia,
  205. Title: sender.Nickname,
  206. Content: pushContent,
  207. PushType: "egg-im",
  208. MessageType: req.MessageType.String(),
  209. SendUserNickname: sender.Nickname,
  210. SendUserAvatarUrl: sender.AvatarUrl,
  211. Memo: sender.SenderType.String(),
  212. Times: time.Now().Format("2006-01-02 15:04:05"),
  213. })
  214. }
  215. }
  216. }
  217. for i := range devices {
  218. if sender.DeviceId == devices[i].DeviceId {
  219. // 消息不需要投递给发送消息的设备
  220. continue
  221. }
  222. err = MessageService.SendToDevice(ctx, devices[i], &message)
  223. if err != nil {
  224. logger.Sugar.Error(err, zap.Any("SendToUser error", devices[i]), zap.Error(err))
  225. }
  226. }
  227. return seq, nil
  228. }
  229. // RecallMessageSendToUser 撤回消息用户
  230. func (*messageService) RecallMessageSendToUser(ctx context.Context, sender *pb.Sender, toUserId int64, req *pb.RecallMessageReq, sendTime time.Time) (int64, error) {
  231. logger.Logger.Debug("SendToUser",
  232. zap.Int64("request_id", grpclib.GetCtxRequestId(ctx)),
  233. zap.Int64("to_user_id", toUserId))
  234. var (
  235. seq int64 = 0
  236. err error
  237. )
  238. //1、改变消息状态
  239. _, err = repo.MessageRepo.UpdateStatus(toUserId, sendTime, int(pb.MessageStatus_MS_RECALL))
  240. if err != nil {
  241. return 0, err
  242. }
  243. //3、发送一条新的消息
  244. if req.IsPersist {
  245. seq, err = SeqService.GetUserNext(ctx, toUserId)
  246. if err != nil {
  247. return 0, err
  248. }
  249. selfMessage := model.Message{
  250. UserId: toUserId,
  251. RequestId: grpclib.GetCtxRequestId(ctx),
  252. SenderType: int32(sender.SenderType),
  253. SenderId: sender.SenderId,
  254. ReceiverType: int32(req.ReceiverType),
  255. ReceiverId: req.ReceiverId,
  256. ToUserIds: model.FormatUserIds(req.ToUserIds),
  257. Type: int(req.MessageType),
  258. Content: req.MessageContent,
  259. Seq: seq,
  260. SendTime: util.UnunixMilliTime(req.SendTime),
  261. Status: int32(pb.MessageStatus_MS_NORMAL),
  262. }
  263. err = repo.MessageRepo.Save(selfMessage)
  264. if err != nil {
  265. logger.Sugar.Error(err)
  266. return 0, err
  267. }
  268. if sender.SenderType == pb.SenderType_ST_USER && sender.SenderId == toUserId {
  269. // 用户需要增加自己的已经同步的序列号
  270. err = repo.DeviceACKRepo.Set(sender.SenderId, sender.DeviceId, seq)
  271. if err != nil {
  272. return 0, err
  273. }
  274. }
  275. }
  276. message := pb.Message{
  277. Sender: sender,
  278. ReceiverType: req.ReceiverType,
  279. ReceiverId: req.ReceiverId,
  280. ToUserIds: req.ToUserIds,
  281. MessageType: req.MessageType,
  282. MessageContent: req.MessageContent,
  283. Seq: seq,
  284. SendTime: req.SendTime,
  285. Status: pb.MessageStatus_MS_NORMAL,
  286. }
  287. // 查询用户在线设备
  288. devices, err := proxy.DeviceProxy.ListOnlineByUserId(ctx, toUserId)
  289. if err != nil {
  290. logger.Sugar.Error(err)
  291. return 0, err
  292. }
  293. for i := range devices {
  294. if sender.DeviceId == devices[i].DeviceId {
  295. // 消息不需要投递给发送消息的设备
  296. continue
  297. }
  298. err = MessageService.SendToDevice(ctx, devices[i], &message)
  299. if err != nil {
  300. logger.Sugar.Error(err, zap.Any("SendToUser error", devices[i]), zap.Error(err))
  301. }
  302. }
  303. return seq, nil
  304. }
  305. // SendRedPackageToUser 发送红包给用户
  306. func (*messageService) SendRedPackageToUser(ctx context.Context, sender *pb.Sender, toUserId int64, req *pb.SendRedPacketReq) (int64, error) {
  307. logger.Logger.Debug("SendRedPackageToUser",
  308. zap.Int64("request_id", grpclib.GetCtxRequestId(ctx)),
  309. zap.Int64("to_user_id", toUserId))
  310. var (
  311. seq int64 = 0
  312. err error
  313. )
  314. //1、发送一条新的消息
  315. seq, err = SeqService.GetUserNext(ctx, toUserId)
  316. if err != nil {
  317. return 0, err
  318. }
  319. selfMessage := model.Message{
  320. UserId: toUserId,
  321. RequestId: grpclib.GetCtxRequestId(ctx),
  322. SenderType: int32(sender.SenderType),
  323. SenderId: sender.SenderId,
  324. ReceiverType: int32(req.ReceiverType),
  325. ReceiverId: req.ReceiverId,
  326. //ToUserIds: model.FormatUserIds(req.ToUserIds),
  327. Type: int(req.MessageType),
  328. Content: req.MessageContent,
  329. Seq: seq,
  330. SendTime: util.UnunixMilliTime(req.SendTime),
  331. Status: int32(pb.MessageStatus_MS_NORMAL),
  332. }
  333. err = repo.MessageRepo.Save(selfMessage)
  334. if err != nil {
  335. logger.Sugar.Error(err)
  336. return 0, err
  337. }
  338. if sender.SenderType == pb.SenderType_ST_USER && sender.SenderId == toUserId {
  339. // 用户需要增加自己的已经同步的序列号
  340. err = repo.DeviceACKRepo.Set(sender.SenderId, sender.DeviceId, seq)
  341. if err != nil {
  342. return 0, err
  343. }
  344. }
  345. message := pb.Message{
  346. Sender: sender,
  347. ReceiverType: req.ReceiverType,
  348. ReceiverId: req.ReceiverId,
  349. //ToUserIds: req.ToUserIds,
  350. MessageType: pb.MessageType_MT_RED_PACKAGE,
  351. MessageContent: req.MessageContent,
  352. Seq: seq,
  353. SendTime: req.SendTime,
  354. Status: pb.MessageStatus_MS_NORMAL,
  355. }
  356. // 查询用户在线设备
  357. devices, err := proxy.DeviceProxy.ListOnlineByUserId(ctx, toUserId)
  358. if err != nil {
  359. logger.Sugar.Error(err)
  360. return 0, err
  361. }
  362. for i := range devices {
  363. if sender.DeviceId == devices[i].DeviceId {
  364. // 消息不需要投递给发送消息的设备
  365. continue
  366. }
  367. err = MessageService.SendToDevice(ctx, devices[i], &message)
  368. if err != nil {
  369. logger.Sugar.Error(err, zap.Any("SendToUser error", devices[i]), zap.Error(err))
  370. }
  371. }
  372. return seq, nil
  373. }
  374. // SendToDevice 将消息发送给设备
  375. func (*messageService) SendToDevice(ctx context.Context, device *pb.Device, message *pb.Message) error {
  376. messageSend := pb.MessageSend{Message: message}
  377. _, err := rpc.GetConnectIntClient().DeliverMessage(picker.ContextWithAddr(ctx, device.ConnAddr), &pb.DeliverMessageReq{
  378. DeviceId: device.DeviceId,
  379. MessageSend: &messageSend,
  380. })
  381. if err != nil {
  382. logger.Logger.Error("SendToDevice error", zap.Error(err))
  383. return err
  384. }
  385. // todo 其他推送厂商
  386. return nil
  387. }
  388. func (*messageService) AddSenderInfo(sender *pb.Sender) {
  389. if sender.SenderType == pb.SenderType_ST_USER {
  390. user, err := rpc.GetBusinessIntClient().GetUser(context.TODO(), &pb.GetUserReq{UserId: sender.SenderId})
  391. if err == nil && user != nil {
  392. sender.AvatarUrl = user.User.AvatarUrl
  393. sender.Nickname = user.User.Nickname
  394. sender.Extra = user.User.Extra
  395. }
  396. }
  397. }