Webhooks flip the API model: instead of polling, the service calls you. Here’s how to handle them without losing events.
Basic Webhook Handler#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| from fastapi import FastAPI, Request, HTTPException
import hmac
import hashlib
app = FastAPI()
@app.post("/webhooks/stripe")
async def stripe_webhook(request: Request):
payload = await request.body()
sig_header = request.headers.get("Stripe-Signature")
# Verify signature first
if not verify_stripe_signature(payload, sig_header):
raise HTTPException(status_code=400, detail="Invalid signature")
event = json.loads(payload)
# Process event
if event["type"] == "payment_intent.succeeded":
handle_payment_success(event["data"]["object"])
# Always return 200 quickly
return {"received": True}
|
Signature Verification#
Never trust unverified webhooks.
Stripe#
1
2
3
4
5
6
7
8
9
10
| import stripe
def verify_stripe_signature(payload: bytes, sig_header: str) -> bool:
try:
stripe.Webhook.construct_event(
payload, sig_header, STRIPE_WEBHOOK_SECRET
)
return True
except stripe.error.SignatureVerificationError:
return False
|
GitHub#
1
2
3
4
5
6
7
| def verify_github_signature(payload: bytes, signature: str) -> bool:
expected = "sha256=" + hmac.new(
GITHUB_SECRET.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature)
|
Generic HMAC#
1
2
3
4
5
6
7
| def verify_hmac(payload: bytes, signature: str, secret: str) -> bool:
expected = hmac.new(
secret.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature)
|
The 200 OK Problem#
Services expect quick responses. If you do slow work in the handler:
1
2
3
4
5
6
7
8
| # Bad: blocks until email sends
@app.post("/webhooks/order")
async def order_webhook(event: dict):
order = process_order(event)
send_confirmation_email(order) # Takes 2 seconds
update_inventory(order) # Takes 1 second
notify_warehouse(order) # Takes 500ms
return {"ok": True} # 3.5 seconds later
|
The sender might timeout and retry, causing duplicate processing.
Pattern: Async Processing#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| from redis import Redis
import json
redis = Redis()
@app.post("/webhooks/order")
async def order_webhook(request: Request):
payload = await request.body()
# Verify signature
if not verify_signature(payload, request.headers):
raise HTTPException(400)
# Queue for async processing
redis.lpush("webhook_queue", payload)
# Return immediately
return {"received": True}
# Separate worker process
def process_webhooks():
while True:
_, payload = redis.brpop("webhook_queue")
event = json.loads(payload)
handle_event(event)
|
Idempotency#
Webhooks can be delivered multiple times. Handle it.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| from functools import wraps
processed_events = set() # Use Redis in production
def idempotent(func):
@wraps(func)
def wrapper(event):
event_id = event.get("id") or event.get("event_id")
if event_id in processed_events:
return {"status": "already_processed"}
result = func(event)
processed_events.add(event_id)
return result
return wrapper
@idempotent
def handle_payment(event):
# Safe to call multiple times
charge_id = event["data"]["object"]["id"]
update_order_status(charge_id, "paid")
|
Database-Backed Idempotency#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| @app.post("/webhooks/stripe")
async def stripe_webhook(request: Request):
event = await parse_and_verify(request)
# Check if already processed
existing = await db.fetch_one(
"SELECT id FROM processed_webhooks WHERE event_id = :id",
{"id": event["id"]}
)
if existing:
return {"status": "duplicate"}
# Process in transaction
async with db.transaction():
await handle_event(event)
await db.execute(
"INSERT INTO processed_webhooks (event_id, processed_at) VALUES (:id, NOW())",
{"id": event["id"]}
)
return {"received": True}
|
Retry Handling#
When your endpoint fails, services retry. Be ready.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| # Log for debugging retries
@app.post("/webhooks/github")
async def github_webhook(request: Request):
delivery_id = request.headers.get("X-GitHub-Delivery")
event_type = request.headers.get("X-GitHub-Event")
logger.info(f"Received {event_type}", extra={
"delivery_id": delivery_id,
"attempt": request.headers.get("X-GitHub-Hook-Installation-Target-ID")
})
try:
await process_event(await request.json())
return {"ok": True}
except Exception as e:
logger.error(f"Webhook failed: {e}", extra={"delivery_id": delivery_id})
raise HTTPException(500) # Service will retry
|
Event Types to Handle#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| HANDLERS = {
"customer.created": handle_new_customer,
"customer.updated": handle_customer_update,
"invoice.paid": handle_invoice_paid,
"invoice.payment_failed": handle_payment_failed,
}
@app.post("/webhooks/stripe")
async def stripe_webhook(request: Request):
event = await parse_and_verify(request)
handler = HANDLERS.get(event["type"])
if handler:
await handler(event["data"]["object"])
else:
logger.info(f"Unhandled event type: {event['type']}")
return {"received": True}
|
Testing Webhooks Locally#
1
2
3
4
5
6
7
8
9
10
| # ngrok for temporary public URL
ngrok http 8000
# Use the ngrok URL in webhook settings
# Stripe CLI
stripe listen --forward-to localhost:8000/webhooks/stripe
stripe trigger payment_intent.succeeded
# GitHub CLI
gh webhook forward --repo=myorg/myrepo --events=push --url=http://localhost:8000/webhooks/github
|
Webhook Security Checklist#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| @app.post("/webhooks/secure")
async def secure_webhook(request: Request):
# 1. Verify signature
if not verify_signature(request):
raise HTTPException(401, "Invalid signature")
# 2. Check timestamp (prevent replay attacks)
timestamp = int(request.headers.get("X-Timestamp", 0))
if abs(time.time() - timestamp) > 300: # 5 minutes
raise HTTPException(401, "Timestamp too old")
# 3. Validate source IP (if provider publishes IP ranges)
client_ip = request.client.host
if not is_allowed_ip(client_ip):
raise HTTPException(403, "IP not allowed")
# 4. Process idempotently
return await process_idempotent(request)
|
Monitoring#
Track these metrics:
- Webhook receive rate
- Processing latency
- Failure rate by event type
- Duplicate event rate
- Queue depth (if using async)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| from prometheus_client import Counter, Histogram
webhook_received = Counter('webhooks_received_total', 'Webhooks received', ['source', 'event_type'])
webhook_latency = Histogram('webhook_processing_seconds', 'Webhook processing time')
webhook_errors = Counter('webhook_errors_total', 'Webhook processing errors', ['source', 'error_type'])
@app.post("/webhooks/{source}")
async def webhook(source: str, request: Request):
event = await parse_event(request)
webhook_received.labels(source=source, event_type=event["type"]).inc()
with webhook_latency.time():
try:
await process_event(event)
except Exception as e:
webhook_errors.labels(source=source, error_type=type(e).__name__).inc()
raise
|
The goal: receive quickly, process reliably, never lose events. Webhooks are only as reliable as your handler.
📬 Get the Newsletter
Weekly insights on DevOps, automation, and CLI mastery. No spam, unsubscribe anytime.