Webhooks flip the API model: instead of polling for changes, services push events to you. GitHub notifies you of commits. Stripe tells you about payments. Twilio delivers SMS receipts.
The concept is simple. Production-grade implementation requires care.
Basic Webhook Receiver#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| from fastapi import FastAPI, Request, HTTPException
app = FastAPI()
@app.post("/webhooks/stripe")
async def stripe_webhook(request: Request):
payload = await request.json()
event_type = payload.get("type")
if event_type == "payment_intent.succeeded":
handle_payment_success(payload["data"]["object"])
elif event_type == "payment_intent.failed":
handle_payment_failure(payload["data"]["object"])
return {"status": "received"}
|
This works for demos. Production needs more.
Signature Verification#
Never trust unverified webhooks. Attackers can forge them:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| import hmac
import hashlib
def verify_stripe_signature(payload: bytes, signature: str, secret: str) -> bool:
timestamp, sig = parse_signature_header(signature)
# Check timestamp to prevent replay attacks
if abs(time.time() - int(timestamp)) > 300: # 5 minute tolerance
return False
# Compute expected signature
signed_payload = f"{timestamp}.{payload.decode()}"
expected = hmac.new(
secret.encode(),
signed_payload.encode(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, sig)
@app.post("/webhooks/stripe")
async def stripe_webhook(request: Request):
payload = await request.body()
signature = request.headers.get("Stripe-Signature")
if not verify_stripe_signature(payload, signature, STRIPE_WEBHOOK_SECRET):
raise HTTPException(401, "Invalid signature")
# Process verified webhook
data = json.loads(payload)
# ...
|
Most providers include signature headers. Always verify them.
Respond Fast, Process Later#
Webhook senders have timeouts (often 5-30 seconds). Long processing causes retries:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| from fastapi import BackgroundTasks
@app.post("/webhooks/github")
async def github_webhook(request: Request, background_tasks: BackgroundTasks):
payload = await request.json()
# Verify signature first
if not verify_github_signature(request):
raise HTTPException(401)
# Queue for background processing
background_tasks.add_task(process_github_event, payload)
# Return immediately
return {"status": "accepted"}
async def process_github_event(payload: dict):
# This runs after response is sent
event_type = payload.get("action")
# ... heavy processing ...
|
Or use a proper queue:
1
2
3
4
5
6
7
8
9
10
11
12
| @app.post("/webhooks/github")
async def github_webhook(request: Request):
payload = await request.json()
# Push to queue, return immediately
await redis.lpush("webhook_queue", json.dumps({
"source": "github",
"payload": payload,
"received_at": time.time()
}))
return {"status": "queued"}
|
Idempotency: Handle Duplicates#
Webhooks can be delivered multiple times. Your handler must be idempotent:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| @app.post("/webhooks/stripe")
async def stripe_webhook(request: Request):
payload = await request.json()
event_id = payload["id"]
# Check if already processed
if await redis.exists(f"webhook:processed:{event_id}"):
return {"status": "already_processed"}
# Process the event
await handle_event(payload)
# Mark as processed (with TTL for cleanup)
await redis.setex(f"webhook:processed:{event_id}", 86400 * 7, "1")
return {"status": "processed"}
|
Use the event ID provided by the sender, not your own.
Retry Handling#
When your endpoint fails, senders retry. Handle retries gracefully:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| async def handle_event(payload: dict):
event_id = payload["id"]
# Idempotent check
existing = await db.get_event(event_id)
if existing and existing.status == "completed":
return # Already done
# Record attempt
await db.upsert_event(event_id, status="processing")
try:
await process_event(payload)
await db.update_event(event_id, status="completed")
except Exception as e:
await db.update_event(event_id, status="failed", error=str(e))
raise # Let the sender know to retry
|
Ordering Considerations#
Webhooks may arrive out of order. Handle it:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| async def handle_order_event(payload: dict):
order_id = payload["order_id"]
event_time = payload["created_at"]
async with db.transaction():
order = await db.get_order(order_id, for_update=True)
# Skip if we've seen a newer event
if order and order.last_event_time > event_time:
return
# Process this event
await update_order(order_id, payload)
await db.update_order(order_id, last_event_time=event_time)
|
Or design for eventual consistency and reconcile periodically.
Security Checklist#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| @app.post("/webhooks/{provider}")
async def webhook_handler(provider: str, request: Request):
# 1. Verify signature
if not verify_signature(provider, request):
raise HTTPException(401, "Invalid signature")
# 2. Validate source IP (if provider publishes IP ranges)
if not is_allowed_ip(provider, request.client.host):
raise HTTPException(403, "IP not allowed")
# 3. Check content type
if request.headers.get("content-type") != "application/json":
raise HTTPException(415, "Expected JSON")
# 4. Parse with size limit
body = await request.body()
if len(body) > 1_000_000: # 1MB limit
raise HTTPException(413, "Payload too large")
# 5. Process
payload = json.loads(body)
await queue_webhook(provider, payload)
return {"status": "accepted"}
|
Monitoring Webhooks#
Track webhook health:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| from prometheus_client import Counter, Histogram
webhook_received = Counter(
'webhooks_received_total',
'Webhooks received',
['provider', 'event_type', 'status']
)
webhook_latency = Histogram(
'webhook_processing_seconds',
'Webhook processing time',
['provider']
)
@app.post("/webhooks/{provider}")
async def webhook_handler(provider: str, request: Request):
start = time.time()
try:
result = await process_webhook(provider, request)
webhook_received.labels(provider, result.event_type, "success").inc()
return {"status": "ok"}
except Exception as e:
webhook_received.labels(provider, "unknown", "error").inc()
raise
finally:
webhook_latency.labels(provider).observe(time.time() - start)
|
Alert on:
- High error rates
- Processing latency spikes
- Missing expected webhooks
Sending Webhooks#
If you’re the sender, be a good citizen:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
| import httpx
from tenacity import retry, stop_after_attempt, wait_exponential
class WebhookSender:
def __init__(self):
self.client = httpx.AsyncClient(timeout=30)
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(min=1, max=300)
)
async def send(self, url: str, payload: dict, secret: str):
body = json.dumps(payload)
signature = self.compute_signature(body, secret)
response = await self.client.post(
url,
content=body,
headers={
"Content-Type": "application/json",
"X-Webhook-Signature": signature,
"X-Webhook-ID": payload["id"],
"X-Webhook-Timestamp": str(int(time.time()))
}
)
if response.status_code >= 500:
raise RetryableError("Server error, will retry")
elif response.status_code >= 400:
# Log but don't retry client errors
logger.error(f"Webhook rejected: {response.status_code}")
return response
def compute_signature(self, body: str, secret: str) -> str:
return hmac.new(
secret.encode(),
body.encode(),
hashlib.sha256
).hexdigest()
|
Good sender practices:
- Include unique event ID
- Include timestamp
- Sign the payload
- Retry with exponential backoff
- Respect receiver’s response codes
- Provide event history API for missed webhooks
Testing Webhooks#
Use tools to receive webhooks locally:
1
2
3
4
5
6
| # ngrok exposes localhost
ngrok http 8000
# Use the ngrok URL as webhook endpoint
# Or use webhook.site for inspection
# https://webhook.site gives you a temporary URL
|
Write integration tests:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| async def test_stripe_webhook():
payload = {
"id": "evt_test_123",
"type": "payment_intent.succeeded",
"data": {"object": {"id": "pi_123", "amount": 1000}}
}
signature = compute_test_signature(payload)
response = await client.post(
"/webhooks/stripe",
json=payload,
headers={"Stripe-Signature": signature}
)
assert response.status_code == 200
assert await db.get_event("evt_test_123") is not None
|
Webhooks are deceptively simple. The HTTP part is easy. The reliability part — verification, idempotency, ordering, retries — is where production systems succeed or fail.
Verify every signature. Respond fast, process async. Handle duplicates. Monitor everything. The webhook that silently fails is the one that costs you money.