Synchronous request-response is simple: client asks, server answers, everyone waits. But some operations don’t fit that model. Sending emails, processing images, generating reports — these take time, and your users shouldn’t wait.
Message queues decouple the “request” from the “work,” letting you respond immediately while processing happens in the background.
The Basic Pattern#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| # API endpoint - returns immediately
@app.post("/orders")
def create_order(order_data):
order = db.create_order(order_data)
# Queue async work instead of doing it now
queue.enqueue('process_order', order.id)
return {"order_id": order.id, "status": "processing"}
# Worker - processes queue in background
def process_order(order_id):
order = db.get_order(order_id)
charge_payment(order)
send_confirmation_email(order)
notify_warehouse(order)
update_inventory(order)
|
The user gets a response in milliseconds. The actual work happens whenever the worker gets to it.
When to Use Queues#
Good candidates:
- Sending emails/SMS
- Image/video processing
- Report generation
- Third-party API calls
- Anything that can fail and retry
- Anything that takes >100ms
Bad candidates:
- Data the user needs immediately
- Simple CRUD operations
- Anything requiring synchronous feedback
Queue Semantics#
At-Least-Once Delivery#
Most queues guarantee messages are delivered at least once. If a worker crashes mid-processing, the message is re-delivered.
Your workers must be idempotent:
1
2
3
4
5
6
7
8
9
10
| def process_order(order_id):
order = db.get_order(order_id)
# Check if already processed
if order.status == 'completed':
return # Idempotent - safe to re-run
charge_payment(order)
order.status = 'completed'
db.save(order)
|
At-Most-Once Delivery#
Message delivered once or not at all. Simpler but you might lose messages. Rarely the right choice for important work.
Exactly-Once Delivery#
The holy grail — each message processed exactly once. Extremely hard to achieve in distributed systems. Most “exactly-once” systems are actually “effectively-once” using idempotency.
Popular Queue Technologies#
Redis (Simple)#
1
2
3
4
5
6
7
8
9
10
11
12
| import redis
r = redis.Redis()
# Producer
r.lpush('tasks', json.dumps({'type': 'email', 'to': 'user@example.com'}))
# Consumer
while True:
_, message = r.brpop('tasks') # Blocking pop
task = json.loads(message)
process_task(task)
|
Good for: Simple use cases, already using Redis
Bad for: Durability requirements, complex routing
RabbitMQ (Flexible)#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='tasks', durable=True)
# Producer
channel.basic_publish(
exchange='',
routing_key='tasks',
body=json.dumps(task),
properties=pika.BasicProperties(delivery_mode=2) # Persistent
)
# Consumer
def callback(ch, method, properties, body):
process_task(json.loads(body))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='tasks', on_message_callback=callback)
channel.start_consuming()
|
Good for: Complex routing, multiple consumers, RPC patterns
Bad for: Extreme scale, simple use cases
Amazon SQS (Managed)#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| import boto3
sqs = boto3.client('sqs')
queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue'
# Producer
sqs.send_message(QueueUrl=queue_url, MessageBody=json.dumps(task))
# Consumer
while True:
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20 # Long polling
)
for message in response.get('Messages', []):
process_task(json.loads(message['Body']))
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message['ReceiptHandle']
)
|
Good for: AWS infrastructure, managed service, auto-scaling
Bad for: Low latency requirements, complex routing
Worker Patterns#
Single Worker#
1
2
3
4
| while True:
task = queue.get()
process(task)
queue.ack(task)
|
Simple, but throughput limited by single process.
Worker Pool#
1
2
3
4
5
6
7
8
9
10
11
| from concurrent.futures import ThreadPoolExecutor
def worker():
while True:
task = queue.get()
process(task)
queue.ack(task)
with ThreadPoolExecutor(max_workers=10) as executor:
for _ in range(10):
executor.submit(worker)
|
Better throughput, but watch for shared state issues.
Process-Per-Message#
1
2
3
4
5
6
7
8
9
| # Using Celery
@celery.task
def process_order(order_id):
# Each task runs in isolated process
order = db.get_order(order_id)
# ...
# Calling
process_order.delay(order_id)
|
Maximum isolation, but higher overhead per task.
Dead Letter Queues#
What happens when processing fails repeatedly?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| MAX_RETRIES = 3
def process_with_retry(task):
try:
process(task)
queue.ack(task)
except Exception as e:
task['retry_count'] = task.get('retry_count', 0) + 1
if task['retry_count'] >= MAX_RETRIES:
# Move to dead letter queue for manual inspection
dead_letter_queue.put(task)
queue.ack(task) # Remove from main queue
else:
# Retry with exponential backoff
delay = 2 ** task['retry_count']
queue.put(task, delay=delay)
queue.ack(task)
|
Dead letter queues catch poison messages that would otherwise loop forever.
Monitoring Queues#
Key metrics:
- Queue depth: Messages waiting to be processed
- Processing time: How long each message takes
- Error rate: Failed processing attempts
- Consumer lag: How far behind workers are
1
2
3
4
5
6
7
8
9
10
11
| # Expose metrics for monitoring
def process_task(task):
start = time.time()
try:
do_work(task)
metrics.increment('queue.processed', tags=['status:success'])
except Exception:
metrics.increment('queue.processed', tags=['status:error'])
raise
finally:
metrics.timing('queue.processing_time', time.time() - start)
|
Alert on:
- Queue depth growing (workers can’t keep up)
- Processing time increasing (downstream slowness)
- Error rate spiking (bad messages or system issues)
Ordering and Priority#
Most queues are FIFO within a single queue. If order matters across message types, use separate queues:
1
2
3
4
5
6
7
8
9
10
11
12
| # High priority queue processed first
high_priority_queue = Queue('urgent')
normal_queue = Queue('normal')
def worker():
while True:
# Check high priority first
task = high_priority_queue.get(block=False)
if not task:
task = normal_queue.get(timeout=1)
if task:
process(task)
|
Or use priority fields if your queue supports them:
1
2
3
4
5
6
7
| # SQS FIFO with message groups for ordering
sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(task),
MessageGroupId=f"user:{user_id}", # Orders per-user preserved
MessageDeduplicationId=str(uuid4())
)
|
Common Pitfalls#
Not handling duplicates:
1
2
3
4
5
6
7
8
9
10
11
12
| # Bad: will charge twice if message redelivered
def process_payment(payment_id):
charge_card(payment_id)
# Good: idempotent
def process_payment(payment_id):
payment = db.get_payment(payment_id)
if payment.status == 'charged':
return
charge_card(payment_id)
payment.status = 'charged'
db.save(payment)
|
Unbounded queues:
1
2
3
4
5
6
7
| # Bad: queue grows forever if workers can't keep up
queue.put(task)
# Good: apply backpressure
if queue.size() > MAX_QUEUE_SIZE:
raise ServiceUnavailable("System overloaded, try again later")
queue.put(task)
|
Ignoring poison messages:
1
2
3
4
5
6
7
8
| # Bad: poison message blocks queue forever
while True:
task = queue.get()
process(task) # Throws every time for bad task
queue.ack(task) # Never reached
# Good: move to DLQ after retries
# (see Dead Letter Queue section above)
|
Message queues transform “do this now and make the user wait” into “acknowledge this now, do it soon.” That simple change unlocks resilience, scalability, and better user experience.
Start with Redis for simple cases. Graduate to RabbitMQ or SQS for durability and features. Always design workers to be idempotent. Monitor queue depth religiously.
The queue is a promise: “I will process this.” Make sure you keep it.