Coverage for astrocyte/mip/router.py: 86%

149 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""MIP router — orchestrates rule_engine → ambiguity detection → intent layer. 

2 

3See docs/_design/memory-intent-protocol.md for the design specification. 

4""" 

5 

6from __future__ import annotations 

7 

8import logging 

9import re 

10from datetime import datetime, timezone 

11from typing import TYPE_CHECKING 

12 

13from astrocyte.errors import MipRoutingError 

14from astrocyte.mip.rule_engine import ( 

15 RuleEngineInput, 

16 RuleMatch, 

17 evaluate_match_block, 

18 evaluate_rules, 

19 interpolate_template, 

20) 

21from astrocyte.mip.schema import ForgetSpec, MipConfig, PipelineSpec, RoutingRule 

22from astrocyte.types import RoutingDecision 

23 

24if TYPE_CHECKING: 

25 from astrocyte.provider import LLMProvider 

26 

27logger = logging.getLogger("astrocyte.mip") 

28 

29 

30_TEMPLATE_PLACEHOLDER = re.compile(r"\{[^}]+\}") 

31 

32 

33def _bank_matches(template: str, bank_id: str) -> bool: 

34 """Whether a templated bank pattern (``"student-{id}"``) matches a concrete bank_id. 

35 

36 Each ``{...}`` placeholder is treated as a non-greedy ``.+?`` wildcard; 

37 surrounding literal text is regex-escaped. 

38 """ 

39 if "{" not in template: 

40 return template == bank_id 

41 parts = _TEMPLATE_PLACEHOLDER.split(template) 

42 pattern = "^" + ".+?".join(re.escape(p) for p in parts) + "$" 

43 return re.match(pattern, bank_id) is not None 

44 

45 

46class MipRouter: 

47 """Top-level MIP router. Orchestrates rule_engine → escalation → intent layer.""" 

48 

49 def __init__(self, config: MipConfig, llm_provider: LLMProvider | None = None) -> None: 

50 self._config = config 

51 self._llm_provider = llm_provider 

52 self._rules = sorted(config.rules or [], key=lambda r: r.priority) 

53 

54 def route_sync(self, input_data: RuleEngineInput) -> RoutingDecision | None: 

55 """Attempt synchronous (mechanical) routing only. 

56 

57 Returns RoutingDecision if a confident match is found. 

58 Returns None if escalation to intent layer is needed. 

59 """ 

60 # Phase 5 — filter rules outside their activation window. 

61 eligible = [r for r in self._rules if _is_active(r)] 

62 

63 # Phase 5 — shadow rules: evaluate them off to the side, log the match 

64 # for observability, then exclude them from real routing. 

65 shadow_rules = [r for r in eligible if r.shadow] 

66 live_rules = [r for r in eligible if not r.shadow] 

67 for rule in shadow_rules: 

68 if evaluate_match_block(rule.match, input_data): 

69 logger.info( 

70 "mip shadow match (no action taken): rule=%s priority=%d tags=%s", 

71 rule.name, 

72 rule.priority, 

73 rule.observability_tags, 

74 ) 

75 

76 matches = evaluate_rules(live_rules, input_data) 

77 

78 if not matches: 

79 return None 

80 

81 # Phase 5 — tie-breaking when multiple non-override rules match at the 

82 # same top priority. Override rules already short-circuit in 

83 # evaluate_rules (returns a single match). When a tie is resolved 

84 # explicitly by tie_breaker, bypass escalation — the author has 

85 # declared deterministic intent for the priority collision. 

86 top_priority = matches[0].rule.priority 

87 tied = [m for m in matches if m.rule.priority == top_priority] 

88 if len(tied) > 1 and not matches[0].rule.override: 

89 top = self._resolve_top(matches) 

90 return self._apply_action(top, input_data) 

91 top = matches[0] 

92 

93 # Override rule — compliance lock, always return 

94 if top.rule.override: 

95 return self._apply_action(top, input_data) 

96 

97 # Check for escalation action 

98 if top.rule.action.escalate == "mip": 

99 return None 

100 

101 # Confident single match — accept 

102 if len(matches) == 1 and top.confidence >= 0.8: 

103 return self._apply_action(top, input_data) 

104 

105 # Low confidence or multiple matches — check escalation policy 

106 if self._should_escalate(matches): 

107 return None 

108 

109 # Escalation policy says don't escalate — use highest priority match 

110 return self._apply_action(top, input_data) 

111 

112 def _resolve_top(self, matches: list[RuleMatch]) -> RuleMatch: 

113 """Apply tie_breaker policy when multiple matches share the top priority. 

114 

115 ``matches`` is already sorted ascending by priority. 

