Staff Prep 10: Background Processing — In-Process vs Celery vs Workers
ArchitectureStaff

Staff Prep 10: Background Processing — In-Process vs Celery vs Workers

April 4, 20269 min readPART 08 / 18

Back to Part 09: async vs sync. "Send the email after the request returns" sounds simple. It is not. In-process background tasks lose work on process restart. Celery adds a broker, worker processes, and retry semantics. The right choice depends on your tolerance for data loss, your ops complexity budget, and your job volume. Let us go through every option honestly.

Option 1: FastAPI BackgroundTasks — simple but fire-and-forget

BackgroundTasks run after the response is sent, in the same worker process. No broker, no queue, no retry. If the worker crashes after the response but before the task completes, the task is lost.

python
from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

async def send_welcome_email(user_email: str, user_name: str):
    # This runs after the HTTP response is sent
    await email_client.send(
        to=user_email,
        subject="Welcome!",
        body=f"Hello {user_name}"
    )

@app.post("/users")
async def create_user(user_data: UserCreate, background_tasks: BackgroundTasks):
    user = await db.create_user(user_data)

    # Schedule background task (non-blocking — runs after response)
    background_tasks.add_task(send_welcome_email, user.email, user.name)

    return user  # Response sent immediately; email sends after

Use BackgroundTasks for:

  • Non-critical side effects where losing the task occasionally is acceptable
  • Audit logging, analytics events, cache invalidation
  • Tasks that take milliseconds to seconds, not minutes

Do NOT use for: anything that must complete, anything the user will notice if it fails silently.

Option 2: celery — reliable distributed task queue

Celery persists task state in a broker (Redis or RabbitMQ). Workers are separate processes that pick up tasks independently. Tasks survive process crashes. Built-in retry, rate limiting, priority queues, ETA scheduling.

python
from celery import Celery
import redis

# celery_app.py
celery = Celery(
    "tasks",
    broker="redis://localhost:6379/0",   # task queue
    backend="redis://localhost:6379/1",  # result storage
)

celery.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    task_acks_late=True,     # ACK after task completes, not when received
    task_reject_on_worker_lost=True,  # re-queue if worker crashes mid-task
    worker_prefetch_multiplier=1,     # pull one task at a time (fair distribution)
)

@celery.task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,  # retry after 60 seconds
    autoretry_for=(Exception,),
)
def send_welcome_email(self, user_id: int, user_email: str):
    try:
        email_client.send_sync(
            to=user_email,
            subject="Welcome!",
        )
    except EmailServiceError as e:
        raise self.retry(exc=e, countdown=2 ** self.request.retries * 30)
        # Exponential backoff: 30s, 60s, 120s

# FastAPI endpoint: enqueue task instead of running it
@app.post("/users")
async def create_user(user_data: UserCreate):
    user = await db.create_user(user_data)

    # Enqueue: send to Redis broker (non-blocking)
    send_welcome_email.delay(user.id, user.email)

    return user

Celery: the acks_late pattern for reliability

By default, Celery ACKs (acknowledges) tasks when the worker receives them, not when they complete. If the worker crashes mid-execution, the task is lost — it was already ACKed and removed from the queue. acks_late=True fixes this: the task is ACKed only after it completes successfully.

python
celery.conf.update(
    task_acks_late=True,              # ACK after completion
    task_reject_on_worker_lost=True,  # re-queue (NACK) if worker dies mid-task
)

# With acks_late: if a worker crashes mid-task,
# the task returns to the queue and another worker picks it up

# WARNING: acks_late requires idempotent tasks
# The task may run MORE than once if a worker crashes after completing
# but before sending the ACK (rare but possible)

@celery.task(acks_late=True)
def process_payment(order_id: int, amount: float):
    # Check if already processed (idempotency)
    if payment_already_processed(order_id):
        return  # safe to call multiple times

    charge_card(order_id, amount)
    mark_as_processed(order_id)

Task isolation: separate queues for different priorities

python
from celery import Celery

celery = Celery("tasks", broker="redis://localhost:6379/0")

# Route different tasks to different queues
celery.conf.task_routes = {
    "tasks.send_email": {"queue": "email"},       # low priority
    "tasks.process_payment": {"queue": "critical"}, # high priority
    "tasks.generate_report": {"queue": "reports"}, # slow, resource-heavy
}

# Start workers for specific queues
# High-priority: 4 workers watching critical queue
# celery -A tasks worker --queues critical --concurrency 4

# Background reports: 1 worker, single concurrency
# celery -A tasks worker --queues reports --concurrency 1

@celery.task(queue="critical")
def process_payment(order_id: int):
    pass

@celery.task(queue="email")
def send_order_confirmation(order_id: int, email: str):
    pass

Option 3: arq — async-native celery alternative

Celery uses synchronous workers. If your tasks use async code (asyncpg, aiohttp), Celery forces you to run an event loop inside each task — awkward. arq is a Redis-backed task queue built entirely on asyncio.

python
import arq
import asyncpg

async def send_email(ctx: dict, user_id: int, email: str):
    # ctx contains the worker's shared resources (DB pool, Redis, etc.)
    db: asyncpg.Pool = ctx["db"]
    user = await db.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
    await email_client.send_async(to=email, subject=f"Hello {user['name']}")

class WorkerSettings:
    functions = [send_email]
    redis_settings = arq.connections.RedisSettings(host="localhost", port=6379)
    max_jobs = 20  # concurrent async jobs per worker

    async def on_startup(ctx):
        ctx["db"] = await asyncpg.create_pool("postgresql://...")

    async def on_shutdown(ctx):
        await ctx["db"].close()

# Enqueue from FastAPI
@app.post("/users")
async def create_user(user_data: UserCreate, arq_redis=Depends(get_arq_redis)):
    user = await db.create_user(user_data)
    await arq_redis.enqueue_job("send_email", user.id, user.email)
    return user

Decision matrix

Choose your background processing pattern based on these criteria:

  • Fire-and-forget, no retry needed, sub-5s: FastAPI BackgroundTasks
  • Must complete, retry on failure, sync workers: Celery + Redis/RabbitMQ
  • Must complete, retry on failure, async tasks: arq
  • Very high throughput, ordering guarantees: Kafka + dedicated consumers
  • Scheduled jobs (cron-style): Celery Beat or APScheduler

Quiz: test your understanding

Before moving on, answer these in your head (or out loud):

  1. A FastAPI BackgroundTask sends an order confirmation email. The worker process crashes 50ms after the HTTP response is sent, before the email function runs. What happens? What would you change?
  2. What is task_acks_late=True in Celery? What risk does it introduce, and how do you mitigate it?
  3. Why is worker_prefetch_multiplier=1 the recommended setting for long-running tasks? What is the default and why is it a problem?
  4. Your Celery worker crashes mid-task while processing a payment. With default settings (acks_early), what happens? With acks_late=True + reject_on_worker_lost=True?
  5. You are building a system that sends 50,000 emails per hour. Single Celery worker with concurrency=1 can send 10/min. Walk through how you would scale this to meet the requirement.

Next up — Part 11: API Design — Pagination & Filtering. Cursor vs offset pagination, and why offset breaks at scale.

← PREV
Staff Prep 09: async vs sync in Python — When Async Actually Helps
← All Architecture Posts