Synchronous processing is a lie. At some point, your request-response cycle will hit a wall: sending emails, processing images, charging credit cards, generating reports. The solution: message queues. Here’s how to use them without creating distributed system nightmares.
Why Queues?# W U W U i s i s t e t e h r h r o u → q → t u A e A q P u P u I └ e I W e ─ : o u → ─ → r e k : [ 3 [ e S + Q ↓ r e u s n s e d e u p c e r E o o m n T c a d a e i s s s l k s ] o s f ] a → s w → y [ a n P i R c r t e o i s c n p e g o s n s ─ s ─ e I ─ m ─ ( a ─ 2 g ─ 0 e ─ 0 ] ─ m ─ s → ─ ) ─ [ ─ C ─ h ─ a ─ r ─ g ─ e ─ ─ C ─ a ─ r ─ d ┘ ] → R e s p o n s e
Benefits:
Faster responses : Return immediately, process laterResilience : If a service is down, retry laterScaling : Add more workers when backloggedDecoupling : Services don’t need to know about each otherQueue Architecture# ┌ │ └ ─ ─ ─ P ─ ─ r ─ ─ o ─ A ─ d ─ P ─ u ─ I ─ c ─ ─ e ─ ─ r ─ ─ ─ ┐ │ ┘ ─ ─ ─ ─ ▶ ┌ │ └ ─ ─ R ─ ─ R a ─ Q ─ e b ─ u ─ d b ─ e ─ i i ─ u ─ s t ─ e ─ / M ─ ─ Q ─ ─ ┐ │ ┘ ─ ─ ─ ─ ▶ ┌ │ └ ─ ─ W ─ C ─ o ─ o ─ r ─ n ─ k ─ s ─ e ─ u ─ r ─ m ─ ─ e ─ ─ r ─ ─ ─ ┐ │ ┘
Producer : Creates messages (your API)
Queue : Stores messages until processed
Consumer : Processes messages (worker processes)
Redis Queues (Simple)# With Python RQ# 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# tasks.py
from redis import Redis
from rq import Queue
redis_conn = Redis ( host = 'localhost' , port = 6379 )
queue = Queue ( connection = redis_conn )
def send_welcome_email ( user_id ):
user = db . get_user ( user_id )
email . send (
to = user . email ,
subject = "Welcome!" ,
body = render_template ( "welcome.html" , user = user )
)
# Producer: enqueue task
from tasks import queue , send_welcome_email
@app.route ( '/signup' , methods = [ 'POST' ])
def signup ():
user = create_user ( request . json )
queue . enqueue ( send_welcome_email , user . id )
return jsonify ({ "status" : "created" }), 201
1
2
# Start worker
rq worker --url redis://localhost:6379
With Bull (Node.js)# 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// queue.js
const Queue = require ( 'bull' );
const emailQueue = new Queue ( 'email' , 'redis://localhost:6379' );
// Define processor
emailQueue . process ( async ( job ) => {
const { userId , template } = job . data ;
const user = await db . getUser ( userId );
await sendEmail ( user . email , template );
});
// Producer
app . post ( '/signup' , async ( req , res ) => {
const user = await createUser ( req . body );
await emailQueue . add ({ userId : user . id , template : 'welcome' });
res . status ( 201 ). json ({ status : 'created' });
});
RabbitMQ (Robust)# Setup# 1
2
3
4
5
6
7
8
9
import pika
connection = pika . BlockingConnection (
pika . ConnectionParameters ( 'localhost' )
)
channel = connection . channel ()
# Declare queue (idempotent)
channel . queue_declare ( queue = 'tasks' , durable = True )
Producer# 1
2
3
4
5
6
7
8
9
10
def publish_task ( task_type , data ):
channel . basic_publish (
exchange = '' ,
routing_key = 'tasks' ,
body = json . dumps ({ 'type' : task_type , 'data' : data }),
properties = pika . BasicProperties (
delivery_mode = 2 , # Persistent
content_type = 'application/json'
)
)
Consumer# 1
2
3
4
5
6
7
8
9
10
11
12
def callback ( ch , method , properties , body ):
try :
message = json . loads ( body )
process_task ( message )
ch . basic_ack ( delivery_tag = method . delivery_tag )
except Exception as e :
# Reject and requeue (or dead-letter)
ch . basic_nack ( delivery_tag = method . delivery_tag , requeue = False )
channel . basic_qos ( prefetch_count = 1 ) # One message at a time
channel . basic_consume ( queue = 'tasks' , on_message_callback = callback )
channel . start_consuming ()
Exchange Patterns# 1
2
3
4
5
6
7
8
9
10
11
12
# Direct: Route to specific queue
channel . exchange_declare ( exchange = 'direct_logs' , exchange_type = 'direct' )
channel . basic_publish ( exchange = 'direct_logs' , routing_key = 'error' , body = message )
# Fanout: Broadcast to all queues
channel . exchange_declare ( exchange = 'broadcast' , exchange_type = 'fanout' )
channel . basic_publish ( exchange = 'broadcast' , routing_key = '' , body = message )
# Topic: Pattern matching
channel . exchange_declare ( exchange = 'topic_logs' , exchange_type = 'topic' )
channel . basic_publish ( exchange = 'topic_logs' , routing_key = 'user.created' , body = message )
# Consumers can bind to 'user.*' or '*.created'
Celery (Python Standard)# Setup# 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# celery_app.py
from celery import Celery
app = Celery (
'tasks' ,
broker = 'redis://localhost:6379/0' ,
backend = 'redis://localhost:6379/1'
)
app . conf . update (
task_serializer = 'json' ,
accept_content = [ 'json' ],
result_serializer = 'json' ,
timezone = 'UTC' ,
task_acks_late = True ,
task_reject_on_worker_lost = True ,
)
Tasks# 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# tasks.py
from celery_app import app
@app.task ( bind = True , max_retries = 3 )
def send_email ( self , user_id , template ):
try :
user = db . get_user ( user_id )
email . send ( user . email , template )
except ConnectionError as e :
raise self . retry ( exc = e , countdown = 60 ) # Retry in 60s
@app.task
def process_image ( image_id ):
image = storage . get ( image_id )
thumbnail = resize ( image , 200 , 200 )
storage . put ( f " { image_id } _thumb" , thumbnail )
Calling Tasks# 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# Async call
send_email . delay ( user_id , 'welcome' )
# With options
send_email . apply_async (
args = [ user_id , 'welcome' ],
countdown = 300 , # Delay 5 minutes
expires = 3600 , # Expire after 1 hour
)
# Chain tasks
from celery import chain
result = chain (
download_image . s ( url ),
process_image . s (),
upload_to_cdn . s ()
) . apply_async ()
# Group (parallel)
from celery import group
result = group (
send_email . s ( user_id , 'welcome' )
for user_id in user_ids
) . apply_async ()
Worker# 1
celery -A celery_app worker --loglevel= info --concurrency= 4
Message Design# Good Message Structure# 1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"id" : "msg_abc123" ,
"type" : "user.welcome_email" ,
"version" : "1.0" ,
"timestamp" : "2024-03-12T15:30:00Z" ,
"data" : {
"user_id" : "usr_456" ,
"email" : "alice@example.com"
},
"metadata" : {
"correlation_id" : "req_789" ,
"retry_count" : 0
}
}
Include:
ID : For deduplicationType : What kind of taskVersion : Schema version for evolutionTimestamp : When createdCorrelation ID : Trace across servicesIdempotency# Messages may be delivered more than once. Design for it.
1
2
3
4
5
6
7
8
9
10
11
def process_payment ( message ):
payment_id = message [ 'data' ][ 'payment_id' ]
# Check if already processed
if db . payment_exists ( payment_id ):
return # Already done
# Process with transaction
with db . transaction ():
charge_card ( message [ 'data' ])
db . mark_payment_complete ( payment_id )
Error Handling# Retry with Backoff# 1
2
3
4
5
6
7
8
@app.task ( bind = True , max_retries = 5 )
def unreliable_task ( self , data ):
try :
external_api . call ( data )
except APIError as e :
# Exponential backoff: 1m, 2m, 4m, 8m, 16m
countdown = 60 * ( 2 ** self . request . retries )
raise self . retry ( exc = e , countdown = countdown )
Dead Letter Queues# 1
2
3
4
5
6
7
8
9
10
11
# RabbitMQ DLQ setup
channel . queue_declare (
queue = 'tasks' ,
durable = True ,
arguments = {
'x-dead-letter-exchange' : '' ,
'x-dead-letter-routing-key' : 'tasks_dlq'
}
)
channel . queue_declare ( queue = 'tasks_dlq' , durable = True )
Failed messages go to DLQ for inspection/replay.
Circuit Breaker# 1
2
3
4
5
6
7
8
9
10
11
12
13
from circuitbreaker import circuit
@circuit ( failure_threshold = 5 , recovery_timeout = 60 )
def call_external_api ( data ):
return requests . post ( 'https://api.example.com' , json = data )
@app.task
def process_with_breaker ( data ):
try :
call_external_api ( data )
except CircuitBreakerError :
# Circuit is open, retry later
raise self . retry ( countdown = 120 )
Monitoring# Key Metrics# 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Prometheus metrics
from prometheus_client import Counter , Histogram , Gauge
tasks_total = Counter ( 'tasks_total' , 'Total tasks' , [ 'type' , 'status' ])
task_duration = Histogram ( 'task_duration_seconds' , 'Task duration' , [ 'type' ])
queue_size = Gauge ( 'queue_size' , 'Messages in queue' , [ 'queue' ])
@app.task
def monitored_task ( data ):
with task_duration . labels ( type = 'email' ) . time ():
try :
send_email ( data )
tasks_total . labels ( type = 'email' , status = 'success' ) . inc ()
except Exception :
tasks_total . labels ( type = 'email' , status = 'failure' ) . inc ()
raise
Celery Flower# 1
2
pip install flower
celery -A celery_app flower --port= 5555
Web UI showing:
Active workers Task success/failure rates Queue depths Task history Alerts# 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# Prometheus alerts
groups :
- name : queues
rules :
- alert : QueueBacklog
expr : queue_size > 10000
for : 5m
labels :
severity : warning
annotations :
summary : "Queue backlog exceeds 10k messages"
- alert : HighFailureRate
expr : |
rate(tasks_total{status="failure"}[5m])
/ rate(tasks_total[5m]) > 0.1
for : 5m
labels :
severity : critical
Scaling Workers# Horizontal Scaling# 1
2
3
4
5
6
7
8
9
# docker-compose.yml
services :
worker :
image : myapp:latest
command : celery -A celery_app worker --concurrency=4
deploy :
replicas : 3
environment :
- CELERY_BROKER_URL=redis://redis:6379/0
Auto-scaling on Queue Depth# 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Kubernetes HPA
apiVersion : autoscaling/v2
kind : HorizontalPodAutoscaler
metadata :
name : celery-worker
spec :
scaleTargetRef :
apiVersion : apps/v1
kind : Deployment
name : celery-worker
minReplicas : 2
maxReplicas : 20
metrics :
- type : External
external :
metric :
name : rabbitmq_queue_messages
selector :
matchLabels :
queue : tasks
target :
type : AverageValue
averageValue : 100 # 100 messages per worker
Queue Selection Guide# Need Use Simple, low volume Redis + RQ Python ecosystem Celery + Redis/RabbitMQ Complex routing RabbitMQ High throughput Kafka AWS native SQS Guaranteed delivery RabbitMQ or SQS
The Checklist# Start Here# Today: Move one slow operation to a queueThis week: Add retry logic and DLQThis month: Implement monitoring and alertsThis quarter: Auto-scale workers based on loadQueues turn “please wait” into “done instantly.” Your users will thank you.
The best queue is invisible. Users just experience responsiveness.