1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
| import pika
import json
from typing import Callable
class RabbitMQQueue:
def __init__(self, url: str, exchange: str = "tasks"):
self.connection = pika.BlockingConnection(
pika.URLParameters(url)
)
self.channel = self.connection.channel()
self.exchange = exchange
# Declare exchange and dead letter exchange
self.channel.exchange_declare(exchange, "topic", durable=True)
self.channel.exchange_declare(f"{exchange}.dead", "topic", durable=True)
def declare_queue(self, queue_name: str, routing_key: str):
"""Declare a queue with dead letter handling."""
# Dead letter queue
self.channel.queue_declare(
f"{queue_name}.dead",
durable=True
)
self.channel.queue_bind(
f"{queue_name}.dead",
f"{self.exchange}.dead",
routing_key
)
# Main queue
self.channel.queue_declare(
queue_name,
durable=True,
arguments={
"x-dead-letter-exchange": f"{self.exchange}.dead",
"x-dead-letter-routing-key": routing_key,
}
)
self.channel.queue_bind(queue_name, self.exchange, routing_key)
def publish(self, routing_key: str, message: dict):
"""Publish a message."""
self.channel.basic_publish(
exchange=self.exchange,
routing_key=routing_key,
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
content_type="application/json"
)
)
def consume(self, queue_name: str, callback: Callable):
"""Consume messages from a queue."""
def wrapper(ch, method, properties, body):
try:
message = json.loads(body)
callback(message)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
# Reject and send to dead letter queue
ch.basic_nack(
delivery_tag=method.delivery_tag,
requeue=False
)
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(queue_name, wrapper)
self.channel.start_consuming()
# Usage
mq = RabbitMQQueue("amqp://localhost")
# Setup
mq.declare_queue("email_tasks", "email.*")
mq.declare_queue("payment_tasks", "payment.*")
# Publish
mq.publish("email.welcome", {
"user_id": 123,
"email": "user@example.com"
})
mq.publish("payment.charge", {
"order_id": "ord-456",
"amount": 99.99
})
# Consume (in separate workers)
mq.consume("email_tasks", send_welcome_email)
mq.consume("payment_tasks", process_payment)
|