When a downstream service is failing, continuing to call it makes everything worse. Circuit breakers stop the cascade.

The Pattern

Three states:

  1. Closed: Normal operation, requests pass through
  2. Open: Service is failing, requests fail immediately
  3. Half-Open: Testing if service recovered
[CLOSED]failsuurcecetshsreshold[OPEN]failtuirmeeout[HALF-OPEN]

Basic Implementation

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import time
from enum import Enum
from threading import Lock

class State(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 30,
        half_open_max_calls: int = 3
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_max_calls = half_open_max_calls
        
        self.state = State.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time = None
        self.lock = Lock()
    
    def can_execute(self) -> bool:
        with self.lock:
            if self.state == State.CLOSED:
                return True
            
            if self.state == State.OPEN:
                if time.time() - self.last_failure_time > self.recovery_timeout:
                    self.state = State.HALF_OPEN
                    self.success_count = 0
                    return True
                return False
            
            if self.state == State.HALF_OPEN:
                return self.success_count < self.half_open_max_calls
        
        return False
    
    def record_success(self):
        with self.lock:
            if self.state == State.HALF_OPEN:
                self.success_count += 1
                if self.success_count >= self.half_open_max_calls:
                    self.state = State.CLOSED
                    self.failure_count = 0
            else:
                self.failure_count = 0
    
    def record_failure(self):
        with self.lock:
            self.failure_count += 1
            self.last_failure_time = time.time()
            
            if self.state == State.HALF_OPEN:
                self.state = State.OPEN
            elif self.failure_count >= self.failure_threshold:
                self.state = State.OPEN

Using the Circuit Breaker

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
payment_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=60)

def process_payment(order):
    if not payment_breaker.can_execute():
        raise ServiceUnavailable("Payment service circuit open")
    
    try:
        result = payment_service.charge(order)
        payment_breaker.record_success()
        return result
    except Exception as e:
        payment_breaker.record_failure()
        raise

Decorator Pattern

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from functools import wraps

def circuit_breaker(breaker: CircuitBreaker):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            if not breaker.can_execute():
                raise CircuitOpenError(f"Circuit breaker open for {func.__name__}")
            
            try:
                result = func(*args, **kwargs)
                breaker.record_success()
                return result
            except Exception as e:
                breaker.record_failure()
                raise
        return wrapper
    return decorator

# Usage
payment_cb = CircuitBreaker()

@circuit_breaker(payment_cb)
def charge_customer(customer_id, amount):
    return payment_api.charge(customer_id, amount)

With Fallback

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def get_user_recommendations(user_id):
    if not recommendations_breaker.can_execute():
        # Fallback to cached or default recommendations
        return get_cached_recommendations(user_id) or DEFAULT_RECOMMENDATIONS
    
    try:
        result = recommendations_service.get(user_id)
        recommendations_breaker.record_success()
        return result
    except Exception:
        recommendations_breaker.record_failure()
        return get_cached_recommendations(user_id) or DEFAULT_RECOMMENDATIONS

Library: pybreaker

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import pybreaker

db_breaker = pybreaker.CircuitBreaker(
    fail_max=5,
    reset_timeout=30
)

@db_breaker
def query_database(sql):
    return db.execute(sql)

# Check state
print(db_breaker.current_state)  # 'closed', 'open', or 'half-open'

Library: tenacity (with circuit breaker)

1
2
3
4
5
6
7
8
from tenacity import retry, stop_after_attempt, CircuitBreaker

cb = CircuitBreaker(failure_threshold=3, recovery_time=60)

@retry(stop=stop_after_attempt(3))
@cb
def call_external_api():
    return requests.get("https://api.example.com/data")

Per-Service Breakers

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class ServiceRegistry:
    def __init__(self):
        self.breakers = {}
    
    def get_breaker(self, service_name: str) -> CircuitBreaker:
        if service_name not in self.breakers:
            self.breakers[service_name] = CircuitBreaker()
        return self.breakers[service_name]

registry = ServiceRegistry()

def call_service(service_name: str, endpoint: str):
    breaker = registry.get_breaker(service_name)
    
    if not breaker.can_execute():
        raise ServiceUnavailable(f"{service_name} circuit is open")
    
    try:
        result = http_client.get(f"http://{service_name}/{endpoint}")
        breaker.record_success()
        return result
    except Exception:
        breaker.record_failure()
        raise

Monitoring

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from prometheus_client import Counter, Gauge

circuit_state = Gauge(
    'circuit_breaker_state',
    'Circuit breaker state (0=closed, 1=open, 2=half-open)',
    ['service']
)
circuit_failures = Counter(
    'circuit_breaker_failures_total',
    'Circuit breaker failure count',
    ['service']
)
circuit_rejections = Counter(
    'circuit_breaker_rejections_total',
    'Requests rejected by open circuit',
    ['service']
)

# Update metrics in circuit breaker
def record_failure(self, service_name):
    circuit_failures.labels(service=service_name).inc()
    # ... rest of failure logic
    circuit_state.labels(service=service_name).set(self.state.value)

Configuration Guidelines

ScenarioThresholdTimeout
Critical service, fast recovery3-5 failures15-30s
Non-critical, can wait5-10 failures60-120s
Flaky external API3 failures30-60s
Database5 failures30s

Anti-Patterns

1. Single global breaker

1
2
3
4
5
6
# Bad: one failure affects all services
global_breaker = CircuitBreaker()

# Good: per-service breakers
payment_breaker = CircuitBreaker()
inventory_breaker = CircuitBreaker()

2. No fallback

1
2
3
4
5
6
7
# Bad: just fails
if not breaker.can_execute():
    raise Error()

# Good: graceful degradation
if not breaker.can_execute():
    return cached_response() or default_response()

3. Wrong failure detection

1
2
3
4
5
6
7
8
9
# Bad: all exceptions trip the breaker
except Exception:
    breaker.record_failure()

# Good: only infrastructure failures
except (ConnectionError, Timeout):
    breaker.record_failure()
except ValidationError:
    pass  # Client error, not service failure

The Philosophy

Circuit breakers embody the “fail fast” principle:

  • Don’t waste resources on doomed requests
  • Give failing services time to recover
  • Protect your system from cascade failures

A tripped circuit breaker is not a bug—it’s the system working as designed.