When Service A needs to tell Service B something happened, the simplest approach is a direct HTTP call. But what happens when Service B is slow? Or down? Or overwhelmed? Message queues decouple your services, letting them communicate reliably even when things go wrong.

Why Queues?

Without a queue:

UserRequestAPIP((aiiyffmesdnlotowwnS,,erurvseieqcrueewsatiEtfmsaa)iills)ServiceResponse

With a queue:

UserRequestP(AapPyrImoecnetsQsuWeeosurekaesrynRce,sEpmroaenitslreiWe(osirmkomenerdfiaaitleu)re)

Redis Queues (Simple & Fast)

For basic queue needs, Redis is hard to beat:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
import redis
import json
from datetime import datetime
from typing import Callable
import time

class RedisQueue:
    def __init__(self, redis_url: str, queue_name: str):
        self.redis = redis.from_url(redis_url)
        self.queue_name = queue_name
        self.processing_queue = f"{queue_name}:processing"
    
    def enqueue(self, task_type: str, payload: dict, priority: int = 0):
        """Add a task to the queue."""
        task = {
            "id": f"{task_type}:{datetime.utcnow().timestamp()}",
            "type": task_type,
            "payload": payload,
            "created_at": datetime.utcnow().isoformat(),
            "attempts": 0
        }
        
        # Use sorted set for priority queue
        self.redis.zadd(self.queue_name, {json.dumps(task): priority})
        return task["id"]
    
    def dequeue(self, timeout: int = 5) -> dict | None:
        """Get the next task from the queue."""
        # Move from main queue to processing queue (atomic)
        result = self.redis.bzpopmin(self.queue_name, timeout)
        
        if result:
            _, task_json, _ = result
            task = json.loads(task_json)
            task["attempts"] += 1
            
            # Track in processing queue with timeout
            self.redis.setex(
                f"{self.processing_queue}:{task['id']}",
                300,  # 5 minute processing timeout
                json.dumps(task)
            )
            return task
        return None
    
    def complete(self, task_id: str):
        """Mark a task as complete."""
        self.redis.delete(f"{self.processing_queue}:{task_id}")
    
    def fail(self, task: dict, error: str, retry: bool = True):
        """Handle a failed task."""
        self.redis.delete(f"{self.processing_queue}:{task['id']}")
        
        if retry and task["attempts"] < 3:
            # Re-queue with backoff
            delay = 2 ** task["attempts"]
            task["last_error"] = error
            self.redis.zadd(
                self.queue_name,
                {json.dumps(task): time.time() + delay}
            )
        else:
            # Send to dead letter queue
            self.redis.lpush(f"{self.queue_name}:dead", json.dumps(task))


# Worker
def run_worker(queue: RedisQueue, handlers: dict[str, Callable]):
    """Process tasks from the queue."""
    while True:
        task = queue.dequeue()
        if not task:
            continue
        
        handler = handlers.get(task["type"])
        if not handler:
            queue.fail(task, f"Unknown task type: {task['type']}", retry=False)
            continue
        
        try:
            handler(task["payload"])
            queue.complete(task["id"])
        except Exception as e:
            queue.fail(task, str(e))


# Usage
queue = RedisQueue("redis://localhost:6379", "tasks")

# Producer
queue.enqueue("send_email", {
    "to": "user@example.com",
    "template": "welcome"
})

# Consumer
handlers = {
    "send_email": lambda p: send_email(p["to"], p["template"]),
    "process_payment": lambda p: charge_card(p["amount"], p["card_id"]),
}
run_worker(queue, handlers)

RabbitMQ (Robust & Feature-Rich)

For complex routing and guaranteed delivery:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import pika
import json
from typing import Callable

class RabbitMQQueue:
    def __init__(self, url: str, exchange: str = "tasks"):
        self.connection = pika.BlockingConnection(
            pika.URLParameters(url)
        )
        self.channel = self.connection.channel()
        self.exchange = exchange
        
        # Declare exchange and dead letter exchange
        self.channel.exchange_declare(exchange, "topic", durable=True)
        self.channel.exchange_declare(f"{exchange}.dead", "topic", durable=True)
    
    def declare_queue(self, queue_name: str, routing_key: str):
        """Declare a queue with dead letter handling."""
        # Dead letter queue
        self.channel.queue_declare(
            f"{queue_name}.dead",
            durable=True
        )
        self.channel.queue_bind(
            f"{queue_name}.dead",
            f"{self.exchange}.dead",
            routing_key
        )
        
        # Main queue
        self.channel.queue_declare(
            queue_name,
            durable=True,
            arguments={
                "x-dead-letter-exchange": f"{self.exchange}.dead",
                "x-dead-letter-routing-key": routing_key,
            }
        )
        self.channel.queue_bind(queue_name, self.exchange, routing_key)
    
    def publish(self, routing_key: str, message: dict):
        """Publish a message."""
        self.channel.basic_publish(
            exchange=self.exchange,
            routing_key=routing_key,
            body=json.dumps(message),
            properties=pika.BasicProperties(
                delivery_mode=2,  # Persistent
                content_type="application/json"
            )
        )
    
    def consume(self, queue_name: str, callback: Callable):
        """Consume messages from a queue."""
        def wrapper(ch, method, properties, body):
            try:
                message = json.loads(body)
                callback(message)
                ch.basic_ack(delivery_tag=method.delivery_tag)
            except Exception as e:
                # Reject and send to dead letter queue
                ch.basic_nack(
                    delivery_tag=method.delivery_tag,
                    requeue=False
                )
        
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(queue_name, wrapper)
        self.channel.start_consuming()


