Staff Prep 18: Task Queues & Celery — Broker, Workers & Idempotency
ArchitectureStaff

Staff Prep 18: Task Queues & Celery — Broker, Workers & Idempotency

April 4, 20269 min readPART 16 / 18

Back to Part 17: FastAPI Internals. Celery is the standard Python task queue for a reason: it handles retries, scheduling, routing, monitoring, and error handling in one package. But misconfigured Celery causes more production incidents than it prevents. This is the complete guide to configuring it correctly.

Celery architecture: broker vs backend

Celery has two storage layers with different responsibilities:

  • Broker: Stores the task queue. Workers pull tasks from here. Redis or RabbitMQ. Must be reliable — if the broker loses data, tasks are lost.
  • Backend (Result Store): Stores task results and status. Only needed if you use AsyncResult to poll task outcomes. Optional — skip it if you do not need results.
python
from celery import Celery

app = Celery(
    "myapp",
    broker="redis://redis:6379/0",          # task queue
    backend="redis://redis:6379/1",         # result store (optional)
    include=["myapp.tasks.email", "myapp.tasks.reports"],
)

app.conf.update(
    # Serialisation
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    task_compression="gzip",  # compress large payloads

    # Reliability
    task_acks_late=True,
    task_reject_on_worker_lost=True,
    worker_prefetch_multiplier=1,   # critical: pull one task at a time

    # Timeouts
    task_soft_time_limit=300,       # sends SoftTimeLimitExceeded after 5 min
    task_time_limit=360,            # kills worker after 6 min (hard limit)

    # Results
    result_expires=3600,            # auto-expire results after 1 hour
    task_ignore_result=True,        # skip result storage for fire-and-forget tasks
)

worker_prefetch_multiplier: the most important setting

The default prefetch_multiplier=4 means each worker thread prefetches 4 tasks at once. For long-running tasks, this is catastrophic: one slow worker holds 4 tasks that other workers could have processed. Fast workers sit idle waiting for more tasks.

python
from celery import Celery
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

# Critical configuration for long-running tasks
app.conf.worker_prefetch_multiplier = 1
# Each worker pulls exactly 1 task, processes it, then pulls the next
# Fair distribution across workers regardless of task duration

# For fast, short tasks (< 100ms each):
# app.conf.worker_prefetch_multiplier = 4  # OK: overhead of pulling tasks matters more

@app.task(
    bind=True,
    max_retries=3,
    default_retry_delay=30,  # 30 seconds between retries
    queue="reports",
    soft_time_limit=300,
    time_limit=360,
)
def generate_report(self, report_id: int):
    try:
        logger.info(f"Starting report {report_id}, attempt {self.request.retries + 1}")
        result = _generate_report_internal(report_id)
        return result
    except SoftTimeLimitExceeded:
        logger.warning(f"Report {report_id} timed out — saving partial state")
        save_partial_state(report_id)
        raise  # do not retry time limit exceeded
    except ReportDataError as e:
        logger.error(f"Report {report_id} data error: {e}")
        raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
        # Exponential-ish backoff: 60s, 120s, 180s

Idempotency: the rule every task must follow

python
import hashlib

# Non-idempotent: dangerous with acks_late
@app.task
def send_welcome_email_bad(user_id: int, email: str):
    send_email(email, "Welcome!")
    # If this task runs twice (worker crash + re-queue), user gets 2 emails

# Idempotent: safe to run multiple times
@app.task(acks_late=True)
def send_welcome_email_safe(user_id: int, email: str):
    # Check if already sent
    key = f"welcome_sent:{user_id}"
    if redis_client.get(key):
        logger.info(f"Welcome email already sent to user {user_id}, skipping")
        return

    send_email(email, "Welcome!")

    # Mark as sent (with TTL to handle edge cases)
    redis_client.setex(key, 86400, "1")  # expires after 24h

# Database-backed idempotency (stronger guarantee)
@app.task(acks_late=True)
def process_payment(order_id: int, amount: float):
    with db.transaction():
        # SELECT FOR UPDATE prevents concurrent processing
        order = db.execute(
            "SELECT * FROM orders WHERE id = $1 AND status = 'pending' FOR UPDATE",
            order_id
        ).fetchone()

        if not order:
            logger.info(f"Order {order_id} already processed or not found")
            return

        # Process payment
        charge_card(order["payment_method_id"], amount)

        # Update status atomically
        db.execute(
            "UPDATE orders SET status = 'paid' WHERE id = $1", order_id
        )

Queue routing and priority

python
from celery import Celery

app.conf.task_routes = {
    "myapp.tasks.email.*": {"queue": "email"},
    "myapp.tasks.reports.*": {"queue": "reports"},
    "myapp.tasks.payments.*": {"queue": "critical"},
}

# Start dedicated workers per queue
# celery -A myapp worker -Q critical --concurrency=4 --prefetch-multiplier=1
# celery -A myapp worker -Q email --concurrency=8 --prefetch-multiplier=4
# celery -A myapp worker -Q reports --concurrency=2 --prefetch-multiplier=1

# Dynamic routing per task call
process_payment.apply_async(args=[order_id], queue="critical", priority=9)
send_newsletter.apply_async(args=[campaign_id], queue="email", countdown=60)

# ETA scheduling
from datetime import datetime, timedelta
send_reminder.apply_async(
    args=[user_id],
    eta=datetime.utcnow() + timedelta(hours=24)
)

Celery beat: scheduled tasks

python
from celery.schedules import crontab

app.conf.beat_schedule = {
    "daily-report": {
        "task": "myapp.tasks.reports.daily_summary",
        "schedule": crontab(hour=7, minute=0),  # 7 AM daily
        "args": [],
    },
    "cleanup-expired-sessions": {
        "task": "myapp.tasks.auth.cleanup_sessions",
        "schedule": 3600,  # every hour (seconds)
    },
    "sync-analytics": {
        "task": "myapp.tasks.analytics.sync",
        "schedule": crontab(minute="*/5"),  # every 5 minutes
    },
}

# Start beat scheduler (run ONE instance only — not per worker)
# celery -A myapp beat --loglevel=info
# celery -A myapp worker --beat  # combined (dev only)

Quiz: test your understanding

Before moving on, answer these in your head (or out loud):

  1. What is the difference between Celery's broker and backend? Which is required? What happens if the backend is unavailable?
  2. You have a task that processes video files (takes 10 minutes each). You have 4 workers, prefetch_multiplier=4. A slow task is assigned to worker 1. What happens to the other 3 tasks prefetched by worker 1? How do you fix this?
  3. A payment task with acks_late=True charges a card, then the worker crashes before setting status to "paid". The task is re-queued and runs again. The card gets charged twice. How do you prevent this?
  4. What is task_soft_time_limit vs task_time_limit? What can you do in the soft limit handler that you cannot do after the hard limit?
  5. You need to run a cleanup job every day at 3 AM UTC. How do you configure Celery Beat? What happens if you run two Celery Beat instances simultaneously?

Next up — Part 19: Connection Pooling & SQLAlchemy. pool_size math, async SQLAlchemy, and PgBouncer mode compatibility.

← PREV
Staff Prep 17: FastAPI Internals — Routing, DI, and Performance Tuning
← All Architecture Posts