""" Circuit Breaker implementation for inter-service communication Prevents cascading failures by failing fast when a service is unhealthy """ import time import structlog from enum import Enum from typing import Callable, Any, Optional import asyncio logger = structlog.get_logger() class CircuitState(Enum): """Circuit breaker states""" CLOSED = "closed" # Normal operation, requests pass through OPEN = "open" # Service is failing, reject requests immediately HALF_OPEN = "half_open" # Testing if service has recovered class CircuitBreakerOpenException(Exception): """Raised when circuit breaker is open and rejects a request""" pass class CircuitBreaker: """ Circuit breaker pattern implementation for preventing cascading failures. States: - CLOSED: Normal operation, all requests pass through - OPEN: Service is failing, reject all requests immediately - HALF_OPEN: Testing recovery, allow one request through Transitions: - CLOSED -> OPEN: After failure_threshold consecutive failures - OPEN -> HALF_OPEN: After timeout seconds have passed - HALF_OPEN -> CLOSED: If test request succeeds - HALF_OPEN -> OPEN: If test request fails """ def __init__( self, service_name: str, failure_threshold: int = 5, timeout: int = 60, success_threshold: int = 2 ): """ Initialize circuit breaker. Args: service_name: Name of the service being protected failure_threshold: Number of consecutive failures before opening circuit timeout: Seconds to wait before attempting recovery (half-open state) success_threshold: Consecutive successes needed to close from half-open """ self.service_name = service_name self.failure_threshold = failure_threshold self.timeout = timeout self.success_threshold = success_threshold self.state = CircuitState.CLOSED self.failure_count = 0 self.success_count = 0 self.last_failure_time: Optional[float] = None self._lock = asyncio.Lock() logger.info( "Circuit breaker initialized", service=service_name, failure_threshold=failure_threshold, timeout=timeout ) async def call(self, func: Callable, *args, **kwargs) -> Any: """ Execute function with circuit breaker protection. Args: func: Async function to execute *args, **kwargs: Arguments to pass to func Returns: Result from func Raises: CircuitBreakerOpenException: If circuit is open Exception: Any exception raised by func """ async with self._lock: # Check if circuit should transition to half-open if self.state == CircuitState.OPEN: if self._should_attempt_reset(): logger.info( "Circuit breaker transitioning to half-open", service=self.service_name ) self.state = CircuitState.HALF_OPEN self.success_count = 0 else: # Circuit is open, reject request raise CircuitBreakerOpenException( f"Circuit breaker is OPEN for {self.service_name}. " f"Service will be retried in {self._time_until_retry():.0f} seconds." ) # Execute function try: result = await func(*args, **kwargs) await self._on_success() return result except Exception as e: await self._on_failure(e) raise def _should_attempt_reset(self) -> bool: """Check if enough time has passed to attempt recovery""" if self.last_failure_time is None: return True return time.time() - self.last_failure_time >= self.timeout def _time_until_retry(self) -> float: """Calculate seconds until next retry attempt""" if self.last_failure_time is None: return 0.0 elapsed = time.time() - self.last_failure_time return max(0.0, self.timeout - elapsed) async def _on_success(self): """Handle successful request""" async with self._lock: self.failure_count = 0 if self.state == CircuitState.HALF_OPEN: self.success_count += 1 logger.debug( "Circuit breaker success in half-open state", service=self.service_name, success_count=self.success_count, success_threshold=self.success_threshold ) if self.success_count >= self.success_threshold: logger.info( "Circuit breaker closing - service recovered", service=self.service_name ) self.state = CircuitState.CLOSED self.success_count = 0 async def _on_failure(self, exception: Exception): """Handle failed request""" async with self._lock: self.failure_count += 1 self.last_failure_time = time.time() if self.state == CircuitState.HALF_OPEN: logger.warning( "Circuit breaker opening - recovery attempt failed", service=self.service_name, error=str(exception) ) self.state = CircuitState.OPEN self.success_count = 0 elif self.state == CircuitState.CLOSED: logger.warning( "Circuit breaker failure recorded", service=self.service_name, failure_count=self.failure_count, threshold=self.failure_threshold, error=str(exception) ) if self.failure_count >= self.failure_threshold: logger.error( "Circuit breaker opening - failure threshold reached", service=self.service_name, failure_count=self.failure_count ) self.state = CircuitState.OPEN def get_state(self) -> str: """Get current circuit breaker state""" return self.state.value def is_closed(self) -> bool: """Check if circuit is closed (normal operation)""" return self.state == CircuitState.CLOSED def is_open(self) -> bool: """Check if circuit is open (failing fast)""" return self.state == CircuitState.OPEN def is_half_open(self) -> bool: """Check if circuit is half-open (testing recovery)""" return self.state == CircuitState.HALF_OPEN async def reset(self): """Manually reset circuit breaker to closed state""" async with self._lock: logger.info( "Circuit breaker manually reset", service=self.service_name, previous_state=self.state.value ) self.state = CircuitState.CLOSED self.failure_count = 0 self.success_count = 0 self.last_failure_time = None