Coverage for astrocyte/mip/rule_engine.py: 93%
127 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""MIP rule engine — match DSL evaluator.
3All functions are sync and pure (no I/O). Rust migration candidates.
4"""
6from __future__ import annotations
8import re
9from dataclasses import dataclass
11from astrocyte.mip.schema import MatchBlock, MatchSpec, RoutingRule
12from astrocyte.types import ActorIdentity, Metadata
15@dataclass
16class RuleEngineInput:
17 """All data available for rule matching."""
19 content: str
20 content_type: str | None = None
21 metadata: Metadata | None = None
22 tags: list[str] | None = None
23 pii_detected: bool = False
24 source: str | None = None
25 signals: dict[str, float] | None = None # Computed signals (word_count, novelty_score)
26 # Identity-aware routing (identity spec §3 Gap 2). Populated by the
27 # caller when a resolved ActorIdentity is available — e.g. after the
28 # JWT identity middleware classifies the inbound token. Rules can
29 # branch on ``principal_type``, ``principal_id``, ``principal_upn``,
30 # ``principal_app_id`` in their match blocks and interpolate
31 # ``{principal.*}`` in action templates. Absent means "no identity
32 # resolved" — rules with principal_* conditions will not match.
33 actor_identity: ActorIdentity | None = None
36@dataclass
37class RuleMatch:
38 """A rule that matched the input."""
40 rule: RoutingRule
41 confidence: float
44_TEMPLATE_PATTERN = re.compile(r"\{([^}]+)\}")
47def evaluate_rules(rules: list[RoutingRule], input_data: RuleEngineInput) -> list[RuleMatch]:
48 """Evaluate all rules against input. Returns matched rules sorted by priority.
50 Override rules (override=True) are checked first and short-circuit.
51 """
52 # Check override rules first
53 for rule in sorted(rules, key=lambda r: r.priority):
54 if rule.override and evaluate_match_block(rule.match, input_data):
55 return [RuleMatch(rule=rule, confidence=rule.action.confidence)]
57 # Check normal rules
58 matches: list[RuleMatch] = []
59 for rule in sorted(rules, key=lambda r: r.priority):
60 if rule.override:
61 continue
62 if evaluate_match_block(rule.match, input_data):
63 matches.append(RuleMatch(rule=rule, confidence=rule.action.confidence))
65 return matches
68def evaluate_match_block(block: MatchBlock, input_data: RuleEngineInput) -> bool:
69 """Evaluate a MatchBlock (all/any/none composition)."""
70 # Empty block with no conditions matches everything
71 has_conditions = False
73 if block.all_conditions is not None:
74 has_conditions = True
75 # Empty all_conditions list matches everything (fallback rule)
76 if block.all_conditions and not all(evaluate_match_spec(s, input_data) for s in block.all_conditions):
77 return False
79 if block.any_conditions is not None:
80 has_conditions = True
81 if not any(evaluate_match_spec(s, input_data) for s in block.any_conditions):
82 return False
84 if block.none_conditions is not None:
85 has_conditions = True
86 if any(evaluate_match_spec(s, input_data) for s in block.none_conditions):
87 return False
89 return has_conditions or (
90 block.all_conditions is None and block.any_conditions is None and block.none_conditions is None
91 )
94def evaluate_match_spec(spec: MatchSpec, input_data: RuleEngineInput) -> bool:
95 """Evaluate a single MatchSpec against input data."""
96 value = resolve_field(spec.field, input_data)
98 if spec.operator == "present":
99 return value is not None
100 if spec.operator == "absent":
101 return value is None
102 if spec.operator == "eq":
103 return value == spec.value
104 if spec.operator == "in":
105 if isinstance(spec.value, list):
106 return value in spec.value
107 return False
108 if spec.operator in ("gte", "lte", "gt", "lt"):
109 if value is None or spec.value is None:
110 return False
111 try:
112 v = float(value)
113 sv = float(spec.value)
114 except (TypeError, ValueError):
115 return False
116 if spec.operator == "gte":
117 return v >= sv
118 if spec.operator == "lte":
119 return v <= sv
120 if spec.operator == "gt":
121 return v > sv
122 if spec.operator == "lt":
123 return v < sv
125 return False
128def resolve_field(field_path: str, input_data: RuleEngineInput) -> str | int | float | bool | None:
129 """Resolve a dotted field path to a value from RuleEngineInput.
131 Examples:
132 "content_type" → input_data.content_type
133 "metadata.student_id" → input_data.metadata["student_id"]
134 "signals.word_count" → input_data.signals["word_count"]
135 "pii_detected" → input_data.pii_detected
136 "principal_type" → input_data.actor_identity.type (identity spec §3 Gap 2)
137 "principal.id" → input_data.actor_identity.id
138 "principal.upn" → input_data.actor_identity.claims["upn"]
139 """
140 parts = field_path.split(".", 1)
141 top = parts[0]
143 # Top-level fields
144 if top == "content_type":
145 return input_data.content_type
146 if top == "pii_detected":
147 return input_data.pii_detected
148 if top == "source":
149 return input_data.source
150 if top == "content":
151 return input_data.content
152 if top == "tags":
153 # "tags" as a field returns comma-joined string for matching
154 return ",".join(input_data.tags) if input_data.tags else None
156 # Identity-aware fields (identity spec §3 Gap 2). Short flat forms are
157 # the ergonomic default for match blocks; the dotted ``principal.*``
158 # form is used for action template interpolation. Both resolve here
159 # so a rule can mix-and-match.
160 if top == "principal_type":
161 return input_data.actor_identity.type if input_data.actor_identity else None
162 if top == "principal_id":
163 return input_data.actor_identity.id if input_data.actor_identity else None
164 if top == "principal_upn":
165 identity = input_data.actor_identity
166 if identity and identity.claims:
167 return identity.claims.get("upn")
168 return None
169 if top == "principal_app_id":
170 identity = input_data.actor_identity
171 if identity and identity.claims:
172 return identity.claims.get("app_id")
173 return None
175 # Dotted paths into dicts
176 if len(parts) == 2:
177 sub_key = parts[1]
178 if top == "metadata" and input_data.metadata:
179 return input_data.metadata.get(sub_key)
180 if top == "signals" and input_data.signals:
181 return input_data.signals.get(sub_key)
182 if top == "principal" and input_data.actor_identity:
183 identity = input_data.actor_identity
184 # Primary structured fields take precedence over claims dict.
185 if sub_key == "type":
186 return identity.type
187 if sub_key == "id":
188 return identity.id
189 # oid_or_app_id: convenience alias for action templates —
190 # resolves to identity.id regardless of type, because id
191 # is already the stable identifier (oid for users, app_id
192 # for service accounts) per the JWT classifier.
193 if sub_key == "oid" or sub_key == "oid_or_app_id":
194 return identity.id
195 if identity.claims:
196 return identity.claims.get(sub_key)
198 return None
201def interpolate_template(template: str, input_data: RuleEngineInput) -> str:
202 """Interpolate {metadata.student_id} style templates.
204 Uses resolve_field for each {placeholder}. Leaves unresolved placeholders intact.
205 """
207 def _replace(match: re.Match[str]) -> str:
208 field_path = match.group(1)
209 value = resolve_field(field_path, input_data)
210 if value is None:
211 return match.group(0) # Leave unresolved
212 return str(value)
214 return _TEMPLATE_PATTERN.sub(_replace, template)