Staff Prep 20: API Design at Scale — Idempotency, ETags & Versioning
ArchitectureStaff

Staff Prep 20: API Design at Scale — Idempotency, ETags & Versioning

April 4, 20269 min readPART 18 / 18

Back to Part 19: Connection Pooling. APIs at scale deal with flaky clients and flakier networks. Idempotency keys save you the day a customer gets charged twice. ETags save bandwidth you did not realise you were burning. And a real versioning strategy is the difference between shipping a breaking change as a quiet migration versus an angry Slack thread at 11pm. Most of what follows is boring by design, which is the whole point.

Idempotency keys: preventing duplicate mutations

A client sends a payment request. The server processes it, then the network drops before the response gets back. The client retries. Without idempotency, the payment runs twice and your support queue lights up. Keys make mutations safe to retry by giving each attempt a stable identifier the server can deduplicate on.

python
from fastapi import Header, HTTPException, Depends
from typing import Optional
import json

async def idempotency_check(
    idempotency_key: Optional[str] = Header(None, alias="Idempotency-Key"),
    redis_client = Depends(get_redis),
):
    """Middleware-style dependency for idempotent endpoints."""
    if not idempotency_key:
        return None  # no key: proceed normally

    cache_key = f"idem:{idempotency_key}"
    cached = await redis_client.get(cache_key)

    if cached:
        # Already processed — return the original response
        data = json.loads(cached)
        raise HTTPException(
            status_code=data["status_code"],
            detail=data["body"],
            headers={"X-Idempotent-Replay": "true"},
        )

    return idempotency_key

@app.post("/payments")
async def create_payment(
    payment: PaymentCreate,
    idempotency_key: Optional[str] = Depends(idempotency_check),
    db: AsyncSession = Depends(get_db),
    redis_client = Depends(get_redis),
):
    # Process payment
    result = await process_payment(payment, db)

    # Store result in Redis (TTL: 24 hours)
    if idempotency_key:
        cache_key = f"idem:{idempotency_key}"
        await redis_client.setex(
            cache_key,
            86400,
            json.dumps({"status_code": 201, "body": result.dict()})
        )

    return result

Etags: conditional requests and cache validation

python
import hashlib
from fastapi import Request, Response
from fastapi.responses import JSONResponse

def compute_etag(data: dict) -> str:
    content = json.dumps(data, sort_keys=True)
    return hashlib.md5(content.encode()).hexdigest()

@app.get("/products/{id}")
async def get_product(
    id: int,
    request: Request,
    db: AsyncSession = Depends(get_db),
):
    product = await db.get(Product, id)
    if not product:
        raise HTTPException(404)

    etag = f'"{compute_etag(product.dict())}"'

    # Check If-None-Match header (client's cached ETag)
    if request.headers.get("If-None-Match") == etag:
        return Response(status_code=304)  # Not Modified — client uses cached version

    response = JSONResponse(content=product.dict())
    response.headers["ETag"] = etag
    response.headers["Cache-Control"] = "max-age=60, must-revalidate"
    return response

# Client flow:
# GET /products/42 → 200 with ETag: "abc123"
# GET /products/42 with If-None-Match: "abc123" → 304 if unchanged (no body sent)
# GET /products/42 with If-None-Match: "abc123" → 200 with new ETag if product changed

API versioning strategies

python
from fastapi import FastAPI, APIRouter

app = FastAPI()

# Strategy 1: URL versioning (most common, most visible)
v1_router = APIRouter(prefix="/v1")
v2_router = APIRouter(prefix="/v2")

@v1_router.get("/users/{id}")
async def get_user_v1(id: int):
    return {"id": id, "name": "John Doe"}  # old response shape

@v2_router.get("/users/{id}")
async def get_user_v2(id: int):
    return {"id": id, "full_name": "John Doe", "avatar_url": "..."}  # new shape

app.include_router(v1_router)
app.include_router(v2_router)

# Strategy 2: Header versioning (cleaner URLs, harder to test in browser)
@app.get("/users/{id}")
async def get_user_versioned(
    id: int,
    api_version: str = Header(default="1", alias="API-Version"),
):
    if api_version == "2":
        return await get_user_v2_impl(id)
    return await get_user_v1_impl(id)

# Strategy 3: Deprecation headers (communicate sunset dates)
@v1_router.get("/users/{id}")
async def get_user_v1_deprecated(id: int):
    response = await get_user_v1_impl(id)
    response.headers["Deprecation"] = "true"
    response.headers["Sunset"] = "Sat, 01 Jan 2027 00:00:00 GMT"
    response.headers["Link"] = '; rel="successor-version"'
    return response

Bulk endpoints: preventing N+1 at the API level

python
from pydantic import BaseModel, Field

class BulkUserRequest(BaseModel):
    ids: list[int] = Field(min_length=1, max_length=100)

class BulkOperationResult(BaseModel):
    succeeded: list[dict]
    failed: list[dict]

@app.post("/users/batch")
async def get_users_bulk(
    request: BulkUserRequest,
    db: AsyncSession = Depends(get_db),
) -> BulkOperationResult:
    # Single query for all IDs (N+1 prevention)
    users = await db.execute(
        select(User).where(User.id.in_(request.ids))
    )
    user_map = {u.id: u for u in users.scalars().all()}

    succeeded = []
    failed = []

    for id_ in request.ids:
        if id_ in user_map:
            succeeded.append(user_map[id_].dict())
        else:
            failed.append({"id": id_, "error": "not_found"})

    return BulkOperationResult(succeeded=succeeded, failed=failed)

# Bulk create with partial success
@app.post("/orders/bulk")
async def create_orders_bulk(orders: list[OrderCreate], db: AsyncSession = Depends(get_db)):
    results = []
    async with db.begin():
        for order in orders:
            try:
                new_order = await create_order(order, db)
                results.append({"status": "created", "id": new_order.id})
            except ValidationError as e:
                results.append({"status": "failed", "error": str(e)})
                # Continue processing remaining orders
    return {"results": results}

Rate limit headers: communicating limits to clients

python
from fastapi import Response

@app.get("/data")
async def get_data(user: User = Depends(get_current_user), response: Response = None):
    rate_info = await get_rate_limit_info(user.id)

    response.headers.update({
        "X-RateLimit-Limit": str(rate_info["limit"]),
        "X-RateLimit-Remaining": str(rate_info["remaining"]),
        "X-RateLimit-Reset": str(rate_info["reset_at"]),  # Unix timestamp
        "X-RateLimit-Window": "60",  # seconds
    })

    return await fetch_data()

Quiz: test your understanding

Before moving on, answer these in your head (or out loud):

  1. A mobile client sends a payment request. The server processes it but the response is lost. The client retries 3 times. Without idempotency keys, what happens? How do keys prevent it?
  2. What HTTP status code do you return when an ETag matches and the resource has not changed? What body do you send?
  3. Compare URL versioning (/v1, /v2) vs header versioning. What are the operational trade-offs for each when running two versions in parallel?
  4. A client needs data for 100 users. They call GET /users/{id} 100 times. How do you design a bulk endpoint that solves this? What are the limits and why?
  5. Your API has GET /users/{id} returning v1 format. You need to add a required field full_name that replaces name. How do you make this change without breaking existing clients?

Next up: Part 21: Postgres MVCC & Concurrency. MVCC internals, visibility rules, write skew, and the transaction isolation model in depth.

← PREV
Staff Prep 19: Connection Pooling & SQLAlchemy — Pool Sizing & Leaks
← All Architecture Posts