package connect import ( "container/list" "egg_im/pkg/pb" "sync" ) var RoomsManager sync.Map // SubscribedRoom 订阅房间 func SubscribedRoom(conn *Conn, roomId int64) { if roomId == conn.RoomId { return } oldRoomId := conn.RoomId // 取消订阅 if oldRoomId != 0 { value, ok := RoomsManager.Load(oldRoomId) if !ok { return } room := value.(*Room) room.Unsubscribe(conn) if room.Conns.Front() == nil { RoomsManager.Delete(oldRoomId) } return } // 订阅 if roomId != 0 { value, ok := RoomsManager.Load(roomId) var room *Room if !ok { room = NewRoom(roomId) RoomsManager.Store(roomId, room) } else { room = value.(*Room) } room.Subscribe(conn) return } } // PushRoom 房间消息推送 func PushRoom(roomId int64, message *pb.MessageSend) { value, ok := RoomsManager.Load(roomId) if !ok { return } value.(*Room).Push(message) } type Room struct { RoomId int64 // 房间ID Conns *list.List // 订阅房间消息的连接 lock sync.RWMutex } func NewRoom(roomId int64) *Room { return &Room{ RoomId: roomId, Conns: list.New(), } } // Subscribe 订阅房间 func (r *Room) Subscribe(conn *Conn) { r.lock.Lock() defer r.lock.Unlock() conn.Element = r.Conns.PushBack(conn) conn.RoomId = r.RoomId } // Unsubscribe 取消订阅 func (r *Room) Unsubscribe(conn *Conn) { r.lock.Lock() defer r.lock.Unlock() r.Conns.Remove(conn.Element) conn.Element = nil conn.RoomId = 0 } // Push 推送消息到房间 func (r *Room) Push(message *pb.MessageSend) { r.lock.RLock() defer r.lock.RUnlock() element := r.Conns.Front() for { conn := element.Value.(*Conn) conn.Send(pb.PackageType_PT_MESSAGE, 0, message, nil) element = element.Next() if element == nil { break } } }