|
- package device
-
- import (
- "context"
- "errors"
- "fmt"
- "gim/internal/business/comm/utils"
- "gim/pkg/db"
- "gim/pkg/gerrors"
- "gim/pkg/grpclib"
- "gim/pkg/logger"
- "gim/pkg/pb"
- "gim/pkg/rpc"
- "time"
-
- "go.uber.org/zap"
- )
-
- type deviceService struct{}
-
- var DeviceService = new(deviceService)
-
- const (
- SignInKey = "%s:gim_sign_in:%s" // 占位符: masterId, key的第一个字母
- SignInExpire = 5 * time.Second
- )
-
- // Register 注册设备
- func (*deviceService) Register(ctx context.Context, device *Device) error {
- err := DeviceDao.Save(device)
- if err != nil {
- return err
- }
-
- return nil
- }
-
- // SignIn 长连接登录
- func (*deviceService) SignIn(ctx context.Context, userId, deviceId int64, token string, connAddr string, clientAddr string) error {
- //TODO::限制操作
- masterId, _ := grpclib.GetCtxMasterId(ctx)
- cacheKey := fmt.Sprintf(SignInKey, masterId, utils.Int64ToStr(userId))
- if db.RedisUtil.Exists(cacheKey) {
- utils.FilePutContents("sign_in", utils.SerializeStr(map[string]interface{}{
- "master_id": masterId,
- "user_id": userId,
- "device_id": deviceId,
- "token": token,
- "conn_addr": connAddr,
- "client_addr": clientAddr,
- "cache_key": cacheKey,
- }))
- return errors.New("限频!!!")
- } else {
- err := db.RedisUtil.Set(cacheKey, "already", SignInExpire)
- if err != nil {
- return gerrors.WrapError(err)
- }
- }
-
- _, err := rpc.GetBusinessIntClient().Auth(ctx, &pb.AuthReq{UserId: userId, DeviceId: deviceId, Token: token})
- if err != nil {
- return err
- }
-
- // 标记用户在设备上登录
- device, err := DeviceRepo.Get(deviceId)
- if err != nil {
- return err
- }
- if device == nil {
- return nil
- }
-
- device.Online(userId, connAddr, clientAddr)
-
- err = DeviceRepo.Save(device)
- if err != nil {
- return err
- }
- return nil
- }
-
- // Auth 权限验证
- func (*deviceService) Auth(ctx context.Context, userId, deviceId int64, token string) error {
- _, err := rpc.GetBusinessIntClient().Auth(ctx, &pb.AuthReq{UserId: userId, DeviceId: deviceId, Token: token})
- if err != nil {
- return err
- }
- return nil
- }
-
- func (*deviceService) ListOnlineByUserId(ctx context.Context, userId int64) ([]*pb.Device, error) {
- devices, err := DeviceRepo.ListOnlineByUserId(userId)
- if err != nil {
- return nil, err
- }
- pbDevices := make([]*pb.Device, len(devices))
- for i := range devices {
- pbDevices[i] = devices[i].ToProto()
- }
- return pbDevices, nil
- }
-
- // ServerStop connect服务停止,需要将连接在这台connect上的设备标记为下线
- func (*deviceService) ServerStop(ctx context.Context, connAddr string) error {
- devices, err := DeviceRepo.ListOnlineByConnAddr(connAddr)
- if err != nil {
- return err
- }
-
- for i := range devices {
- // 因为是异步修改设备转台,要避免设备重连,导致状态不一致
- err = DeviceRepo.UpdateStatusOffline(devices[i])
- if err != nil {
- logger.Logger.Error("DeviceRepo.Save error", zap.Any("device", devices[i]), zap.Error(err))
- }
- time.Sleep(2 * time.Millisecond)
- }
- return nil
- }
|