FastAPI WebSockets: building real-time features without a separate service
← Back
November 5, 2026Python7 min read

FastAPI WebSockets: building real-time features without a separate service

Published November 5, 20267 min read

I was about to add Socket.IO and a separate Node.js service for real-time notifications in a FastAPI app. Then I found FastAPI's built-in WebSocket support and realized I could handle it in the same service with the same Python code. Here is the complete implementation for a multi-room notification system.

Connection manager for rooms

python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from collections import defaultdict
import json
import asyncio

class ConnectionManager:
    """Manages WebSocket connections grouped by room."""
    
    def __init__(self):
        # room_id -> set of WebSocket connections
        self._rooms: dict[str, set[WebSocket]] = defaultdict(set)
    
    async def connect(self, websocket: WebSocket, room_id: str) -> None:
        await websocket.accept()
        self._rooms[room_id].add(websocket)
    
    def disconnect(self, websocket: WebSocket, room_id: str) -> None:
        self._rooms[room_id].discard(websocket)
        # Cleanup empty rooms
        if not self._rooms[room_id]:
            del self._rooms[room_id]
    
    async def send_to_room(self, room_id: str, message: dict) -> None:
        """Broadcast to all connections in a room."""
        if room_id not in self._rooms:
            return
        
        dead_connections = set()
        payload = json.dumps(message)
        
        for websocket in self._rooms[room_id]:
            try:
                await websocket.send_text(payload)
            except Exception:
                dead_connections.add(websocket)
        
        # Clean up dead connections
        for ws in dead_connections:
            self._rooms[room_id].discard(ws)
    
    async def send_to_socket(self, websocket: WebSocket, message: dict) -> None:
        """Send to a specific connection."""
        await websocket.send_text(json.dumps(message))
    
    def room_size(self, room_id: str) -> int:
        return len(self._rooms.get(room_id, set()))

manager = ConnectionManager()
app = FastAPI()

WebSocket endpoint with auth

python
from fastapi import WebSocket, WebSocketDisconnect, Query
from jose import jwt, JWTError

@app.websocket("/ws/{room_id}")
async def websocket_endpoint(
    websocket: WebSocket,
    room_id: str,
    token: str = Query(...),  # Auth token from query param
):
    # Validate token before accepting connection
    try:
        payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
        user_id = payload["sub"]
    except JWTError:
        await websocket.close(code=4001)  # Custom close code for auth failure
        return
    
    await manager.connect(websocket, room_id)
    
    # Notify room of new connection
    await manager.send_to_room(room_id, {
        "type": "user_joined",
        "user_id": user_id,
        "room_size": manager.room_size(room_id),
    })
    
    try:
        while True:
            # Receive messages from this client
            raw = await websocket.receive_text()
            
            try:
                message = json.loads(raw)
            except json.JSONDecodeError:
                await manager.send_to_socket(websocket, {"type": "error", "message": "Invalid JSON"})
                continue
            
            # Handle different message types
            if message.get("type") == "chat":
                await manager.send_to_room(room_id, {
                    "type": "chat",
                    "user_id": user_id,
                    "text": message.get("text", "")[:500],  # Limit message length
                })
            elif message.get("type") == "ping":
                await manager.send_to_socket(websocket, {"type": "pong"})
    
    except WebSocketDisconnect:
        manager.disconnect(websocket, room_id)
        await manager.send_to_room(room_id, {
            "type": "user_left",
            "user_id": user_id,
        })

Sending from HTTP endpoints to WebSocket clients

python
@app.post("/orders/{order_id}/status")
async def update_order_status(
    order_id: str,
    status: OrderStatus,
    db: AsyncSession = Depends(get_db),
):
    order = await db.orders.update(order_id, status=status)
    
    # Notify the user's room in real-time
    room_id = f"user_{order.user_id}"
    await manager.send_to_room(room_id, {
        "type": "order_update",
        "order_id": order_id,
        "status": status,
    })
    
    return {"order_id": order_id, "status": status}

Client-side reconnection

typescript
class ReconnectingWebSocket {
  private ws: WebSocket | null = null;
  private reconnectDelay = 1000;
  private maxDelay = 30000;
  
  constructor(
    private url: string,
    private onMessage: (data: unknown) => void,
  ) {
    this.connect();
  }
  
  private connect() {
    this.ws = new WebSocket(this.url);
    
    this.ws.onopen = () => {
      console.log('WebSocket connected');
      this.reconnectDelay = 1000; // Reset delay on successful connection
    };
    
    this.ws.onmessage = (event) => {
      const data = JSON.parse(event.data);
      this.onMessage(data);
    };
    
    this.ws.onclose = (event) => {
      if (event.code !== 4001) { // Don't reconnect on auth failure
        console.log(`Reconnecting in ${this.reconnectDelay}ms...`);
        setTimeout(() => this.connect(), this.reconnectDelay);
        this.reconnectDelay = Math.min(this.reconnectDelay * 2, this.maxDelay);
      }
    };
  }
  
  send(data: unknown) {
    this.ws?.send(JSON.stringify(data));
  }
}

This pattern handles everything a real production WebSocket system needs: authentication, rooms, broadcasting, dead connection cleanup, and graceful disconnection. The same FastAPI worker process handles both HTTP requests and WebSocket connections — no additional service needed. For very high scale (50,000+ concurrent connections), you would need a Redis pub/sub layer so that messages can be broadcast across multiple worker processes. At moderate scale, this single-process approach works fine and is dramatically simpler to operate.

Share this
← All Posts7 min read