In distributed systems, failures are inevitable. A single slow or failing service can cascade through your entire architecture, turning a minor issue into a major outage. Circuit breakers prevent this by detecting failures and stopping the cascade before it spreads.

The Problem: Cascading Failures

Imagine Service A calls Service B, which calls Service C. If Service C becomes slow:

  1. Requests to C start timing out
  2. Service B’s thread pool fills up waiting for C
  3. Service B becomes slow
  4. Service A’s threads fill up waiting for B
  5. Your entire system grinds to a halt

One slow service just took down everything.

The Solution: Circuit Breakers

A circuit breaker wraps calls to external services and monitors for failures. When failures exceed a threshold, the circuit “opens” and immediately rejects requests without attempting the call. After a timeout, it allows limited requests through to test if the service has recovered.

Three states:

  • Closed: Normal operation, requests flow through
  • Open: Failures exceeded threshold, requests rejected immediately
  • Half-Open: Testing if service recovered, limited requests allowed
Closedfsauicflcaueirslesusre>thresholdHalOfp-eOnpteinmeoutexpires

Basic Implementation

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import time
from enum import Enum
from threading import Lock
from typing import Callable, Any
from functools import wraps

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 30,
        half_open_max_calls: int = 3
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_max_calls = half_open_max_calls
        
        self.state = CircuitState.CLOSED
        self.failures = 0
        self.successes = 0
        self.last_failure_time = None
        self.half_open_calls = 0
        self._lock = Lock()
    
    def _should_attempt_reset(self) -> bool:
        """Check if enough time has passed to try again."""
        if self.last_failure_time is None:
            return False
        return time.time() - self.last_failure_time >= self.recovery_timeout
    
    def _record_success(self):
        """Record a successful call."""
        with self._lock:
            if self.state == CircuitState.HALF_OPEN:
                self.successes += 1
                if self.successes >= self.half_open_max_calls:
                    # Enough successes, close the circuit
                    self.state = CircuitState.CLOSED
                    self.failures = 0
                    self.successes = 0
                    self.half_open_calls = 0
            else:
                # In closed state, reset failure count on success
                self.failures = 0
    
    def _record_failure(self):
        """Record a failed call."""
        with self._lock:
            self.failures += 1
            self.last_failure_time = time.time()
            
            if self.state == CircuitState.HALF_OPEN:
                # Any failure in half-open goes back to open
                self.state = CircuitState.OPEN
                self.half_open_calls = 0
                self.successes = 0
            elif self.failures >= self.failure_threshold:
                # Too many failures, open the circuit
                self.state = CircuitState.OPEN
    
    def can_execute(self) -> bool:
        """Check if a call should be attempted."""
        with self._lock:
            if self.state == CircuitState.CLOSED:
                return True
            
            if self.state == CircuitState.OPEN:
                if self._should_attempt_reset():
                    self.state = CircuitState.HALF_OPEN
                    self.half_open_calls = 0
                    self.successes = 0
                    return True
                return False
            
            # Half-open: allow limited calls
            if self.half_open_calls < self.half_open_max_calls:
                self.half_open_calls += 1
                return True
            return False
    
    def call(self, func: Callable, *args, **kwargs) -> Any:
        """Execute a function through the circuit breaker."""
        if not self.can_execute():
            raise CircuitOpenError(f"Circuit is {self.state.value}")
        
        try:
            result = func(*args, **kwargs)
            self._record_success()
            return result
        except Exception as e:
            self._record_failure()
            raise


class CircuitOpenError(Exception):
    """Raised when circuit is open and rejecting calls."""
    pass


# Decorator version
def circuit_breaker(failure_threshold=5, recovery_timeout=30):
    breaker = CircuitBreaker(failure_threshold, recovery_timeout)
    
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            return breaker.call(func, *args, **kwargs)
        wrapper.circuit_breaker = breaker  # Expose for monitoring
        return wrapper
    return decorator

Usage Examples

Protecting HTTP Calls

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import httpx

@circuit_breaker(failure_threshold=3, recovery_timeout=60)
def fetch_user_data(user_id: str) -> dict:
    response = httpx.get(
        f"https://api.example.com/users/{user_id}",
        timeout=5.0
    )
    response.raise_for_status()
    return response.json()

# Usage with fallback
def get_user(user_id: str) -> dict:
    try:
        return fetch_user_data(user_id)
    except CircuitOpenError:
        # Return cached data or default
        return get_cached_user(user_id) or {"id": user_id, "status": "unavailable"}
    except httpx.HTTPError:
        # Let the circuit breaker track this failure
        return {"id": user_id, "status": "error"}

Per-Service Circuit Breakers

Different services need different thresholds:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class CircuitBreakerRegistry:
    def __init__(self):
        self._breakers: dict[str, CircuitBreaker] = {}
        self._lock = Lock()
    
    def get(
        self,
        name: str,
        failure_threshold: int = 5,
        recovery_timeout: int = 30
    ) -> CircuitBreaker:
        with self._lock:
            if name not in self._breakers:
                self._breakers[name] = CircuitBreaker(
                    failure_threshold=failure_threshold,
                    recovery_timeout=recovery_timeout
                )
            return self._breakers[name]
    
    def get_all_states(self) -> dict[str, str]:
        """Get states of all circuit breakers for monitoring."""
        return {
            name: breaker.state.value
            for name, breaker in self._breakers.items()
        }


