Coverage for astrocyte/mip/router.py: 86%
149 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""MIP router — orchestrates rule_engine → ambiguity detection → intent layer.
3See docs/_design/memory-intent-protocol.md for the design specification.
4"""
6from __future__ import annotations
8import logging
9import re
10from datetime import datetime, timezone
11from typing import TYPE_CHECKING
13from astrocyte.errors import MipRoutingError
14from astrocyte.mip.rule_engine import (
15 RuleEngineInput,
16 RuleMatch,
17 evaluate_match_block,
18 evaluate_rules,
19 interpolate_template,
20)
21from astrocyte.mip.schema import ForgetSpec, MipConfig, PipelineSpec, RoutingRule
22from astrocyte.types import RoutingDecision
24if TYPE_CHECKING:
25 from astrocyte.provider import LLMProvider
27logger = logging.getLogger("astrocyte.mip")
30_TEMPLATE_PLACEHOLDER = re.compile(r"\{[^}]+\}")
33def _bank_matches(template: str, bank_id: str) -> bool:
34 """Whether a templated bank pattern (``"student-{id}"``) matches a concrete bank_id.
36 Each ``{...}`` placeholder is treated as a non-greedy ``.+?`` wildcard;
37 surrounding literal text is regex-escaped.
38 """
39 if "{" not in template:
40 return template == bank_id
41 parts = _TEMPLATE_PLACEHOLDER.split(template)
42 pattern = "^" + ".+?".join(re.escape(p) for p in parts) + "$"
43 return re.match(pattern, bank_id) is not None
46class MipRouter:
47 """Top-level MIP router. Orchestrates rule_engine → escalation → intent layer."""
49 def __init__(self, config: MipConfig, llm_provider: LLMProvider | None = None) -> None:
50 self._config = config
51 self._llm_provider = llm_provider
52 self._rules = sorted(config.rules or [], key=lambda r: r.priority)
54 def route_sync(self, input_data: RuleEngineInput) -> RoutingDecision | None:
55 """Attempt synchronous (mechanical) routing only.
57 Returns RoutingDecision if a confident match is found.
58 Returns None if escalation to intent layer is needed.
59 """
60 # Phase 5 — filter rules outside their activation window.
61 eligible = [r for r in self._rules if _is_active(r)]
63 # Phase 5 — shadow rules: evaluate them off to the side, log the match
64 # for observability, then exclude them from real routing.
65 shadow_rules = [r for r in eligible if r.shadow]
66 live_rules = [r for r in eligible if not r.shadow]
67 for rule in shadow_rules:
68 if evaluate_match_block(rule.match, input_data):
69 logger.info(
70 "mip shadow match (no action taken): rule=%s priority=%d tags=%s",
71 rule.name,
72 rule.priority,
73 rule.observability_tags,
74 )
76 matches = evaluate_rules(live_rules, input_data)
78 if not matches:
79 return None
81 # Phase 5 — tie-breaking when multiple non-override rules match at the
82 # same top priority. Override rules already short-circuit in
83 # evaluate_rules (returns a single match). When a tie is resolved
84 # explicitly by tie_breaker, bypass escalation — the author has
85 # declared deterministic intent for the priority collision.
86 top_priority = matches[0].rule.priority
87 tied = [m for m in matches if m.rule.priority == top_priority]
88 if len(tied) > 1 and not matches[0].rule.override:
89 top = self._resolve_top(matches)
90 return self._apply_action(top, input_data)
91 top = matches[0]
93 # Override rule — compliance lock, always return
94 if top.rule.override:
95 return self._apply_action(top, input_data)
97 # Check for escalation action
98 if top.rule.action.escalate == "mip":
99 return None
101 # Confident single match — accept
102 if len(matches) == 1 and top.confidence >= 0.8:
103 return self._apply_action(top, input_data)
105 # Low confidence or multiple matches — check escalation policy
106 if self._should_escalate(matches):
107 return None
109 # Escalation policy says don't escalate — use highest priority match
110 return self._apply_action(top, input_data)
112 def _resolve_top(self, matches: list[RuleMatch]) -> RuleMatch:
113 """Apply tie_breaker policy when multiple matches share the top priority.
115 ``matches`` is already sorted ascending by priority.
116 """
117 top_priority = matches[0].rule.priority
118 tied = [m for m in matches if m.rule.priority == top_priority]
119 if len(tied) <= 1:
120 return tied[0]
122 policy = self._config.tie_breaker
123 if policy == "first":
124 return tied[0]
125 if policy == "error":
126 names = ", ".join(m.rule.name for m in tied)
127 raise MipRoutingError(
128 f"MIP tie_breaker=error: {len(tied)} rules matched at priority {top_priority}: {names}"
129 )
130 if policy == "most_specific":
131 return max(tied, key=lambda m: _condition_count(m.rule))
132 return tied[0] # defensive fallback
134 def _should_escalate(self, matches: list[RuleMatch]) -> bool:
135 """Check escalation conditions from intent_policy.escalate_when."""
136 policy = self._config.intent_policy
137 if not policy or not policy.escalate_when:
138 return True # Default: escalate when no explicit policy
140 for condition in policy.escalate_when:
141 if condition.condition == "matched_rules":
142 count = len(matches)
143 if self._compare(count, condition.operator, condition.value):
144 return True
145 elif condition.condition == "confidence":
146 if matches:
147 top_confidence = matches[0].confidence
148 if self._compare(top_confidence, condition.operator, condition.value):
149 return True
150 elif condition.condition == "conflicting_rules":
151 if condition.value and len(matches) > 1:
152 return True
153 return False
155 @staticmethod
156 def _compare(actual: int | float, operator: str, expected: str | int | float | bool) -> bool:
157 """Compare a value against a condition."""
158 try:
159 a = float(actual)
160 e = float(expected)
161 except (TypeError, ValueError):
162 return actual == expected
163 if operator == "eq":
164 return a == e
165 if operator == "lt":
166 return a < e
167 if operator == "gt":
168 return a > e
169 if operator == "gte":
170 return a >= e
171 if operator == "lte":
172 return a <= e
173 return a == e
175 async def route(self, input_data: RuleEngineInput) -> RoutingDecision:
176 """Full routing: mechanical rules first, then intent layer if needed.
178 1. Evaluate override rules → if match, return immediately (compliance lock)
179 2. Evaluate normal rules → if confident match, return
180 3. Check escalation conditions
181 4. If escalation needed and LLM available, call intent layer
182 5. If no LLM, return passthrough decision
183 """
184 # Try mechanical routing first
185 decision = self.route_sync(input_data)
186 if decision is not None:
187 return decision
189 # Need escalation — try intent layer
190 if self._llm_provider and self._config.intent_policy:
191 from astrocyte.mip.intent import resolve_intent
193 return await resolve_intent(
194 input_data=input_data,
195 intent_policy=self._config.intent_policy,
196 available_banks=self._config.banks or [],
197 llm_provider=self._llm_provider,
198 )
200 # No LLM available — passthrough
201 return RoutingDecision(resolved_by="passthrough", reasoning="No mechanical match and no LLM available")
203 def resolve_forget_for_bank(self, bank_id: str) -> ForgetSpec | None:
204 """Resolve the highest-priority rule's ForgetSpec targeting ``bank_id`` (Phase 4).
206 At forget time the original RetainRequest context is gone, but forget
207 still needs to know the policy (mode/audit/legal_hold/min_age/max_per_call)
208 configured for the bank being purged. This walks rules in priority order
209 and returns the first ``action.forget`` whose ``action.bank`` resolves
210 to ``bank_id``. Bank templates (``"student-{id}"``) match concrete IDs.
211 """
212 for rule in self._rules:
213 if rule.action.forget is None:
214 continue
215 template = rule.action.bank
216 if not template:
217 continue
218 if _bank_matches(template, bank_id):
219 return rule.action.forget
220 return None
222 def resolve_pipeline_for_bank(self, bank_id: str) -> PipelineSpec | None:
223 """Resolve the highest-priority rule's PipelineSpec targeting ``bank_id`` (P3).
225 At recall time the original RetainRequest context (content, metadata)
226 is gone, but recall still needs to know the rerank/reflect overrides
227 configured for the bank being read. This walks rules in priority order
228 and returns the first ``action.pipeline`` whose ``action.bank``
229 resolves to ``bank_id``.
231 ``action.bank`` may contain ``{...}`` template placeholders. Each
232 placeholder is treated as a wildcard for matching purposes
233 (``"student-{id}"`` matches ``"student-42"``, ``"student-foo"``).
234 """
235 for rule in self._rules:
236 if rule.action.pipeline is None:
237 continue
238 template = rule.action.bank
239 if not template:
240 continue
241 if _bank_matches(template, bank_id):
242 return rule.action.pipeline
243 return None
245 def _apply_action(self, match: RuleMatch, input_data: RuleEngineInput) -> RoutingDecision:
246 """Convert a RuleMatch into a RoutingDecision, interpolating templates."""
247 action = match.rule.action
249 bank_id = interpolate_template(action.bank, input_data) if action.bank else None
250 tags = [interpolate_template(t, input_data) for t in action.tags] if action.tags else None
252 return RoutingDecision(
253 bank_id=bank_id,
254 tags=tags,
255 retain_policy=action.retain_policy,
256 resolved_by="mechanical",
257 rule_name=match.rule.name,
258 confidence=match.confidence,
259 pipeline=action.pipeline,
260 forget=action.forget,
261 observability_tags=match.rule.observability_tags,
262 )
265# ---------------------------------------------------------------------------
266# Phase 5 helpers
267# ---------------------------------------------------------------------------
270def _is_active(rule: RoutingRule) -> bool:
271 """Whether ``rule`` is within its ``active_from``/``active_until`` window."""
272 if rule.active_from is None and rule.active_until is None:
273 return True
274 now = datetime.now(timezone.utc)
275 if rule.active_from is not None and now < rule.active_from:
276 return False
277 if rule.active_until is not None and now > rule.active_until:
278 return False
279 return True
282def _condition_count(rule: RoutingRule) -> int:
283 """Count match conditions (used by tie_breaker=most_specific)."""
284 block = rule.match
285 count = 0
286 if block.all_conditions:
287 count += len(block.all_conditions)
288 if block.any_conditions:
289 count += len(block.any_conditions)
290 if block.none_conditions:
291 count += len(block.none_conditions)
292 return count