FastAPI WebSockets: building real-time features without a separate service
I was about to add Socket.IO and a separate Node.js service for real-time notifications in a FastAPI app. Then I found FastAPI's built-in WebSocket support and realized I could handle it in the same service with the same Python code. Here is the complete implementation for a multi-room notification system.
Connection manager for rooms
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from collections import defaultdict
import json
import asyncio
class ConnectionManager:
"""Manages WebSocket connections grouped by room."""
def __init__(self):
# room_id -> set of WebSocket connections
self._rooms: dict[str, set[WebSocket]] = defaultdict(set)
async def connect(self, websocket: WebSocket, room_id: str) -> None:
await websocket.accept()
self._rooms[room_id].add(websocket)
def disconnect(self, websocket: WebSocket, room_id: str) -> None:
self._rooms[room_id].discard(websocket)
# Cleanup empty rooms
if not self._rooms[room_id]:
del self._rooms[room_id]
async def send_to_room(self, room_id: str, message: dict) -> None:
"""Broadcast to all connections in a room."""
if room_id not in self._rooms:
return
dead_connections = set()
payload = json.dumps(message)
for websocket in self._rooms[room_id]:
try:
await websocket.send_text(payload)
except Exception:
dead_connections.add(websocket)
# Clean up dead connections
for ws in dead_connections:
self._rooms[room_id].discard(ws)
async def send_to_socket(self, websocket: WebSocket, message: dict) -> None:
"""Send to a specific connection."""
await websocket.send_text(json.dumps(message))
def room_size(self, room_id: str) -> int:
return len(self._rooms.get(room_id, set()))
manager = ConnectionManager()
app = FastAPI()
WebSocket endpoint with auth
from fastapi import WebSocket, WebSocketDisconnect, Query
from jose import jwt, JWTError
@app.websocket("/ws/{room_id}")
async def websocket_endpoint(
websocket: WebSocket,
room_id: str,
token: str = Query(...), # Auth token from query param
):
# Validate token before accepting connection
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
user_id = payload["sub"]
except JWTError:
await websocket.close(code=4001) # Custom close code for auth failure
return
await manager.connect(websocket, room_id)
# Notify room of new connection
await manager.send_to_room(room_id, {
"type": "user_joined",
"user_id": user_id,
"room_size": manager.room_size(room_id),
})
try:
while True:
# Receive messages from this client
raw = await websocket.receive_text()
try:
message = json.loads(raw)
except json.JSONDecodeError:
await manager.send_to_socket(websocket, {"type": "error", "message": "Invalid JSON"})
continue
# Handle different message types
if message.get("type") == "chat":
await manager.send_to_room(room_id, {
"type": "chat",
"user_id": user_id,
"text": message.get("text", "")[:500], # Limit message length
})
elif message.get("type") == "ping":
await manager.send_to_socket(websocket, {"type": "pong"})
except WebSocketDisconnect:
manager.disconnect(websocket, room_id)
await manager.send_to_room(room_id, {
"type": "user_left",
"user_id": user_id,
})
Sending from HTTP endpoints to WebSocket clients
@app.post("/orders/{order_id}/status")
async def update_order_status(
order_id: str,
status: OrderStatus,
db: AsyncSession = Depends(get_db),
):
order = await db.orders.update(order_id, status=status)
# Notify the user's room in real-time
room_id = f"user_{order.user_id}"
await manager.send_to_room(room_id, {
"type": "order_update",
"order_id": order_id,
"status": status,
})
return {"order_id": order_id, "status": status}
Client-side reconnection
class ReconnectingWebSocket {
private ws: WebSocket | null = null;
private reconnectDelay = 1000;
private maxDelay = 30000;
constructor(
private url: string,
private onMessage: (data: unknown) => void,
) {
this.connect();
}
private connect() {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log('WebSocket connected');
this.reconnectDelay = 1000; // Reset delay on successful connection
};
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data);
this.onMessage(data);
};
this.ws.onclose = (event) => {
if (event.code !== 4001) { // Don't reconnect on auth failure
console.log(`Reconnecting in ${this.reconnectDelay}ms...`);
setTimeout(() => this.connect(), this.reconnectDelay);
this.reconnectDelay = Math.min(this.reconnectDelay * 2, this.maxDelay);
}
};
}
send(data: unknown) {
this.ws?.send(JSON.stringify(data));
}
}
This pattern handles everything a real production WebSocket system needs: authentication, rooms, broadcasting, dead connection cleanup, and graceful disconnection. The same FastAPI worker process handles both HTTP requests and WebSocket connections — no additional service needed. For very high scale (50,000+ concurrent connections), you would need a Redis pub/sub layer so that messages can be broadcast across multiple worker processes. At moderate scale, this single-process approach works fine and is dramatically simpler to operate.