FastAPI 103: API Design — Pagination, Filtering & Error Handling at Scale
FastAPIProduction

FastAPI 103: API Design — Pagination, Filtering & Error Handling at Scale

March 31, 202611 min readPART 03 / 18

In Part 2 we covered the async vs sync model — what actually runs on the event loop and what gets offloaded to a thread pool. Now we're one layer higher: how do you design the API surface itself to be correct, consistent, and survivable at scale? Pagination, filtering, and error handling are the three places where teams make decisions early that become impossible to change later. This is Part 3.

The pagination trap: why offset breaks

Offset-based pagination is the first thing everyone reaches for. It maps cleanly to UI concepts ("page 1", "page 2") and is dead simple to implement.

@app.get("/orders")
async def list_orders_bad(page: int = 1, page_size: int = 50, db: AsyncSession = Depends(get_db)):
    offset = (page - 1) * page_size
    result = await db.execute(
        select(Order).order_by(Order.created_at.desc()).offset(offset).limit(page_size)
    )
    return result.scalars().all()

This works fine for the first few months. Then two problems emerge simultaneously:

Problem 1 — Performance. OFFSET 1000 doesn't skip 1000 rows — it reads and discards 1000 rows. At OFFSET 50000, Postgres is scanning 50,000 rows to throw them away. On a 1M row table with real traffic, your p99 latency for late pages hits seconds.

EXPLAIN ANALYZE
SELECT * FROM orders ORDER BY created_at DESC OFFSET 50000 LIMIT 50;

Limit  (cost=4821.00..4825.83 rows=50 width=128)
       (actual time=1243.821..1244.109 rows=50)
  ->  Sort  (cost=4321.00..4571.00 rows=100000 width=128)
            (actual time=981.423..1191.734 rows=50050)  ← scanned 50,050 rows
        Sort Key: created_at DESC
        Sort Method: external merge  Disk: 12384kB
Planning Time: 0.4 ms
Execution Time: 1244.3 ms  ← 1.2 seconds for page 1001

Problem 2 — Consistency. If a new order is inserted while a user is paginating, every subsequent row shifts. Page 2 now returns a row the user already saw on page 1. Page 3 silently drops a row entirely. No error, no warning — just wrong data.

Initial state:          After new insert:
Page 1: rows 1-50       Page 1: rows 1-50 (new row 0 pushed in)
Page 2: rows 51-100     Page 2: rows 50-99  ← row 50 repeated!
                                              row 100 dropped

Cursor pagination: how to do it right

Cursor pagination replaces "skip N rows" with "start after this specific row." The cursor is an opaque token the client passes back — typically a base64-encoded value of the last row's sort key.

import base64, json
from typing import Optional, Any
from fastapi import Query
from sqlalchemy import select

def encode_cursor(id: int) -> str:
    return base64.urlsafe_b64encode(json.dumps({"id": id}).encode()).decode()

def decode_cursor(cursor: str) -> int:
    return json.loads(base64.urlsafe_b64decode(cursor.encode()))["id"]

@app.get("/orders")
async def list_orders(
    after: Optional[str] = None,           # opaque cursor
    limit: int = Query(default=50, le=500), # always bound limit
    db: AsyncSession = Depends(get_db),
):
    query = select(Order).order_by(Order.id.asc())

    if after:
        last_id = decode_cursor(after)
        query = query.where(Order.id > last_id)

    query = query.limit(limit + 1)  # fetch one extra to check if next page exists

    rows = (await db.execute(query)).scalars().all()
    has_more = len(rows) > limit
    items = rows[:limit]

    return {
        "items": items,
        "next_cursor": encode_cursor(items[-1].id) if has_more else None,
        "has_more": has_more,
    }

The + 1 trick is important: instead of running a COUNT query to check if there are more pages, just fetch one extra row. If you get limit + 1 rows back, there's a next page. Slice to limit before returning.

The non-unique Cursor problem

Using created_at as a cursor breaks when two rows share the same timestamp. The query WHERE created_at > '2024-01-15 12:00:00' will skip all rows with that exact timestamp, not just the one you already returned.

Fix: use a composite cursor — sort key plus unique tiebreaker:

def encode_cursor(created_at: datetime, id: int) -> str:
    return base64.urlsafe_b64encode(
        json.dumps({"created_at": created_at.isoformat(), "id": id}).encode()
    ).decode()

# Query uses (created_at, id) tuple comparison:
query = query.where(
    or_(
        Order.created_at > cursor_created_at,
        and_(Order.created_at == cursor_created_at, Order.id > cursor_id)
    )
).order_by(Order.created_at.asc(), Order.id.asc())

Now rows with identical timestamps are ordered by ID, and the cursor is always unambiguous.

Filtering: pydantic query params done right

Don't parse query parameters manually. Declare a Pydantic model and inject it as a dependency — FastAPI will parse, validate, and coerce types before your handler runs.

from pydantic import BaseModel, Field
from typing import Optional
from datetime import datetime
from enum import Enum

class OrderStatus(str, Enum):
    pending = "pending"
    processing = "processing"
    completed = "completed"
    cancelled = "cancelled"

class OrderFilters(BaseModel):
    status: Optional[OrderStatus] = None     # validated enum — rejects "PENDING"
    user_id: Optional[int] = None
    created_after: Optional[datetime] = None
    created_before: Optional[datetime] = None
    min_amount: Optional[float] = Field(None, ge=0)
    after: Optional[str] = None              # cursor
    limit: int = Field(default=50, ge=1, le=500)

