Staff Prep 18: Task Queues & Celery — Broker, Workers & Idempotency
Back to Part 17: FastAPI Internals. Celery is the standard Python task queue for a reason: it handles retries, scheduling, routing, monitoring, and error handling in one package. But misconfigured Celery causes more production incidents than it prevents. This is the complete guide to configuring it correctly.
Celery architecture: broker vs backend
Celery has two storage layers with different responsibilities:
- Broker: Stores the task queue. Workers pull tasks from here. Redis or RabbitMQ. Must be reliable — if the broker loses data, tasks are lost.
- Backend (Result Store): Stores task results and status. Only needed if you
use
AsyncResultto poll task outcomes. Optional — skip it if you do not need results.
from celery import Celery
app = Celery(
"myapp",
broker="redis://redis:6379/0", # task queue
backend="redis://redis:6379/1", # result store (optional)
include=["myapp.tasks.email", "myapp.tasks.reports"],
)
app.conf.update(
# Serialisation
task_serializer="json",
result_serializer="json",
accept_content=["json"],
task_compression="gzip", # compress large payloads
# Reliability
task_acks_late=True,
task_reject_on_worker_lost=True,
worker_prefetch_multiplier=1, # critical: pull one task at a time
# Timeouts
task_soft_time_limit=300, # sends SoftTimeLimitExceeded after 5 min
task_time_limit=360, # kills worker after 6 min (hard limit)
# Results
result_expires=3600, # auto-expire results after 1 hour
task_ignore_result=True, # skip result storage for fire-and-forget tasks
)
worker_prefetch_multiplier: the most important setting
The default prefetch_multiplier=4 means each worker thread prefetches 4 tasks at once.
For long-running tasks, this is catastrophic: one slow worker holds 4 tasks that other workers
could have processed. Fast workers sit idle waiting for more tasks.
from celery import Celery
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
# Critical configuration for long-running tasks
app.conf.worker_prefetch_multiplier = 1
# Each worker pulls exactly 1 task, processes it, then pulls the next
# Fair distribution across workers regardless of task duration
# For fast, short tasks (< 100ms each):
# app.conf.worker_prefetch_multiplier = 4 # OK: overhead of pulling tasks matters more
@app.task(
bind=True,
max_retries=3,
default_retry_delay=30, # 30 seconds between retries
queue="reports",
soft_time_limit=300,
time_limit=360,
)
def generate_report(self, report_id: int):
try:
logger.info(f"Starting report {report_id}, attempt {self.request.retries + 1}")
result = _generate_report_internal(report_id)
return result
except SoftTimeLimitExceeded:
logger.warning(f"Report {report_id} timed out — saving partial state")
save_partial_state(report_id)
raise # do not retry time limit exceeded
except ReportDataError as e:
logger.error(f"Report {report_id} data error: {e}")
raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
# Exponential-ish backoff: 60s, 120s, 180s
Idempotency: the rule every task must follow
import hashlib
# Non-idempotent: dangerous with acks_late
@app.task
def send_welcome_email_bad(user_id: int, email: str):
send_email(email, "Welcome!")
# If this task runs twice (worker crash + re-queue), user gets 2 emails
# Idempotent: safe to run multiple times
@app.task(acks_late=True)
def send_welcome_email_safe(user_id: int, email: str):
# Check if already sent
key = f"welcome_sent:{user_id}"
if redis_client.get(key):
logger.info(f"Welcome email already sent to user {user_id}, skipping")
return
send_email(email, "Welcome!")
# Mark as sent (with TTL to handle edge cases)
redis_client.setex(key, 86400, "1") # expires after 24h
# Database-backed idempotency (stronger guarantee)
@app.task(acks_late=True)
def process_payment(order_id: int, amount: float):
with db.transaction():
# SELECT FOR UPDATE prevents concurrent processing
order = db.execute(
"SELECT * FROM orders WHERE id = $1 AND status = 'pending' FOR UPDATE",
order_id
).fetchone()
if not order:
logger.info(f"Order {order_id} already processed or not found")
return
# Process payment
charge_card(order["payment_method_id"], amount)
# Update status atomically
db.execute(
"UPDATE orders SET status = 'paid' WHERE id = $1", order_id
)
Queue routing and priority
from celery import Celery
app.conf.task_routes = {
"myapp.tasks.email.*": {"queue": "email"},
"myapp.tasks.reports.*": {"queue": "reports"},
"myapp.tasks.payments.*": {"queue": "critical"},
}
# Start dedicated workers per queue
# celery -A myapp worker -Q critical --concurrency=4 --prefetch-multiplier=1
# celery -A myapp worker -Q email --concurrency=8 --prefetch-multiplier=4
# celery -A myapp worker -Q reports --concurrency=2 --prefetch-multiplier=1
# Dynamic routing per task call
process_payment.apply_async(args=[order_id], queue="critical", priority=9)
send_newsletter.apply_async(args=[campaign_id], queue="email", countdown=60)
# ETA scheduling
from datetime import datetime, timedelta
send_reminder.apply_async(
args=[user_id],
eta=datetime.utcnow() + timedelta(hours=24)
)
Celery beat: scheduled tasks
from celery.schedules import crontab
app.conf.beat_schedule = {
"daily-report": {
"task": "myapp.tasks.reports.daily_summary",
"schedule": crontab(hour=7, minute=0), # 7 AM daily
"args": [],
},
"cleanup-expired-sessions": {
"task": "myapp.tasks.auth.cleanup_sessions",
"schedule": 3600, # every hour (seconds)
},
"sync-analytics": {
"task": "myapp.tasks.analytics.sync",
"schedule": crontab(minute="*/5"), # every 5 minutes
},
}
# Start beat scheduler (run ONE instance only — not per worker)
# celery -A myapp beat --loglevel=info
# celery -A myapp worker --beat # combined (dev only)
Quiz: test your understanding
Before moving on, answer these in your head (or out loud):
- What is the difference between Celery's broker and backend? Which is required? What happens if the backend is unavailable?
- You have a task that processes video files (takes 10 minutes each). You have 4 workers, prefetch_multiplier=4. A slow task is assigned to worker 1. What happens to the other 3 tasks prefetched by worker 1? How do you fix this?
- A payment task with
acks_late=Truecharges a card, then the worker crashes before setting status to "paid". The task is re-queued and runs again. The card gets charged twice. How do you prevent this? - What is
task_soft_time_limitvstask_time_limit? What can you do in the soft limit handler that you cannot do after the hard limit? - You need to run a cleanup job every day at 3 AM UTC. How do you configure Celery Beat? What happens if you run two Celery Beat instances simultaneously?
Next up — Part 19: Connection Pooling & SQLAlchemy. pool_size math, async SQLAlchemy, and PgBouncer mode compatibility.