golang-im聊天
Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.

device_service.go 2.7 KiB

2 år sedan
2 år sedan
2 år sedan
2 år sedan
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package device
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "gim/internal/business/comm/utils"
  7. "gim/pkg/db"
  8. "gim/pkg/gerrors"
  9. "gim/pkg/grpclib"
  10. "gim/pkg/logger"
  11. "gim/pkg/pb"
  12. "gim/pkg/rpc"
  13. "time"
  14. "go.uber.org/zap"
  15. )
  16. type deviceService struct{}
  17. var DeviceService = new(deviceService)
  18. const (
  19. SignInKey = "%s:gim_sign_in:%s" // 占位符: masterId, key的第一个字母
  20. SignInExpire = 30 * time.Second
  21. )
  22. // Register 注册设备
  23. func (*deviceService) Register(ctx context.Context, device *Device) error {
  24. err := DeviceDao.Save(device)
  25. if err != nil {
  26. return err
  27. }
  28. return nil
  29. }
  30. // SignIn 长连接登录
  31. func (*deviceService) SignIn(ctx context.Context, userId, deviceId int64, token string, connAddr string, clientAddr string) error {
  32. //TODO::限制操作
  33. masterId, _ := grpclib.GetCtxMasterId(ctx)
  34. cacheKey := fmt.Sprintf(SignInKey, masterId, utils.Int64ToStr(userId))
  35. if db.RedisUtil.Exists(cacheKey) {
  36. return errors.New("限频!!!")
  37. } else {
  38. err := db.RedisUtil.Set(SignInKey, "already", SignInExpire)
  39. if err != nil {
  40. return gerrors.WrapError(err)
  41. }
  42. }
  43. _, err := rpc.GetBusinessIntClient().Auth(ctx, &pb.AuthReq{UserId: userId, DeviceId: deviceId, Token: token})
  44. if err != nil {
  45. return err
  46. }
  47. // 标记用户在设备上登录
  48. device, err := DeviceRepo.Get(deviceId)
  49. if err != nil {
  50. return err
  51. }
  52. if device == nil {
  53. return nil
  54. }
  55. device.Online(userId, connAddr, clientAddr)
  56. err = DeviceRepo.Save(device)
  57. if err != nil {
  58. return err
  59. }
  60. return nil
  61. }
  62. // Auth 权限验证
  63. func (*deviceService) Auth(ctx context.Context, userId, deviceId int64, token string) error {
  64. _, err := rpc.GetBusinessIntClient().Auth(ctx, &pb.AuthReq{UserId: userId, DeviceId: deviceId, Token: token})
  65. if err != nil {
  66. return err
  67. }
  68. return nil
  69. }
  70. func (*deviceService) ListOnlineByUserId(ctx context.Context, userId int64) ([]*pb.Device, error) {
  71. devices, err := DeviceRepo.ListOnlineByUserId(userId)
  72. if err != nil {
  73. return nil, err
  74. }
  75. pbDevices := make([]*pb.Device, len(devices))
  76. for i := range devices {
  77. pbDevices[i] = devices[i].ToProto()
  78. }
  79. return pbDevices, nil
  80. }
  81. // ServerStop connect服务停止,需要将连接在这台connect上的设备标记为下线
  82. func (*deviceService) ServerStop(ctx context.Context, connAddr string) error {
  83. devices, err := DeviceRepo.ListOnlineByConnAddr(connAddr)
  84. if err != nil {
  85. return err
  86. }
  87. for i := range devices {
  88. // 因为是异步修改设备转台,要避免设备重连,导致状态不一致
  89. err = DeviceRepo.UpdateStatusOffline(devices[i])
  90. if err != nil {
  91. logger.Logger.Error("DeviceRepo.Save error", zap.Any("device", devices[i]), zap.Error(err))
  92. }
  93. time.Sleep(2 * time.Millisecond)
  94. }
  95. return nil
  96. }