Staff Prep 20: API Design at Scale — Idempotency, ETags & Versioning
ArchitectureStaff

Staff Prep 20: API Design at Scale — Idempotency, ETags & Versioning

April 4, 20269 min readPART 18 / 18

Back to Part 19: Connection Pooling. APIs at scale have to handle unreliable clients, unreliable networks, and unreliable deployments. Idempotency keys prevent duplicate payments when clients retry. ETags prevent unnecessary data transfer. Versioning strategy determines whether your next breaking change is a migration or an outage.

Idempotency keys: preventing duplicate mutations

A client sends a payment request. The network drops after the server processes it but before the response arrives. The client retries. Without idempotency, the payment runs twice. Idempotency keys make mutations safe to retry.

python
from fastapi import Header, HTTPException, Depends
from typing import Optional
import json

async def idempotency_check(
    idempotency_key: Optional[str] = Header(None, alias="Idempotency-Key"),
    redis_client = Depends(get_redis),
):
    """Middleware-style dependency for idempotent endpoints."""
    if not idempotency_key:
        return None  # no key: proceed normally

    cache_key = f"idem:{idempotency_key}"
    cached = await redis_client.get(cache_key)

    if cached:
        # Already processed — return the original response
        data = json.loads(cached)
        raise HTTPException(
            status_code=data["status_code"],
            detail=data["body"],
            headers={"X-Idempotent-Replay": "true"},
        )

    return idempotency_key

@app.post("/payments")
async def create_payment(
    payment: PaymentCreate,
    idempotency_key: Optional[str] = Depends(idempotency_check),
    db: AsyncSession = Depends(get_db),
    redis_client = Depends(get_redis),
):
    # Process payment
    result = await process_payment(payment, db)

    # Store result in Redis (TTL: 24 hours)
    if idempotency_key:
        cache_key = f"idem:{idempotency_key}"
        await redis_client.setex(
            cache_key,
            86400,
            json.dumps({"status_code": 201, "body": result.dict()})
        )

    return result

Etags: conditional requests and cache validation

python
import hashlib
from fastapi import Request, Response
from fastapi.responses import JSONResponse

def compute_etag(data: dict) -> str:
    content = json.dumps(data, sort_keys=True)
    return hashlib.md5(content.encode()).hexdigest()

@app.get("/products/{id}")
async def get_product(
    id: int,
    request: Request,
    db: AsyncSession = Depends(get_db),
):
    product = await db.get(Product, id)
    if not product:
        raise HTTPException(404)

    etag = f'"{compute_etag(product.dict())}"'

    # Check If-None-Match header (client's cached ETag)
    if request.headers.get("If-None-Match") == etag:
        return Response(status_code=304)  # Not Modified — client uses cached version

    response = JSONResponse(content=product.dict())
    response.headers["ETag"] = etag
    response.headers["Cache-Control"] = "max-age=60, must-revalidate"
    return response

# Client flow:
# GET /products/42 → 200 with ETag: "abc123"
# GET /products/42 with If-None-Match: "abc123" → 304 if unchanged (no body sent)
# GET /products/42 with If-None-Match: "abc123" → 200 with new ETag if product changed

API versioning strategies

python
from fastapi import FastAPI, APIRouter

app = FastAPI()

# Strategy 1: URL versioning (most common, most visible)
v1_router = APIRouter(prefix="/v1")
v2_router = APIRouter(prefix="/v2")

@v1_router.get("/users/{id}")
async def get_user_v1(id: int):
    return {"id": id, "name": "John Doe"}  # old response shape

@v2_router.get("/users/{id}")
async def get_user_v2(id: int):
    return {"id": id, "full_name": "John Doe", "avatar_url": "..."}  # new shape

app.include_router(v1_router)
app.include_router(v2_router)

# Strategy 2: Header versioning (cleaner URLs, harder to test in browser)
@app.get("/users/{id}")
async def get_user_versioned(
    id: int,
    api_version: str = Header(default="1", alias="API-Version"),
):
    if api_version == "2":
        return await get_user_v2_impl(id)
    return await get_user_v1_impl(id)

# Strategy 3: Deprecation headers (communicate sunset dates)
@v1_router.get("/users/{id}")
async def get_user_v1_deprecated(id: int):
    response = await get_user_v1_impl(id)
    response.headers["Deprecation"] = "true"
    response.headers["Sunset"] = "Sat, 01 Jan 2027 00:00:00 GMT"
    response.headers["Link"] = '; rel="successor-version"'
    return response

Bulk endpoints: preventing N+1 at the API level

python
from pydantic import BaseModel, Field

class BulkUserRequest(BaseModel):
    ids: list[int] = Field(min_length=1, max_length=100)

class BulkOperationResult(BaseModel):
    succeeded: list[dict]
    failed: list[dict]

@app.post("/users/batch")
async def get_users_bulk(
    request: BulkUserRequest,
    db: AsyncSession = Depends(get_db),
) -> BulkOperationResult:
    # Single query for all IDs (N+1 prevention)
    users = await db.execute(
        select(User).where(User.id.in_(request.ids))
    )
    user_map = {u.id: u for u in users.scalars().all()}

    succeeded = []
    failed = []

    for id_ in request.ids:
        if id_ in user_map:
            succeeded.append(user_map[id_].dict())
        else:
            failed.append({"id": id_, "error": "not_found"})

    return BulkOperationResult(succeeded=succeeded, failed=failed)

# Bulk create with partial success
@app.post("/orders/bulk")
async def create_orders_bulk(orders: list[OrderCreate], db: AsyncSession = Depends(get_db)):
    results = []
    async with db.begin():
        for order in orders:
            try:
                new_order = await create_order(order, db)
                results.append({"status": "created", "id": new_order.id})
            except ValidationError as e:
                results.append({"status": "failed", "error": str(e)})
                # Continue processing remaining orders
    return {"results": results}

Rate limit headers: communicating limits to clients

python
from fastapi import Response

@app.get("/data")
async def get_data(user: User = Depends(get_current_user), response: Response = None):
    rate_info = await get_rate_limit_info(user.id)

    response.headers.update({
        "X-RateLimit-Limit": str(rate_info["limit"]),
        "X-RateLimit-Remaining": str(rate_info["remaining"]),
        "X-RateLimit-Reset": str(rate_info["reset_at"]),  # Unix timestamp
        "X-RateLimit-Window": "60",  # seconds
    })

    return await fetch_data()

Quiz: test your understanding

Before moving on, answer these in your head (or out loud):

  1. A mobile client sends a payment request. The server processes it but the response is lost. The client retries 3 times. Without idempotency keys, what happens? How do keys prevent it?
  2. What HTTP status code do you return when an ETag matches and the resource has not changed? What body do you send?
  3. Compare URL versioning (/v1, /v2) vs header versioning. What are the operational trade-offs for each when running two versions in parallel?
  4. A client needs data for 100 users. They call GET /users/{id} 100 times. How do you design a bulk endpoint that solves this? What are the limits and why?
  5. Your API has GET /users/{id} returning v1 format. You need to add a required field full_name that replaces name. How do you make this change without breaking existing clients?

Next up — Part 21: Postgres MVCC & Concurrency. MVCC internals, visibility rules, write skew, and the transaction isolation model in depth.

← PREV
Staff Prep 19: Connection Pooling & SQLAlchemy — Pool Sizing & Leaks
← All Architecture Posts