Background jobs are the janitors of your application. They handle the work that doesn’t need to happen immediately: sending emails, processing uploads, generating reports, syncing data. When they work, nobody notices. When they fail, everyone notices—usually at 3 AM.

Here’s how to build jobs that let you sleep.

The Fundamentals: Idempotency First

Every background job should be safe to run twice. Network hiccups, worker crashes, queue retries—your job will execute more than once eventually.

Bad:

1
2
3
4
5
def send_welcome_email(user_id: int):
    user = get_user(user_id)
    send_email(user.email, "Welcome!")
    # If this crashes after send_email but before completion,
    # retry will send another email

Good:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def send_welcome_email(user_id: int):
    user = get_user(user_id)
    
    # Check if already sent
    if redis.get(f"welcome_email_sent:{user_id}"):
        return  # Already handled
    
    send_email(user.email, "Welcome!")
    
    # Mark as sent (with TTL for cleanup)
    redis.setex(f"welcome_email_sent:{user_id}", 86400 * 30, "1")

The pattern: check before acting, mark after completing. Use a persistent store (Redis, database) for the idempotency key—not in-memory state that dies with the worker.

Timeouts: The Non-Negotiable

Jobs without timeouts are ticking time bombs. A hung HTTP request, a deadlocked database query, a file that never finishes uploading—without timeouts, your worker sits there forever, blocking the queue.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import signal
from contextlib import contextmanager

class JobTimeout(Exception):
    pass

@contextmanager
def timeout(seconds: int):
    def handler(signum, frame):
        raise JobTimeout(f"Job exceeded {seconds}s timeout")
    
    old_handler = signal.signal(signal.SIGALRM, handler)
    signal.alarm(seconds)
    try:
        yield
    finally:
        signal.alarm(0)
        signal.signal(signal.SIGALRM, old_handler)

def process_upload(file_id: int):
    with timeout(300):  # 5 minute max
        file = download_file(file_id)
        processed = transform(file)
        upload_result(processed)

Set timeouts aggressively. A job that usually takes 10 seconds should timeout at 60, not 600. Fast failures are recoverable; hung workers are debugging nightmares.

Retry Strategies: Exponential Backoff + Jitter

When jobs fail, retrying immediately usually fails again. The service is down, the rate limit is hit, the database is overloaded. Hammer it harder and you make things worse.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import random
import time

def calculate_backoff(attempt: int, base: float = 1.0, max_delay: float = 300.0) -> float:
    """Exponential backoff with full jitter."""
    delay = min(base * (2 ** attempt), max_delay)
    return random.uniform(0, delay)

def retry_with_backoff(job_func, max_attempts: int = 5):
    for attempt in range(max_attempts):
        try:
            return job_func()
        except RetryableError as e:
            if attempt == max_attempts - 1:
                raise  # Final attempt, propagate error
            
            delay = calculate_backoff(attempt)
            logger.warning(f"Attempt {attempt + 1} failed, retrying in {delay:.1f}s: {e}")
            time.sleep(delay)

The jitter is crucial. Without it, a thousand jobs that failed at the same time will all retry at the same time, creating a thundering herd that takes down whatever they’re hitting.

Dead Letter Queues: Where Jobs Go to Be Understood

Jobs that fail repeatedly shouldn’t disappear. They should go somewhere you can examine them:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
MAX_RETRIES = 5

async def process_with_dlq(job: dict):
    try:
        await execute_job(job)
    except Exception as e:
        retries = job.get('_retry_count', 0)
        
        if retries < MAX_RETRIES:
            job['_retry_count'] = retries + 1
            job['_last_error'] = str(e)
            job['_retry_at'] = time.time() + calculate_backoff(retries)
            await queue.enqueue('retry_queue', job)
        else:
            # Move to dead letter queue for investigation
            job['_final_error'] = str(e)
            job['_failed_at'] = datetime.utcnow().isoformat()
            await queue.enqueue('dead_letter_queue', job)
            
            # Alert ops
            await alert(f"Job permanently failed after {MAX_RETRIES} attempts: {job['id']}")

Check your DLQ regularly. Each entry is either a bug to fix or an edge case to handle. The goal is an empty DLQ, not a growing one you ignore.

Job Visibility: Know What’s Running

