Your API is slow because it’s doing too much synchronously. Here’s when to reach for a message queue, and how to implement it without overcomplicating everything.
When You Need a Queue#
Signs you need async processing:
- API response time dominated by side effects (emails, webhooks, analytics)
- Downstream service failures cascade to user-facing errors
- Traffic spikes overwhelm dependent services
- You need to retry failed operations automatically
- Work needs to happen on a schedule or with delay
Signs you don’t:
- Response time is fine
- Operations are simple and fast
- You have fewer than 100 requests/minute
- Complexity cost outweighs benefits
Don’t add queues because you might need them. Add them when you do.
The Simplest Pattern#
Before reaching for RabbitMQ, try the database:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
| # models.py
class Task(Base):
id = Column(UUID, primary_key=True)
task_type = Column(String)
payload = Column(JSON)
status = Column(String, default="pending")
attempts = Column(Integer, default=0)
created_at = Column(DateTime, default=utcnow)
run_after = Column(DateTime, default=utcnow)
# producer (in your API)
def send_welcome_email(user_id: str):
db.add(Task(
task_type="send_email",
payload={"user_id": user_id, "template": "welcome"}
))
db.commit()
# consumer (separate worker)
while True:
task = db.query(Task).filter(
Task.status == "pending",
Task.run_after <= utcnow()
).with_for_update(skip_locked=True).first()
if task:
try:
process_task(task)
task.status = "completed"
except Exception as e:
task.attempts += 1
task.status = "failed" if task.attempts >= 3 else "pending"
task.run_after = utcnow() + timedelta(minutes=2 ** task.attempts)
db.commit()
else:
time.sleep(1)
|
This handles 90% of use cases with zero new infrastructure.
When to Graduate#
Move to a real queue when:
- Polling latency matters (need sub-second processing)
- Volume exceeds 1000 tasks/minute
- You need complex routing (topic-based, priority queues)
- Multiple consumers need the same message (fan-out)
Choosing a Queue#
Redis (Simple, Fast)#
Best for: High throughput, simple patterns, you already have Redis
1
2
3
4
5
6
7
8
9
10
11
12
| import redis
r = redis.Redis()
# Producer
r.lpush("tasks", json.dumps({"type": "email", "user_id": "123"}))
# Consumer
while True:
_, message = r.brpop("tasks")
task = json.loads(message)
process_task(task)
|
Pros: Fast, simple, you probably already run it
Cons: No built-in retry, persistence is optional, no routing
RabbitMQ (Flexible, Reliable)#
Best for: Complex routing, guaranteed delivery, multiple consumers
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Declare queue with durability
channel.queue_declare(queue='tasks', durable=True)
# Producer
channel.basic_publish(
exchange='',
routing_key='tasks',
body=json.dumps({"type": "email", "user_id": "123"}),
properties=pika.BasicProperties(delivery_mode=2) # Persistent
)
# Consumer
def callback(ch, method, properties, body):
task = json.loads(body)
try:
process_task(task)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
channel.basic_consume(queue='tasks', on_message_callback=callback)
channel.start_consuming()
|
Pros: Reliable, flexible routing, built-in retry/DLQ
Cons: Operational complexity, another service to manage
SQS (Managed, Scalable)#
Best for: AWS environments, zero ops overhead, massive scale
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| import boto3
sqs = boto3.client('sqs')
queue_url = "https://sqs.us-east-1.amazonaws.com/123456/tasks"
# Producer
sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps({"type": "email", "user_id": "123"})
)
# Consumer
while True:
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20 # Long polling
)
for message in response.get('Messages', []):
try:
task = json.loads(message['Body'])
process_task(task)
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message['ReceiptHandle']
)
except Exception:
pass # Message returns to queue after visibility timeout
|
Pros: Zero ops, auto-scaling, pay-per-use
Cons: AWS lock-in, 256KB message limit, no complex routing
Essential Patterns#
Idempotency#
Messages can be delivered more than once. Handle it:
1
2
3
4
5
6
7
8
9
10
11
12
13
| def process_email_task(task):
# Check if already processed
if db.query(ProcessedTask).filter_by(
task_id=task['id']
).first():
return # Already done
# Do the work
send_email(task['user_id'], task['template'])
# Mark as processed
db.add(ProcessedTask(task_id=task['id']))
db.commit()
|
Or use natural idempotency keys:
1
2
3
4
5
6
7
| def charge_order(order_id):
# Stripe idempotency key = order_id
# Same order_id = same charge, no duplicates
stripe.Charge.create(
amount=1000,
idempotency_key=f"order-{order_id}"
)
|
Dead Letter Queues#
Failed messages need somewhere to go:
1
2
3
4
5
6
7
8
9
10
11
12
| # RabbitMQ: Declare DLQ
channel.queue_declare(queue='tasks_dlq', durable=True)
channel.queue_declare(
queue='tasks',
durable=True,
arguments={
'x-dead-letter-exchange': '',
'x-dead-letter-routing-key': 'tasks_dlq'
}
)
# After 3 retries, message goes to DLQ automatically
|
Monitor your DLQ. Messages there need human attention.
Poison Message Protection#
One bad message can block your queue:
1
2
3
4
5
6
7
8
9
10
11
12
13
| def process_with_protection(message):
attempts = get_attempt_count(message.id)
if attempts >= 3:
move_to_dlq(message)
return
try:
process(message)
ack(message)
except Exception as e:
increment_attempts(message.id)
nack(message) # Return to queue
|
Backpressure#
Don’t let producers overwhelm consumers:
1
2
3
4
5
6
7
8
9
| # Check queue depth before accepting work
def accept_request():
queue_depth = get_queue_depth()
if queue_depth > 10000:
raise HTTPException(503, "System busy, try later")
enqueue_task(...)
return {"status": "accepted"}
|
Monitoring#
Key Metrics#
1
2
3
4
5
6
7
8
9
10
11
| # Queue depth (messages waiting)
queue_depth = channel.queue_declare(queue='tasks', passive=True).method.message_count
# Consumer lag (how far behind)
lag = newest_message_timestamp - last_processed_timestamp
# Processing rate
messages_per_second = processed_count / time_window
# Error rate
error_rate = failed_count / total_count
|
Alerts#
- Queue depth growing → consumers can’t keep up
- Consumer lag increasing → processing too slow
- DLQ not empty → failures need attention
- Zero consumers → workers died
Common Mistakes#
1. Not Handling Duplicates#
1
2
3
4
5
6
7
8
| # Bad: Assumes exactly-once delivery
def process(task):
charge_customer(task['amount']) # Double charge if replayed!
# Good: Idempotent
def process(task):
if not already_charged(task['order_id']):
charge_customer(task['amount'], idempotency_key=task['order_id'])
|
2. Unbounded Retries#
1
2
3
4
5
6
7
8
9
10
| # Bad: Retry forever
except Exception:
requeue(message)
# Good: Limited retries, then DLQ
except Exception:
if attempts < 3:
requeue_with_delay(message)
else:
move_to_dlq(message)
|
3. Synchronous in Disguise#
1
2
3
4
5
6
7
8
9
10
| # Bad: Blocks waiting for result
task_id = enqueue(task)
while not is_complete(task_id):
time.sleep(0.1) # This is just slow synchronous
return get_result(task_id)
# Good: True async
task_id = enqueue(task)
return {"status": "accepted", "task_id": task_id}
# Client polls or gets webhook when done
|
4. Huge Messages#
1
2
3
4
5
6
| # Bad: 10MB payload in queue
enqueue({"data": huge_file_contents})
# Good: Reference to data
upload_to_s3(huge_file, key)
enqueue({"file_key": key})
|
Keep messages small. Store large data elsewhere.
Start Simple#
- Start with database-as-queue — it’s probably enough
- Move to Redis — when you need speed
- Move to RabbitMQ/SQS — when you need reliability or routing
- Always handle duplicates — at-least-once is the norm
- Monitor queue depth — it tells you everything
Queues add complexity. Make sure you’re getting value for it.
The best queue is the one you don’t add until you need it. The second best is the simplest one that solves your problem.