# Global registry
breakers = CircuitBreakerRegistry()

# Configure per-service
def call_payment_service(data):
    cb = breakers.get("payment", failure_threshold=2, recovery_timeout=120)
    return cb.call(payment_client.process, data)

def call_inventory_service(sku):
    cb = breakers.get("inventory", failure_threshold=5, recovery_timeout=30)
    return cb.call(inventory_client.check, sku)

Async Circuit Breaker

For async code:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import asyncio
from typing import Coroutine

class AsyncCircuitBreaker(CircuitBreaker):
    async def call_async(self, coro: Coroutine) -> Any:
        if not self.can_execute():
            raise CircuitOpenError(f"Circuit is {self.state.value}")
        
        try:
            result = await coro
            self._record_success()
            return result
        except Exception as e:
            self._record_failure()
            raise


# Usage
payment_breaker = AsyncCircuitBreaker(failure_threshold=3)

async def process_payment(order_id: str):
    try:
        return await payment_breaker.call_async(
            payment_api.charge(order_id)
        )
    except CircuitOpenError:
        # Queue for retry later
        await retry_queue.push({"order_id": order_id, "action": "charge"})
        return {"status": "queued"}

Advanced Patterns

Sliding Window Failure Tracking

Instead of a simple counter, track failures over a time window:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from collections import deque

class SlidingWindowCircuitBreaker(CircuitBreaker):
    def __init__(
        self,
        failure_rate_threshold: float = 0.5,  # 50% failure rate
        window_size: int = 10,                 # Last 10 calls
        min_calls: int = 5,                    # Minimum calls before evaluating
        **kwargs
    ):
        super().__init__(**kwargs)
        self.failure_rate_threshold = failure_rate_threshold
        self.window_size = window_size
        self.min_calls = min_calls
        self.results = deque(maxlen=window_size)
    
    def _record_success(self):
        with self._lock:
            self.results.append(True)
            self._evaluate_state()
    
    def _record_failure(self):
        with self._lock:
            self.results.append(False)
            self.last_failure_time = time.time()
            self._evaluate_state()
    
    def _evaluate_state(self):
        if len(self.results) < self.min_calls:
            return
        
        failure_rate = self.results.count(False) / len(self.results)
        
        if self.state == CircuitState.CLOSED:
            if failure_rate >= self.failure_rate_threshold:
                self.state = CircuitState.OPEN
        elif self.state == CircuitState.HALF_OPEN:
            if failure_rate >= self.failure_rate_threshold:
                self.state = CircuitState.OPEN
            elif failure_rate < self.failure_rate_threshold / 2:
                self.state = CircuitState.CLOSED
                self.results.clear()

Bulkhead Pattern (Bonus)

Combine circuit breakers with bulkheads to isolate failures:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from concurrent.futures import ThreadPoolExecutor, TimeoutError
from contextlib import contextmanager

class Bulkhead:
    def __init__(self, name: str, max_concurrent: int, max_wait: float = 1.0):
        self.name = name
        self.executor = ThreadPoolExecutor(max_workers=max_concurrent)
        self.max_wait = max_wait
    
    @contextmanager
    def execute(self):
        future = self.executor.submit(lambda: None)
        try:
            future.result(timeout=self.max_wait)
            yield
        except TimeoutError:
            raise BulkheadFullError(f"Bulkhead {self.name} is full")


class BulkheadFullError(Exception):
    pass


# Combine bulkhead + circuit breaker
payment_bulkhead = Bulkhead("payment", max_concurrent=10)
payment_breaker = CircuitBreaker(failure_threshold=5)

def protected_payment(order_id):
    with payment_bulkhead.execute():
        return payment_breaker.call(payment_api.charge, order_id)

Monitoring

Expose circuit breaker state for observability:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from fastapi import FastAPI
from prometheus_client import Gauge, Counter

app = FastAPI()

# Metrics
circuit_state = Gauge(
    'circuit_breaker_state',
    'Current state (0=closed, 1=open, 2=half_open)',
    ['service']
)
circuit_failures = Counter(
    'circuit_breaker_failures_total',
    'Total failures',
    ['service']
)

@app.get("/health/circuits")
def circuit_health():
    return {
        name: {
            "state": breaker.state.value,
            "failures": breaker.failures,
            "last_failure": breaker.last_failure_time
        }
        for name, breaker in breakers._breakers.items()
    }

Best Practices

  1. Set appropriate thresholds: Too low = flapping, too high = slow response to failures
  2. Use fallbacks: Always have a degraded-but-working response when circuits open
  3. Monitor state changes: Alert when circuits open — it indicates a real problem
  4. Different settings per service: Critical services might need lower thresholds
  5. Test your circuit breakers: Chaos engineering — intentionally break things to verify behavior
  6. Combine with retries carefully: Retries inside a circuit breaker can accelerate failure detection

Circuit breakers are essential infrastructure for any distributed system. They transform “everything is broken” into “this one thing is broken, everything else is fine.” That’s the difference between a minor incident and a major outage.