Webhooks flip the API model: instead of polling for changes, services push events to you. GitHub notifies you of commits. Stripe tells you about payments. Twilio delivers SMS receipts.

The concept is simple. Production-grade implementation requires care.

Basic Webhook Receiver

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from fastapi import FastAPI, Request, HTTPException

app = FastAPI()

@app.post("/webhooks/stripe")
async def stripe_webhook(request: Request):
    payload = await request.json()
    event_type = payload.get("type")
    
    if event_type == "payment_intent.succeeded":
        handle_payment_success(payload["data"]["object"])
    elif event_type == "payment_intent.failed":
        handle_payment_failure(payload["data"]["object"])
    
    return {"status": "received"}

This works for demos. Production needs more.

Signature Verification

Never trust unverified webhooks. Attackers can forge them:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import hmac
import hashlib

def verify_stripe_signature(payload: bytes, signature: str, secret: str) -> bool:
    timestamp, sig = parse_signature_header(signature)
    
    # Check timestamp to prevent replay attacks
    if abs(time.time() - int(timestamp)) > 300:  # 5 minute tolerance
        return False
    
    # Compute expected signature
    signed_payload = f"{timestamp}.{payload.decode()}"
    expected = hmac.new(
        secret.encode(),
        signed_payload.encode(),
        hashlib.sha256
    ).hexdigest()
    
    return hmac.compare_digest(expected, sig)

@app.post("/webhooks/stripe")
async def stripe_webhook(request: Request):
    payload = await request.body()
    signature = request.headers.get("Stripe-Signature")
    
    if not verify_stripe_signature(payload, signature, STRIPE_WEBHOOK_SECRET):
        raise HTTPException(401, "Invalid signature")
    
    # Process verified webhook
    data = json.loads(payload)
    # ...

Most providers include signature headers. Always verify them.

Respond Fast, Process Later

Webhook senders have timeouts (often 5-30 seconds). Long processing causes retries:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from fastapi import BackgroundTasks

@app.post("/webhooks/github")
async def github_webhook(request: Request, background_tasks: BackgroundTasks):
    payload = await request.json()
    
    # Verify signature first
    if not verify_github_signature(request):
        raise HTTPException(401)
    
    # Queue for background processing
    background_tasks.add_task(process_github_event, payload)
    
    # Return immediately
    return {"status": "accepted"}

async def process_github_event(payload: dict):
    # This runs after response is sent
    event_type = payload.get("action")
    # ... heavy processing ...

Or use a proper queue:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
@app.post("/webhooks/github")
async def github_webhook(request: Request):
    payload = await request.json()
    
    # Push to queue, return immediately
    await redis.lpush("webhook_queue", json.dumps({
        "source": "github",
        "payload": payload,
        "received_at": time.time()
    }))
    
    return {"status": "queued"}

Idempotency: Handle Duplicates

Webhooks can be delivered multiple times. Your handler must be idempotent:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
@app.post("/webhooks/stripe")
async def stripe_webhook(request: Request):
    payload = await request.json()
    event_id = payload["id"]
    
    # Check if already processed
    if await redis.exists(f"webhook:processed:{event_id}"):
        return {"status": "already_processed"}
    
    # Process the event
    await handle_event(payload)
    
    # Mark as processed (with TTL for cleanup)
    await redis.setex(f"webhook:processed:{event_id}", 86400 * 7, "1")
    
    return {"status": "processed"}

Use the event ID provided by the sender, not your own.

Retry Handling

When your endpoint fails, senders retry. Handle retries gracefully:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
async def handle_event(payload: dict):
    event_id = payload["id"]
    
    # Idempotent check
    existing = await db.get_event(event_id)
    if existing and existing.status == "completed":
        return  # Already done
    
    # Record attempt
    await db.upsert_event(event_id, status="processing")
    
    try:
        await process_event(payload)
        await db.update_event(event_id, status="completed")
    except Exception as e:
        await db.update_event(event_id, status="failed", error=str(e))
        raise  # Let the sender know to retry

Ordering Considerations

Webhooks may arrive out of order. Handle it:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
async def handle_order_event(payload: dict):
    order_id = payload["order_id"]
    event_time = payload["created_at"]
    
    async with db.transaction():
        order = await db.get_order(order_id, for_update=True)
        
        # Skip if we've seen a newer event
        if order and order.last_event_time > event_time:
            return
        
        # Process this event
        await update_order(order_id, payload)
        await db.update_order(order_id, last_event_time=event_time)

