Webhooks seem simple: receive an HTTP POST, do something. In practice, they’re a minefield of security issues, reliability problems, and edge cases. Let’s build webhooks that actually work in production.

Receiving Webhooks

Basic Receiver

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from fastapi import FastAPI, Request, HTTPException, Header
from typing import Optional
import hmac
import hashlib

app = FastAPI()

WEBHOOK_SECRET = "your-webhook-secret"

@app.post("/webhooks/stripe")
async def stripe_webhook(
    request: Request,
    stripe_signature: Optional[str] = Header(None, alias="Stripe-Signature")
):
    payload = await request.body()
    
    # Verify signature
    if not verify_stripe_signature(payload, stripe_signature, WEBHOOK_SECRET):
        raise HTTPException(status_code=401, detail="Invalid signature")
    
    event = await request.json()
    
    # Process event
    event_type = event.get("type")
    if event_type == "payment_intent.succeeded":
        await handle_payment_success(event["data"]["object"])
    elif event_type == "customer.subscription.deleted":
        await handle_subscription_cancelled(event["data"]["object"])
    
    return {"received": True}


def verify_stripe_signature(payload: bytes, signature: str, secret: str) -> bool:
    """Verify Stripe webhook signature."""
    if not signature:
        return False
    
    # Parse signature header
    elements = dict(item.split("=") for item in signature.split(","))
    timestamp = elements.get("t")
    expected_sig = elements.get("v1")
    
    # Compute expected signature
    signed_payload = f"{timestamp}.{payload.decode()}"
    computed_sig = hmac.new(
        secret.encode(),
        signed_payload.encode(),
        hashlib.sha256
    ).hexdigest()
    
    return hmac.compare_digest(computed_sig, expected_sig)

Idempotent Processing

Webhooks can be delivered multiple times. Make your handlers idempotent:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import redis
from datetime import timedelta

redis_client = redis.Redis()

async def handle_payment_success(payment_intent: dict):
    payment_id = payment_intent["id"]
    
    # Check if already processed
    lock_key = f"webhook:processed:{payment_id}"
    
    # Use SET NX (only set if not exists) with expiration
    if not redis_client.set(lock_key, "1", nx=True, ex=86400):
        # Already processed, skip
        return {"status": "duplicate", "payment_id": payment_id}
    
    try:
        # Process payment
        await credit_user_account(
            user_id=payment_intent["metadata"]["user_id"],
            amount=payment_intent["amount"],
            payment_id=payment_id
        )
        return {"status": "processed", "payment_id": payment_id}
    except Exception as e:
        # Remove lock so retry can work
        redis_client.delete(lock_key)
        raise

Async Processing with Queues

Don’t process webhooks synchronously—acknowledge quickly, process later:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from fastapi import BackgroundTasks
import json

# Or use a proper queue like Redis/RabbitMQ/SQS
webhook_queue = []

@app.post("/webhooks/github")
async def github_webhook(
    request: Request,
    background_tasks: BackgroundTasks,
    x_hub_signature_256: str = Header(None)
):
    payload = await request.body()
    
    # Verify signature
    if not verify_github_signature(payload, x_hub_signature_256):
        raise HTTPException(status_code=401, detail="Invalid signature")
    
    event = await request.json()
    event_type = request.headers.get("X-GitHub-Event")
    
    # Queue for async processing
    background_tasks.add_task(process_github_event, event_type, event)
    
    # Return immediately
    return {"status": "queued"}


async def process_github_event(event_type: str, event: dict):
    """Process GitHub webhook asynchronously."""
    try:
        if event_type == "push":
            await trigger_ci_pipeline(event)
        elif event_type == "pull_request":
            await handle_pr_event(event)
        elif event_type == "issues":
            await sync_issue_to_tracker(event)
    except Exception as e:
        # Log and potentially retry
        logger.error("webhook_processing_failed", 
            event_type=event_type,
            error=str(e))

Production Queue Pattern

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import boto3
import json

sqs = boto3.client('sqs')
QUEUE_URL = "https://sqs.us-east-1.amazonaws.com/123456789/webhooks"

@app.post("/webhooks/shopify")
async def shopify_webhook(request: Request):
    payload = await request.body()
    
    if not verify_shopify_signature(request, payload):
        raise HTTPException(status_code=401)
    
    # Send to SQS for reliable processing
    sqs.send_message(
        QueueUrl=QUEUE_URL,
        MessageBody=payload.decode(),
        MessageAttributes={
            'EventType': {
                'StringValue': request.headers.get('X-Shopify-Topic'),
                'DataType': 'String'
            },
            'ShopDomain': {
                'StringValue': request.headers.get('X-Shopify-Shop-Domain'),
                'DataType': 'String'
            }
        }
    )
    
    return {"status": "accepted"}

Sending Webhooks

Basic Sender with Retries

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import httpx
import hashlib
import hmac
import time
import json
from typing import Optional

