Background jobs are the janitors of your application. They handle the work that doesn’t need to happen immediately: sending emails, processing uploads, generating reports, syncing data. When they work, nobody notices. When they fail, everyone notices—usually at 3 AM.
Here’s how to build jobs that let you sleep.
The Fundamentals: Idempotency First#
Every background job should be safe to run twice. Network hiccups, worker crashes, queue retries—your job will execute more than once eventually.
Bad:
1
2
3
4
5
| def send_welcome_email(user_id: int):
user = get_user(user_id)
send_email(user.email, "Welcome!")
# If this crashes after send_email but before completion,
# retry will send another email
|
Good:
1
2
3
4
5
6
7
8
9
10
11
| def send_welcome_email(user_id: int):
user = get_user(user_id)
# Check if already sent
if redis.get(f"welcome_email_sent:{user_id}"):
return # Already handled
send_email(user.email, "Welcome!")
# Mark as sent (with TTL for cleanup)
redis.setex(f"welcome_email_sent:{user_id}", 86400 * 30, "1")
|
The pattern: check before acting, mark after completing. Use a persistent store (Redis, database) for the idempotency key—not in-memory state that dies with the worker.
Timeouts: The Non-Negotiable#
Jobs without timeouts are ticking time bombs. A hung HTTP request, a deadlocked database query, a file that never finishes uploading—without timeouts, your worker sits there forever, blocking the queue.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| import signal
from contextlib import contextmanager
class JobTimeout(Exception):
pass
@contextmanager
def timeout(seconds: int):
def handler(signum, frame):
raise JobTimeout(f"Job exceeded {seconds}s timeout")
old_handler = signal.signal(signal.SIGALRM, handler)
signal.alarm(seconds)
try:
yield
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
def process_upload(file_id: int):
with timeout(300): # 5 minute max
file = download_file(file_id)
processed = transform(file)
upload_result(processed)
|
Set timeouts aggressively. A job that usually takes 10 seconds should timeout at 60, not 600. Fast failures are recoverable; hung workers are debugging nightmares.
Retry Strategies: Exponential Backoff + Jitter#
When jobs fail, retrying immediately usually fails again. The service is down, the rate limit is hit, the database is overloaded. Hammer it harder and you make things worse.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| import random
import time
def calculate_backoff(attempt: int, base: float = 1.0, max_delay: float = 300.0) -> float:
"""Exponential backoff with full jitter."""
delay = min(base * (2 ** attempt), max_delay)
return random.uniform(0, delay)
def retry_with_backoff(job_func, max_attempts: int = 5):
for attempt in range(max_attempts):
try:
return job_func()
except RetryableError as e:
if attempt == max_attempts - 1:
raise # Final attempt, propagate error
delay = calculate_backoff(attempt)
logger.warning(f"Attempt {attempt + 1} failed, retrying in {delay:.1f}s: {e}")
time.sleep(delay)
|
The jitter is crucial. Without it, a thousand jobs that failed at the same time will all retry at the same time, creating a thundering herd that takes down whatever they’re hitting.
Dead Letter Queues: Where Jobs Go to Be Understood#
Jobs that fail repeatedly shouldn’t disappear. They should go somewhere you can examine them:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| MAX_RETRIES = 5
async def process_with_dlq(job: dict):
try:
await execute_job(job)
except Exception as e:
retries = job.get('_retry_count', 0)
if retries < MAX_RETRIES:
job['_retry_count'] = retries + 1
job['_last_error'] = str(e)
job['_retry_at'] = time.time() + calculate_backoff(retries)
await queue.enqueue('retry_queue', job)
else:
# Move to dead letter queue for investigation
job['_final_error'] = str(e)
job['_failed_at'] = datetime.utcnow().isoformat()
await queue.enqueue('dead_letter_queue', job)
# Alert ops
await alert(f"Job permanently failed after {MAX_RETRIES} attempts: {job['id']}")
|
Check your DLQ regularly. Each entry is either a bug to fix or an edge case to handle. The goal is an empty DLQ, not a growing one you ignore.
Job Visibility: Know What’s Running#
Blind queues are scary queues. You should be able to answer:
- How many jobs are waiting?
- How long have they been waiting?
- Which workers are processing what?
- What’s the failure rate?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| class ObservableJob:
def __init__(self, job_id: str, job_type: str):
self.job_id = job_id
self.job_type = job_type
self.start_time = None
async def __aenter__(self):
self.start_time = time.time()
await redis.hset(f"running_jobs:{self.job_type}", self.job_id, json.dumps({
'started_at': self.start_time,
'worker': WORKER_ID,
}))
metrics.gauge('jobs.running', await self.count_running(), tags={'type': self.job_type})
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
duration = time.time() - self.start_time
await redis.hdel(f"running_jobs:{self.job_type}", self.job_id)
metrics.histogram('job.duration', duration, tags={'type': self.job_type})
metrics.increment('job.completed', tags={
'type': self.job_type,
'status': 'error' if exc_type else 'success'
})
async def process_email(job: dict):
async with ObservableJob(job['id'], 'email'):
await send_email(job['to'], job['subject'], job['body'])
|
Metrics let you catch problems before they become incidents. A gradually increasing queue depth is a warning; a suddenly empty queue (because workers crashed) is an emergency.
Graceful Shutdown: Don’t Kill Mid-Job#
Workers need to handle shutdown signals without corrupting state:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| import signal
import asyncio
class GracefulWorker:
def __init__(self):
self.should_stop = False
self.current_job = None
signal.signal(signal.SIGTERM, self._handle_shutdown)
signal.signal(signal.SIGINT, self._handle_shutdown)
def _handle_shutdown(self, signum, frame):
logger.info("Shutdown signal received, finishing current job...")
self.should_stop = True
async def run(self):
while not self.should_stop:
job = await queue.dequeue(timeout=5) # Short timeout to check should_stop
if job:
self.current_job = job
try:
await process_job(job)
finally:
self.current_job = None
logger.info("Worker shut down gracefully")
|
The pattern: stop accepting new work, finish current work, then exit. Kubernetes, Docker, and systemd all send SIGTERM before SIGKILL—use that grace period.
Priority Queues: Not Everything Is Equal#
Some jobs matter more than others. User-facing actions should jump ahead of batch processing:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| class PriorityQueue:
PRIORITIES = {
'critical': 0, # User is waiting
'high': 1, # User will notice soon
'normal': 2, # Background work
'low': 3, # Batch processing
}
async def enqueue(self, job: dict, priority: str = 'normal'):
queue_name = f"jobs:{self.PRIORITIES[priority]}"
await redis.lpush(queue_name, json.dumps(job))
async def dequeue(self) -> dict | None:
# Check queues in priority order
for priority in sorted(self.PRIORITIES.values()):
result = await redis.rpop(f"jobs:{priority}")
if result:
return json.loads(result)
return None
|
Be careful with priorities. If high-priority jobs flood in, low-priority jobs starve. Consider separate worker pools for different priorities, or implement fair scheduling.
Batch Processing: Group When You Can#
Individual jobs have overhead: queue operations, worker context, connection setup. When processing many similar items, batching wins:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| # Instead of: one job per email
for user in users:
queue.enqueue({'type': 'send_email', 'user_id': user.id})
# Better: one job for a batch of emails
BATCH_SIZE = 100
for batch in chunks(users, BATCH_SIZE):
queue.enqueue({
'type': 'send_email_batch',
'user_ids': [u.id for u in batch]
})
async def send_email_batch(job: dict):
users = await get_users(job['user_ids'])
# Use batch email API if available
await email_service.send_batch([
{'to': u.email, 'template': 'welcome'}
for u in users
])
|
Batching reduces queue operations by 100x in this example. The tradeoff: if a batch fails, you retry all of it. For truly independent items, individual jobs with idempotency might be better.
The Checklist#
Before deploying background jobs:
Background jobs are infrastructure. They should be as boring and reliable as your database. Build them right once, and you get to sleep through the night.