Webhooks flip the API model: instead of polling, the service calls you. Here’s how to handle them without losing events.

Basic Webhook Handler

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from fastapi import FastAPI, Request, HTTPException
import hmac
import hashlib

app = FastAPI()

@app.post("/webhooks/stripe")
async def stripe_webhook(request: Request):
    payload = await request.body()
    sig_header = request.headers.get("Stripe-Signature")
    
    # Verify signature first
    if not verify_stripe_signature(payload, sig_header):
        raise HTTPException(status_code=400, detail="Invalid signature")
    
    event = json.loads(payload)
    
    # Process event
    if event["type"] == "payment_intent.succeeded":
        handle_payment_success(event["data"]["object"])
    
    # Always return 200 quickly
    return {"received": True}

Signature Verification

Never trust unverified webhooks.

Stripe

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import stripe

def verify_stripe_signature(payload: bytes, sig_header: str) -> bool:
    try:
        stripe.Webhook.construct_event(
            payload, sig_header, STRIPE_WEBHOOK_SECRET
        )
        return True
    except stripe.error.SignatureVerificationError:
        return False

GitHub

1
2
3
4
5
6
7
def verify_github_signature(payload: bytes, signature: str) -> bool:
    expected = "sha256=" + hmac.new(
        GITHUB_SECRET.encode(),
        payload,
        hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(expected, signature)

Generic HMAC

1
2
3
4
5
6
7
def verify_hmac(payload: bytes, signature: str, secret: str) -> bool:
    expected = hmac.new(
        secret.encode(),
        payload,
        hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(expected, signature)

The 200 OK Problem

Services expect quick responses. If you do slow work in the handler:

1
2
3
4
5
6
7
8
# Bad: blocks until email sends
@app.post("/webhooks/order")
async def order_webhook(event: dict):
    order = process_order(event)
    send_confirmation_email(order)  # Takes 2 seconds
    update_inventory(order)         # Takes 1 second
    notify_warehouse(order)         # Takes 500ms
    return {"ok": True}  # 3.5 seconds later

The sender might timeout and retry, causing duplicate processing.

Pattern: Async Processing

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from redis import Redis
import json

redis = Redis()

@app.post("/webhooks/order")
async def order_webhook(request: Request):
    payload = await request.body()
    
    # Verify signature
    if not verify_signature(payload, request.headers):
        raise HTTPException(400)
    
    # Queue for async processing
    redis.lpush("webhook_queue", payload)
    
    # Return immediately
    return {"received": True}

# Separate worker process
def process_webhooks():
    while True:
        _, payload = redis.brpop("webhook_queue")
        event = json.loads(payload)
        handle_event(event)

Idempotency

Webhooks can be delivered multiple times. Handle it.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from functools import wraps

processed_events = set()  # Use Redis in production

def idempotent(func):
    @wraps(func)
    def wrapper(event):
        event_id = event.get("id") or event.get("event_id")
        
        if event_id in processed_events:
            return {"status": "already_processed"}
        
        result = func(event)
        processed_events.add(event_id)
        return result
    return wrapper

@idempotent
def handle_payment(event):
    # Safe to call multiple times
    charge_id = event["data"]["object"]["id"]
    update_order_status(charge_id, "paid")

Database-Backed Idempotency

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
@app.post("/webhooks/stripe")
async def stripe_webhook(request: Request):
    event = await parse_and_verify(request)
    
    # Check if already processed
    existing = await db.fetch_one(
        "SELECT id FROM processed_webhooks WHERE event_id = :id",
        {"id": event["id"]}
    )
    
    if existing:
        return {"status": "duplicate"}
    
    # Process in transaction
    async with db.transaction():
        await handle_event(event)
        await db.execute(
            "INSERT INTO processed_webhooks (event_id, processed_at) VALUES (:id, NOW())",
            {"id": event["id"]}
        )
    
    return {"received": True}

Retry Handling

When your endpoint fails, services retry. Be ready.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# Log for debugging retries
@app.post("/webhooks/github")
async def github_webhook(request: Request):
    delivery_id = request.headers.get("X-GitHub-Delivery")
    event_type = request.headers.get("X-GitHub-Event")
    
    logger.info(f"Received {event_type}", extra={
        "delivery_id": delivery_id,
        "attempt": request.headers.get("X-GitHub-Hook-Installation-Target-ID")
    })
    
    try:
        await process_event(await request.json())
        return {"ok": True}
    except Exception as e:
        logger.error(f"Webhook failed: {e}", extra={"delivery_id": delivery_id})
        raise HTTPException(500)  # Service will retry

Event Types to Handle

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
HANDLERS = {
    "customer.created": handle_new_customer,
    "customer.updated": handle_customer_update,
    "invoice.paid": handle_invoice_paid,
    "invoice.payment_failed": handle_payment_failed,
}

@app.post("/webhooks/stripe")
async def stripe_webhook(request: Request):
    event = await parse_and_verify(request)
    
    handler = HANDLERS.get(event["type"])
    if handler:
        await handler(event["data"]["object"])
    else:
        logger.info(f"Unhandled event type: {event['type']}")
    
    return {"received": True}

Testing Webhooks Locally

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# ngrok for temporary public URL
ngrok http 8000
# Use the ngrok URL in webhook settings

# Stripe CLI
stripe listen --forward-to localhost:8000/webhooks/stripe
stripe trigger payment_intent.succeeded

# GitHub CLI
gh webhook forward --repo=myorg/myrepo --events=push --url=http://localhost:8000/webhooks/github

Webhook Security Checklist

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
@app.post("/webhooks/secure")
async def secure_webhook(request: Request):
    # 1. Verify signature
    if not verify_signature(request):
        raise HTTPException(401, "Invalid signature")
    
    # 2. Check timestamp (prevent replay attacks)
    timestamp = int(request.headers.get("X-Timestamp", 0))
    if abs(time.time() - timestamp) > 300:  # 5 minutes
        raise HTTPException(401, "Timestamp too old")
    
    # 3. Validate source IP (if provider publishes IP ranges)
    client_ip = request.client.host
    if not is_allowed_ip(client_ip):
        raise HTTPException(403, "IP not allowed")
    
    # 4. Process idempotently
    return await process_idempotent(request)

Monitoring

Track these metrics:

  • Webhook receive rate
  • Processing latency
  • Failure rate by event type
  • Duplicate event rate
  • Queue depth (if using async)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from prometheus_client import Counter, Histogram

webhook_received = Counter('webhooks_received_total', 'Webhooks received', ['source', 'event_type'])
webhook_latency = Histogram('webhook_processing_seconds', 'Webhook processing time')
webhook_errors = Counter('webhook_errors_total', 'Webhook processing errors', ['source', 'error_type'])

@app.post("/webhooks/{source}")
async def webhook(source: str, request: Request):
    event = await parse_event(request)
    webhook_received.labels(source=source, event_type=event["type"]).inc()
    
    with webhook_latency.time():
        try:
            await process_event(event)
        except Exception as e:
            webhook_errors.labels(source=source, error_type=type(e).__name__).inc()
            raise

The goal: receive quickly, process reliably, never lose events. Webhooks are only as reliable as your handler.