Staff Prep 18: Task Queues & Celery — Broker, Workers & Idempotency
ArchitectureStaff

Staff Prep 18: Task Queues & Celery — Broker, Workers & Idempotency

April 4, 20269 min readPART 16 / 18

Back to Part 17: FastAPI Internals. Celery is the standard Python task queue. It handles retries, scheduling, routing, monitoring and error handling in one package. It also causes more production incidents than it prevents when you leave the defaults alone, which most teams do. This post is about the settings I wish someone had screamed at me about on day one.

Celery architecture: broker vs backend

Celery has two storage layers with different responsibilities:

  • The broker stores the task queue. Workers pull tasks from here (Redis or RabbitMQ). If the broker loses data, the tasks are gone.
  • The backend, or result store, holds task results and status. You only need it if you poll task outcomes via AsyncResult. Skip it for fire-and-forget tasks.
python
from celery import Celery

app = Celery(
    "myapp",
    broker="redis://redis:6379/0",          # task queue
    backend="redis://redis:6379/1",         # result store (optional)
    include=["myapp.tasks.email", "myapp.tasks.reports"],
)

app.conf.update(
    # Serialisation
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    task_compression="gzip",  # compress large payloads

    # Reliability
    task_acks_late=True,
    task_reject_on_worker_lost=True,
    worker_prefetch_multiplier=1,   # critical: pull one task at a time

    # Timeouts
    task_soft_time_limit=300,       # sends SoftTimeLimitExceeded after 5 min
    task_time_limit=360,            # kills worker after 6 min (hard limit)

    # Results
    result_expires=3600,            # auto-expire results after 1 hour
    task_ignore_result=True,        # skip result storage for fire-and-forget tasks
)

worker_prefetch_multiplier: the most important setting

The default prefetch_multiplier=4 means each worker thread prefetches four tasks up front. For long-running tasks this is disastrous. One slow worker sits on four tasks while your other workers starve. I have debugged this exact scenario at 2am on a launch weekend. Set it to 1 unless your tasks are genuinely short.

python
from celery import Celery
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

# Critical configuration for long-running tasks
app.conf.worker_prefetch_multiplier = 1
# Each worker pulls exactly 1 task, processes it, then pulls the next
# Fair distribution across workers regardless of task duration

# For fast, short tasks (< 100ms each):
# app.conf.worker_prefetch_multiplier = 4  # OK: overhead of pulling tasks matters more

@app.task(
    bind=True,
    max_retries=3,
    default_retry_delay=30,  # 30 seconds between retries
    queue="reports",
    soft_time_limit=300,
    time_limit=360,
)
def generate_report(self, report_id: int):
    try:
        logger.info(f"Starting report {report_id}, attempt {self.request.retries + 1}")
        result = _generate_report_internal(report_id)
        return result
    except SoftTimeLimitExceeded:
        logger.warning(f"Report {report_id} timed out — saving partial state")
        save_partial_state(report_id)
        raise  # do not retry time limit exceeded
    except ReportDataError as e:
        logger.error(f"Report {report_id} data error: {e}")
        raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
        # Exponential-ish backoff: 60s, 120s, 180s

Idempotency: the rule every task must follow

python
import hashlib

# Non-idempotent: dangerous with acks_late
@app.task
def send_welcome_email_bad(user_id: int, email: str):
    send_email(email, "Welcome!")
    # If this task runs twice (worker crash + re-queue), user gets 2 emails

# Idempotent: safe to run multiple times
@app.task(acks_late=True)
def send_welcome_email_safe(user_id: int, email: str):
    # Check if already sent
    key = f"welcome_sent:{user_id}"
    if redis_client.get(key):
        logger.info(f"Welcome email already sent to user {user_id}, skipping")
        return

    send_email(email, "Welcome!")

    # Mark as sent (with TTL to handle edge cases)
    redis_client.setex(key, 86400, "1")  # expires after 24h

# Database-backed idempotency (stronger guarantee)
@app.task(acks_late=True)
def process_payment(order_id: int, amount: float):
    with db.transaction():
        # SELECT FOR UPDATE prevents concurrent processing
        order = db.execute(
            "SELECT * FROM orders WHERE id = $1 AND status = 'pending' FOR UPDATE",
            order_id
        ).fetchone()

        if not order:
            logger.info(f"Order {order_id} already processed or not found")
            return

        # Process payment
        charge_card(order["payment_method_id"], amount)

        # Update status atomically
        db.execute(
            "UPDATE orders SET status = 'paid' WHERE id = $1", order_id
        )

Queue routing and priority

python
from celery import Celery

app.conf.task_routes = {
    "myapp.tasks.email.*": {"queue": "email"},
    "myapp.tasks.reports.*": {"queue": "reports"},
    "myapp.tasks.payments.*": {"queue": "critical"},
}

# Start dedicated workers per queue
# celery -A myapp worker -Q critical --concurrency=4 --prefetch-multiplier=1
# celery -A myapp worker -Q email --concurrency=8 --prefetch-multiplier=4
# celery -A myapp worker -Q reports --concurrency=2 --prefetch-multiplier=1

# Dynamic routing per task call
process_payment.apply_async(args=[order_id], queue="critical", priority=9)
send_newsletter.apply_async(args=[campaign_id], queue="email", countdown=60)

# ETA scheduling
from datetime import datetime, timedelta
send_reminder.apply_async(
    args=[user_id],
    eta=datetime.utcnow() + timedelta(hours=24)
)

Celery beat: scheduled tasks

python
from celery.schedules import crontab

app.conf.beat_schedule = {
    "daily-report": {
        "task": "myapp.tasks.reports.daily_summary",
        "schedule": crontab(hour=7, minute=0),  # 7 AM daily
        "args": [],
    },
    "cleanup-expired-sessions": {
        "task": "myapp.tasks.auth.cleanup_sessions",
        "schedule": 3600,  # every hour (seconds)
    },
    "sync-analytics": {
        "task": "myapp.tasks.analytics.sync",
        "schedule": crontab(minute="*/5"),  # every 5 minutes
    },
}

# Start beat scheduler (run ONE instance only — not per worker)
# celery -A myapp beat --loglevel=info
# celery -A myapp worker --beat  # combined (dev only)

Quiz: test your understanding

Before moving on, answer these in your head (or out loud):

  1. What is the difference between Celery's broker and backend? Which is required? What happens if the backend is unavailable?
  2. You have a task that processes video files (takes 10 minutes each). You have 4 workers, prefetch_multiplier=4. A slow task is assigned to worker 1. What happens to the other 3 tasks prefetched by worker 1? How do you fix this?
  3. A payment task with acks_late=True charges a card, then the worker crashes before setting status to "paid". The task is re-queued and runs again. The card gets charged twice. How do you prevent this?
  4. What is task_soft_time_limit vs task_time_limit? What can you do in the soft limit handler that you cannot do after the hard limit?
  5. You need to run a cleanup job every day at 3 AM UTC. How do you configure Celery Beat? What happens if you run two Celery Beat instances simultaneously?

Next up: Part 19: Connection Pooling & SQLAlchemy. pool_size math, async SQLAlchemy, and PgBouncer mode compatibility.

← PREV
Staff Prep 17: FastAPI Internals — Routing, DI, and Performance Tuning
← All Architecture Posts