Every production system eventually needs background jobs. Email notifications, report generation, data syncing, webhook processing—the work that can’t (or shouldn’t) happen during a user request. Here’s what I’ve learned about making them reliable.
The Naive Approach (And Why It Breaks)#
Most developers start with something like this:
1
2
3
4
5
| @app.route('/signup')
def signup():
user = create_user(request.form)
send_welcome_email(user) # Blocks the response
return redirect('/dashboard')
|
This works until it doesn’t. The email service has a 5-second timeout. Now your signup page feels broken. Or the email service is down, and signups fail entirely.
Pattern 1: Fire-and-Forget Queues#
The first fix is usually a task queue:
1
2
3
4
5
6
7
8
9
10
11
12
| from celery import Celery
@app.route('/signup')
def signup():
user = create_user(request.form)
send_welcome_email.delay(user.id) # Returns immediately
return redirect('/dashboard')
@celery.task
def send_welcome_email(user_id):
user = User.get(user_id)
email_service.send(user.email, "Welcome!")
|
Key insight: Pass IDs, not objects. The user object might change between enqueue and execution. Fresh data from the database is safer.
Pattern 2: Idempotent Jobs#
Jobs fail. Networks blip. Workers crash. Your job might run twice:
1
2
3
4
5
6
7
8
9
10
11
| @celery.task
def process_payment(payment_id):
payment = Payment.get(payment_id)
# Idempotency check
if payment.processed_at:
return # Already done, safe to skip
result = stripe.charge(payment.amount)
payment.processed_at = datetime.now()
payment.save()
|
The rule: Design every job so running it twice produces the same result as running it once. This usually means checking state before acting.
Pattern 3: Outbox Pattern for Reliability#
What if your database write succeeds but the queue message fails? Or vice versa?
1
2
3
4
5
6
7
| -- Instead of writing to a queue, write to your database
CREATE TABLE outbox (
id SERIAL PRIMARY KEY,
payload JSONB NOT NULL,
processed_at TIMESTAMP,
created_at TIMESTAMP DEFAULT NOW()
);
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| def signup():
with transaction():
user = create_user(request.form)
db.execute("""
INSERT INTO outbox (payload)
VALUES (%s)
""", json.dumps({
'type': 'welcome_email',
'user_id': user.id
}))
return redirect('/dashboard')
# Separate worker polls the outbox
def outbox_worker():
while True:
jobs = db.query("SELECT * FROM outbox WHERE processed_at IS NULL LIMIT 10")
for job in jobs:
process_job(job)
db.execute("UPDATE outbox SET processed_at = NOW() WHERE id = %s", job.id)
sleep(1)
|
Why this works: Your job and your data are in the same transaction. Either both happen or neither does.
Pattern 4: Dead Letter Queues#
Jobs will fail. The question is: what happens next?
1
2
3
4
5
6
7
8
9
10
11
12
13
| @celery.task(bind=True, max_retries=3)
def risky_job(self, data):
try:
external_api.call(data)
except TransientError as e:
raise self.retry(exc=e, countdown=60) # Retry in 1 min
except PermanentError:
# Move to dead letter queue for investigation
dead_letter_queue.put({
'original_data': data,
'error': str(e),
'failed_at': datetime.now()
})
|
Best practice:
- Retry on transient failures (network timeouts, rate limits)
- Dead-letter on permanent failures (invalid data, auth errors)
- Alert humans when the dead letter queue grows
Pattern 5: Scheduled Jobs with Cron#
Some work needs to happen on a schedule:
1
2
3
4
5
6
7
8
9
10
11
| # celery beat configuration
app.conf.beat_schedule = {
'daily-report': {
'task': 'tasks.generate_daily_report',
'schedule': crontab(hour=6, minute=0), # 6 AM
},
'cleanup-old-sessions': {
'task': 'tasks.cleanup_sessions',
'schedule': crontab(minute='*/15'), # Every 15 min
},
}
|
Gotcha: Make these idempotent too. If your scheduler fires twice (clock skew, restart), your job should handle it gracefully.
Pattern 6: Job Batching#
Processing 100,000 records one job at a time is slow. Batching helps:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| @celery.task
def process_batch(user_ids):
users = User.query.filter(User.id.in_(user_ids)).all()
for user in users:
# Process in memory, one DB query
process_user(user)
# Bulk update
db.execute("""
UPDATE users SET processed = TRUE
WHERE id = ANY(%s)
""", user_ids)
def enqueue_all_users():
user_ids = [u.id for u in User.query.all()]
for batch in chunks(user_ids, 100): # 100 per job
process_batch.delay(batch)
|
The tradeoff: Larger batches = fewer jobs = less overhead, but also longer individual job times and coarser error handling.
Monitoring That Matters#
The jobs you can’t see are the jobs that break silently:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| # Track job metrics
from prometheus_client import Counter, Histogram
job_duration = Histogram('job_duration_seconds', 'Job duration', ['job_name'])
job_failures = Counter('job_failures_total', 'Failed jobs', ['job_name'])
@celery.task
def instrumented_job():
with job_duration.labels('instrumented_job').time():
try:
do_work()
except Exception as e:
job_failures.labels('instrumented_job').inc()
raise
|
Alerts to set up:
- Queue depth growing faster than processing
- Job duration exceeding SLA
- Dead letter queue size > 0
- Worker count dropping
The Simplest Thing That Works#
If you’re just starting: use your database as a queue (outbox pattern) with a polling worker. It’s not the most performant, but it’s transactional, debuggable, and works with tools you already have.
Graduate to Redis/RabbitMQ/SQS when:
- You need sub-second latency
- Your job volume exceeds what polling can handle
- You need advanced features (priorities, routing, delayed messages)
Background jobs are infrastructure you’ll live with for years. Make them boring, observable, and idempotent—your future self will thank you.