Or design for eventual consistency and reconcile periodically.

Security Checklist

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@app.post("/webhooks/{provider}")
async def webhook_handler(provider: str, request: Request):
    # 1. Verify signature
    if not verify_signature(provider, request):
        raise HTTPException(401, "Invalid signature")
    
    # 2. Validate source IP (if provider publishes IP ranges)
    if not is_allowed_ip(provider, request.client.host):
        raise HTTPException(403, "IP not allowed")
    
    # 3. Check content type
    if request.headers.get("content-type") != "application/json":
        raise HTTPException(415, "Expected JSON")
    
    # 4. Parse with size limit
    body = await request.body()
    if len(body) > 1_000_000:  # 1MB limit
        raise HTTPException(413, "Payload too large")
    
    # 5. Process
    payload = json.loads(body)
    await queue_webhook(provider, payload)
    
    return {"status": "accepted"}

Monitoring Webhooks

Track webhook health:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from prometheus_client import Counter, Histogram

webhook_received = Counter(
    'webhooks_received_total',
    'Webhooks received',
    ['provider', 'event_type', 'status']
)

webhook_latency = Histogram(
    'webhook_processing_seconds',
    'Webhook processing time',
    ['provider']
)

@app.post("/webhooks/{provider}")
async def webhook_handler(provider: str, request: Request):
    start = time.time()
    
    try:
        result = await process_webhook(provider, request)
        webhook_received.labels(provider, result.event_type, "success").inc()
        return {"status": "ok"}
    except Exception as e:
        webhook_received.labels(provider, "unknown", "error").inc()
        raise
    finally:
        webhook_latency.labels(provider).observe(time.time() - start)

Alert on:

  • High error rates
  • Processing latency spikes
  • Missing expected webhooks

Sending Webhooks

If you’re the sender, be a good citizen:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential

class WebhookSender:
    def __init__(self):
        self.client = httpx.AsyncClient(timeout=30)
    
    @retry(
        stop=stop_after_attempt(5),
        wait=wait_exponential(min=1, max=300)
    )
    async def send(self, url: str, payload: dict, secret: str):
        body = json.dumps(payload)
        signature = self.compute_signature(body, secret)
        
        response = await self.client.post(
            url,
            content=body,
            headers={
                "Content-Type": "application/json",
                "X-Webhook-Signature": signature,
                "X-Webhook-ID": payload["id"],
                "X-Webhook-Timestamp": str(int(time.time()))
            }
        )
        
        if response.status_code >= 500:
            raise RetryableError("Server error, will retry")
        elif response.status_code >= 400:
            # Log but don't retry client errors
            logger.error(f"Webhook rejected: {response.status_code}")
        
        return response
    
    def compute_signature(self, body: str, secret: str) -> str:
        return hmac.new(
            secret.encode(),
            body.encode(),
            hashlib.sha256
        ).hexdigest()

Good sender practices:

  • Include unique event ID
  • Include timestamp
  • Sign the payload
  • Retry with exponential backoff
  • Respect receiver’s response codes
  • Provide event history API for missed webhooks

Testing Webhooks

Use tools to receive webhooks locally:

1
2
3
4
5
6
# ngrok exposes localhost
ngrok http 8000
# Use the ngrok URL as webhook endpoint

# Or use webhook.site for inspection
# https://webhook.site gives you a temporary URL

Write integration tests:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
async def test_stripe_webhook():
    payload = {
        "id": "evt_test_123",
        "type": "payment_intent.succeeded",
        "data": {"object": {"id": "pi_123", "amount": 1000}}
    }
    
    signature = compute_test_signature(payload)
    
    response = await client.post(
        "/webhooks/stripe",
        json=payload,
        headers={"Stripe-Signature": signature}
    )
    
    assert response.status_code == 200
    assert await db.get_event("evt_test_123") is not None

Webhooks are deceptively simple. The HTTP part is easy. The reliability part — verification, idempotency, ordering, retries — is where production systems succeed or fail.

Verify every signature. Respond fast, process async. Handle duplicates. Monitor everything. The webhook that silently fails is the one that costs you money.