Staff Prep 19: Connection Pooling & SQLAlchemy — Pool Sizing & Leaks
ArchitectureStaff

Staff Prep 19: Connection Pooling & SQLAlchemy — Pool Sizing & Leaks

April 4, 20269 min readPART 17 / 18

Back to Part 18: Celery Task Queues. Every Postgres connection is an OS process consuming ~5-10MB RAM. At 500 connections, that is 5GB of RAM before your data is even touched. Pool sizing is not a guess — it is math. And connection leaks are the silent killer: they accumulate invisibly until your database suddenly refuses all new connections under load.

Sqlalchemy async engine configuration

python
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.pool import NullPool, AsyncAdaptedQueuePool

# Standard configuration
engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost:5432/mydb",
    pool_size=20,          # keep 20 connections open permanently
    max_overflow=10,       # allow up to 10 extra under burst (total: 30)
    pool_timeout=30,       # wait up to 30s for a connection from pool
    pool_recycle=3600,     # recycle connections hourly (prevents stale connections)
    pool_pre_ping=True,    # test connection before use (detects dropped connections)
    echo=False,            # set True for SQL query logging (dev only)
)

# Session factory
AsyncSessionLocal = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,  # prevent lazy loading after commit
)

# FastAPI dependency
async def get_db():
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
        # session.close() is called automatically by the context manager

Pool sizing math

The rule: max connections = pool_size + max_overflow per application instance. Total connections = max connections × number of app instances.

python
"""
Scenario:
- 4 Uvicorn workers per app instance
- 3 app instances (3 pods/VMs)
- Each worker creates its own engine pool (pool_size=10, max_overflow=5)

Total connections = 4 workers × 3 instances × (10 + 5) = 180 connections

Postgres max_connections default: 100
Result: CONNECTION EXHAUSTED

Fix: reduce pool_size or route through PgBouncer
- With PgBouncer (transaction mode, default_pool_size=25):
  App connects to PgBouncer with pool_size=50
  PgBouncer maintains 25 connections to Postgres
  Total Postgres connections: 25 (not 180)
"""

# With PgBouncer
engine = create_async_engine(
    "postgresql+asyncpg://user:pass@pgbouncer:6432/mydb",  # connect to PgBouncer
    pool_size=50,
    max_overflow=0,      # no overflow — PgBouncer handles excess by queuing
    pool_recycle=1800,   # shorter recycle time (PgBouncer may close idle connections)
    connect_args={
        "prepared_statement_cache_size": 0,  # disable prepared statements (incompatible with PgBouncer transaction mode)
        "statement_cache_size": 0,
    },
)

pool_pre_ping: detecting stale connections

python
engine = create_async_engine(
    "postgresql+asyncpg://...",
    pool_pre_ping=True,  # before returning a connection from pool, send SELECT 1
    # If the connection is stale (Postgres restarted, network blip), it is discarded
    # and a fresh connection is established
    # Cost: one extra round-trip per connection checkout (~0.2ms)
    # Worth it in production environments where connections can go stale

    pool_recycle=3600,
    # Independently recycles connections older than 1 hour
    # Prevents issues with Postgres's connection timeout settings
    # Does NOT test the connection — use pool_pre_ping for that
)

Detecting and preventing connection leaks

python
from sqlalchemy import event
import asyncio
import logging

# Connection leak: session not properly closed
async def get_user_leaky(user_id: int):
    session = AsyncSessionLocal()  # WRONG: no context manager
    user = await session.get(User, user_id)
    return user  # session never closed — connection held in pool forever

# Proper: always use context manager
async def get_user_safe(user_id: int):
    async with AsyncSessionLocal() as session:
        user = await session.get(User, user_id)
        return user
    # session auto-closed here

# Detecting leaks: monitor pool status
async def log_pool_status():
    pool = engine.pool
    logging.info(
        f"Pool: size={pool.size()}, checked_out={pool.checkedout()}, "
        f"overflow={pool.overflow()}, checked_in={pool.checkedin()}"
    )

# If checkedout keeps growing over time: leak detected
# Monitor this metric in your APM/Prometheus setup

# SQLAlchemy connection pool events for debugging
@event.listens_for(engine.sync_engine, "checkin")
def receive_checkin(dbapi_connection, connection_record):
    pass  # log when connections returned to pool

@event.listens_for(engine.sync_engine, "checkout")
def receive_checkout(dbapi_connection, connection_record, connection_proxy):
    pass  # log when connections checked out

NullPool for short-lived processes

python
from sqlalchemy.pool import NullPool

# NullPool: no pooling — create a new connection for every request
# Use case: Celery workers, CLI scripts, Alembic migrations
# Why: worker processes may run for only seconds; keeping a pool alive wastes connections

celery_engine = create_async_engine(
    "postgresql+asyncpg://...",
    poolclass=NullPool,  # create/destroy connection per use
)

# For Alembic migrations (sync engine)
from sqlalchemy import create_engine

migration_engine = create_engine(
    "postgresql://...",
    poolclass=NullPool,  # migrations run once, no need for a pool
)

Connection timeout handling

python
from sqlalchemy.exc import TimeoutError as SATimeoutError, OperationalError
from fastapi import HTTPException

async def execute_with_timeout(session: AsyncSession, stmt, timeout: float = 5.0):
    try:
        return await asyncio.wait_for(session.execute(stmt), timeout=timeout)
    except asyncio.TimeoutError:
        await session.rollback()
        raise HTTPException(503, "Database query timed out")
    except OperationalError as e:
        # Connection error (DB down, network issue)
        await session.rollback()
        raise HTTPException(503, "Database unavailable")
    except SATimeoutError:
        # Pool timeout: no connection available in pool_timeout seconds
        raise HTTPException(503, "Database connection pool exhausted")

Quiz: test your understanding

Before moving on, answer these in your head (or out loud):

  1. You have 8 Uvicorn workers, pool_size=20, max_overflow=10. How many Postgres connections can your app hold at peak? What happens if Postgres max_connections is 100?
  2. What does pool_pre_ping=True do? What is the cost? When is it essential?
  3. You notice that pool.checkedout() grows by ~10 per hour and never decreases. What is happening? How do you find which code path is leaking?
  4. Why do Celery workers use NullPool instead of a connection pool? What would go wrong with a regular pool in a worker process that restarts frequently?
  5. You switch from direct Postgres connection to PgBouncer in transaction mode. What SQLAlchemy settings must you change, and why?

Next up — Part 20: API Design at Scale. Idempotency keys, ETags, API versioning, and bulk endpoints that do not destroy your database.

← PREV
Staff Prep 18: Task Queues & Celery — Broker, Workers & Idempotency
← All Architecture Posts