Your dependencies will fail. Database goes down, third-party API times out, cache disappears. The question isn’t whether this happens—it’s whether your users notice. Graceful degradation keeps things working when components fail.

The Philosophy

Instead of: “Redis is down → Application crashes” Think: “Redis is down → Features using Redis degrade → Core features work”

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# Brittle: Cache failure = Application failure
def get_user(user_id):
    cached = redis.get(f"user:{user_id}")  # Throws if Redis down
    if cached:
        return json.loads(cached)
    return db.query("SELECT * FROM users WHERE id = %s", user_id)

# Resilient: Cache failure = Slower, but working
def get_user(user_id):
    try:
        cached = redis.get(f"user:{user_id}")
        if cached:
            return json.loads(cached)
    except RedisError:
        logger.warning("Cache unavailable, falling back to database")
    
    return db.query("SELECT * FROM users WHERE id = %s", user_id)

Timeouts: The First Defense

Never wait forever:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import httpx

# BAD: Could hang forever
response = httpx.get("https://api.example.com/data")

# GOOD: Fail fast
response = httpx.get(
    "https://api.example.com/data",
    timeout=httpx.Timeout(5.0, connect=2.0)
)

Set timeouts on everything: HTTP calls, database queries, cache operations, file I/O.

Circuit Breakers

Stop calling a service that’s failing:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from circuitbreaker import circuit

@circuit(failure_threshold=5, recovery_timeout=30)
def call_payment_api(amount):
    return payment_client.charge(amount)

def process_payment(amount):
    try:
        return call_payment_api(amount)
    except CircuitBreakerError:
        # Circuit is open - service is known to be down
        logger.error("Payment service unavailable")
        return queue_for_later(amount)  # Graceful fallback

How it works:

  1. Track failures for each service
  2. After N failures, “open” the circuit
  3. Immediately return error (don’t waste time calling broken service)
  4. After timeout, try one request (“half-open”)
  5. If it works, close circuit; if not, stay open

Fallback Chains

Try alternatives when primary fails:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
async def get_exchange_rate(currency: str) -> float:
    """Try multiple sources, fall back to cached, fall back to hardcoded."""
    
    # Try primary API
    try:
        return await primary_forex_api.get_rate(currency)
    except Exception as e:
        logger.warning(f"Primary forex API failed: {e}")
    
    # Try secondary API
    try:
        return await secondary_forex_api.get_rate(currency)
    except Exception as e:
        logger.warning(f"Secondary forex API failed: {e}")
    
    # Try cached value
    cached = await cache.get(f"exchange_rate:{currency}")
    if cached:
        logger.warning(f"Using cached exchange rate for {currency}")
        return float(cached)
    
    # Last resort: hardcoded rates (stale but better than nothing)
    if currency in HARDCODED_RATES:
        logger.error(f"Using hardcoded exchange rate for {currency}")
        return HARDCODED_RATES[currency]
    
    raise ExchangeRateUnavailable(currency)

Feature Degradation Levels

Define what degrades and when:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class ServiceHealth:
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    CRITICAL = "critical"

class FeatureConfig:
    def __init__(self):
        self.health = ServiceHealth.HEALTHY
    
    def should_enable(self, feature: str) -> bool:
        if self.health == ServiceHealth.HEALTHY:
            return True
        
        # Degraded: disable nice-to-haves
        if self.health == ServiceHealth.DEGRADED:
            return feature not in [
                "recommendations",
                "analytics_tracking",
                "image_thumbnails",
            ]
        
        # Critical: only core functionality
        if self.health == ServiceHealth.CRITICAL:
            return feature in [
                "authentication",
                "checkout",
                "order_status",
            ]
        
        return False

config = FeatureConfig()

@app.get("/products/{id}")
def get_product(id: str):
    product = db.get_product(id)
    
    if config.should_enable("recommendations"):
        product["recommendations"] = get_recommendations(id)
    
    if config.should_enable("reviews"):
        product["reviews"] = get_reviews(id)
    
    return product

Bulkheads: Isolate Failures

Don’t let one component take down everything:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from concurrent.futures import ThreadPoolExecutor
import asyncio

# Separate thread pools for different services
payment_pool = ThreadPoolExecutor(max_workers=10, thread_name_prefix="payment")
inventory_pool = ThreadPoolExecutor(max_workers=10, thread_name_prefix="inventory")
notification_pool = ThreadPoolExecutor(max_workers=5, thread_name_prefix="notify")

async def checkout(order):
    # If notifications are slow/broken, payments still work
    payment_future = payment_pool.submit(process_payment, order)
    inventory_future = inventory_pool.submit(reserve_inventory, order)
    
    # These must succeed
    payment_result = payment_future.result(timeout=10)
    inventory_result = inventory_future.result(timeout=10)
    
    # This can fail without breaking checkout
    try:
        notification_pool.submit(send_confirmation, order).result(timeout=5)
    except Exception:
        logger.warning("Failed to send confirmation, will retry later")
        queue_notification_retry(order)
    
    return {"payment": payment_result, "inventory": inventory_result}

Async Fallbacks

Queue for later when immediate processing fails:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
async def send_email(user_id: str, template: str, data: dict):
    try:
        await email_service.send(user_id, template, data)
    except EmailServiceError:
        # Queue for retry instead of failing
        await message_queue.publish("email_retry", {
            "user_id": user_id,
            "template": template,
            "data": data,
            "attempt": 1,
            "queued_at": datetime.now().isoformat()
        })
        logger.warning(f"Email queued for retry: {user_id}")

User Communication

Tell users what’s happening:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
@app.get("/dashboard")
def dashboard():
    data = {"user": get_current_user()}
    warnings = []
    
    try:
        data["recent_orders"] = get_recent_orders()
    except ServiceUnavailable:
        data["recent_orders"] = None
        warnings.append("Order history is temporarily unavailable")
    
    try:
        data["recommendations"] = get_recommendations()
    except ServiceUnavailable:
        data["recommendations"] = []
        # Don't warn - users don't care about missing recommendations
    
    return {
        "data": data,
        "warnings": warnings,
        "degraded": len(warnings) > 0
    }

Quick Checklist

For each external dependency:

  • Timeout configured
  • Circuit breaker in place
  • Fallback behavior defined
  • Failure logged and monitored
  • Users informed if they’re affected
  • Core functionality continues without it

The goal: When something breaks—and it will—users might see degraded functionality but never a complete outage. Partial functionality beats zero functionality every time.