Some work doesn’t belong in a web request. Sending emails, processing uploads, generating reports, syncing with external APIs — these tasks are too slow, too unreliable, or too resource-intensive to run while a user waits.

Background jobs solve this by moving work out of the request cycle and into a separate processing system.

The Basic Architecture

WebAppReQsuuelutesWorkers
  1. Producer: Web app enqueues jobs
  2. Queue: Stores jobs until workers are ready
  3. Workers: Process jobs independently
  4. Results: Optional storage for job outcomes

Choosing a Queue Backend

Redis (with Sidekiq, Bull, Celery)

1
2
3
4
5
6
7
8
9
# Celery with Redis
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def send_email(user_id, template):
    user = get_user(user_id)
    email_service.send(user.email, template)

Pros: Fast, simple, good ecosystem Cons: Not durable by default (can lose jobs on crash)

PostgreSQL (with LISTEN/NOTIFY or polling)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# Simple polling approach
def enqueue(job_type, payload):
    db.execute("""
        INSERT INTO jobs (type, payload, status, created_at)
        VALUES (%s, %s, 'pending', NOW())
    """, (job_type, json.dumps(payload)))

def fetch_job():
    return db.execute("""
        UPDATE jobs SET status = 'processing', started_at = NOW()
        WHERE id = (
            SELECT id FROM jobs 
            WHERE status = 'pending' 
            ORDER BY created_at 
            FOR UPDATE SKIP LOCKED
            LIMIT 1
        )
        RETURNING *
    """).fetchone()

Pros: Durable, transactional with your data, no extra infrastructure Cons: Slower, requires careful locking

Dedicated Message Queues (RabbitMQ, SQS)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# AWS SQS
import boto3

sqs = boto3.client('sqs')

def enqueue(job):
    sqs.send_message(
        QueueUrl='https://sqs.../my-queue',
        MessageBody=json.dumps(job),
        MessageGroupId='default'  # For FIFO queues
    )

def process_messages():
    while True:
        response = sqs.receive_message(
            QueueUrl='https://sqs.../my-queue',
            MaxNumberOfMessages=10,
            WaitTimeSeconds=20  # Long polling
        )
        for message in response.get('Messages', []):
            process(json.loads(message['Body']))
            sqs.delete_message(
                QueueUrl='https://sqs.../my-queue',
                ReceiptHandle=message['ReceiptHandle']
            )

Pros: Highly durable, scales independently, managed options available Cons: More infrastructure, eventual consistency

Essential Patterns

Idempotency

Jobs may run multiple times (retries, duplicates). Design for it:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
@app.task(bind=True)
def charge_customer(self, order_id):
    order = get_order(order_id)
    
    # Check if already processed
    if order.payment_id:
        logger.info(f"Order {order_id} already charged")
        return order.payment_id
    
    # Process with idempotency key
    payment = stripe.PaymentIntent.create(
        amount=order.total,
        idempotency_key=f"order-{order_id}"
    )
    
    order.payment_id = payment.id
    order.save()
    
    return payment.id

Retry with Backoff

Failures happen. Retry intelligently:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
@app.task(
    bind=True,
    max_retries=5,
    default_retry_delay=60,  # Base delay
    retry_backoff=True,      # Exponential backoff
    retry_jitter=True        # Add randomness
)
def sync_to_external_api(self, record_id):
    try:
        record = get_record(record_id)
        external_api.sync(record)
    except ExternalAPIError as e:
        # Retry with exponential backoff
        raise self.retry(exc=e)
    except PermanentError as e:
        # Don't retry, move to dead letter
        log_permanent_failure(record_id, e)
        raise

Dead Letter Queue

When retries are exhausted, don’t lose the job:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
@app.task(bind=True, max_retries=3)
def process_upload(self, upload_id):
    try:
        # Process...
        pass
    except Exception as e:
        if self.request.retries >= self.max_retries:
            # Move to dead letter queue for manual review
            dead_letter_queue.send({
                'task': 'process_upload',
                'args': [upload_id],
                'error': str(e),
                'traceback': traceback.format_exc(),
                'failed_at': datetime.utcnow().isoformat()
            })
            return  # Don't raise, we've handled it
        raise self.retry(exc=e)

Job Uniqueness