# Usage
mq = RabbitMQQueue("amqp://localhost")

# Setup
mq.declare_queue("email_tasks", "email.*")
mq.declare_queue("payment_tasks", "payment.*")

# Publish
mq.publish("email.welcome", {
    "user_id": 123,
    "email": "user@example.com"
})

mq.publish("payment.charge", {
    "order_id": "ord-456",
    "amount": 99.99
})

# Consume (in separate workers)
mq.consume("email_tasks", send_welcome_email)
mq.consume("payment_tasks", process_payment)

AWS SQS (Managed & Scalable)

For cloud-native applications:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
import boto3
import json
from typing import Callable
import time

class SQSQueue:
    def __init__(self, queue_url: str, dlq_url: str = None):
        self.sqs = boto3.client('sqs')
        self.queue_url = queue_url
        self.dlq_url = dlq_url
    
    def send(self, message: dict, delay_seconds: int = 0, 
             deduplication_id: str = None):
        """Send a message to the queue."""
        params = {
            "QueueUrl": self.queue_url,
            "MessageBody": json.dumps(message),
            "DelaySeconds": delay_seconds,
        }
        
        # For FIFO queues
        if deduplication_id:
            params["MessageDeduplicationId"] = deduplication_id
            params["MessageGroupId"] = message.get("group", "default")
        
        return self.sqs.send_message(**params)
    
    def receive(self, max_messages: int = 10, 
                wait_seconds: int = 20) -> list[dict]:
        """Receive messages from the queue."""
        response = self.sqs.receive_message(
            QueueUrl=self.queue_url,
            MaxNumberOfMessages=max_messages,
            WaitTimeSeconds=wait_seconds,
            AttributeNames=["ApproximateReceiveCount"],
        )
        
        messages = []
        for msg in response.get("Messages", []):
            messages.append({
                "id": msg["MessageId"],
                "receipt_handle": msg["ReceiptHandle"],
                "body": json.loads(msg["Body"]),
                "receive_count": int(
                    msg["Attributes"]["ApproximateReceiveCount"]
                )
            })
        return messages
    
    def delete(self, receipt_handle: str):
        """Delete a processed message."""
        self.sqs.delete_message(
            QueueUrl=self.queue_url,
            ReceiptHandle=receipt_handle
        )
    
    def send_to_dlq(self, message: dict, error: str):
        """Send failed message to dead letter queue."""
        if self.dlq_url:
            message["_error"] = error
            message["_failed_at"] = time.time()
            self.sqs.send_message(
                QueueUrl=self.dlq_url,
                MessageBody=json.dumps(message)
            )


# Worker pattern
def run_sqs_worker(queue: SQSQueue, handler: Callable):
    """Process messages continuously."""
    while True:
        messages = queue.receive()
        
        for msg in messages:
            try:
                handler(msg["body"])
                queue.delete(msg["receipt_handle"])
            except Exception as e:
                if msg["receive_count"] >= 3:
                    queue.send_to_dlq(msg["body"], str(e))
                    queue.delete(msg["receipt_handle"])
                # Otherwise, message returns to queue after visibility timeout


# Usage
queue = SQSQueue(
    "https://sqs.us-east-1.amazonaws.com/123456789/tasks",
    "https://sqs.us-east-1.amazonaws.com/123456789/tasks-dlq"
)

# Send
queue.send({"type": "process_order", "order_id": "123"})

# With delay (e.g., for scheduled tasks)
queue.send(
    {"type": "send_reminder", "user_id": 456},
    delay_seconds=3600  # 1 hour
)

# Process
run_sqs_worker(queue, process_task)

Patterns

Fan-Out

One message triggers multiple handlers:

1
2
3
4
5
6
7
# RabbitMQ fan-out with topic exchange
mq.publish("order.created", order_data)

# Multiple queues bound to "order.*" each receive the message:
# - inventory_queue: reserve items
# - email_queue: send confirmation
# - analytics_queue: track conversion

Request-Reply

When you need a response:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import uuid

def request_reply(queue: RedisQueue, task_type: str, payload: dict, 
                  timeout: int = 30) -> dict:
    """Send a task and wait for response."""
    reply_channel = f"reply:{uuid.uuid4()}"
    
    queue.enqueue(task_type, {
        **payload,
        "_reply_to": reply_channel
    })
    
    # Wait for response
    pubsub = queue.redis.pubsub()
    pubsub.subscribe(reply_channel)
    
    for message in pubsub.listen():
        if message["type"] == "message":
            return json.loads(message["data"])
    
    raise TimeoutError("No response received")

Best Practices

  1. Make handlers idempotent — messages may be delivered more than once
  2. Use dead letter queues — don’t lose failed messages
  3. Set visibility timeouts — prevent stuck messages from blocking
  4. Monitor queue depth — alert when backlog grows
  5. Use batching — process multiple messages together when possible
  6. Log message IDs — for debugging and tracing
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# Monitoring example
def check_queue_health(queue_url: str):
    sqs = boto3.client('sqs')
    attrs = sqs.get_queue_attributes(
        QueueUrl=queue_url,
        AttributeNames=['ApproximateNumberOfMessages', 
                       'ApproximateNumberOfMessagesNotVisible']
    )
    
    pending = int(attrs['Attributes']['ApproximateNumberOfMessages'])
    in_flight = int(attrs['Attributes']['ApproximateNumberOfMessagesNotVisible'])
    
    if pending > 1000:
        alert(f"Queue backlog: {pending} messages pending")

Message queues transform brittle synchronous systems into resilient async ones. Start simple with Redis, graduate to RabbitMQ or SQS as needs grow.