116 """ 

117 top_priority = matches[0].rule.priority 

118 tied = [m for m in matches if m.rule.priority == top_priority] 

119 if len(tied) <= 1: 

120 return tied[0] 

121 

122 policy = self._config.tie_breaker 

123 if policy == "first": 

124 return tied[0] 

125 if policy == "error": 

126 names = ", ".join(m.rule.name for m in tied) 

127 raise MipRoutingError( 

128 f"MIP tie_breaker=error: {len(tied)} rules matched at priority {top_priority}: {names}" 

129 ) 

130 if policy == "most_specific": 

131 return max(tied, key=lambda m: _condition_count(m.rule)) 

132 return tied[0] # defensive fallback 

133 

134 def _should_escalate(self, matches: list[RuleMatch]) -> bool: 

135 """Check escalation conditions from intent_policy.escalate_when.""" 

136 policy = self._config.intent_policy 

137 if not policy or not policy.escalate_when: 

138 return True # Default: escalate when no explicit policy 

139 

140 for condition in policy.escalate_when: 

141 if condition.condition == "matched_rules": 

142 count = len(matches) 

143 if self._compare(count, condition.operator, condition.value): 

144 return True 

145 elif condition.condition == "confidence": 

146 if matches: 

147 top_confidence = matches[0].confidence 

148 if self._compare(top_confidence, condition.operator, condition.value): 

149 return True 

150 elif condition.condition == "conflicting_rules": 

151 if condition.value and len(matches) > 1: 

152 return True 

153 return False 

154 

155 @staticmethod 

156 def _compare(actual: int | float, operator: str, expected: str | int | float | bool) -> bool: 

157 """Compare a value against a condition.""" 

158 try: 

159 a = float(actual) 

160 e = float(expected) 

161 except (TypeError, ValueError): 

162 return actual == expected 

163 if operator == "eq": 

164 return a == e 

165 if operator == "lt": 

166 return a < e 

167 if operator == "gt": 

168 return a > e 

169 if operator == "gte": 

170 return a >= e 

171 if operator == "lte": 

172 return a <= e 

173 return a == e 

174 

175 async def route(self, input_data: RuleEngineInput) -> RoutingDecision: 

176 """Full routing: mechanical rules first, then intent layer if needed. 

177 

178 1. Evaluate override rules → if match, return immediately (compliance lock) 

179 2. Evaluate normal rules → if confident match, return 

180 3. Check escalation conditions 

181 4. If escalation needed and LLM available, call intent layer 

182 5. If no LLM, return passthrough decision 

183 """ 

184 # Try mechanical routing first 

185 decision = self.route_sync(input_data) 

186 if decision is not None: 

187 return decision 

188 

189 # Need escalation — try intent layer 

190 if self._llm_provider and self._config.intent_policy: 

191 from astrocyte.mip.intent import resolve_intent 

192 

193 return await resolve_intent( 

194 input_data=input_data, 

195 intent_policy=self._config.intent_policy, 

196 available_banks=self._config.banks or [], 

197 llm_provider=self._llm_provider, 

198 ) 

199 

200 # No LLM available — passthrough 

201 return RoutingDecision(resolved_by="passthrough", reasoning="No mechanical match and no LLM available") 

202 

203 def resolve_forget_for_bank(self, bank_id: str) -> ForgetSpec | None: 

204 """Resolve the highest-priority rule's ForgetSpec targeting ``bank_id`` (Phase 4). 

205 

206 At forget time the original RetainRequest context is gone, but forget 

207 still needs to know the policy (mode/audit/legal_hold/min_age/max_per_call) 

208 configured for the bank being purged. This walks rules in priority order 

209 and returns the first ``action.forget`` whose ``action.bank`` resolves 

210 to ``bank_id``. Bank templates (``"student-{id}"``) match concrete IDs. 

211 """ 

212 for rule in self._rules: 

213 if rule.action.forget is None: 

214 continue 

215 template = rule.action.bank 

216 if not template: 

217 continue 

218 if _bank_matches(template, bank_id): 

219 return rule.action.forget 

220 return None 

221 

222 def resolve_pipeline_for_bank(self, bank_id: str) -> PipelineSpec | None: 

223 """Resolve the highest-priority rule's PipelineSpec targeting ``bank_id`` (P3). 

224 

225 At recall time the original RetainRequest context (content, metadata) 

226 is gone, but recall still needs to know the rerank/reflect overrides 

227 configured for the bank being read. This walks rules in priority order 

228 and returns the first ``action.pipeline`` whose ``action.bank`` 

229 resolves to ``bank_id``. 

230 

231 ``action.bank`` may contain ``{...}`` template placeholders. Each 

232 placeholder is treated as a wildcard for matching purposes 

233 (``"student-{id}"`` matches ``"student-42"``, ``"student-foo"``). 

234 """ 

235 for rule in self._rules: 

236 if rule.action.pipeline is None: 

237 continue 

238 template = rule.action.bank 

239 if not template: 

240 continue 

241 if _bank_matches(template, bank_id): 

242 return rule.action.pipeline 

243 return None 

244 

245 def _apply_action(self, match: RuleMatch, input_data: RuleEngineInput) -> RoutingDecision: 

246 """Convert a RuleMatch into a RoutingDecision, interpolating templates.""" 

247 action = match.rule.action 

248 

249 bank_id = interpolate_template(action.bank, input_data) if action.bank else None 

250 tags = [interpolate_template(t, input_data) for t in action.tags] if action.tags else None 

251 

252 return RoutingDecision( 

253 bank_id=bank_id, 

254 tags=tags, 

255 retain_policy=action.retain_policy, 

256 resolved_by="mechanical", 

257 rule_name=match.rule.name, 

258 confidence=match.confidence, 

259 pipeline=action.pipeline, 

260 forget=action.forget, 

261 observability_tags=match.rule.observability_tags, 

262 ) 

263 

264 

265# --------------------------------------------------------------------------- 

266# Phase 5 helpers 

267# --------------------------------------------------------------------------- 

268 

269 

270def _is_active(rule: RoutingRule) -> bool: 

271 """Whether ``rule`` is within its ``active_from``/``active_until`` window.""" 

272 if rule.active_from is None and rule.active_until is None: 

273 return True 

274 now = datetime.now(timezone.utc) 

275 if rule.active_from is not None and now < rule.active_from: 

276 return False 

277 if rule.active_until is not None and now > rule.active_until: 

278 return False 

279 return True 

280 

281 

282def _condition_count(rule: RoutingRule) -> int: 

283 """Count match conditions (used by tie_breaker=most_specific).""" 

284 block = rule.match 

285 count = 0 

286 if block.all_conditions: 

287 count += len(block.all_conditions) 

288 if block.any_conditions: 

289 count += len(block.any_conditions) 

290 if block.none_conditions: 

291 count += len(block.none_conditions) 

292 return count