Staff Prep 19: Connection Pooling & SQLAlchemy — Pool Sizing & Leaks
ArchitectureStaff

Staff Prep 19: Connection Pooling & SQLAlchemy — Pool Sizing & Leaks

April 4, 20269 min readPART 17 / 18

Back to Part 18: Celery Task Queues. Every Postgres connection is an OS process consuming roughly 5-10MB of RAM. At 500 connections you have burned 5GB before a single row is read. Pool sizing is arithmetic, not vibes. Connection leaks are worse. They accumulate invisibly for weeks and then your database refuses everything at the exact moment you least want it to. (I learned this during a Black Friday I would rather not relive.)

Sqlalchemy async engine configuration

python
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.pool import NullPool, AsyncAdaptedQueuePool

# Standard configuration
engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost:5432/mydb",
    pool_size=20,          # keep 20 connections open permanently
    max_overflow=10,       # allow up to 10 extra under burst (total: 30)
    pool_timeout=30,       # wait up to 30s for a connection from pool
    pool_recycle=3600,     # recycle connections hourly (prevents stale connections)
    pool_pre_ping=True,    # test connection before use (detects dropped connections)
    echo=False,            # set True for SQL query logging (dev only)
)

# Session factory
AsyncSessionLocal = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,  # prevent lazy loading after commit
)

# FastAPI dependency
async def get_db():
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
        # session.close() is called automatically by the context manager

Pool sizing math

The rule: max connections = pool_size + max_overflow per application instance. Total connections = max connections × number of app instances.

python
"""
Scenario:
- 4 Uvicorn workers per app instance
- 3 app instances (3 pods/VMs)
- Each worker creates its own engine pool (pool_size=10, max_overflow=5)

Total connections = 4 workers × 3 instances × (10 + 5) = 180 connections

Postgres max_connections default: 100
Result: CONNECTION EXHAUSTED

Fix: reduce pool_size or route through PgBouncer
- With PgBouncer (transaction mode, default_pool_size=25):
  App connects to PgBouncer with pool_size=50
  PgBouncer maintains 25 connections to Postgres
  Total Postgres connections: 25 (not 180)
"""

# With PgBouncer
engine = create_async_engine(
    "postgresql+asyncpg://user:pass@pgbouncer:6432/mydb",  # connect to PgBouncer
    pool_size=50,
    max_overflow=0,      # no overflow — PgBouncer handles excess by queuing
    pool_recycle=1800,   # shorter recycle time (PgBouncer may close idle connections)
    connect_args={
        "prepared_statement_cache_size": 0,  # disable prepared statements (incompatible with PgBouncer transaction mode)
        "statement_cache_size": 0,
    },
)

pool_pre_ping: detecting stale connections

python
engine = create_async_engine(
    "postgresql+asyncpg://...",
    pool_pre_ping=True,  # before returning a connection from pool, send SELECT 1
    # If the connection is stale (Postgres restarted, network blip), it is discarded
    # and a fresh connection is established
    # Cost: one extra round-trip per connection checkout (~0.2ms)
    # Worth it in production environments where connections can go stale

    pool_recycle=3600,
    # Independently recycles connections older than 1 hour
    # Prevents issues with Postgres's connection timeout settings
    # Does NOT test the connection — use pool_pre_ping for that
)

Detecting and preventing connection leaks

python
from sqlalchemy import event
import asyncio
import logging

# Connection leak: session not properly closed
async def get_user_leaky(user_id: int):
    session = AsyncSessionLocal()  # WRONG: no context manager
    user = await session.get(User, user_id)
    return user  # session never closed — connection held in pool forever

# Proper: always use context manager
async def get_user_safe(user_id: int):
    async with AsyncSessionLocal() as session:
        user = await session.get(User, user_id)
        return user
    # session auto-closed here

# Detecting leaks: monitor pool status
async def log_pool_status():
    pool = engine.pool
    logging.info(
        f"Pool: size={pool.size()}, checked_out={pool.checkedout()}, "
        f"overflow={pool.overflow()}, checked_in={pool.checkedin()}"
    )

# If checkedout keeps growing over time: leak detected
# Monitor this metric in your APM/Prometheus setup

# SQLAlchemy connection pool events for debugging
@event.listens_for(engine.sync_engine, "checkin")
def receive_checkin(dbapi_connection, connection_record):
    pass  # log when connections returned to pool

@event.listens_for(engine.sync_engine, "checkout")
def receive_checkout(dbapi_connection, connection_record, connection_proxy):
    pass  # log when connections checked out

NullPool for short-lived processes

python
from sqlalchemy.pool import NullPool

# NullPool: no pooling — create a new connection for every request
# Use case: Celery workers, CLI scripts, Alembic migrations
# Why: worker processes may run for only seconds; keeping a pool alive wastes connections

celery_engine = create_async_engine(
    "postgresql+asyncpg://...",
    poolclass=NullPool,  # create/destroy connection per use
)

# For Alembic migrations (sync engine)
from sqlalchemy import create_engine

migration_engine = create_engine(
    "postgresql://...",
    poolclass=NullPool,  # migrations run once, no need for a pool
)

Connection timeout handling

python
from sqlalchemy.exc import TimeoutError as SATimeoutError, OperationalError
from fastapi import HTTPException

async def execute_with_timeout(session: AsyncSession, stmt, timeout: float = 5.0):
    try:
        return await asyncio.wait_for(session.execute(stmt), timeout=timeout)
    except asyncio.TimeoutError:
        await session.rollback()
        raise HTTPException(503, "Database query timed out")
    except OperationalError as e:
        # Connection error (DB down, network issue)
        await session.rollback()
        raise HTTPException(503, "Database unavailable")
    except SATimeoutError:
        # Pool timeout: no connection available in pool_timeout seconds
        raise HTTPException(503, "Database connection pool exhausted")

Quiz: test your understanding

Before moving on, answer these in your head (or out loud):

  1. You have 8 Uvicorn workers, pool_size=20, max_overflow=10. How many Postgres connections can your app hold at peak? What happens if Postgres max_connections is 100?
  2. What does pool_pre_ping=True do? What is the cost? When is it essential?
  3. You notice that pool.checkedout() grows by ~10 per hour and never decreases. What is happening? How do you find which code path is leaking?
  4. Why do Celery workers use NullPool instead of a connection pool? What would go wrong with a regular pool in a worker process that restarts frequently?
  5. You switch from direct Postgres connection to PgBouncer in transaction mode. What SQLAlchemy settings must you change, and why?

Next up: Part 20: API Design at Scale. Idempotency keys, ETags, API versioning, and bulk endpoints that do not destroy your database.

← PREV
Staff Prep 18: Task Queues & Celery — Broker, Workers & Idempotency
← All Architecture Posts