Your API is slow because it’s doing too much synchronously. Here’s when to reach for a message queue, and how to implement it without overcomplicating everything.

When You Need a Queue

Signs you need async processing:

  • API response time dominated by side effects (emails, webhooks, analytics)
  • Downstream service failures cascade to user-facing errors
  • Traffic spikes overwhelm dependent services
  • You need to retry failed operations automatically
  • Work needs to happen on a schedule or with delay

Signs you don’t:

  • Response time is fine
  • Operations are simple and fast
  • You have fewer than 100 requests/minute
  • Complexity cost outweighs benefits

Don’t add queues because you might need them. Add them when you do.

The Simplest Pattern

Before reaching for RabbitMQ, try the database:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# models.py
class Task(Base):
    id = Column(UUID, primary_key=True)
    task_type = Column(String)
    payload = Column(JSON)
    status = Column(String, default="pending")
    attempts = Column(Integer, default=0)
    created_at = Column(DateTime, default=utcnow)
    run_after = Column(DateTime, default=utcnow)

# producer (in your API)
def send_welcome_email(user_id: str):
    db.add(Task(
        task_type="send_email",
        payload={"user_id": user_id, "template": "welcome"}
    ))
    db.commit()

# consumer (separate worker)
while True:
    task = db.query(Task).filter(
        Task.status == "pending",
        Task.run_after <= utcnow()
    ).with_for_update(skip_locked=True).first()
    
    if task:
        try:
            process_task(task)
            task.status = "completed"
        except Exception as e:
            task.attempts += 1
            task.status = "failed" if task.attempts >= 3 else "pending"
            task.run_after = utcnow() + timedelta(minutes=2 ** task.attempts)
        db.commit()
    else:
        time.sleep(1)

This handles 90% of use cases with zero new infrastructure.

When to Graduate

Move to a real queue when:

  • Polling latency matters (need sub-second processing)
  • Volume exceeds 1000 tasks/minute
  • You need complex routing (topic-based, priority queues)
  • Multiple consumers need the same message (fan-out)

Choosing a Queue

Redis (Simple, Fast)

Best for: High throughput, simple patterns, you already have Redis

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import redis

r = redis.Redis()

# Producer
r.lpush("tasks", json.dumps({"type": "email", "user_id": "123"}))

# Consumer
while True:
    _, message = r.brpop("tasks")
    task = json.loads(message)
    process_task(task)

Pros: Fast, simple, you probably already run it
Cons: No built-in retry, persistence is optional, no routing

RabbitMQ (Flexible, Reliable)

Best for: Complex routing, guaranteed delivery, multiple consumers

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Declare queue with durability
channel.queue_declare(queue='tasks', durable=True)

# Producer
channel.basic_publish(
    exchange='',
    routing_key='tasks',
    body=json.dumps({"type": "email", "user_id": "123"}),
    properties=pika.BasicProperties(delivery_mode=2)  # Persistent
)

# Consumer
def callback(ch, method, properties, body):
    task = json.loads(body)
    try:
        process_task(task)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception:
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

channel.basic_consume(queue='tasks', on_message_callback=callback)
channel.start_consuming()

Pros: Reliable, flexible routing, built-in retry/DLQ
Cons: Operational complexity, another service to manage

SQS (Managed, Scalable)

Best for: AWS environments, zero ops overhead, massive scale

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import boto3

sqs = boto3.client('sqs')
queue_url = "https://sqs.us-east-1.amazonaws.com/123456/tasks"

# Producer
sqs.send_message(
    QueueUrl=queue_url,
    MessageBody=json.dumps({"type": "email", "user_id": "123"})
)

# Consumer
while True:
    response = sqs.receive_message(
        QueueUrl=queue_url,
        MaxNumberOfMessages=10,
        WaitTimeSeconds=20  # Long polling
    )
    
    for message in response.get('Messages', []):
        try:
            task = json.loads(message['Body'])
            process_task(task)
            sqs.delete_message(
                QueueUrl=queue_url,
                ReceiptHandle=message['ReceiptHandle']
            )
        except Exception:
            pass  # Message returns to queue after visibility timeout

