Circuit Breakers: Fail Fast, Recover Gracefully
When a downstream service is failing, continuing to call it makes everything worse. Circuit breakers stop the cascade. The Pattern Three states: Closed: Normal operation, requests pass through Open: Service is failing, requests fail immediately Half-Open: Testing if service recovered [ C L β β βΌ O β² β β β S β β E β β D β β ] β β β β β β β β β β f β β a β β i β β l s β u u β r c β e c β e β t s β h s β r β β e β β s β β h β β o β β l β β d β β β β β β β β βΆ β β β β [ β β O β β P β β β΄ β E β β N β β ] f β a β β i β β l β t u β i r β m e β e β β β β o β u β t β β β β β β β βΆ [ H A L F - O P E N ] Basic Implementation 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 import time from enum import Enum from threading import Lock class State(Enum): CLOSED = "closed" OPEN = "open" HALF_OPEN = "half_open" class CircuitBreaker: def __init__( self, failure_threshold: int = 5, recovery_timeout: int = 30, half_open_max_calls: int = 3 ): self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.half_open_max_calls = half_open_max_calls self.state = State.CLOSED self.failure_count = 0 self.success_count = 0 self.last_failure_time = None self.lock = Lock() def can_execute(self) -> bool: with self.lock: if self.state == State.CLOSED: return True if self.state == State.OPEN: if time.time() - self.last_failure_time > self.recovery_timeout: self.state = State.HALF_OPEN self.success_count = 0 return True return False if self.state == State.HALF_OPEN: return self.success_count < self.half_open_max_calls return False def record_success(self): with self.lock: if self.state == State.HALF_OPEN: self.success_count += 1 if self.success_count >= self.half_open_max_calls: self.state = State.CLOSED self.failure_count = 0 else: self.failure_count = 0 def record_failure(self): with self.lock: self.failure_count += 1 self.last_failure_time = time.time() if self.state == State.HALF_OPEN: self.state = State.OPEN elif self.failure_count >= self.failure_threshold: self.state = State.OPEN Using the Circuit Breaker 1 2 3 4 5 6 7 8 9 10 11 12 13 payment_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=60) def process_payment(order): if not payment_breaker.can_execute(): raise ServiceUnavailable("Payment service circuit open") try: result = payment_service.charge(order) payment_breaker.record_success() return result except Exception as e: payment_breaker.record_failure() raise Decorator Pattern 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from functools import wraps def circuit_breaker(breaker: CircuitBreaker): def decorator(func): @wraps(func) def wrapper(*args, **kwargs): if not breaker.can_execute(): raise CircuitOpenError(f"Circuit breaker open for {func.__name__}") try: result = func(*args, **kwargs) breaker.record_success() return result except Exception as e: breaker.record_failure() raise return wrapper return decorator # Usage payment_cb = CircuitBreaker() @circuit_breaker(payment_cb) def charge_customer(customer_id, amount): return payment_api.charge(customer_id, amount) With Fallback 1 2 3 4 5 6 7 8 9 10 11 12 def get_user_recommendations(user_id): if not recommendations_breaker.can_execute(): # Fallback to cached or default recommendations return get_cached_recommendations(user_id) or DEFAULT_RECOMMENDATIONS try: result = recommendations_service.get(user_id) recommendations_breaker.record_success() return result except Exception: recommendations_breaker.record_failure() return get_cached_recommendations(user_id) or DEFAULT_RECOMMENDATIONS Library: pybreaker 1 2 3 4 5 6 7 8 9 10 11 12 13 import pybreaker db_breaker = pybreaker.CircuitBreaker( fail_max=5, reset_timeout=30 ) @db_breaker def query_database(sql): return db.execute(sql) # Check state print(db_breaker.current_state) # 'closed', 'open', or 'half-open' Library: tenacity (with circuit breaker) 1 2 3 4 5 6 7 8 from tenacity import retry, stop_after_attempt, CircuitBreaker cb = CircuitBreaker(failure_threshold=3, recovery_time=60) @retry(stop=stop_after_attempt(3)) @cb def call_external_api(): return requests.get("https://api.example.com/data") Per-Service Breakers 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 class ServiceRegistry: def __init__(self): self.breakers = {} def get_breaker(self, service_name: str) -> CircuitBreaker: if service_name not in self.breakers: self.breakers[service_name] = CircuitBreaker() return self.breakers[service_name] registry = ServiceRegistry() def call_service(service_name: str, endpoint: str): breaker = registry.get_breaker(service_name) if not breaker.can_execute(): raise ServiceUnavailable(f"{service_name} circuit is open") try: result = http_client.get(f"http://{service_name}/{endpoint}") breaker.record_success() return result except Exception: breaker.record_failure() raise Monitoring 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from prometheus_client import Counter, Gauge circuit_state = Gauge( 'circuit_breaker_state', 'Circuit breaker state (0=closed, 1=open, 2=half-open)', ['service'] ) circuit_failures = Counter( 'circuit_breaker_failures_total', 'Circuit breaker failure count', ['service'] ) circuit_rejections = Counter( 'circuit_breaker_rejections_total', 'Requests rejected by open circuit', ['service'] ) # Update metrics in circuit breaker def record_failure(self, service_name): circuit_failures.labels(service=service_name).inc() # ... rest of failure logic circuit_state.labels(service=service_name).set(self.state.value) Configuration Guidelines Scenario Threshold Timeout Critical service, fast recovery 3-5 failures 15-30s Non-critical, can wait 5-10 failures 60-120s Flaky external API 3 failures 30-60s Database 5 failures 30s Anti-Patterns 1. Single global breaker ...