Synchronous processing is a lie. At some point, your request-response cycle will hit a wall: sending emails, processing images, charging credit cards, generating reports. The solution: message queues. Here’s how to use them without creating distributed system nightmares.

Why Queues?

WUWUisistetehrhrouqtuAeAqPuPuIeIWe:ourek:[3[eS+QreusnsedeupcerEoomnTcadaeissslks]osf]aswy[anPiRcrteoiscnpegosnsseIm(a2g0e0]ms)[ChargeCard]Response

Benefits:

  • Faster responses: Return immediately, process later
  • Resilience: If a service is down, retry later
  • Scaling: Add more workers when backlogged
  • Decoupling: Services don’t need to know about each other

Queue Architecture

ProAdPuIcerRRaQebudbeiiuste/MQWCoornkseurmer

Producer: Creates messages (your API) Queue: Stores messages until processed Consumer: Processes messages (worker processes)

Redis Queues (Simple)

With Python RQ

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# tasks.py
from redis import Redis
from rq import Queue

redis_conn = Redis(host='localhost', port=6379)
queue = Queue(connection=redis_conn)

def send_welcome_email(user_id):
    user = db.get_user(user_id)
    email.send(
        to=user.email,
        subject="Welcome!",
        body=render_template("welcome.html", user=user)
    )

# Producer: enqueue task
from tasks import queue, send_welcome_email

@app.route('/signup', methods=['POST'])
def signup():
    user = create_user(request.json)
    queue.enqueue(send_welcome_email, user.id)
    return jsonify({"status": "created"}), 201
1
2
# Start worker
rq worker --url redis://localhost:6379

With Bull (Node.js)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// queue.js
const Queue = require('bull');
const emailQueue = new Queue('email', 'redis://localhost:6379');

// Define processor
emailQueue.process(async (job) => {
  const { userId, template } = job.data;
  const user = await db.getUser(userId);
  await sendEmail(user.email, template);
});

// Producer
app.post('/signup', async (req, res) => {
  const user = await createUser(req.body);
  await emailQueue.add({ userId: user.id, template: 'welcome' });
  res.status(201).json({ status: 'created' });
});

RabbitMQ (Robust)

Setup

1
2
3
4
5
6
7
8
9
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
channel = connection.channel()

# Declare queue (idempotent)
channel.queue_declare(queue='tasks', durable=True)

Producer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def publish_task(task_type, data):
    channel.basic_publish(
        exchange='',
        routing_key='tasks',
        body=json.dumps({'type': task_type, 'data': data}),
        properties=pika.BasicProperties(
            delivery_mode=2,  # Persistent
            content_type='application/json'
        )
    )

Consumer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def callback(ch, method, properties, body):
    try:
        message = json.loads(body)
        process_task(message)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        # Reject and requeue (or dead-letter)
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

channel.basic_qos(prefetch_count=1)  # One message at a time
channel.basic_consume(queue='tasks', on_message_callback=callback)
channel.start_consuming()

Exchange Patterns

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# Direct: Route to specific queue
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
channel.basic_publish(exchange='direct_logs', routing_key='error', body=message)

# Fanout: Broadcast to all queues
channel.exchange_declare(exchange='broadcast', exchange_type='fanout')
channel.basic_publish(exchange='broadcast', routing_key='', body=message)

# Topic: Pattern matching
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
channel.basic_publish(exchange='topic_logs', routing_key='user.created', body=message)
# Consumers can bind to 'user.*' or '*.created'

Celery (Python Standard)

Setup

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# celery_app.py
from celery import Celery

app = Celery(
    'tasks',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1'
)

app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    task_acks_late=True,
    task_reject_on_worker_lost=True,
)

Tasks

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
# tasks.py
from celery_app import app

@app.task(bind=True, max_retries=3)
def send_email(self, user_id, template):
    try:
        user = db.get_user(user_id)
        email.send(user.email, template)
    except ConnectionError as e:
        raise self.retry(exc=e, countdown=60)  # Retry in 60s

@app.task
def process_image(image_id):
    image = storage.get(image_id)
    thumbnail = resize(image, 200, 200)
    storage.put(f"{image_id}_thumb", thumbnail)

Calling Tasks

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# Async call
send_email.delay(user_id, 'welcome')

# With options
send_email.apply_async(
    args=[user_id, 'welcome'],
    countdown=300,  # Delay 5 minutes
    expires=3600,   # Expire after 1 hour
)

# Chain tasks
from celery import chain
result = chain(
    download_image.s(url),
    process_image.s(),
    upload_to_cdn.s()
).apply_async()

# Group (parallel)
from celery import group
result = group(
    send_email.s(user_id, 'welcome')
    for user_id in user_ids
).apply_async()

Worker