Pros: Zero ops, auto-scaling, pay-per-use
Cons: AWS lock-in, 256KB message limit, no complex routing

Essential Patterns

Idempotency

Messages can be delivered more than once. Handle it:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
def process_email_task(task):
    # Check if already processed
    if db.query(ProcessedTask).filter_by(
        task_id=task['id']
    ).first():
        return  # Already done
    
    # Do the work
    send_email(task['user_id'], task['template'])
    
    # Mark as processed
    db.add(ProcessedTask(task_id=task['id']))
    db.commit()

Or use natural idempotency keys:

1
2
3
4
5
6
7
def charge_order(order_id):
    # Stripe idempotency key = order_id
    # Same order_id = same charge, no duplicates
    stripe.Charge.create(
        amount=1000,
        idempotency_key=f"order-{order_id}"
    )

Dead Letter Queues

Failed messages need somewhere to go:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# RabbitMQ: Declare DLQ
channel.queue_declare(queue='tasks_dlq', durable=True)
channel.queue_declare(
    queue='tasks',
    durable=True,
    arguments={
        'x-dead-letter-exchange': '',
        'x-dead-letter-routing-key': 'tasks_dlq'
    }
)

# After 3 retries, message goes to DLQ automatically

Monitor your DLQ. Messages there need human attention.

Poison Message Protection

One bad message can block your queue:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
def process_with_protection(message):
    attempts = get_attempt_count(message.id)
    
    if attempts >= 3:
        move_to_dlq(message)
        return
    
    try:
        process(message)
        ack(message)
    except Exception as e:
        increment_attempts(message.id)
        nack(message)  # Return to queue

Backpressure

Don’t let producers overwhelm consumers:

1
2
3
4
5
6
7
8
9
# Check queue depth before accepting work
def accept_request():
    queue_depth = get_queue_depth()
    
    if queue_depth > 10000:
        raise HTTPException(503, "System busy, try later")
    
    enqueue_task(...)
    return {"status": "accepted"}

Monitoring

Key Metrics

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Queue depth (messages waiting)
queue_depth = channel.queue_declare(queue='tasks', passive=True).method.message_count

# Consumer lag (how far behind)
lag = newest_message_timestamp - last_processed_timestamp

# Processing rate
messages_per_second = processed_count / time_window

# Error rate
error_rate = failed_count / total_count

Alerts

  • Queue depth growing → consumers can’t keep up
  • Consumer lag increasing → processing too slow
  • DLQ not empty → failures need attention
  • Zero consumers → workers died

Common Mistakes

1. Not Handling Duplicates

1
2
3
4
5
6
7
8
# Bad: Assumes exactly-once delivery
def process(task):
    charge_customer(task['amount'])  # Double charge if replayed!

# Good: Idempotent
def process(task):
    if not already_charged(task['order_id']):
        charge_customer(task['amount'], idempotency_key=task['order_id'])

2. Unbounded Retries

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# Bad: Retry forever
except Exception:
    requeue(message)

# Good: Limited retries, then DLQ
except Exception:
    if attempts < 3:
        requeue_with_delay(message)
    else:
        move_to_dlq(message)

3. Synchronous in Disguise

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# Bad: Blocks waiting for result
task_id = enqueue(task)
while not is_complete(task_id):
    time.sleep(0.1)  # This is just slow synchronous
return get_result(task_id)

# Good: True async
task_id = enqueue(task)
return {"status": "accepted", "task_id": task_id}
# Client polls or gets webhook when done

4. Huge Messages

1
2
3
4
5
6
# Bad: 10MB payload in queue
enqueue({"data": huge_file_contents})

# Good: Reference to data
upload_to_s3(huge_file, key)
enqueue({"file_key": key})

Keep messages small. Store large data elsewhere.

Start Simple

  1. Start with database-as-queue — it’s probably enough
  2. Move to Redis — when you need speed
  3. Move to RabbitMQ/SQS — when you need reliability or routing
  4. Always handle duplicates — at-least-once is the norm
  5. Monitor queue depth — it tells you everything

Queues add complexity. Make sure you’re getting value for it.


The best queue is the one you don’t add until you need it. The second best is the simplest one that solves your problem.