Prevent duplicate jobs from being enqueued:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
def enqueue_unique(job_type, payload, unique_key):
    # Use Redis SET NX for uniqueness
    lock_key = f"job_lock:{job_type}:{unique_key}"
    
    if redis.set(lock_key, "1", nx=True, ex=3600):
        # Lock acquired, safe to enqueue
        queue.enqueue(job_type, payload)
        return True
    else:
        # Job already queued
        return False

# Usage
enqueue_unique("send_welcome_email", {"user_id": 123}, unique_key="user:123")

Progress Tracking

For long-running jobs, track and expose progress:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
@app.task(bind=True)
def generate_report(self, report_id):
    report = get_report(report_id)
    items = get_items_for_report(report_id)
    
    for i, item in enumerate(items):
        process_item(item)
        
        # Update progress
        progress = (i + 1) / len(items) * 100
        self.update_state(
            state='PROGRESS',
            meta={'progress': progress, 'current': i + 1, 'total': len(items)}
        )
    
    return {'status': 'complete', 'url': report.url}

# Check progress from web app
result = generate_report.AsyncResult(task_id)
if result.state == 'PROGRESS':
    print(f"Progress: {result.info['progress']}%")

Scheduling Jobs

Cron-Style Scheduling

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Celery Beat
app.conf.beat_schedule = {
    'cleanup-every-night': {
        'task': 'tasks.cleanup_old_records',
        'schedule': crontab(hour=2, minute=0),
    },
    'sync-every-5-minutes': {
        'task': 'tasks.sync_external_data',
        'schedule': 300.0,  # Every 5 minutes
    },
}

Delayed Jobs

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Run in 1 hour
send_reminder.apply_async(
    args=[user_id],
    countdown=3600
)

# Run at specific time
send_report.apply_async(
    args=[report_id],
    eta=datetime(2024, 2, 5, 9, 0, 0)
)

Rate Limiting

1
2
3
4
5
6
7
8
9
from celery import Celery
from celery.utils.time import rate

app = Celery()

@app.task(rate_limit='10/m')  # Max 10 per minute
def call_external_api(record_id):
    # This task will be throttled
    pass

Worker Scaling

Concurrency

1
2
3
4
5
# Celery: 4 concurrent workers
celery -A tasks worker --concurrency=4

# For I/O-bound tasks, use gevent/eventlet
celery -A tasks worker --pool=gevent --concurrency=100

Priority Queues

1
2
3
4
5
6
7
8
9
# Define queues with priorities
app.conf.task_routes = {
    'tasks.critical_*': {'queue': 'high'},
    'tasks.batch_*': {'queue': 'low'},
}

# Run workers for specific queues
# celery -A tasks worker -Q high --concurrency=4
# celery -A tasks worker -Q low --concurrency=2

Autoscaling

1
2
# Celery autoscale: min 2, max 10 workers
celery -A tasks worker --autoscale=10,2

Monitoring

Key Metrics

  • Queue depth: How many jobs are waiting?
  • Processing time: How long do jobs take?
  • Failure rate: What percentage fail?
  • Retry rate: Are jobs succeeding on first try?
  • Worker utilization: Are workers busy or idle?

Health Checks

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
@app.get("/health/workers")
def worker_health():
    # Check if workers are responsive
    inspector = app.control.inspect()
    
    active = inspector.active()
    if not active:
        raise HTTPException(503, "No active workers")
    
    # Check queue depth
    queue_length = redis.llen('celery')
    if queue_length > 10000:
        raise HTTPException(503, f"Queue backlog: {queue_length}")
    
    return {"workers": len(active), "queue_length": queue_length}

Alerting

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# Alert on queue backup
- alert: QueueBacklog
  expr: celery_queue_length > 1000
  for: 5m
  labels:
    severity: warning
    
# Alert on high failure rate
- alert: HighJobFailureRate
  expr: rate(celery_task_failed_total[5m]) / rate(celery_task_received_total[5m]) > 0.1
  for: 5m
  labels:
    severity: critical

Common Pitfalls

  1. Not handling job failures: Always have retry logic and dead letter handling

  2. Blocking workers with long tasks: Use dedicated queues for slow jobs

  3. Forgetting idempotency: Jobs will run multiple times

  4. Queue as database: Don’t store important state only in the queue

  5. No visibility: If you can’t see what’s happening, you can’t fix it

  6. Unbounded queues: Set limits, apply backpressure when overwhelmed


Background jobs are how you build responsive applications that handle real-world complexity. Start simple (PostgreSQL + polling works fine), add sophistication as needed, and always design for failure — because in distributed systems, failure is the norm.