@app.get("/orders")
async def list_orders(
    filters: OrderFilters = Depends(),
    db: AsyncSession = Depends(get_db),
):
    query = select(Order).order_by(Order.id.asc())

    if filters.status:
        query = query.where(Order.status == filters.status)
    if filters.user_id:
        query = query.where(Order.user_id == filters.user_id)
    if filters.created_after:
        query = query.where(Order.created_at >= filters.created_after)
    if filters.created_before:
        query = query.where(Order.created_at <= filters.created_before)
    if filters.min_amount is not None:
        query = query.where(Order.amount >= filters.min_amount)
    if filters.after:
        query = query.where(Order.id > decode_cursor(filters.after))

    query = query.limit(filters.limit + 1)
    # ... rest of pagination logic

Adding a new filter like max_amount next month is one line in OrderFilters and two lines in the handler. Existing callers who don't send it get None — the filter simply doesn't apply. Zero breaking changes.

Error handling: one shape everywhere

FastAPI's default error format for validation errors looks like this:

{
  "detail": [
    {
      "loc": ["query", "limit"],
      "msg": "value is not a valid integer",
      "type": "type_error.integer"
    }
  ]
}

Your HTTP 404s probably look like {"detail": "Not found"}. Your 500s might return a raw Python exception string. Your custom business logic errors return something else entirely. Every client has to handle all of these differently.

The fix: override the exception handlers once at app startup to enforce a single response shape.

from fastapi import FastAPI, Request
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
import logging

logger = logging.getLogger(__name__)

app = FastAPI()

# ── Custom application error ────────────────────────────────────────────────
class AppError(Exception):
    def __init__(self, code: str, message: str, status: int = 400):
        self.code = code
        self.message = message
        self.status = status

# ── Handlers ────────────────────────────────────────────────────────────────
@app.exception_handler(RequestValidationError)
async def validation_handler(request: Request, exc: RequestValidationError):
    return JSONResponse(
        status_code=422,
        content={
            "error": "validation_error",
            "message": "Request validation failed",
            "details": exc.errors(),
        },
    )

@app.exception_handler(AppError)
async def app_error_handler(request: Request, exc: AppError):
    return JSONResponse(
        status_code=exc.status,
        content={"error": exc.code, "message": exc.message},
    )

@app.exception_handler(Exception)
async def unhandled_handler(request: Request, exc: Exception):
    logger.exception("Unhandled exception on %s %s", request.method, request.url)
    return JSONResponse(
        status_code=500,
        content={
            "error": "internal_error",
            "message": "Something went wrong",
            # Never: "message": str(exc) — leaks stack traces and internals
        },
    )

Now every error in the system, from validation failures to database exceptions to business logic errors, returns the same shape. Frontend can write one error handler instead of five.

Raising AppError from anywhere

@app.get("/orders/{order_id}")
async def get_order(order_id: int, db: AsyncSession = Depends(get_db),
                    user: User = Depends(get_current_user)):
    order = await db.get(Order, order_id)
    if not order:
        raise AppError("order_not_found", f"No order with ID {order_id}", status=404)
    if order.user_id != user.id:
        raise AppError("forbidden", "You don't have access to this order", status=403)
    return order

Clean, readable handlers. No try/except scattered everywhere. One place to log, monitor, and adjust error responses across the entire API.

Putting it together: a complete endpoint

from fastapi import FastAPI, Depends, Query
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import BaseModel, Field
from typing import Optional, Any
import base64, json

app = FastAPI()

class OrderFilters(BaseModel):
    status: Optional[str] = None
    user_id: Optional[int] = None
    limit: int = Field(default=50, ge=1, le=500)
    after: Optional[str] = None

class CursorPage(BaseModel):
    items: list[Any]
    next_cursor: Optional[str]
    has_more: bool
    count: int  # count of items in this page (not total — total requires a COUNT query)

@app.get("/orders", response_model=CursorPage)
async def list_orders(
    filters: OrderFilters = Depends(),
    db: AsyncSession = Depends(get_db),
    user: User = Depends(require_auth),
):
    query = select(Order).where(Order.tenant_id == user.tenant_id)  # always scope by tenant

    if filters.status:
        query = query.where(Order.status == filters.status)
    if filters.user_id:
        query = query.where(Order.user_id == filters.user_id)
    if filters.after:
        query = query.where(Order.id > decode_cursor(filters.after))

    query = query.order_by(Order.id.asc()).limit(filters.limit + 1)

    rows = (await db.execute(query)).scalars().all()
    has_more = len(rows) > filters.limit
    items = rows[:filters.limit]

    return CursorPage(
        items=items,
        next_cursor=encode_cursor(items[-1].id) if has_more else None,
        has_more=has_more,
        count=len(items),
    )

Common mistakes summary

  • Offset pagination at scale. You'll discover the problem at 1M rows. Too late to change without a migration + client updates.
  • No upper bound on limit. ?limit=100000 is a valid URL. Without le=500 in your Pydantic model, one bad actor (or one bug) takes down your DB.
  • Returning str(exc) to clients. Exposes file paths, SQL queries, internal service names. OWASP calls this "Security Misconfiguration."
  • Different error shapes per route. Every new shape is a new branch in every frontend client's error handling code.
  • Cursor on a non-unique field. Two rows with identical created_at → skipped results. Always use a unique tiebreaker in the sort.
  • No has_more in response. Clients have to make an extra request to discover they've reached the end, rather than stopping when has_more: false.

Part 3 done. Next up — Part 4: Rate Limiting & Throttling. Token bucket vs sliding window, per-user vs global limits, and where to enforce them — at the gateway, in middleware, or in application code.

← PREV
FastAPI 102: Async vs Sync — What Actually Happens
← All FastAPI Posts