Coverage for astrocyte/policy/escalation.py: 97%

71 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""Escalation policies — circuit breaker, degraded mode. 

2 

3All functions are sync (Rust migration candidates). 

4See docs/_design/policy-layer.md section 4. 

5""" 

6 

7from __future__ import annotations 

8 

9import threading 

10import time 

11from dataclasses import dataclass 

12from typing import Literal 

13 

14from astrocyte.errors import ProviderUnavailable 

15from astrocyte.types import RecallResult 

16 

17# --------------------------------------------------------------------------- 

18# Circuit breaker 

19# --------------------------------------------------------------------------- 

20 

21 

22@dataclass 

23class CircuitBreakerState: 

24 state: Literal["closed", "open", "half_open"] = "closed" 

25 failure_count: int = 0 

26 last_failure_time: float = 0.0 

27 half_open_calls: int = 0 

28 

29 

30class CircuitBreaker: 

31 """Circuit breaker for provider calls. 

32 

33 States: closed → open (after failures) → half_open (after timeout) → closed (after success). 

34 Sync, self-contained — Rust migration candidate. 

35 """ 

36 

37 def __init__( 

38 self, 

39 failure_threshold: int = 5, 

40 recovery_timeout_seconds: float = 30.0, 

41 half_open_max_calls: int = 2, 

42 ) -> None: 

43 self.failure_threshold = failure_threshold 

44 self.recovery_timeout = recovery_timeout_seconds 

45 self.half_open_max_calls = half_open_max_calls 

46 self._state = CircuitBreakerState() 

47 self._lock = threading.Lock() 

48 

49 @property 

50 def state(self) -> Literal["closed", "open", "half_open"]: 

51 """Current state, with automatic open → half_open transition on timeout.""" 

52 with self._lock: 

53 if self._state.state == "open": 

54 elapsed = time.monotonic() - self._state.last_failure_time 

55 if elapsed >= self.recovery_timeout: 

56 self._state.state = "half_open" 

57 self._state.half_open_calls = 0 

58 return self._state.state 

59 

60 def is_open(self) -> bool: 

61 """Check if circuit breaker blocks calls.""" 

62 current = self.state # Triggers timeout-based transition 

63 with self._lock: 

64 if current == "open": 

65 return True 

66 if current == "half_open" and self._state.half_open_calls >= self.half_open_max_calls: 

67 return True 

68 return False 

69 

70 def check(self, provider: str) -> None: 

71 """Check if call is allowed. Raises ProviderUnavailable if blocked.""" 

72 if self.is_open(): 

73 raise ProviderUnavailable(provider, reason="circuit breaker open") 

74 

75 with self._lock: 

76 if self._state.state == "half_open": 

77 self._state.half_open_calls += 1 

78 

79 def record_success(self) -> None: 

80 """Record a successful call. Resets breaker to closed.""" 

81 with self._lock: 

82 self._state.state = "closed" 

83 self._state.failure_count = 0 

84 self._state.half_open_calls = 0 

85 

86 def record_failure(self) -> None: 

87 """Record a failed call. May trip breaker to open.""" 

88 with self._lock: 

89 self._state.failure_count += 1 

90 self._state.last_failure_time = time.monotonic() 

91 

92 if self._state.failure_count >= self.failure_threshold: 

93 self._state.state = "open" 

94 elif self._state.state == "half_open": 

95 # Any failure in half_open trips back to open 

96 self._state.state = "open" 

97 

98 def reset(self) -> None: 

99 """Force reset to closed state.""" 

100 with self._lock: 

101 self._state = CircuitBreakerState() 

102 

103 

104# --------------------------------------------------------------------------- 

105# Degraded mode handler 

106# --------------------------------------------------------------------------- 

107 

108 

109class DegradedModeHandler: 

110 """Handle operations when provider is unavailable. 

111 

112 Sync, stateless — Rust migration candidate. 

113 """ 

114 

115 def __init__(self, mode: str = "empty_recall") -> None: 

116 self.mode = mode # "empty_recall" | "error" | "cache" 

117 

118 def handle_recall(self, provider: str) -> RecallResult: 

119 """Handle a recall when provider is unavailable.""" 

120 if self.mode == "error": 

121 raise ProviderUnavailable(provider, reason="degraded mode: error") 

122 

123 if self.mode == "empty_recall": 

124 return RecallResult( 

125 hits=[], 

126 total_available=0, 

127 truncated=False, 

128 ) 

129 

130 # "cache" mode — not implemented in Phase 1 

131 return RecallResult( 

132 hits=[], 

133 total_available=0, 

134 truncated=False, 

135 ) 

136 

137 def handle_retain(self, provider: str) -> None: 

138 """Handle a retain when provider is unavailable.""" 

139 if self.mode == "error": 

140 raise ProviderUnavailable(provider, reason="degraded mode: error") 

141 # For empty_recall mode: retain is silently dropped (could queue for retry)