package hub import ( "net/http" "sync" "github.com/gorilla/websocket" "websocket-relay/internal/logging" "websocket-relay/internal/metrics" ) var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } type Hub struct { clients map[*websocket.Conn]bool broadcast chan []byte register chan *websocket.Conn unregister chan *websocket.Conn stop chan struct{} mu sync.RWMutex logger *logging.Logger } func New(logger *logging.Logger) *Hub { return &Hub{ clients: make(map[*websocket.Conn]bool), broadcast: make(chan []byte), register: make(chan *websocket.Conn), unregister: make(chan *websocket.Conn), stop: make(chan struct{}), logger: logger, } } func (h *Hub) Run() { for { select { case <-h.stop: h.mu.Lock() for conn := range h.clients { conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseGoingAway, "server shutting down")) conn.Close() delete(h.clients, conn) } h.mu.Unlock() metrics.ConnectedClients.Set(0) h.logger.Info("Hub stopped, all clients disconnected") return case conn := <-h.register: h.mu.Lock() h.clients[conn] = true h.mu.Unlock() metrics.ConnectedClients.Set(float64(len(h.clients))) metrics.ConnectionsTotal.Inc() h.logger.Infof("Client connected. Total: %d", len(h.clients)) case conn := <-h.unregister: h.mu.Lock() if _, ok := h.clients[conn]; ok { delete(h.clients, conn) conn.Close() } h.mu.Unlock() metrics.ConnectedClients.Set(float64(len(h.clients))) metrics.DisconnectionsTotal.Inc() h.logger.Infof("Client disconnected. Total: %d", len(h.clients)) case message := <-h.broadcast: metrics.MessagesTotal.Inc() h.mu.RLock() var failed []*websocket.Conn for conn := range h.clients { if err := conn.WriteMessage(websocket.TextMessage, message); err != nil { failed = append(failed, conn) } } h.mu.RUnlock() // Remove failed clients properly so metrics stay consistent for _, conn := range failed { h.mu.Lock() if _, ok := h.clients[conn]; ok { delete(h.clients, conn) conn.Close() metrics.ConnectedClients.Set(float64(len(h.clients))) metrics.DisconnectionsTotal.Inc() h.logger.Warnf("Client disconnected (write error). Total: %d", len(h.clients)) } h.mu.Unlock() } } } } // Shutdown gracefully stops the hub, closing all client connections. func (h *Hub) Shutdown() { close(h.stop) } func (h *Hub) HandleWebSocket(w http.ResponseWriter, r *http.Request) { conn, err := upgrader.Upgrade(w, r, nil) if err != nil { h.logger.Errorf("WebSocket upgrade error: %v", err) return } h.register <- conn go func() { defer func() { h.unregister <- conn }() for { _, message, err := conn.ReadMessage() if err != nil { break } h.broadcast <- message } }() } func (h *Hub) ClientCount() int { h.mu.RLock() defer h.mu.RUnlock() return len(h.clients) }