Blind queues are scary queues. You should be able to answer:

  • How many jobs are waiting?
  • How long have they been waiting?
  • Which workers are processing what?
  • What’s the failure rate?
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class ObservableJob:
    def __init__(self, job_id: str, job_type: str):
        self.job_id = job_id
        self.job_type = job_type
        self.start_time = None
    
    async def __aenter__(self):
        self.start_time = time.time()
        await redis.hset(f"running_jobs:{self.job_type}", self.job_id, json.dumps({
            'started_at': self.start_time,
            'worker': WORKER_ID,
        }))
        metrics.gauge('jobs.running', await self.count_running(), tags={'type': self.job_type})
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        duration = time.time() - self.start_time
        await redis.hdel(f"running_jobs:{self.job_type}", self.job_id)
        
        metrics.histogram('job.duration', duration, tags={'type': self.job_type})
        metrics.increment('job.completed', tags={
            'type': self.job_type,
            'status': 'error' if exc_type else 'success'
        })

async def process_email(job: dict):
    async with ObservableJob(job['id'], 'email'):
        await send_email(job['to'], job['subject'], job['body'])

Metrics let you catch problems before they become incidents. A gradually increasing queue depth is a warning; a suddenly empty queue (because workers crashed) is an emergency.

Graceful Shutdown: Don’t Kill Mid-Job

Workers need to handle shutdown signals without corrupting state:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import signal
import asyncio

class GracefulWorker:
    def __init__(self):
        self.should_stop = False
        self.current_job = None
        
        signal.signal(signal.SIGTERM, self._handle_shutdown)
        signal.signal(signal.SIGINT, self._handle_shutdown)
    
    def _handle_shutdown(self, signum, frame):
        logger.info("Shutdown signal received, finishing current job...")
        self.should_stop = True
    
    async def run(self):
        while not self.should_stop:
            job = await queue.dequeue(timeout=5)  # Short timeout to check should_stop
            if job:
                self.current_job = job
                try:
                    await process_job(job)
                finally:
                    self.current_job = None
        
        logger.info("Worker shut down gracefully")

The pattern: stop accepting new work, finish current work, then exit. Kubernetes, Docker, and systemd all send SIGTERM before SIGKILL—use that grace period.

Priority Queues: Not Everything Is Equal

Some jobs matter more than others. User-facing actions should jump ahead of batch processing:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class PriorityQueue:
    PRIORITIES = {
        'critical': 0,   # User is waiting
        'high': 1,       # User will notice soon
        'normal': 2,     # Background work
        'low': 3,        # Batch processing
    }
    
    async def enqueue(self, job: dict, priority: str = 'normal'):
        queue_name = f"jobs:{self.PRIORITIES[priority]}"
        await redis.lpush(queue_name, json.dumps(job))
    
    async def dequeue(self) -> dict | None:
        # Check queues in priority order
        for priority in sorted(self.PRIORITIES.values()):
            result = await redis.rpop(f"jobs:{priority}")
            if result:
                return json.loads(result)
        return None

Be careful with priorities. If high-priority jobs flood in, low-priority jobs starve. Consider separate worker pools for different priorities, or implement fair scheduling.

Batch Processing: Group When You Can

Individual jobs have overhead: queue operations, worker context, connection setup. When processing many similar items, batching wins:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
# Instead of: one job per email
for user in users:
    queue.enqueue({'type': 'send_email', 'user_id': user.id})

# Better: one job for a batch of emails
BATCH_SIZE = 100
for batch in chunks(users, BATCH_SIZE):
    queue.enqueue({
        'type': 'send_email_batch',
        'user_ids': [u.id for u in batch]
    })

async def send_email_batch(job: dict):
    users = await get_users(job['user_ids'])
    
    # Use batch email API if available
    await email_service.send_batch([
        {'to': u.email, 'template': 'welcome'}
        for u in users
    ])

Batching reduces queue operations by 100x in this example. The tradeoff: if a batch fails, you retry all of it. For truly independent items, individual jobs with idempotency might be better.

The Checklist

Before deploying background jobs:

  • Idempotent: safe to run twice
  • Timeouts: won’t hang forever
  • Retries: exponential backoff with jitter
  • Dead letter queue: failures are captured
  • Visibility: metrics on queue depth, duration, failure rate
  • Graceful shutdown: handles SIGTERM cleanly
  • Priority: critical jobs don’t wait behind batch work
  • Alerting: you know when the queue is unhealthy

Background jobs are infrastructure. They should be as boring and reliable as your database. Build them right once, and you get to sleep through the night.