Coverage for astrocyte/policy/escalation.py: 97%
71 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""Escalation policies — circuit breaker, degraded mode.
3All functions are sync (Rust migration candidates).
4See docs/_design/policy-layer.md section 4.
5"""
7from __future__ import annotations
9import threading
10import time
11from dataclasses import dataclass
12from typing import Literal
14from astrocyte.errors import ProviderUnavailable
15from astrocyte.types import RecallResult
17# ---------------------------------------------------------------------------
18# Circuit breaker
19# ---------------------------------------------------------------------------
22@dataclass
23class CircuitBreakerState:
24 state: Literal["closed", "open", "half_open"] = "closed"
25 failure_count: int = 0
26 last_failure_time: float = 0.0
27 half_open_calls: int = 0
30class CircuitBreaker:
31 """Circuit breaker for provider calls.
33 States: closed → open (after failures) → half_open (after timeout) → closed (after success).
34 Sync, self-contained — Rust migration candidate.
35 """
37 def __init__(
38 self,
39 failure_threshold: int = 5,
40 recovery_timeout_seconds: float = 30.0,
41 half_open_max_calls: int = 2,
42 ) -> None:
43 self.failure_threshold = failure_threshold
44 self.recovery_timeout = recovery_timeout_seconds
45 self.half_open_max_calls = half_open_max_calls
46 self._state = CircuitBreakerState()
47 self._lock = threading.Lock()
49 @property
50 def state(self) -> Literal["closed", "open", "half_open"]:
51 """Current state, with automatic open → half_open transition on timeout."""
52 with self._lock:
53 if self._state.state == "open":
54 elapsed = time.monotonic() - self._state.last_failure_time
55 if elapsed >= self.recovery_timeout:
56 self._state.state = "half_open"
57 self._state.half_open_calls = 0
58 return self._state.state
60 def is_open(self) -> bool:
61 """Check if circuit breaker blocks calls."""
62 current = self.state # Triggers timeout-based transition
63 with self._lock:
64 if current == "open":
65 return True
66 if current == "half_open" and self._state.half_open_calls >= self.half_open_max_calls:
67 return True
68 return False
70 def check(self, provider: str) -> None:
71 """Check if call is allowed. Raises ProviderUnavailable if blocked."""
72 if self.is_open():
73 raise ProviderUnavailable(provider, reason="circuit breaker open")
75 with self._lock:
76 if self._state.state == "half_open":
77 self._state.half_open_calls += 1
79 def record_success(self) -> None:
80 """Record a successful call. Resets breaker to closed."""
81 with self._lock:
82 self._state.state = "closed"
83 self._state.failure_count = 0
84 self._state.half_open_calls = 0
86 def record_failure(self) -> None:
87 """Record a failed call. May trip breaker to open."""
88 with self._lock:
89 self._state.failure_count += 1
90 self._state.last_failure_time = time.monotonic()
92 if self._state.failure_count >= self.failure_threshold:
93 self._state.state = "open"
94 elif self._state.state == "half_open":
95 # Any failure in half_open trips back to open
96 self._state.state = "open"
98 def reset(self) -> None:
99 """Force reset to closed state."""
100 with self._lock:
101 self._state = CircuitBreakerState()
104# ---------------------------------------------------------------------------
105# Degraded mode handler
106# ---------------------------------------------------------------------------
109class DegradedModeHandler:
110 """Handle operations when provider is unavailable.
112 Sync, stateless — Rust migration candidate.
113 """
115 def __init__(self, mode: str = "empty_recall") -> None:
116 self.mode = mode # "empty_recall" | "error" | "cache"
118 def handle_recall(self, provider: str) -> RecallResult:
119 """Handle a recall when provider is unavailable."""
120 if self.mode == "error":
121 raise ProviderUnavailable(provider, reason="degraded mode: error")
123 if self.mode == "empty_recall":
124 return RecallResult(
125 hits=[],
126 total_available=0,
127 truncated=False,
128 )
130 # "cache" mode — not implemented in Phase 1
131 return RecallResult(
132 hits=[],
133 total_available=0,
134 truncated=False,
135 )
137 def handle_retain(self, provider: str) -> None:
138 """Handle a retain when provider is unavailable."""
139 if self.mode == "error":
140 raise ProviderUnavailable(provider, reason="degraded mode: error")
141 # For empty_recall mode: retain is silently dropped (could queue for retry)