You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

262 lines
5.9 KiB

  1. package connect
  2. import (
  3. "container/list"
  4. "context"
  5. "egg_im/config"
  6. "egg_im/pkg/grpclib"
  7. "egg_im/pkg/logger"
  8. "egg_im/pkg/pb"
  9. "egg_im/pkg/rpc"
  10. "sync"
  11. "time"
  12. "go.uber.org/zap"
  13. "google.golang.org/grpc/status"
  14. "google.golang.org/protobuf/proto"
  15. "github.com/alberliu/gn"
  16. "github.com/gorilla/websocket"
  17. )
  18. const (
  19. CoonTypeTCP int8 = 1 // tcp连接
  20. ConnTypeWS int8 = 2 // websocket连接
  21. )
  22. type Conn struct {
  23. CoonType int8 // 连接类型
  24. TCP *gn.Conn // tcp连接
  25. WSMutex sync.Mutex // WS写锁
  26. WS *websocket.Conn // websocket连接
  27. UserId int64 // 用户ID
  28. DeviceId int64 // 设备ID
  29. RoomId int64 // 订阅的房间ID
  30. Element *list.Element // 链表节点
  31. }
  32. // Write 写入数据
  33. func (c *Conn) Write(bytes []byte) error {
  34. if c.CoonType == CoonTypeTCP {
  35. return c.TCP.WriteWithEncoder(bytes)
  36. } else if c.CoonType == ConnTypeWS {
  37. return c.WriteToWS(bytes)
  38. }
  39. logger.Logger.Error("unknown conn type", zap.Any("conn", c))
  40. return nil
  41. }
  42. // WriteToWS 消息写入WebSocket
  43. func (c *Conn) WriteToWS(bytes []byte) error {
  44. c.WSMutex.Lock()
  45. defer c.WSMutex.Unlock()
  46. err := c.WS.SetWriteDeadline(time.Now().Add(10 * time.Millisecond))
  47. if err != nil {
  48. return err
  49. }
  50. return c.WS.WriteMessage(websocket.BinaryMessage, bytes)
  51. }
  52. // Close 关闭
  53. func (c *Conn) Close() error {
  54. // 取消设备和连接的对应关系
  55. if c.DeviceId != 0 {
  56. DeleteConn(c.DeviceId)
  57. }
  58. // 取消订阅,需要异步出去,防止重复加锁造成死锁
  59. go func() {
  60. SubscribedRoom(c, 0)
  61. }()
  62. if c.DeviceId != 0 {
  63. _, _ = rpc.GetLogicIntClient().Offline(context.TODO(), &pb.OfflineReq{
  64. UserId: c.UserId,
  65. DeviceId: c.DeviceId,
  66. ClientAddr: c.GetAddr(),
  67. })
  68. }
  69. if c.CoonType == CoonTypeTCP {
  70. c.TCP.Close()
  71. } else if c.CoonType == ConnTypeWS {
  72. return c.WS.Close()
  73. }
  74. return nil
  75. }
  76. func (c *Conn) GetAddr() string {
  77. if c.CoonType == CoonTypeTCP {
  78. return c.TCP.GetAddr()
  79. } else if c.CoonType == ConnTypeWS {
  80. return c.WS.RemoteAddr().String()
  81. }
  82. return ""
  83. }
  84. // HandleMessage 消息处理
  85. func (c *Conn) HandleMessage(bytes []byte) {
  86. var input = new(pb.Input)
  87. err := proto.Unmarshal(bytes, input)
  88. if err != nil {
  89. logger.Logger.Error("unmarshal error", zap.Error(err))
  90. return
  91. }
  92. logger.Logger.Debug("HandleMessage", zap.Any("input", input))
  93. // 对未登录的用户进行拦截
  94. if input.Type != pb.PackageType_PT_SIGN_IN && c.UserId == 0 {
  95. // 应该告诉用户没有登录
  96. return
  97. }
  98. switch input.Type {
  99. case pb.PackageType_PT_SIGN_IN:
  100. c.SignIn(input)
  101. case pb.PackageType_PT_SYNC:
  102. c.Sync(input)
  103. case pb.PackageType_PT_HEARTBEAT:
  104. c.Heartbeat(input)
  105. case pb.PackageType_PT_MESSAGE:
  106. c.MessageACK(input)
  107. case pb.PackageType_PT_SUBSCRIBE_ROOM:
  108. c.SubscribedRoom(input)
  109. default:
  110. logger.Logger.Error("handler switch other")
  111. }
  112. }
  113. // Send 下发消息
  114. func (c *Conn) Send(pt pb.PackageType, requestId int64, message proto.Message, err error) {
  115. var output = pb.Output{
  116. Type: pt,
  117. RequestId: requestId,
  118. }
  119. if err != nil {
  120. status, _ := status.FromError(err)
  121. output.Code = int32(status.Code())
  122. output.Message = status.Message()
  123. }
  124. if message != nil {
  125. msgBytes, err := proto.Marshal(message)
  126. if err != nil {
  127. logger.Sugar.Error(err)
  128. return
  129. }
  130. output.Data = msgBytes
  131. }
  132. outputBytes, err := proto.Marshal(&output)
  133. if err != nil {
  134. logger.Sugar.Error(err)
  135. return
  136. }
  137. err = c.Write(outputBytes)
  138. if err != nil {
  139. logger.Sugar.Error(err)
  140. c.Close()
  141. return
  142. }
  143. }
  144. // SignIn 登录
  145. func (c *Conn) SignIn(input *pb.Input) {
  146. var signIn pb.SignInInput
  147. err := proto.Unmarshal(input.Data, &signIn)
  148. if err != nil {
  149. logger.Sugar.Error(err)
  150. return
  151. }
  152. _, err = rpc.GetLogicIntClient().ConnSignIn(grpclib.ContextWithRequestId(context.TODO(), input.RequestId), &pb.ConnSignInReq{
  153. UserId: signIn.UserId,
  154. DeviceId: signIn.DeviceId,
  155. Token: signIn.Token,
  156. ConnAddr: config.LocalAddr,
  157. ClientAddr: c.GetAddr(),
  158. })
  159. c.Send(pb.PackageType_PT_SIGN_IN, input.RequestId, nil, err)
  160. if err != nil {
  161. return
  162. }
  163. c.UserId = signIn.UserId
  164. c.DeviceId = signIn.DeviceId
  165. SetConn(signIn.DeviceId, c)
  166. }
  167. // Sync 消息同步
  168. func (c *Conn) Sync(input *pb.Input) {
  169. var sync pb.SyncInput
  170. err := proto.Unmarshal(input.Data, &sync)
  171. if err != nil {
  172. logger.Sugar.Error(err)
  173. return
  174. }
  175. resp, err := rpc.GetLogicIntClient().Sync(grpclib.ContextWithRequestId(context.TODO(), input.RequestId), &pb.SyncReq{
  176. UserId: c.UserId,
  177. DeviceId: c.DeviceId,
  178. Seq: sync.Seq,
  179. })
  180. var message proto.Message
  181. if err == nil {
  182. message = &pb.SyncOutput{Messages: resp.Messages, HasMore: resp.HasMore}
  183. }
  184. c.Send(pb.PackageType_PT_SYNC, input.RequestId, message, err)
  185. }
  186. // Heartbeat 心跳
  187. func (c *Conn) Heartbeat(input *pb.Input) {
  188. c.Send(pb.PackageType_PT_HEARTBEAT, input.RequestId, nil, nil)
  189. logger.Sugar.Infow("heartbeat", "device_id", c.DeviceId, "user_id", c.UserId)
  190. }
  191. // MessageACK 消息收到回执
  192. func (c *Conn) MessageACK(input *pb.Input) {
  193. var messageACK pb.MessageACK
  194. err := proto.Unmarshal(input.Data, &messageACK)
  195. if err != nil {
  196. logger.Sugar.Error(err)
  197. return
  198. }
  199. _, _ = rpc.GetLogicIntClient().MessageACK(grpclib.ContextWithRequestId(context.TODO(), input.RequestId), &pb.MessageACKReq{
  200. UserId: c.UserId,
  201. DeviceId: c.DeviceId,
  202. DeviceAck: messageACK.DeviceAck,
  203. ReceiveTime: messageACK.ReceiveTime,
  204. })
  205. }
  206. // SubscribedRoom 订阅房间
  207. func (c *Conn) SubscribedRoom(input *pb.Input) {
  208. var subscribeRoom pb.SubscribeRoomInput
  209. err := proto.Unmarshal(input.Data, &subscribeRoom)
  210. if err != nil {
  211. logger.Sugar.Error(err)
  212. return
  213. }
  214. SubscribedRoom(c, subscribeRoom.RoomId)
  215. c.Send(pb.PackageType_PT_SUBSCRIBE_ROOM, input.RequestId, nil, nil)
  216. _, err = rpc.GetLogicIntClient().SubscribeRoom(context.TODO(), &pb.SubscribeRoomReq{
  217. UserId: c.UserId,
  218. DeviceId: c.DeviceId,
  219. RoomId: subscribeRoom.RoomId,
  220. Seq: subscribeRoom.Seq,
  221. ConnAddr: config.LocalAddr,
  222. })
  223. if err != nil {
  224. logger.Logger.Error("SubscribedRoom error", zap.Error(err))
  225. }
  226. }