From 905c241daaa777e12c7d76b452e8ed7ddb9c7883 Mon Sep 17 00:00:00 2001 From: savinmax Date: Thu, 11 Jun 2026 19:14:19 +0200 Subject: [PATCH] Improve reliability, testing, and documentation - Fix metrics: change MessagesTotal, ConnectionsTotal, DisconnectionsTotal from Gauge to Counter with proper _total naming convention - Fix broadcast write-error handling: failed clients now get properly removed with accurate metrics updates - Add graceful shutdown: SIGINT/SIGTERM handling with 10s timeout, CloseGoingAway frame sent to clients before disconnect - Add integration tests: 11 tests using real WebSocket connections covering connect, broadcast, disconnect, concurrency, and shutdown - Fix example client port: changed from 8000 to 8443 to match config - Rewrite README.md to reflect current features and usage - Add AGENTS.md and .agents/summary/ documentation for AI assistants --- .agents/summary/.last_commit | 1 + .agents/summary/architecture.md | 134 ++++++++++ .agents/summary/codebase_info.md | 74 ++++++ .agents/summary/components.md | 147 +++++++++++ .agents/summary/data_models.md | 142 +++++++++++ .agents/summary/dependencies.md | 104 ++++++++ .agents/summary/index.md | 112 +++++++++ .agents/summary/interfaces.md | 147 +++++++++++ .agents/summary/review_notes.md | 75 ++++++ .agents/summary/workflows.md | 140 +++++++++++ AGENTS.md | 202 +++++++++++++++ README.md | 123 ++++++++-- example/index.html | 4 +- internal/hub/hub.go | 39 ++- internal/hub/hub_integration_test.go | 355 +++++++++++++++++++++++++++ internal/hub/hub_test.go | 29 ++- internal/metrics/metrics.go | 18 +- main.go | 70 +++++- 18 files changed, 1875 insertions(+), 41 deletions(-) create mode 100644 .agents/summary/.last_commit create mode 100644 .agents/summary/architecture.md create mode 100644 .agents/summary/codebase_info.md create mode 100644 .agents/summary/components.md create mode 100644 .agents/summary/data_models.md create mode 100644 .agents/summary/dependencies.md create mode 100644 .agents/summary/index.md create mode 100644 .agents/summary/interfaces.md create mode 100644 .agents/summary/review_notes.md create mode 100644 .agents/summary/workflows.md create mode 100644 AGENTS.md create mode 100644 internal/hub/hub_integration_test.go diff --git a/.agents/summary/.last_commit b/.agents/summary/.last_commit new file mode 100644 index 0000000..ef0474f --- /dev/null +++ b/.agents/summary/.last_commit @@ -0,0 +1 @@ +f69355d69d25687624c22c441aaf9fd12e20140b diff --git a/.agents/summary/architecture.md b/.agents/summary/architecture.md new file mode 100644 index 0000000..6cdf0de --- /dev/null +++ b/.agents/summary/architecture.md @@ -0,0 +1,134 @@ +# Architecture + +## System Architecture Overview + +```mermaid +graph TB + subgraph Clients + C1[WebSocket Client 1] + C2[WebSocket Client 2] + C3[WebSocket Client N] + end + + subgraph "WebSocket Relay Server" + EP[HTTP/TLS Endpoint] + HUB[Hub - Connection Manager] + BC[Broadcast Channel] + MET[Prometheus Metrics] + end + + subgraph Monitoring + PROM[Prometheus Scraper] + end + + C1 -->|ws/wss| EP + C2 -->|ws/wss| EP + C3 -->|ws/wss| EP + EP --> HUB + HUB --> BC + BC -->|relay to all| C1 + BC -->|relay to all| C2 + BC -->|relay to all| C3 + HUB --> MET + MET -->|:9090/metrics| PROM +``` + +## Design Pattern: Hub-and-Spoke + +The application uses a **Hub-and-Spoke** (fan-out) pattern where: + +1. **Hub** is the central coordinator managing all WebSocket connections +2. **Spokes** are individual WebSocket client connections +3. Every message received from any client is **broadcast to all connected clients** + +```mermaid +graph LR + subgraph Hub + REG[Register Channel] + UNREG[Unregister Channel] + BCAST[Broadcast Channel] + CLIENTS[Client Map] + end + + CONN[New Connection] --> REG + REG --> CLIENTS + DISC[Disconnection] --> UNREG + UNREG --> CLIENTS + MSG[Incoming Message] --> BCAST + BCAST --> CLIENTS +``` + +## Concurrency Model + +The server uses Go's CSP (Communicating Sequential Processes) concurrency model: + +```mermaid +sequenceDiagram + participant Client + participant Handler as HTTP Handler + participant Hub as Hub.Run() goroutine + participant Reader as ReadMessage goroutine + + Client->>Handler: HTTP Upgrade Request + Handler->>Hub: register <- conn + Hub->>Hub: Add to clients map + Handler->>Reader: Start goroutine + + loop Read Messages + Client->>Reader: WebSocket Frame + Reader->>Hub: broadcast <- message + Hub->>Hub: Iterate clients map + Hub->>Client: WriteMessage (fan-out) + end + + Reader->>Hub: unregister <- conn (on error/close) + Hub->>Hub: Remove from clients map +``` + +### Goroutine Lifecycle + +| Goroutine | Purpose | Lifetime | +|-----------|---------|----------| +| `main` | HTTP server, accepts connections | Application lifetime | +| `Hub.Run()` | Processes register/unregister/broadcast channels | Application lifetime | +| Per-client reader | Reads messages from a single client | Client connection lifetime | +| Metrics server | Serves `/metrics` endpoint | Application lifetime (if enabled) | + +## Configuration Architecture + +```mermaid +graph LR + CLI[CLI Flag: --config-file] --> LOAD[config.Load] + LOAD --> YAML[YAML Parser] + YAML --> CFG[Config Struct] + CFG --> SRV[Server Setup] + CFG --> TLS[TLS Config] + CFG --> MET[Metrics Setup] +``` + +## Security Model + +- **TLS Support**: Optional TLS via cert/key PEM files +- **Origin Check**: `CheckOrigin` allows all origins (permissive for relay use case) +- **No Authentication**: The relay is designed as a transparent message forwarder +- **No Authorization**: All connected clients can send/receive all messages + +## Deployment Architecture + +```mermaid +graph TB + subgraph "Build Pipeline" + SRC[Source Code] --> CI[Gitea CI] + CI --> TEST[go test] + CI --> LINT[golangci-lint] + TAG[Git Tag v*] --> REL[Release Pipeline] + REL --> BIN_L[Linux amd64 Binary] + REL --> BIN_M[macOS arm64 Binary] + end + + subgraph "Runtime" + BIN[Binary] --> CFG[config.yaml] + CFG --> SERVER[WebSocket Server :8443] + CFG --> METRICS[Metrics Server :9090] + end +``` diff --git a/.agents/summary/codebase_info.md b/.agents/summary/codebase_info.md new file mode 100644 index 0000000..73e5c45 --- /dev/null +++ b/.agents/summary/codebase_info.md @@ -0,0 +1,74 @@ +# Codebase Information + +## Project Overview + +| Field | Value | +|-------|-------| +| **Name** | websocket-relay | +| **Language** | Go 1.21 | +| **Type** | WebSocket relay server | +| **License** | Not specified | +| **Repository** | Gitea-hosted | + +## Directory Structure + +``` +websocket-relay/ +├── main.go # Application entry point +├── go.mod # Go module definition +├── go.sum # Dependency checksums +├── config.yaml # Runtime configuration +├── config.example.yaml # Example configuration with TLS enabled +├── Makefile # Build, test, release commands +├── cert.pem # TLS certificate (local dev) +├── key.pem # TLS private key (local dev) +├── README.md # Project readme +├── .gitignore # Git ignore rules +├── example/ +│ └── index.html # Browser-based P2P chat demo +├── internal/ +│ ├── config/ +│ │ ├── config.go # YAML configuration loader +│ │ └── config_test.go # Config loader tests +│ ├── hub/ +│ │ ├── hub.go # WebSocket hub (connection management + broadcast) +│ │ └── hub_test.go # Hub unit tests +│ └── metrics/ +│ └── metrics.go # Prometheus metrics definitions +└── .gitea/ + └── workflows/ + ├── ci.yml # CI pipeline (test + lint) + └── release.yml # Release pipeline (build + publish) +``` + +## Technology Stack + +| Category | Technology | Version | +|----------|-----------|---------| +| Runtime | Go | 1.21 | +| WebSocket | gorilla/websocket | 1.5.1 | +| Metrics | prometheus/client_golang | 1.17.0 | +| Configuration | gopkg.in/yaml.v3 | 3.0.1 | +| CI/CD | Gitea Actions | — | +| Linting | golangci-lint | latest | + +## Build Targets + +| Command | Description | +|---------|-------------| +| `make build` | Build binary to `build/websocket-relay` | +| `make test` | Run all tests with verbose output | +| `make release` | Cross-compile for linux/amd64 and darwin/arm64 | +| `make clean` | Remove build artifacts | +| `make run` | Run from source | +| `make deps` | Tidy Go modules | + +## Key Metrics + +| Metric | Value | +|--------|-------| +| Total Go files | 5 (+ 2 test files) | +| Packages | 4 (`main`, `config`, `hub`, `metrics`) | +| Test files | 2 | +| CI Pipelines | 2 (CI + Release) | +| External dependencies | 3 direct, 9 indirect | diff --git a/.agents/summary/components.md b/.agents/summary/components.md new file mode 100644 index 0000000..123567a --- /dev/null +++ b/.agents/summary/components.md @@ -0,0 +1,147 @@ +# Components + +## Component Overview + +```mermaid +graph TB + subgraph "main (Entry Point)" + MAIN[main.go] + end + + subgraph "internal/config" + CFG[Config Loader] + end + + subgraph "internal/hub" + HUB[Hub Manager] + WS[WebSocket Handler] + end + + subgraph "internal/metrics" + MET[Prometheus Metrics] + end + + MAIN --> CFG + MAIN --> HUB + MAIN --> MET + HUB --> WS + HUB --> MET +``` + +## Package: `main` + +**File:** `main.go` + +**Responsibility:** Application entry point and server initialization. + +**Behavior:** +1. Parses CLI flags (`--config-file`) +2. Loads YAML configuration +3. Creates and starts the Hub +4. Optionally starts the metrics HTTP server on a separate port +5. Starts the WebSocket HTTP/TLS server + +**Dependencies:** `internal/config`, `internal/hub`, `prometheus/client_golang` + +--- + +## Package: `internal/hub` + +**File:** `internal/hub/hub.go` + +**Responsibility:** WebSocket connection lifecycle management and message broadcasting. + +### Struct: `Hub` + +| Field | Type | Purpose | +|-------|------|---------| +| `clients` | `map[*websocket.Conn]bool` | Set of active connections | +| `broadcast` | `chan []byte` | Channel for messages to relay | +| `register` | `chan *websocket.Conn` | Channel for new connections | +| `unregister` | `chan *websocket.Conn` | Channel for disconnections | +| `mu` | `sync.RWMutex` | Protects the clients map | + +### Methods + +| Method | Signature | Description | +|--------|-----------|-------------| +| `New` | `func New() *Hub` | Constructor, initializes all channels and map | +| `Run` | `func (h *Hub) Run()` | Main event loop processing channels (blocking) | +| `HandleWebSocket` | `func (h *Hub) HandleWebSocket(w, r)` | HTTP handler — upgrades connection and starts reader | +| `ClientCount` | `func (h *Hub) ClientCount() int` | Returns current connected client count (thread-safe) | + +### Connection Flow + +```mermaid +stateDiagram-v2 + [*] --> HTTPRequest: Client connects + HTTPRequest --> Upgraded: WebSocket upgrade + Upgraded --> Registered: register channel + Registered --> Reading: goroutine loop + Reading --> Broadcasting: message received + Broadcasting --> Reading: continue + Reading --> Unregistered: error/close + Unregistered --> [*]: connection cleaned up +``` + +--- + +## Package: `internal/config` + +**File:** `internal/config/config.go` + +**Responsibility:** YAML configuration file loading and parsing. + +### Struct: `Config` + +```go +type Config struct { + Server struct { + Port int + TLS struct { + Enabled bool + CertFile string + KeyFile string + } + } + Metrics struct { + Enabled bool + Port int + } +} +``` + +### Functions + +| Function | Signature | Description | +|----------|-----------|-------------| +| `Load` | `func Load(filename string) (*Config, error)` | Reads and parses YAML config file | + +--- + +## Package: `internal/metrics` + +**File:** `internal/metrics/metrics.go` + +**Responsibility:** Prometheus metrics registration and exposure. + +### Metrics Defined + +| Variable | Prometheus Type | Metric Name | Description | +|----------|----------------|-------------|-------------| +| `ConnectedClients` | Gauge | `websocket_connected_clients` | Current number of connected clients | +| `MessagesTotal` | Gauge | `websocket_message` | Total messages processed | +| `ConnectionsTotal` | Gauge | `websocket_connection` | Total connections established | +| `DisconnectionsTotal` | Gauge | `websocket_disconnection` | Total disconnections | + +> **Note:** All metrics use `promauto.NewGauge` for auto-registration. The "total" metrics use Gauge instead of Counter, which means they track cumulative counts but will reset on restart. + +--- + +## Test Coverage + +| Package | Test File | Tests | +|---------|-----------|-------| +| `internal/hub` | `hub_test.go` | `TestNew`, `TestClientCount`, `TestBroadcastChannel` | +| `internal/config` | `config_test.go` | `TestLoad`, `TestLoadFileNotFound` | +| `internal/metrics` | — | No dedicated tests | diff --git a/.agents/summary/data_models.md b/.agents/summary/data_models.md new file mode 100644 index 0000000..fb87452 --- /dev/null +++ b/.agents/summary/data_models.md @@ -0,0 +1,142 @@ +# Data Models + +## Configuration Model + +```mermaid +classDiagram + class Config { + +Server ServerConfig + +Metrics MetricsConfig + } + class ServerConfig { + +int Port + +TLSConfig TLS + } + class TLSConfig { + +bool Enabled + +string CertFile + +string KeyFile + } + class MetricsConfig { + +bool Enabled + +int Port + } + + Config --> ServerConfig + Config --> MetricsConfig + ServerConfig --> TLSConfig +``` + +### Config Struct Definition + +```go +type Config struct { + Server struct { + Port int `yaml:"port"` + TLS struct { + Enabled bool `yaml:"enabled"` + CertFile string `yaml:"cert_file"` + KeyFile string `yaml:"key_file"` + } `yaml:"tls"` + } `yaml:"server"` + Metrics struct { + Enabled bool `yaml:"enabled"` + Port int `yaml:"port"` + } `yaml:"metrics"` +} +``` + +### Default Configuration Values + +| Field | Default | Notes | +|-------|---------|-------| +| `server.port` | 8443 | Standard alternate HTTPS port | +| `server.tls.enabled` | false (config.yaml) / true (example) | Toggle TLS | +| `server.tls.cert_file` | `cert.pem` | Relative to working directory | +| `server.tls.key_file` | `key.pem` | Relative to working directory | +| `metrics.enabled` | true | Prometheus metrics | +| `metrics.port` | 9090 | Standard Prometheus port | + +--- + +## Hub State Model + +```mermaid +classDiagram + class Hub { + -map~*websocket.Conn, bool~ clients + -chan []byte broadcast + -chan *websocket.Conn register + -chan *websocket.Conn unregister + -sync.RWMutex mu + +New() Hub + +Run() + +HandleWebSocket(w, r) + +ClientCount() int + } +``` + +### Channel Types + +| Channel | Direction | Payload | Buffer | +|---------|-----------|---------|--------| +| `register` | Handler → Hub | `*websocket.Conn` | Unbuffered | +| `unregister` | Reader → Hub | `*websocket.Conn` | Unbuffered | +| `broadcast` | Reader → Hub | `[]byte` | Unbuffered | + +--- + +## Message Model + +The relay server does **not** impose any message structure. Messages are raw `[]byte` payloads passed through as WebSocket text frames. + +```mermaid +graph LR + A[Client A sends bytes] --> B[Hub broadcast channel] + B --> C[Written as TextMessage to all clients] +``` + +The example HTML client uses an informal format: +``` +{name}
{message_text} +``` + +But this is purely client-side convention — the server is format-agnostic. + +--- + +## Metrics Model + +```mermaid +classDiagram + class PrometheusMetrics { + +Gauge ConnectedClients + +Gauge MessagesTotal + +Gauge ConnectionsTotal + +Gauge DisconnectionsTotal + } +``` + +| Metric | Update Trigger | +|--------|---------------| +| `ConnectedClients` | Set on register/unregister (absolute count) | +| `MessagesTotal` | Incremented on each broadcast | +| `ConnectionsTotal` | Incremented on register | +| `DisconnectionsTotal` | Incremented on unregister | + +--- + +## Connection State Machine + +```mermaid +stateDiagram-v2 + [*] --> Connecting: HTTP request to / + Connecting --> Connected: WebSocket upgrade success + Connecting --> Failed: Upgrade error + Connected --> Active: Registered in Hub + Active --> Active: Sending/Receiving messages + Active --> Disconnecting: Read error or client close + Disconnecting --> Closed: Unregistered from Hub + Failed --> [*] + Closed --> [*] +``` diff --git a/.agents/summary/dependencies.md b/.agents/summary/dependencies.md new file mode 100644 index 0000000..de2a1c0 --- /dev/null +++ b/.agents/summary/dependencies.md @@ -0,0 +1,104 @@ +# Dependencies + +## Direct Dependencies + +| Package | Version | Purpose | Usage Location | +|---------|---------|---------|---------------| +| `github.com/gorilla/websocket` | v1.5.1 | WebSocket protocol implementation | `internal/hub/hub.go` | +| `github.com/prometheus/client_golang` | v1.17.0 | Prometheus metrics client library | `internal/metrics/metrics.go`, `main.go` | +| `gopkg.in/yaml.v3` | v3.0.1 | YAML configuration parsing | `internal/config/config.go` | + +## Dependency Graph + +```mermaid +graph TD + subgraph "Application" + MAIN[main.go] + HUB[internal/hub] + CFG[internal/config] + MET[internal/metrics] + end + + subgraph "Direct Dependencies" + GWS[gorilla/websocket v1.5.1] + PROM[prometheus/client_golang v1.17.0] + YAML[gopkg.in/yaml.v3 v3.0.1] + end + + subgraph "Transitive Dependencies" + NET[golang.org/x/net v0.17.0] + PROTO[google.golang.org/protobuf v1.31.0] + PROMMOD[prometheus/client_model v0.4.1] + PROMCOM[prometheus/common v0.44.0] + PROMPROC[prometheus/procfs v0.11.1] + end + + HUB --> GWS + HUB --> MET + MET --> PROM + CFG --> YAML + MAIN --> PROM + GWS --> NET + PROM --> PROTO + PROM --> PROMMOD + PROM --> PROMCOM + PROM --> PROMPROC +``` + +## Indirect (Transitive) Dependencies + +| Package | Version | Required By | +|---------|---------|-------------| +| `github.com/beorn7/perks` | v1.0.1 | prometheus/client_golang | +| `github.com/cespare/xxhash/v2` | v2.2.0 | prometheus/client_golang | +| `github.com/golang/protobuf` | v1.5.3 | prometheus/client_golang | +| `github.com/kr/text` | v0.2.0 | prometheus (testing) | +| `github.com/matttproud/golang_protobuf_extensions` | v1.0.4 | prometheus/client_golang | +| `github.com/prometheus/client_model` | v0.4.1 | prometheus/client_golang | +| `github.com/prometheus/common` | v0.44.0 | prometheus/client_golang | +| `github.com/prometheus/procfs` | v0.11.1 | prometheus/client_golang | +| `golang.org/x/net` | v0.17.0 | gorilla/websocket | +| `golang.org/x/sys` | v0.13.0 | prometheus/procfs | +| `google.golang.org/protobuf` | v1.31.0 | prometheus/client_golang | + +## Dependency Usage Details + +### gorilla/websocket + +- **Used for:** WebSocket protocol handling (upgrade, read, write) +- **Key APIs used:** + - `websocket.Upgrader` — HTTP to WebSocket upgrade + - `websocket.Conn.ReadMessage()` — Read frames from client + - `websocket.Conn.WriteMessage()` — Write frames to client + - `websocket.TextMessage` — Message type constant + +### prometheus/client_golang + +- **Used for:** Application observability metrics +- **Key APIs used:** + - `promauto.NewGauge()` — Auto-registering gauge metrics + - `prometheus.GaugeOpts` — Metric configuration + - `promhttp.Handler()` — HTTP handler for `/metrics` endpoint + +### gopkg.in/yaml.v3 + +- **Used for:** Configuration file parsing +- **Key APIs used:** + - `yaml.Unmarshal()` — Deserialize YAML into Go structs + +## Build & CI Dependencies + +| Tool | Purpose | Used In | +|------|---------|---------| +| Go 1.21 | Compiler and runtime | CI, Release | +| golangci-lint | Static analysis / linting | CI | +| make | Build automation | Local dev, CI | +| Gitea Actions | CI/CD pipeline runner | `.gitea/workflows/` | + +## Security Considerations + +| Dependency | Known Issues | Notes | +|-----------|--------------|-------| +| `golang.org/x/net` | v0.17.0 | Check for CVEs periodically | +| `gorilla/websocket` | Archived repository | Consider migration to `nhooyr.io/websocket` or `coder/websocket` long-term | +| TLS certificates | Local dev certs in repo | Not for production use | diff --git a/.agents/summary/index.md b/.agents/summary/index.md new file mode 100644 index 0000000..b161385 --- /dev/null +++ b/.agents/summary/index.md @@ -0,0 +1,112 @@ +# Documentation Index — WebSocket Relay + +> **Purpose:** This file serves as the primary knowledge base entry point for AI assistants working with this codebase. Read this file first to understand where to find detailed information. + +## How to Use This Documentation + +1. **Start here** — this index contains summaries of every documentation file +2. **Check the summary tables** below to determine which file has the information you need +3. **Only load additional files** when you need deeper detail on a specific topic +4. **Cross-references** are provided to help navigate between related topics + +--- + +## Project Quick Reference + +| Fact | Value | +|------|-------| +| **Project** | WebSocket Relay Server | +| **Language** | Go 1.21 | +| **Purpose** | Broadcast WebSocket messages to all connected clients (P2P relay) | +| **Entry Point** | `main.go` | +| **Config** | `config.yaml` (YAML) | +| **WebSocket Port** | 8443 (configurable) | +| **Metrics Port** | 9090 (configurable, optional) | +| **Build** | `make build` / `make release` | +| **Test** | `make test` / `go test ./...` | +| **Architecture** | Hub-and-Spoke broadcast pattern using Go channels | + +--- + +## Documentation Files + +### 📋 codebase_info.md +**What it contains:** Project metadata, directory tree, technology stack, build targets, and codebase statistics. +**When to consult:** When you need to understand the project layout, find a file, or check what tools/languages are used. +**Key topics:** Directory structure, Go module info, Makefile targets, dependency counts. + +--- + +### 🏗️ architecture.md +**What it contains:** System design, Hub-and-Spoke pattern explanation, concurrency model, goroutine lifecycle, security model, and deployment architecture. +**When to consult:** When you need to understand HOW the system works at a high level, the threading model, or how components interact. +**Key topics:** CSP concurrency, fan-out broadcasting, TLS configuration, CI/CD pipeline structure. +**Cross-references:** → `components.md` for implementation details, → `workflows.md` for sequence flows. + +--- + +### 🧩 components.md +**What it contains:** Detailed description of each Go package — structs, methods, fields, and their responsibilities. +**When to consult:** When you need to modify a specific package, understand a struct's fields, or find where functionality lives. +**Key topics:** Hub struct and methods, Config struct, metrics variables, test coverage map. +**Cross-references:** → `architecture.md` for design rationale, → `interfaces.md` for external contracts. + +--- + +### 🔌 interfaces.md +**What it contains:** All external interfaces — WebSocket endpoint behavior, metrics endpoint, CLI flags, configuration schema, and integration points. +**When to consult:** When you need to understand how clients interact with the server, what the API contract is, or how to configure the service. +**Key topics:** WebSocket message protocol, Prometheus metric names, CLI usage, YAML config schema. +**Cross-references:** → `data_models.md` for config struct details, → `components.md` for handler implementation. + +--- + +### 📊 data_models.md +**What it contains:** All data structures — Config struct, Hub state, message format, metrics model, and connection state machine. +**When to consult:** When you need to understand data shapes, struct definitions, channel types, or state transitions. +**Key topics:** Config YAML mapping, Hub channels and their payloads, connection lifecycle states. +**Cross-references:** → `interfaces.md` for how models are exposed externally, → `components.md` for methods operating on these models. + +--- + +### 🔄 workflows.md +**What it contains:** Step-by-step process flows — startup, connection, broadcast, build/release, development, and error handling. +**When to consult:** When you need to understand the sequence of operations, debug a flow, or add a new feature that hooks into an existing workflow. +**Key topics:** Application startup sequence, client lifecycle, CI/CD pipeline steps, error handling paths. +**Cross-references:** → `architecture.md` for the concurrency model underlying workflows, → `components.md` for method details. + +--- + +### 📦 dependencies.md +**What it contains:** Complete dependency inventory — direct and transitive deps, their versions, usage locations, security considerations, and build tools. +**When to consult:** When updating dependencies, evaluating security, understanding what libraries provide, or considering alternatives. +**Key topics:** gorilla/websocket APIs used, Prometheus client usage, gopkg.in/yaml.v3 usage, transitive dependency tree. +**Cross-references:** → `components.md` for where dependencies are imported. + +--- + +### 📝 review_notes.md +**What it contains:** Documentation quality assessment — consistency issues, completeness gaps, bugs found during analysis, and prioritized recommendations. +**When to consult:** When looking for known issues, potential bugs, or areas needing improvement. +**Key topics:** Metrics type bug, port mismatch in example, missing features (graceful shutdown, rate limiting), test coverage gaps. +**Cross-references:** All other files (identifies issues across the entire codebase). + +--- + +## Quick Lookup: Common Questions + +| Question | File to Consult | +|----------|----------------| +| "What does this project do?" | This file (index.md) | +| "Where is X implemented?" | `codebase_info.md` → directory tree | +| "How does the Hub work?" | `components.md` → Hub section | +| "What's the WebSocket message format?" | `interfaces.md` → WebSocket Endpoint | +| "How do I build/test?" | `codebase_info.md` → Build Targets | +| "What metrics are exposed?" | `interfaces.md` → Metrics Endpoint | +| "What are the config options?" | `interfaces.md` → Configuration Interface | +| "Are there known bugs?" | `review_notes.md` → Inconsistencies | +| "What should I improve?" | `review_notes.md` → Recommendations | +| "What dependencies does it use?" | `dependencies.md` | +| "How does startup work?" | `workflows.md` → Application Startup | +| "How are connections handled?" | `workflows.md` → Client Connection Workflow | +| "What's the threading model?" | `architecture.md` → Concurrency Model | diff --git a/.agents/summary/interfaces.md b/.agents/summary/interfaces.md new file mode 100644 index 0000000..51b0a66 --- /dev/null +++ b/.agents/summary/interfaces.md @@ -0,0 +1,147 @@ +# Interfaces + +## HTTP Endpoints + +### WebSocket Endpoint + +| Property | Value | +|----------|-------| +| **Path** | `/` | +| **Protocol** | WebSocket (ws:// or wss://) | +| **Port** | Configurable (default: 8443) | +| **Handler** | `hub.HandleWebSocket` | + +**Upgrade Headers:** +- Standard WebSocket upgrade +- `CheckOrigin` accepts all origins + +**Message Protocol:** +- Type: `TextMessage` (opcode 1) +- Format: Raw bytes (no structured format imposed) +- Direction: Bidirectional — any message sent is broadcast to all connected clients + +```mermaid +sequenceDiagram + participant A as Client A + participant S as Relay Server + participant B as Client B + participant C as Client C + + A->>S: Connect (ws upgrade) + B->>S: Connect (ws upgrade) + C->>S: Connect (ws upgrade) + A->>S: Send "Hello" + S->>A: Relay "Hello" + S->>B: Relay "Hello" + S->>C: Relay "Hello" +``` + +> **Note:** The sender also receives their own message back (no sender filtering). + +--- + +### Metrics Endpoint + +| Property | Value | +|----------|-------| +| **Path** | `/metrics` | +| **Protocol** | HTTP | +| **Port** | Configurable (default: 9090) | +| **Format** | Prometheus text exposition format | +| **Condition** | Only available when `metrics.enabled: true` | + +**Available Metrics:** + +``` +# HELP websocket_connected_clients Number of currently connected WebSocket clients +# TYPE websocket_connected_clients gauge +websocket_connected_clients 0 + +# HELP websocket_message Number of WebSocket messages processed +# TYPE websocket_message gauge +websocket_message 0 + +# HELP websocket_connection Number of WebSocket connections established +# TYPE websocket_connection gauge +websocket_connection 0 + +# HELP websocket_disconnection Number of WebSocket disconnections +# TYPE websocket_disconnection gauge +websocket_disconnection 0 +``` + +--- + +## CLI Interface + +``` +Usage: websocket-relay [flags] + +Flags: + --config-file string Path to configuration file (default "config.yaml") +``` + +--- + +## Configuration Interface (YAML) + +```yaml +server: + port: 8443 # Server listen port + tls: + enabled: true # Enable TLS (wss://) + cert_file: cert.pem # Path to TLS certificate + key_file: key.pem # Path to TLS private key + +metrics: + enabled: true # Enable Prometheus metrics endpoint + port: 9090 # Metrics server port +``` + +--- + +## Internal Go Interfaces (Implicit) + +The codebase doesn't define explicit Go interfaces but uses the following implicit contracts: + +### Hub Contract + +```go +// Hub manages WebSocket connections and broadcasts messages +type Hub interface { + Run() // Start event loop + HandleWebSocket(http.ResponseWriter, *http.Request) // HTTP handler + ClientCount() int // Connected client count +} +``` + +### Config Contract + +```go +// Config loading +type ConfigLoader interface { + Load(filename string) (*Config, error) +} +``` + +--- + +## Integration Points + +```mermaid +graph LR + subgraph External + PROM[Prometheus] + BROWSERS[Browser Clients] + APPS[Application Clients] + end + + subgraph "WebSocket Relay" + WS[WebSocket :8443] + MET[Metrics :9090] + end + + BROWSERS -->|ws/wss| WS + APPS -->|ws/wss| WS + PROM -->|HTTP GET /metrics| MET +``` diff --git a/.agents/summary/review_notes.md b/.agents/summary/review_notes.md new file mode 100644 index 0000000..0ee7505 --- /dev/null +++ b/.agents/summary/review_notes.md @@ -0,0 +1,75 @@ +# Documentation Review Notes + +## Consistency Check Results + +### ✅ Consistent + +- Configuration documented in `data_models.md` matches actual `config.go` struct +- Hub methods documented in `components.md` match actual implementation +- CLI flags documented in `interfaces.md` match `main.go` +- Build commands in `codebase_info.md` match `Makefile` + +### ⚠️ Inconsistencies Found + +| Issue | Location | Details | +|-------|----------|---------| +| **Port mismatch in example client** | `example/index.html` | Uses `ws://localhost:8000/` but config default is port `8443` | +| **Metrics type mismatch** | `internal/metrics/metrics.go` | `MessagesTotal`, `ConnectionsTotal`, `DisconnectionsTotal` are defined as `Gauge` but semantically represent counters (monotonically increasing values). Should be `Counter` type. | +| **Silent client removal** | `internal/hub/hub.go` | During broadcast, write errors cause client removal without going through the `unregister` channel, meaning `DisconnectionsTotal` and `ConnectedClients` metrics won't be updated correctly. | +| **README port reference** | `README.md` | Mentions TLS is enabled by default, but `config.yaml` has `tls.enabled: false` | + +--- + +## Completeness Check Results + +### ✅ Well-Documented Areas + +- Core WebSocket relay logic +- Configuration structure and loading +- Build and deployment pipeline +- Metrics definitions + +### ❌ Documentation Gaps + +| Gap | Severity | Recommendation | +|-----|----------|----------------| +| **No graceful shutdown** | Medium | Document that the server lacks graceful shutdown — connections are terminated abruptly on SIGTERM | +| **No rate limiting** | Medium | Document absence of rate limiting and implications for production use | +| **No message size limits** | Medium | No `ReadLimit` set on WebSocket connections — potential DoS vector | +| **No health check endpoint** | Low | No `/health` or `/ready` endpoint for orchestrators | +| **No connection limits** | Medium | No max client count — server could be overwhelmed | +| **No logging configuration** | Low | Uses default `log` package with no level control | +| **No deployment docs** | Medium | No systemd unit file, Docker instructions, or k8s manifests | +| **Missing test coverage** | Medium | `internal/metrics` has no tests; hub integration tests (actual WebSocket connections) missing | + +--- + +## Language Support + +| Aspect | Support Level | Notes | +|--------|--------------|-------| +| Go source analysis | ✅ Full | All Go code fully analyzed | +| HTML/JS (example) | ✅ Full | Simple single-file client analyzed | +| YAML configs | ✅ Full | Configuration fully documented | +| Makefile | ✅ Full | All targets documented | +| Gitea Actions YAML | ✅ Full | CI/CD pipelines documented | + +--- + +## Recommendations + +### High Priority +1. **Fix metrics types**: Change `MessagesTotal`, `ConnectionsTotal`, `DisconnectionsTotal` from `Gauge` to `Counter` +2. **Fix broadcast disconnect handling**: Route write-error disconnections through the unregister channel to maintain accurate metrics +3. **Add message size limits**: Set `conn.SetReadLimit()` to prevent memory exhaustion + +### Medium Priority +4. **Add graceful shutdown**: Use `context.Context` and `http.Server.Shutdown()` +5. **Add health endpoint**: Simple `/health` returning 200 OK +6. **Add integration tests**: Test actual WebSocket connections end-to-end +7. **Fix example port**: Update `example/index.html` to use port 8443 + +### Low Priority +8. **Add structured logging**: Replace `log` with `slog` or `zerolog` +9. **Add connection limits**: Max concurrent connections configuration +10. **Add Docker support**: Dockerfile and docker-compose for easy deployment diff --git a/.agents/summary/workflows.md b/.agents/summary/workflows.md new file mode 100644 index 0000000..c4e347b --- /dev/null +++ b/.agents/summary/workflows.md @@ -0,0 +1,140 @@ +# Workflows + +## Application Startup + +```mermaid +flowchart TD + START[Application Start] --> PARSE[Parse CLI flags] + PARSE --> LOAD[Load config.yaml] + LOAD -->|Error| FATAL[log.Fatal - exit] + LOAD -->|Success| CREATE[Create Hub] + CREATE --> RUN[Start Hub.Run goroutine] + RUN --> METRICS{Metrics enabled?} + METRICS -->|Yes| METSRV[Start metrics server goroutine on :9090] + METRICS -->|No| SKIP[Skip metrics] + METSRV --> TLS{TLS enabled?} + SKIP --> TLS + TLS -->|Yes| TLSSERVE[ListenAndServeTLS on :8443] + TLS -->|No| HTTPSERVE[ListenAndServe on :8443] +``` + +--- + +## Client Connection Workflow + +```mermaid +sequenceDiagram + participant Client + participant HTTP as HTTP Server + participant Upgrader as WebSocket Upgrader + participant Hub as Hub.Run() + participant Metrics + + Client->>HTTP: GET / (Upgrade: websocket) + HTTP->>Upgrader: CheckOrigin (always true) + Upgrader->>HTTP: Upgrade response + HTTP->>Hub: register <- conn + Hub->>Metrics: ConnectedClients.Set(n) + Hub->>Metrics: ConnectionsTotal.Inc() + Note over HTTP: Spawn reader goroutine + + loop Message Loop + Client->>HTTP: WebSocket frame + HTTP->>Hub: broadcast <- message + Hub->>Metrics: MessagesTotal.Inc() + Hub->>Client: WriteMessage to all clients + end + + Note over HTTP: Read error or client disconnect + HTTP->>Hub: unregister <- conn + Hub->>Metrics: ConnectedClients.Set(n-1) + Hub->>Metrics: DisconnectionsTotal.Inc() + Hub->>Hub: Close connection, remove from map +``` + +--- + +## Build and Release Workflow + +```mermaid +flowchart LR + subgraph Development + CODE[Write Code] --> PUSH[Push to main/develop] + end + + subgraph "CI Pipeline" + PUSH --> TEST[go test -v ./...] + PUSH --> LINT[golangci-lint] + TEST --> BUILD[make build] + end + + subgraph "Release Pipeline" + TAG[Push v* tag] --> REL_BUILD[Cross-compile] + REL_BUILD --> LINUX[linux/amd64 binary] + REL_BUILD --> MACOS[darwin/arm64 binary] + LINUX --> RELEASE[Gitea Release] + MACOS --> RELEASE + end +``` + +--- + +## Development Workflow + +```mermaid +flowchart TD + START[Clone repo] --> DEPS[make deps / go mod tidy] + DEPS --> CONFIG[Edit config.yaml] + CONFIG --> RUN[make run] + RUN --> TEST_LOCAL[Test with example/index.html] + TEST_LOCAL --> WRITE[Write code changes] + WRITE --> UNIT[make test] + UNIT -->|Pass| COMMIT[git commit] + UNIT -->|Fail| WRITE + COMMIT --> PUSH[git push] + PUSH --> CI[CI runs tests + lint] +``` + +--- + +## Message Broadcast Workflow + +```mermaid +flowchart TD + MSG[Client sends message] --> CHAN[broadcast channel receives []byte] + CHAN --> INC[MessagesTotal.Inc] + INC --> LOCK[RLock clients map] + LOCK --> ITER{For each client} + ITER -->|Next client| WRITE[WriteMessage] + WRITE -->|Success| ITER + WRITE -->|Error| REMOVE[Remove client, close conn] + REMOVE --> ITER + ITER -->|Done| UNLOCK[RUnlock] +``` + +--- + +## Error Handling Workflows + +### Connection Upgrade Failure + +```mermaid +flowchart LR + REQ[HTTP Request] --> UPG{Upgrade succeeds?} + UPG -->|No| LOG[Log error] + LOG --> RETURN[Return - no cleanup needed] + UPG -->|Yes| REGISTER[Continue with registration] +``` + +### Write Error During Broadcast + +```mermaid +flowchart LR + WRITE[WriteMessage] --> ERR{Error?} + ERR -->|No| NEXT[Continue to next client] + ERR -->|Yes| DEL[Delete from clients map] + DEL --> CLOSE[Close connection] + CLOSE --> NEXT +``` + +> **Note:** Write errors during broadcast silently remove the failing client without triggering the unregister channel. This is a potential inconsistency — the `DisconnectionsTotal` metric won't be incremented and `ConnectedClients` gauge won't be updated for these removals. diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..1490be4 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,202 @@ +# AGENTS.md — AI Assistant Guide for websocket-relay + +> This file provides context for AI coding assistants working on this project. It focuses on information not found in README.md and is optimized for quick comprehension. + +## Project Identity + +**websocket-relay** is a minimal Go WebSocket relay server that broadcasts every incoming message to all connected clients (hub-and-spoke / fan-out pattern). It supports TLS, Prometheus metrics, and graceful shutdown. + +--- + +## Directory Structure + +``` +websocket-relay/ +├── main.go # Entry point: config loading, signal handling, graceful shutdown +├── internal/ +│ ├── config/config.go # YAML config loader (server port, TLS, metrics) +│ ├── hub/hub.go # Core logic: WebSocket hub, connection mgmt, broadcast +│ └── metrics/metrics.go # Prometheus counter/gauge definitions +├── example/index.html # Browser P2P chat demo client +├── config.yaml # Runtime config (edit for local dev) +├── config.example.yaml # Reference config with TLS enabled +├── Makefile # build, test, release, run, deps, clean +├── .gitea/workflows/ +│ ├── ci.yml # Push/PR → test + lint +│ └── release.yml # Tag v* → cross-compile + Gitea release +└── .agents/summary/ # Generated documentation (see index.md) +``` + +--- + +## Architecture at a Glance + +```mermaid +graph LR + C1[Client] -->|ws| HUB[Hub goroutine] + C2[Client] -->|ws| HUB + HUB -->|broadcast| C1 + HUB -->|broadcast| C2 + HUB --> MET[Prometheus :9090] +``` + +- **Single Hub goroutine** runs a `select` loop on 4 channels: `register`, `unregister`, `broadcast`, `stop` +- **Per-client reader goroutine** reads messages and pushes to `broadcast` channel +- **No write goroutine** — writes happen inline during broadcast (under RLock) +- **Thread safety** via `sync.RWMutex` on the clients map +- **Graceful shutdown** via SIGINT/SIGTERM → HTTP server shutdown → Hub shutdown → clean exit + +--- + +## Coding Patterns + +### Package Layout +- All internal packages live under `internal/` (Go internal package convention — cannot be imported externally) +- Flat package structure — each package has one primary `.go` file +- Tests use `_test.go` suffix in the same package (white-box testing) + +### Naming Conventions +- Exported functions/types: `PascalCase` (e.g., `New`, `Run`, `HandleWebSocket`, `Shutdown`) +- Config struct uses nested anonymous structs with `yaml` tags +- Metrics use package-level `var` block with `promauto` for auto-registration + +### Error Handling +- Fatal errors at startup → `log.Fatal()` +- WebSocket upgrade errors → logged and returned (no panic) +- Write errors during broadcast → client removed with proper metrics update +- Config load errors → fatal (server won't start without valid config) +- Shutdown errors → logged but not fatal (best-effort cleanup) + +### Concurrency Pattern +- CSP via channels (not mutexes for coordination) +- The Hub `select` loop is the single coordination point +- RWMutex used additionally for broadcast iteration safety +- `stop` channel (closed on shutdown) signals the Hub to terminate + +### Graceful Shutdown Pattern +- `main()` listens for SIGINT/SIGTERM via `os/signal` +- On signal: stops accepting new HTTP connections → shuts down Hub → exits +- 10-second timeout ensures the process doesn't hang indefinitely +- Clients receive a WebSocket Close frame (`CloseGoingAway`) before disconnection + +--- + +## How to Write & Run Tests + +### Running Tests +```bash +make test # Runs: go test -v ./... +go test ./internal/hub/ # Single package +go test -run TestNew ./internal/hub/ # Single test +``` + +### Test Conventions +- Test files: `*_test.go` in same package +- Use standard `testing.T` — no test framework +- Table-driven tests not yet adopted (tests are simple) +- Temp files for config tests (`os.CreateTemp`) +- Hub tests start `go h.Run()` and use `defer h.Shutdown()` for cleanup +- Integration tests use `httptest.Server` + real WebSocket dials + +### Adding New Tests +```go +// File: internal//_test.go +package + +import "testing" + +func TestFeature(t *testing.T) { + // Setup + h := New() + go h.Run() + defer h.Shutdown() + + // Assert + if h.ClientCount() != 0 { + t.Errorf("expected 0, got %d", h.ClientCount()) + } +} +``` + +### Missing Test Coverage +- `internal/metrics` — no tests (metrics are auto-registered, mostly testing Prometheus library) +- No benchmarks + +--- + +## Configuration + +Config is loaded from YAML (default: `config.yaml`, override with `--config-file` flag): + +```yaml +server: + port: 8443 + tls: + enabled: false # Set true + provide cert/key for wss:// + cert_file: cert.pem + key_file: key.pem +metrics: + enabled: true + port: 9090 +``` + +--- + +## Build & Deployment + +```bash +make build # → build/websocket-relay +make release # → build/websocket-relay-linux-amd64, build/websocket-relay-darwin-arm64 +make run # → go run . +make deps # → go mod tidy +make clean # → rm build artifacts +``` + +### Release Process +1. Tag with `v*` prefix (e.g., `git tag v1.2.0`) +2. Push tag → Gitea Actions builds linux/amd64 + darwin/arm64 +3. Binaries uploaded as Gitea Release assets + +--- + +## Known Issues & Technical Debt + +| Issue | Severity | Location | +|-------|----------|----------| +| No message size limits (`ReadLimit`) | Security | `internal/hub/hub.go` | +| No connection count limits | Security | `internal/hub/hub.go` | +| `gorilla/websocket` is archived | Debt | `go.mod` | + +--- + +## Adding Features — Quick Guide + +### Adding a new config field +1. Add field to `Config` struct in `internal/config/config.go` with `yaml` tag +2. Add to `config.yaml` and `config.example.yaml` +3. Use in `main.go` via `cfg.YourSection.YourField` + +### Adding a new metric +1. Add `var` to `internal/metrics/metrics.go` using `promauto.NewGauge/Counter/Histogram` +2. Call `metrics.YourMetric.Inc()` (or `.Set()`, `.Observe()`) where needed + +### Adding a new HTTP endpoint +1. Add handler method to Hub or create new handler +2. Register in `main.go`: `mux.HandleFunc("/path", handler)` + +### Adding a new internal package +1. Create `internal//.go` +2. Import as `websocket-relay/internal/` + +--- + +## Detailed Documentation + +For deeper analysis, see `.agents/summary/index.md` which provides a complete knowledge base with: +- Architecture diagrams and concurrency model +- Component-level documentation with all structs/methods +- Complete interface specifications +- Data model definitions and state machines +- Workflow sequence diagrams +- Dependency analysis and security notes +- Prioritized improvement recommendations diff --git a/README.md b/README.md index ca70391..288ee85 100644 --- a/README.md +++ b/README.md @@ -1,33 +1,122 @@ # WebSocket Relay Server -A minimal Go WebSocket relay server with SSL support for P2P connections. +A minimal Go WebSocket relay server that broadcasts every incoming message to all connected clients. Supports TLS, Prometheus metrics, and graceful shutdown. -## Setup +## Features + +- **Fan-out broadcasting** — every message is relayed to all connected clients +- **TLS support** — optional `wss://` via cert/key PEM files +- **Prometheus metrics** — connection counts, message totals, disconnections +- **Graceful shutdown** — clean exit on SIGINT/SIGTERM with client notification +- **Zero dependencies at runtime** — single static binary + +## Quick Start ```bash +# Install dependencies go mod tidy -# Configure via config.yaml (see config.yaml for options) -go run main.go --config-file=./config.yaml + +# Run the server (defaults to ws://localhost:8443) +make run + +# Or with a custom config +go run . --config-file=./config.yaml ``` +Open `example/index.html` in multiple browser tabs to test the P2P chat demo. + ## Configuration -Edit `config.yaml` to configure: -- **Server port and TLS settings** -- **SSL certificate paths** +Edit `config.yaml`: + +```yaml +server: + port: 8443 + tls: + enabled: false # Set true for wss:// + cert_file: cert.pem + key_file: key.pem + +metrics: + enabled: true + port: 9090 # Prometheus metrics at :9090/metrics +``` + +Override the config file path with `--config-file`: + +```bash +./websocket-relay --config-file=/etc/relay/config.yaml +``` ## Usage -- WebSocket endpoint: `/` -- All WebSocket messages are relayed to all connected clients +Connect any WebSocket client to the server: + +```javascript +const ws = new WebSocket('ws://localhost:8443/'); + +ws.onmessage = (event) => console.log('Received:', event.data); +ws.onopen = () => ws.send('Hello from client!'); +``` + +With TLS enabled: + +```javascript +const ws = new WebSocket('wss://localhost:8443/'); +``` + +All messages sent by any client are broadcast to every connected client (including the sender). + +## Build + +```bash +make build # Build binary → build/websocket-relay +make release # Cross-compile linux/amd64 + darwin/arm64 +make clean # Remove build artifacts +``` ## Testing -```javascript -// For TLS enabled (default config) -const ws = new WebSocket('wss://localhost:8443/'); -// For HTTP only -// const ws = new WebSocket('ws://localhost:8443/'); -ws.onmessage = (event) => console.log('Received:', event.data); -ws.send('Hello from client!'); -``` \ No newline at end of file +```bash +make test # Run all tests (unit + integration) +``` + +## Metrics + +When `metrics.enabled` is `true`, Prometheus metrics are exposed at `http://localhost:9090/metrics`: + +| Metric | Type | Description | +|--------|------|-------------| +| `websocket_connected_clients` | Gauge | Currently connected clients | +| `websocket_messages_total` | Counter | Total messages relayed | +| `websocket_connections_total` | Counter | Total connections established | +| `websocket_disconnections_total` | Counter | Total disconnections | + +## Graceful Shutdown + +The server handles `SIGINT` and `SIGTERM` signals: + +1. Stops accepting new connections +2. Sends WebSocket `CloseGoingAway` frame to all connected clients +3. Closes all connections and exits cleanly + +Shutdown timeout is 10 seconds. + +## Project Structure + +``` +websocket-relay/ +├── main.go # Entry point, signal handling, graceful shutdown +├── internal/ +│ ├── config/config.go # YAML config loader +│ ├── hub/hub.go # WebSocket hub, connection management, broadcast +│ └── metrics/metrics.go # Prometheus metric definitions +├── example/index.html # Browser P2P chat demo +├── config.yaml # Runtime configuration +├── config.example.yaml # Example config with TLS enabled +└── Makefile # Build, test, release commands +``` + +## License + +See repository for license details. diff --git a/example/index.html b/example/index.html index f2c23b1..c519a7f 100644 --- a/example/index.html +++ b/example/index.html @@ -49,7 +49,7 @@ let ws; function connect() { - ws = new WebSocket('ws://localhost:8000/'); + ws = new WebSocket('ws://localhost:8443/'); ws.onmessage = (event) => { console.log('Received:', event.data); @@ -83,4 +83,4 @@ - \ No newline at end of file + diff --git a/internal/hub/hub.go b/internal/hub/hub.go index 88c1d0c..23993a6 100644 --- a/internal/hub/hub.go +++ b/internal/hub/hub.go @@ -18,6 +18,7 @@ type Hub struct { broadcast chan []byte register chan *websocket.Conn unregister chan *websocket.Conn + stop chan struct{} mu sync.RWMutex } @@ -27,12 +28,26 @@ func New() *Hub { broadcast: make(chan []byte), register: make(chan *websocket.Conn), unregister: make(chan *websocket.Conn), + stop: make(chan struct{}), } } func (h *Hub) Run() { for { select { + case <-h.stop: + h.mu.Lock() + for conn := range h.clients { + conn.WriteMessage(websocket.CloseMessage, + websocket.FormatCloseMessage(websocket.CloseGoingAway, "server shutting down")) + conn.Close() + delete(h.clients, conn) + } + h.mu.Unlock() + metrics.ConnectedClients.Set(0) + log.Printf("Hub stopped, all clients disconnected") + return + case conn := <-h.register: h.mu.Lock() h.clients[conn] = true @@ -55,17 +70,35 @@ func (h *Hub) Run() { case message := <-h.broadcast: metrics.MessagesTotal.Inc() h.mu.RLock() + var failed []*websocket.Conn for conn := range h.clients { if err := conn.WriteMessage(websocket.TextMessage, message); err != nil { - delete(h.clients, conn) - conn.Close() + failed = append(failed, conn) } } h.mu.RUnlock() + + // Remove failed clients properly so metrics stay consistent + for _, conn := range failed { + h.mu.Lock() + if _, ok := h.clients[conn]; ok { + delete(h.clients, conn) + conn.Close() + metrics.ConnectedClients.Set(float64(len(h.clients))) + metrics.DisconnectionsTotal.Inc() + log.Printf("Client disconnected (write error). Total: %d", len(h.clients)) + } + h.mu.Unlock() + } } } } +// Shutdown gracefully stops the hub, closing all client connections. +func (h *Hub) Shutdown() { + close(h.stop) +} + func (h *Hub) HandleWebSocket(w http.ResponseWriter, r *http.Request) { conn, err := upgrader.Upgrade(w, r, nil) if err != nil { @@ -94,4 +127,4 @@ func (h *Hub) ClientCount() int { h.mu.RLock() defer h.mu.RUnlock() return len(h.clients) -} \ No newline at end of file +} diff --git a/internal/hub/hub_integration_test.go b/internal/hub/hub_integration_test.go new file mode 100644 index 0000000..d466689 --- /dev/null +++ b/internal/hub/hub_integration_test.go @@ -0,0 +1,355 @@ +package hub + +import ( + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + "github.com/gorilla/websocket" +) + +// helper: start a test server with a running Hub, return the server and hub +func setupTestServer(t *testing.T) (*httptest.Server, *Hub) { + t.Helper() + h := New() + go h.Run() + + server := httptest.NewServer(http.HandlerFunc(h.HandleWebSocket)) + return server, h +} + +// helper: dial a WebSocket connection to the test server +func dialWS(t *testing.T, server *httptest.Server) *websocket.Conn { + t.Helper() + wsURL := "ws" + strings.TrimPrefix(server.URL, "http") + conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("Failed to dial WebSocket: %v", err) + } + return conn +} + +// helper: wait until hub reaches expected client count or timeout +func waitForClients(t *testing.T, h *Hub, expected int, timeout time.Duration) { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if h.ClientCount() == expected { + return + } + time.Sleep(5 * time.Millisecond) + } + t.Fatalf("Timed out waiting for %d clients, got %d", expected, h.ClientCount()) +} + +func TestIntegration_SingleClientConnect(t *testing.T) { + server, h := setupTestServer(t) + defer server.Close() + defer h.Shutdown() + + conn := dialWS(t, server) + defer conn.Close() + + waitForClients(t, h, 1, time.Second) + + if count := h.ClientCount(); count != 1 { + t.Errorf("Expected 1 client, got %d", count) + } +} + +func TestIntegration_MultipleClientsConnect(t *testing.T) { + server, h := setupTestServer(t) + defer server.Close() + defer h.Shutdown() + + const numClients = 5 + conns := make([]*websocket.Conn, numClients) + for i := 0; i < numClients; i++ { + conns[i] = dialWS(t, server) + defer conns[i].Close() + } + + waitForClients(t, h, numClients, time.Second) + + if count := h.ClientCount(); count != numClients { + t.Errorf("Expected %d clients, got %d", numClients, count) + } +} + +func TestIntegration_BroadcastMessage(t *testing.T) { + server, h := setupTestServer(t) + defer server.Close() + defer h.Shutdown() + + // Connect two clients + conn1 := dialWS(t, server) + defer conn1.Close() + conn2 := dialWS(t, server) + defer conn2.Close() + + waitForClients(t, h, 2, time.Second) + + // Send a message from client 1 + testMsg := "hello from client 1" + if err := conn1.WriteMessage(websocket.TextMessage, []byte(testMsg)); err != nil { + t.Fatalf("Failed to send message: %v", err) + } + + // Both clients should receive the broadcast + for i, conn := range []*websocket.Conn{conn1, conn2} { + conn.SetReadDeadline(time.Now().Add(time.Second)) + _, msg, err := conn.ReadMessage() + if err != nil { + t.Fatalf("Client %d failed to read message: %v", i+1, err) + } + if string(msg) != testMsg { + t.Errorf("Client %d expected %q, got %q", i+1, testMsg, string(msg)) + } + } +} + +func TestIntegration_BroadcastToManyClients(t *testing.T) { + server, h := setupTestServer(t) + defer server.Close() + defer h.Shutdown() + + const numClients = 10 + conns := make([]*websocket.Conn, numClients) + for i := 0; i < numClients; i++ { + conns[i] = dialWS(t, server) + defer conns[i].Close() + } + + waitForClients(t, h, numClients, time.Second) + + // Send from first client + testMsg := "broadcast to all" + if err := conns[0].WriteMessage(websocket.TextMessage, []byte(testMsg)); err != nil { + t.Fatalf("Failed to send message: %v", err) + } + + // All clients should receive it + for i, conn := range conns { + conn.SetReadDeadline(time.Now().Add(time.Second)) + _, msg, err := conn.ReadMessage() + if err != nil { + t.Fatalf("Client %d failed to read: %v", i, err) + } + if string(msg) != testMsg { + t.Errorf("Client %d expected %q, got %q", i, testMsg, string(msg)) + } + } +} + +func TestIntegration_ClientDisconnect(t *testing.T) { + server, h := setupTestServer(t) + defer server.Close() + defer h.Shutdown() + + conn1 := dialWS(t, server) + conn2 := dialWS(t, server) + defer conn2.Close() + + waitForClients(t, h, 2, time.Second) + + // Disconnect client 1 + conn1.Close() + + waitForClients(t, h, 1, time.Second) + + if count := h.ClientCount(); count != 1 { + t.Errorf("Expected 1 client after disconnect, got %d", count) + } +} + +func TestIntegration_MessageAfterDisconnect(t *testing.T) { + server, h := setupTestServer(t) + defer server.Close() + defer h.Shutdown() + + conn1 := dialWS(t, server) + conn2 := dialWS(t, server) + defer conn2.Close() + + waitForClients(t, h, 2, time.Second) + + // Disconnect client 1 + conn1.Close() + waitForClients(t, h, 1, time.Second) + + // Send a message from client 2 — should still work + testMsg := "after disconnect" + if err := conn2.WriteMessage(websocket.TextMessage, []byte(testMsg)); err != nil { + t.Fatalf("Failed to send message: %v", err) + } + + // Client 2 should receive its own message back + conn2.SetReadDeadline(time.Now().Add(time.Second)) + _, msg, err := conn2.ReadMessage() + if err != nil { + t.Fatalf("Failed to read message: %v", err) + } + if string(msg) != testMsg { + t.Errorf("Expected %q, got %q", testMsg, string(msg)) + } +} + +func TestIntegration_MultipleMessages(t *testing.T) { + server, h := setupTestServer(t) + defer server.Close() + defer h.Shutdown() + + conn1 := dialWS(t, server) + defer conn1.Close() + conn2 := dialWS(t, server) + defer conn2.Close() + + waitForClients(t, h, 2, time.Second) + + messages := []string{"first", "second", "third"} + + for _, msg := range messages { + if err := conn1.WriteMessage(websocket.TextMessage, []byte(msg)); err != nil { + t.Fatalf("Failed to send %q: %v", msg, err) + } + } + + // Client 2 should receive all messages in order + for _, expected := range messages { + conn2.SetReadDeadline(time.Now().Add(time.Second)) + _, msg, err := conn2.ReadMessage() + if err != nil { + t.Fatalf("Failed to read message: %v", err) + } + if string(msg) != expected { + t.Errorf("Expected %q, got %q", expected, string(msg)) + } + } +} + +func TestIntegration_ConcurrentSenders(t *testing.T) { + server, h := setupTestServer(t) + defer server.Close() + defer h.Shutdown() + + const numClients = 5 + conns := make([]*websocket.Conn, numClients) + for i := 0; i < numClients; i++ { + conns[i] = dialWS(t, server) + defer conns[i].Close() + } + + waitForClients(t, h, numClients, time.Second) + + // Each client sends one message concurrently + var wg sync.WaitGroup + for i := 0; i < numClients; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + msg := []byte(strings.Repeat("x", idx+1)) // unique length per sender + conns[idx].WriteMessage(websocket.TextMessage, msg) + }(i) + } + wg.Wait() + + // Each client should receive exactly numClients messages (one from each sender) + for i, conn := range conns { + received := 0 + conn.SetReadDeadline(time.Now().Add(2 * time.Second)) + for received < numClients { + _, _, err := conn.ReadMessage() + if err != nil { + t.Fatalf("Client %d: read error after %d messages: %v", i, received, err) + } + received++ + } + } +} + +func TestIntegration_GracefulShutdownClosesClients(t *testing.T) { + server, h := setupTestServer(t) + defer server.Close() + + conn := dialWS(t, server) + defer conn.Close() + + waitForClients(t, h, 1, time.Second) + + // Trigger shutdown + h.Shutdown() + + // Client should receive a close frame + conn.SetReadDeadline(time.Now().Add(time.Second)) + _, _, err := conn.ReadMessage() + if err == nil { + t.Fatal("Expected error after shutdown, got nil") + } + + // Verify it's a close error with GoingAway code + if closeErr, ok := err.(*websocket.CloseError); ok { + if closeErr.Code != websocket.CloseGoingAway { + t.Errorf("Expected CloseGoingAway (%d), got %d", websocket.CloseGoingAway, closeErr.Code) + } + } + // Any error is acceptable — the key is the connection is no longer usable +} + +func TestIntegration_EmptyMessage(t *testing.T) { + server, h := setupTestServer(t) + defer server.Close() + defer h.Shutdown() + + conn1 := dialWS(t, server) + defer conn1.Close() + conn2 := dialWS(t, server) + defer conn2.Close() + + waitForClients(t, h, 2, time.Second) + + // Send an empty message + if err := conn1.WriteMessage(websocket.TextMessage, []byte("")); err != nil { + t.Fatalf("Failed to send empty message: %v", err) + } + + // Client 2 should receive the empty message + conn2.SetReadDeadline(time.Now().Add(time.Second)) + _, msg, err := conn2.ReadMessage() + if err != nil { + t.Fatalf("Failed to read message: %v", err) + } + if string(msg) != "" { + t.Errorf("Expected empty message, got %q", string(msg)) + } +} + +func TestIntegration_LargeMessage(t *testing.T) { + server, h := setupTestServer(t) + defer server.Close() + defer h.Shutdown() + + conn1 := dialWS(t, server) + defer conn1.Close() + conn2 := dialWS(t, server) + defer conn2.Close() + + waitForClients(t, h, 2, time.Second) + + // Send a 64KB message + largeMsg := strings.Repeat("A", 64*1024) + if err := conn1.WriteMessage(websocket.TextMessage, []byte(largeMsg)); err != nil { + t.Fatalf("Failed to send large message: %v", err) + } + + conn2.SetReadDeadline(time.Now().Add(2 * time.Second)) + _, msg, err := conn2.ReadMessage() + if err != nil { + t.Fatalf("Failed to read large message: %v", err) + } + if len(msg) != 64*1024 { + t.Errorf("Expected message length %d, got %d", 64*1024, len(msg)) + } +} diff --git a/internal/hub/hub_test.go b/internal/hub/hub_test.go index e979bdb..59e721f 100644 --- a/internal/hub/hub_test.go +++ b/internal/hub/hub_test.go @@ -16,11 +16,15 @@ func TestNew(t *testing.T) { if h.broadcast == nil { t.Error("broadcast channel not initialized") } + if h.stop == nil { + t.Error("stop channel not initialized") + } } func TestClientCount(t *testing.T) { h := New() go h.Run() + defer h.Shutdown() if count := h.ClientCount(); count != 0 { t.Errorf("Expected 0 clients, got %d", count) @@ -30,6 +34,7 @@ func TestClientCount(t *testing.T) { func TestBroadcastChannel(t *testing.T) { h := New() go h.Run() + defer h.Shutdown() select { case h.broadcast <- []byte("test"): @@ -37,4 +42,26 @@ func TestBroadcastChannel(t *testing.T) { case <-time.After(100 * time.Millisecond): t.Error("broadcast channel blocked") } -} \ No newline at end of file +} + +func TestShutdown(t *testing.T) { + h := New() + + done := make(chan struct{}) + go func() { + h.Run() + close(done) + }() + + // Ensure Run is processing before shutdown + time.Sleep(10 * time.Millisecond) + + h.Shutdown() + + select { + case <-done: + // Hub.Run() returned successfully + case <-time.After(1 * time.Second): + t.Fatal("Hub.Run() did not return after Shutdown") + } +} diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 92c0a4e..d73bd0d 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -11,18 +11,18 @@ var ( Help: "Number of currently connected WebSocket clients", }) - MessagesTotal = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "websocket_message", - Help: "Number of WebSocket messages processed", + MessagesTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "websocket_messages_total", + Help: "Total number of WebSocket messages processed", }) - ConnectionsTotal = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "websocket_connection", - Help: "Number of WebSocket connections established", + ConnectionsTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "websocket_connections_total", + Help: "Total number of WebSocket connections established", }) - DisconnectionsTotal = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "websocket_disconnection", - Help: "Number of WebSocket disconnections", + DisconnectionsTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "websocket_disconnections_total", + Help: "Total number of WebSocket disconnections", }) ) diff --git a/main.go b/main.go index 8ead97f..e0526d6 100644 --- a/main.go +++ b/main.go @@ -1,10 +1,15 @@ package main import ( + "context" "flag" "fmt" "log" "net/http" + "os" + "os/signal" + "syscall" + "time" "websocket-relay/internal/config" "websocket-relay/internal/hub" @@ -25,12 +30,20 @@ func main() { go h.Run() // Start metrics server if enabled + var metricsServer *http.Server if cfg.Metrics.Enabled { + metricsMux := http.NewServeMux() + metricsMux.Handle("/metrics", promhttp.Handler()) + metricsAddr := fmt.Sprintf(":%d", cfg.Metrics.Port) + metricsServer = &http.Server{ + Addr: metricsAddr, + Handler: metricsMux, + } go func() { - metricsAddr := fmt.Sprintf(":%d", cfg.Metrics.Port) log.Printf("Metrics server starting on %s", metricsAddr) - http.Handle("/metrics", promhttp.Handler()) - log.Fatal(http.ListenAndServe(metricsAddr, nil)) + if err := metricsServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Printf("Metrics server error: %v", err) + } }() } @@ -38,11 +51,50 @@ func main() { mux.HandleFunc("/", h.HandleWebSocket) addr := fmt.Sprintf(":%d", cfg.Server.Port) - if cfg.Server.TLS.Enabled { - log.Printf("WebSocket relay server starting on %s (TLS)", addr) - log.Fatal(http.ListenAndServeTLS(addr, cfg.Server.TLS.CertFile, cfg.Server.TLS.KeyFile, mux)) - } else { - log.Printf("WebSocket relay server starting on %s (HTTP)", addr) - log.Fatal(http.ListenAndServe(addr, mux)) + server := &http.Server{ + Addr: addr, + Handler: mux, } + + // Start the main server in a goroutine + go func() { + if cfg.Server.TLS.Enabled { + log.Printf("WebSocket relay server starting on %s (TLS)", addr) + if err := server.ListenAndServeTLS(cfg.Server.TLS.CertFile, cfg.Server.TLS.KeyFile); err != nil && err != http.ErrServerClosed { + log.Fatalf("Server error: %v", err) + } + } else { + log.Printf("WebSocket relay server starting on %s (HTTP)", addr) + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("Server error: %v", err) + } + } + }() + + // Wait for interrupt signal + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + sig := <-quit + log.Printf("Received signal %v, shutting down gracefully...", sig) + + // Create a deadline for the shutdown + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Shut down the main HTTP server (stops accepting new connections) + if err := server.Shutdown(ctx); err != nil { + log.Printf("HTTP server shutdown error: %v", err) + } + + // Shut down the metrics server + if metricsServer != nil { + if err := metricsServer.Shutdown(ctx); err != nil { + log.Printf("Metrics server shutdown error: %v", err) + } + } + + // Stop the hub and close all WebSocket connections + h.Shutdown() + + log.Printf("Server stopped") }