Synchronous request-response is simple: client asks, server answers, everyone waits. But some operations don’t fit that model. Sending emails, processing images, generating reports — these take time, and your users shouldn’t wait.

Message queues decouple the “request” from the “work,” letting you respond immediately while processing happens in the background.

The Basic Pattern

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# API endpoint - returns immediately
@app.post("/orders")
def create_order(order_data):
    order = db.create_order(order_data)
    
    # Queue async work instead of doing it now
    queue.enqueue('process_order', order.id)
    
    return {"order_id": order.id, "status": "processing"}

# Worker - processes queue in background
def process_order(order_id):
    order = db.get_order(order_id)
    charge_payment(order)
    send_confirmation_email(order)
    notify_warehouse(order)
    update_inventory(order)

The user gets a response in milliseconds. The actual work happens whenever the worker gets to it.

When to Use Queues

Good candidates:

  • Sending emails/SMS
  • Image/video processing
  • Report generation
  • Third-party API calls
  • Anything that can fail and retry
  • Anything that takes >100ms

Bad candidates:

  • Data the user needs immediately
  • Simple CRUD operations
  • Anything requiring synchronous feedback

Queue Semantics

At-Least-Once Delivery

Most queues guarantee messages are delivered at least once. If a worker crashes mid-processing, the message is re-delivered.

Your workers must be idempotent:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def process_order(order_id):
    order = db.get_order(order_id)
    
    # Check if already processed
    if order.status == 'completed':
        return  # Idempotent - safe to re-run
    
    charge_payment(order)
    order.status = 'completed'
    db.save(order)

At-Most-Once Delivery

Message delivered once or not at all. Simpler but you might lose messages. Rarely the right choice for important work.

Exactly-Once Delivery

The holy grail — each message processed exactly once. Extremely hard to achieve in distributed systems. Most “exactly-once” systems are actually “effectively-once” using idempotency.

Redis (Simple)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import redis

r = redis.Redis()

# Producer
r.lpush('tasks', json.dumps({'type': 'email', 'to': 'user@example.com'}))

# Consumer
while True:
    _, message = r.brpop('tasks')  # Blocking pop
    task = json.loads(message)
    process_task(task)

Good for: Simple use cases, already using Redis Bad for: Durability requirements, complex routing

RabbitMQ (Flexible)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='tasks', durable=True)

# Producer
channel.basic_publish(
    exchange='',
    routing_key='tasks',
    body=json.dumps(task),
    properties=pika.BasicProperties(delivery_mode=2)  # Persistent
)

# Consumer
def callback(ch, method, properties, body):
    process_task(json.loads(body))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='tasks', on_message_callback=callback)
channel.start_consuming()

Good for: Complex routing, multiple consumers, RPC patterns Bad for: Extreme scale, simple use cases

Amazon SQS (Managed)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import boto3

sqs = boto3.client('sqs')
queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue'

# Producer
sqs.send_message(QueueUrl=queue_url, MessageBody=json.dumps(task))

# Consumer
while True:
    response = sqs.receive_message(
        QueueUrl=queue_url,
        MaxNumberOfMessages=10,
        WaitTimeSeconds=20  # Long polling
    )
    for message in response.get('Messages', []):
        process_task(json.loads(message['Body']))
        sqs.delete_message(
            QueueUrl=queue_url,
            ReceiptHandle=message['ReceiptHandle']
        )

Good for: AWS infrastructure, managed service, auto-scaling Bad for: Low latency requirements, complex routing

Worker Patterns

Single Worker

1
2
3
4
while True:
    task = queue.get()
    process(task)
    queue.ack(task)

Simple, but throughput limited by single process.

Worker Pool

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from concurrent.futures import ThreadPoolExecutor

def worker():
    while True:
        task = queue.get()
        process(task)
        queue.ack(task)

with ThreadPoolExecutor(max_workers=10) as executor:
    for _ in range(10):
        executor.submit(worker)

Better throughput, but watch for shared state issues.

Process-Per-Message

1
2
3
4
5
6
7
8
9
# Using Celery
@celery.task
def process_order(order_id):
    # Each task runs in isolated process
    order = db.get_order(order_id)
    # ...

# Calling
process_order.delay(order_id)

Maximum isolation, but higher overhead per task.

Dead Letter Queues

What happens when processing fails repeatedly?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
MAX_RETRIES = 3

def process_with_retry(task):
    try:
        process(task)
        queue.ack(task)
    except Exception as e:
        task['retry_count'] = task.get('retry_count', 0) + 1
        
        if task['retry_count'] >= MAX_RETRIES:
            # Move to dead letter queue for manual inspection
            dead_letter_queue.put(task)
            queue.ack(task)  # Remove from main queue
        else:
            # Retry with exponential backoff
            delay = 2 ** task['retry_count']
            queue.put(task, delay=delay)
            queue.ack(task)

Dead letter queues catch poison messages that would otherwise loop forever.

Monitoring Queues

Key metrics:

  • Queue depth: Messages waiting to be processed
  • Processing time: How long each message takes
  • Error rate: Failed processing attempts
  • Consumer lag: How far behind workers are
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Expose metrics for monitoring
def process_task(task):
    start = time.time()
    try:
        do_work(task)
        metrics.increment('queue.processed', tags=['status:success'])
    except Exception:
        metrics.increment('queue.processed', tags=['status:error'])
        raise
    finally:
        metrics.timing('queue.processing_time', time.time() - start)

Alert on:

  • Queue depth growing (workers can’t keep up)
  • Processing time increasing (downstream slowness)
  • Error rate spiking (bad messages or system issues)

Ordering and Priority

Most queues are FIFO within a single queue. If order matters across message types, use separate queues:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# High priority queue processed first
high_priority_queue = Queue('urgent')
normal_queue = Queue('normal')

def worker():
    while True:
        # Check high priority first
        task = high_priority_queue.get(block=False)
        if not task:
            task = normal_queue.get(timeout=1)
        if task:
            process(task)

Or use priority fields if your queue supports them:

1
2
3
4
5
6
7
# SQS FIFO with message groups for ordering
sqs.send_message(
    QueueUrl=queue_url,
    MessageBody=json.dumps(task),
    MessageGroupId=f"user:{user_id}",  # Orders per-user preserved
    MessageDeduplicationId=str(uuid4())
)

Common Pitfalls

Not handling duplicates:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# Bad: will charge twice if message redelivered
def process_payment(payment_id):
    charge_card(payment_id)

# Good: idempotent
def process_payment(payment_id):
    payment = db.get_payment(payment_id)
    if payment.status == 'charged':
        return
    charge_card(payment_id)
    payment.status = 'charged'
    db.save(payment)

Unbounded queues:

1
2
3
4
5
6
7
# Bad: queue grows forever if workers can't keep up
queue.put(task)

# Good: apply backpressure
if queue.size() > MAX_QUEUE_SIZE:
    raise ServiceUnavailable("System overloaded, try again later")
queue.put(task)

Ignoring poison messages:

1
2
3
4
5
6
7
8
# Bad: poison message blocks queue forever
while True:
    task = queue.get()
    process(task)  # Throws every time for bad task
    queue.ack(task)  # Never reached

# Good: move to DLQ after retries
# (see Dead Letter Queue section above)

Message queues transform “do this now and make the user wait” into “acknowledge this now, do it soon.” That simple change unlocks resilience, scalability, and better user experience.

Start with Redis for simple cases. Graduate to RabbitMQ or SQS for durability and features. Always design workers to be idempotent. Monitor queue depth religiously.

The queue is a promise: “I will process this.” Make sure you keep it.