1
celery -A celery_app worker --loglevel=info --concurrency=4

Message Design

Good Message Structure

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
{
  "id": "msg_abc123",
  "type": "user.welcome_email",
  "version": "1.0",
  "timestamp": "2024-03-12T15:30:00Z",
  "data": {
    "user_id": "usr_456",
    "email": "alice@example.com"
  },
  "metadata": {
    "correlation_id": "req_789",
    "retry_count": 0
  }
}

Include:

  • ID: For deduplication
  • Type: What kind of task
  • Version: Schema version for evolution
  • Timestamp: When created
  • Correlation ID: Trace across services

Idempotency

Messages may be delivered more than once. Design for it.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def process_payment(message):
    payment_id = message['data']['payment_id']
    
    # Check if already processed
    if db.payment_exists(payment_id):
        return  # Already done
    
    # Process with transaction
    with db.transaction():
        charge_card(message['data'])
        db.mark_payment_complete(payment_id)

Error Handling

Retry with Backoff

1
2
3
4
5
6
7
8
@app.task(bind=True, max_retries=5)
def unreliable_task(self, data):
    try:
        external_api.call(data)
    except APIError as e:
        # Exponential backoff: 1m, 2m, 4m, 8m, 16m
        countdown = 60 * (2 ** self.request.retries)
        raise self.retry(exc=e, countdown=countdown)

Dead Letter Queues

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# RabbitMQ DLQ setup
channel.queue_declare(
    queue='tasks',
    durable=True,
    arguments={
        'x-dead-letter-exchange': '',
        'x-dead-letter-routing-key': 'tasks_dlq'
    }
)

channel.queue_declare(queue='tasks_dlq', durable=True)

Failed messages go to DLQ for inspection/replay.

Circuit Breaker

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from circuitbreaker import circuit

@circuit(failure_threshold=5, recovery_timeout=60)
def call_external_api(data):
    return requests.post('https://api.example.com', json=data)

@app.task
def process_with_breaker(data):
    try:
        call_external_api(data)
    except CircuitBreakerError:
        # Circuit is open, retry later
        raise self.retry(countdown=120)

Monitoring

Key Metrics

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
# Prometheus metrics
from prometheus_client import Counter, Histogram, Gauge

tasks_total = Counter('tasks_total', 'Total tasks', ['type', 'status'])
task_duration = Histogram('task_duration_seconds', 'Task duration', ['type'])
queue_size = Gauge('queue_size', 'Messages in queue', ['queue'])

@app.task
def monitored_task(data):
    with task_duration.labels(type='email').time():
        try:
            send_email(data)
            tasks_total.labels(type='email', status='success').inc()
        except Exception:
            tasks_total.labels(type='email', status='failure').inc()
            raise

Celery Flower

1
2
pip install flower
celery -A celery_app flower --port=5555

Web UI showing:

  • Active workers
  • Task success/failure rates
  • Queue depths
  • Task history

Alerts

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# Prometheus alerts
groups:
  - name: queues
    rules:
      - alert: QueueBacklog
        expr: queue_size > 10000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Queue backlog exceeds 10k messages"
      
      - alert: HighFailureRate
        expr: |
          rate(tasks_total{status="failure"}[5m]) 
          / rate(tasks_total[5m]) > 0.1
        for: 5m
        labels:
          severity: critical

Scaling Workers

Horizontal Scaling

1
2
3
4
5
6
7
8
9
# docker-compose.yml
services:
  worker:
    image: myapp:latest
    command: celery -A celery_app worker --concurrency=4
    deploy:
      replicas: 3
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0

Auto-scaling on Queue Depth

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Kubernetes HPA
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: celery-worker
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: celery-worker
  minReplicas: 2
  maxReplicas: 20
  metrics:
    - type: External
      external:
        metric:
          name: rabbitmq_queue_messages
          selector:
            matchLabels:
              queue: tasks
        target:
          type: AverageValue
          averageValue: 100  # 100 messages per worker

Queue Selection Guide

NeedUse
Simple, low volumeRedis + RQ
Python ecosystemCelery + Redis/RabbitMQ
Complex routingRabbitMQ
High throughputKafka
AWS nativeSQS
Guaranteed deliveryRabbitMQ or SQS

The Checklist

  • Messages are idempotent
  • Dead letter queue configured
  • Retry with exponential backoff
  • Monitoring on queue depth
  • Alerts on failure rates
  • Workers auto-scale
  • Correlation IDs for tracing

Start Here

  1. Today: Move one slow operation to a queue
  2. This week: Add retry logic and DLQ
  3. This month: Implement monitoring and alerts
  4. This quarter: Auto-scale workers based on load

Queues turn “please wait” into “done instantly.” Your users will thank you.


The best queue is invisible. Users just experience responsiveness.