class WebhookSender:
    def __init__(self, secret: str, max_retries: int = 5):
        self.secret = secret
        self.max_retries = max_retries
        self.retry_delays = [1, 5, 30, 120, 600]  # seconds
    
    def sign_payload(self, payload: str, timestamp: int) -> str:
        """Create HMAC signature for payload."""
        message = f"{timestamp}.{payload}"
        signature = hmac.new(
            self.secret.encode(),
            message.encode(),
            hashlib.sha256
        ).hexdigest()
        return f"t={timestamp},v1={signature}"
    
    async def send(
        self, 
        url: str, 
        event_type: str, 
        data: dict,
        webhook_id: Optional[str] = None
    ) -> dict:
        """Send webhook with automatic retries."""
        payload = json.dumps({
            "id": webhook_id or str(uuid.uuid4()),
            "type": event_type,
            "created_at": datetime.utcnow().isoformat(),
            "data": data
        })
        
        timestamp = int(time.time())
        signature = self.sign_payload(payload, timestamp)
        
        headers = {
            "Content-Type": "application/json",
            "X-Webhook-Signature": signature,
            "X-Webhook-Timestamp": str(timestamp),
            "X-Webhook-ID": webhook_id,
        }
        
        last_error = None
        for attempt in range(self.max_retries):
            try:
                async with httpx.AsyncClient() as client:
                    response = await client.post(
                        url,
                        content=payload,
                        headers=headers,
                        timeout=30.0
                    )
                    
                    if response.status_code == 200:
                        return {"status": "delivered", "attempt": attempt + 1}
                    
                    if response.status_code >= 500:
                        # Server error, retry
                        raise Exception(f"Server error: {response.status_code}")
                    
                    # Client error (4xx), don't retry
                    return {
                        "status": "failed",
                        "error": f"Client error: {response.status_code}",
                        "attempt": attempt + 1
                    }
                    
            except Exception as e:
                last_error = str(e)
                if attempt < self.max_retries - 1:
                    delay = self.retry_delays[min(attempt, len(self.retry_delays) - 1)]
                    await asyncio.sleep(delay)
        
        return {
            "status": "failed",
            "error": last_error,
            "attempts": self.max_retries
        }

Webhook Delivery Service

For production, track deliveries in a database:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
from sqlalchemy import Column, String, Integer, DateTime, JSON
from sqlalchemy.ext.asyncio import AsyncSession

class WebhookDelivery(Base):
    __tablename__ = "webhook_deliveries"
    
    id = Column(String, primary_key=True)
    endpoint_id = Column(String, index=True)
    event_type = Column(String)
    payload = Column(JSON)
    status = Column(String)  # pending, delivered, failed
    attempts = Column(Integer, default=0)
    last_attempt_at = Column(DateTime)
    last_error = Column(String, nullable=True)
    created_at = Column(DateTime, default=datetime.utcnow)


class WebhookService:
    def __init__(self, db: AsyncSession, sender: WebhookSender):
        self.db = db
        self.sender = sender
    
    async def queue_webhook(
        self,
        endpoint_url: str,
        endpoint_id: str,
        event_type: str,
        data: dict
    ) -> str:
        """Queue a webhook for delivery."""
        delivery_id = str(uuid.uuid4())
        
        delivery = WebhookDelivery(
            id=delivery_id,
            endpoint_id=endpoint_id,
            event_type=event_type,
            payload=data,
            status="pending"
        )
        self.db.add(delivery)
        await self.db.commit()
        
        # Trigger async delivery
        asyncio.create_task(self.deliver(delivery_id, endpoint_url))
        
        return delivery_id
    
    async def deliver(self, delivery_id: str, url: str):
        """Attempt to deliver a webhook."""
        delivery = await self.db.get(WebhookDelivery, delivery_id)
        
        delivery.attempts += 1
        delivery.last_attempt_at = datetime.utcnow()
        
        result = await self.sender.send(
            url=url,
            event_type=delivery.event_type,
            data=delivery.payload,
            webhook_id=delivery_id
        )
        
        if result["status"] == "delivered":
            delivery.status = "delivered"
        else:
            delivery.status = "failed"
            delivery.last_error = result.get("error")
        
        await self.db.commit()

Security Best Practices

Signature Verification (Don’t Skip This!)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def verify_signature(payload: bytes, signature: str, secret: str) -> bool:
    """Generic HMAC-SHA256 signature verification."""
    expected = hmac.new(
        secret.encode(),
        payload,
        hashlib.sha256
    ).hexdigest()
    
    # Use constant-time comparison to prevent timing attacks
    return hmac.compare_digest(expected, signature)

Timestamp Validation

1
2
3
4
5
6
7
8
def verify_timestamp(timestamp: str, tolerance_seconds: int = 300) -> bool:
    """Reject webhooks with old timestamps (replay protection)."""
    try:
        webhook_time = int(timestamp)
        current_time = int(time.time())
        return abs(current_time - webhook_time) <= tolerance_seconds
    except (ValueError, TypeError):
        return False

IP Allowlisting

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from fastapi import Request

ALLOWED_IPS = {
    "192.0.2.1",  # Example webhook source IP
    "198.51.100.0/24",  # CIDR range
}

def verify_source_ip(request: Request) -> bool:
    client_ip = request.client.host
    # Check against allowlist (implement CIDR matching as needed)
    return client_ip in ALLOWED_IPS

Monitoring

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from prometheus_client import Counter, Histogram

webhook_received = Counter(
    'webhooks_received_total',
    'Total webhooks received',
    ['source', 'event_type', 'status']
)

webhook_processing_time = Histogram(
    'webhook_processing_seconds',
    'Time spent processing webhooks',
    ['source', 'event_type']
)

webhook_delivery_attempts = Counter(
    'webhook_delivery_attempts_total',
    'Webhook delivery attempts',
    ['endpoint', 'status']
)

Summary

PatternWhen to Use
Sync processingSimple, low-volume webhooks
Background tasksMedium volume, non-critical
Message queueHigh volume, mission-critical
Delivery trackingWhen you need audit trails

Webhooks are deceptively simple. The difference between a webhook that works in development and one that survives production is retry logic, idempotency, signature verification, and proper monitoring. Build these in from the start.