golang-im聊天
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

122 lines
2.9 KiB

  1. package device
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "gim/internal/business/comm/utils"
  7. "gim/pkg/db"
  8. "gim/pkg/gerrors"
  9. "gim/pkg/grpclib"
  10. "gim/pkg/logger"
  11. "gim/pkg/pb"
  12. "gim/pkg/rpc"
  13. "time"
  14. "go.uber.org/zap"
  15. )
  16. type deviceService struct{}
  17. var DeviceService = new(deviceService)
  18. const (
  19. SignInKey = "%s:gim_sign_in:%s" // 占位符: masterId, key的第一个字母
  20. SignInExpire = 5 * time.Second
  21. )
  22. // Register 注册设备
  23. func (*deviceService) Register(ctx context.Context, device *Device) error {
  24. err := DeviceDao.Save(device)
  25. if err != nil {
  26. return err
  27. }
  28. return nil
  29. }
  30. // SignIn 长连接登录
  31. func (*deviceService) SignIn(ctx context.Context, userId, deviceId int64, token string, connAddr string, clientAddr string) error {
  32. //TODO::限制操作
  33. masterId, _ := grpclib.GetCtxMasterId(ctx)
  34. cacheKey := fmt.Sprintf(SignInKey, masterId, utils.Int64ToStr(userId))
  35. if db.RedisUtil.Exists(cacheKey) {
  36. utils.FilePutContents("sign_in", utils.SerializeStr(map[string]interface{}{
  37. "master_id": masterId,
  38. "user_id": userId,
  39. "device_id": deviceId,
  40. "token": token,
  41. "conn_addr": connAddr,
  42. "client_addr": clientAddr,
  43. "cache_key": cacheKey,
  44. }))
  45. return errors.New("限频!!!")
  46. } else {
  47. err := db.RedisUtil.Set(cacheKey, "already", SignInExpire)
  48. if err != nil {
  49. return gerrors.WrapError(err)
  50. }
  51. }
  52. _, err := rpc.GetBusinessIntClient().Auth(ctx, &pb.AuthReq{UserId: userId, DeviceId: deviceId, Token: token})
  53. if err != nil {
  54. return err
  55. }
  56. // 标记用户在设备上登录
  57. device, err := DeviceRepo.Get(deviceId)
  58. if err != nil {
  59. return err
  60. }
  61. if device == nil {
  62. return nil
  63. }
  64. device.Online(userId, connAddr, clientAddr)
  65. err = DeviceRepo.Save(device)
  66. if err != nil {
  67. return err
  68. }
  69. return nil
  70. }
  71. // Auth 权限验证
  72. func (*deviceService) Auth(ctx context.Context, userId, deviceId int64, token string) error {
  73. _, err := rpc.GetBusinessIntClient().Auth(ctx, &pb.AuthReq{UserId: userId, DeviceId: deviceId, Token: token})
  74. if err != nil {
  75. return err
  76. }
  77. return nil
  78. }
  79. func (*deviceService) ListOnlineByUserId(ctx context.Context, userId int64) ([]*pb.Device, error) {
  80. devices, err := DeviceRepo.ListOnlineByUserId(userId)
  81. if err != nil {
  82. return nil, err
  83. }
  84. pbDevices := make([]*pb.Device, len(devices))
  85. for i := range devices {
  86. pbDevices[i] = devices[i].ToProto()
  87. }
  88. return pbDevices, nil
  89. }
  90. // ServerStop connect服务停止,需要将连接在这台connect上的设备标记为下线
  91. func (*deviceService) ServerStop(ctx context.Context, connAddr string) error {
  92. devices, err := DeviceRepo.ListOnlineByConnAddr(connAddr)
  93. if err != nil {
  94. return err
  95. }
  96. for i := range devices {
  97. // 因为是异步修改设备转台,要避免设备重连,导致状态不一致
  98. err = DeviceRepo.UpdateStatusOffline(devices[i])
  99. if err != nil {
  100. logger.Logger.Error("DeviceRepo.Save error", zap.Any("device", devices[i]), zap.Error(err))
  101. }
  102. time.Sleep(2 * time.Millisecond)
  103. }
  104. return nil
  105. }