Coverage for astrocyte/mip/rule_engine.py: 93%

127 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""MIP rule engine — match DSL evaluator. 

2 

3All functions are sync and pure (no I/O). Rust migration candidates. 

4""" 

5 

6from __future__ import annotations 

7 

8import re 

9from dataclasses import dataclass 

10 

11from astrocyte.mip.schema import MatchBlock, MatchSpec, RoutingRule 

12from astrocyte.types import ActorIdentity, Metadata 

13 

14 

15@dataclass 

16class RuleEngineInput: 

17 """All data available for rule matching.""" 

18 

19 content: str 

20 content_type: str | None = None 

21 metadata: Metadata | None = None 

22 tags: list[str] | None = None 

23 pii_detected: bool = False 

24 source: str | None = None 

25 signals: dict[str, float] | None = None # Computed signals (word_count, novelty_score) 

26 # Identity-aware routing (identity spec §3 Gap 2). Populated by the 

27 # caller when a resolved ActorIdentity is available — e.g. after the 

28 # JWT identity middleware classifies the inbound token. Rules can 

29 # branch on ``principal_type``, ``principal_id``, ``principal_upn``, 

30 # ``principal_app_id`` in their match blocks and interpolate 

31 # ``{principal.*}`` in action templates. Absent means "no identity 

32 # resolved" — rules with principal_* conditions will not match. 

33 actor_identity: ActorIdentity | None = None 

34 

35 

36@dataclass 

37class RuleMatch: 

38 """A rule that matched the input.""" 

39 

40 rule: RoutingRule 

41 confidence: float 

42 

43 

44_TEMPLATE_PATTERN = re.compile(r"\{([^}]+)\}") 

45 

46 

47def evaluate_rules(rules: list[RoutingRule], input_data: RuleEngineInput) -> list[RuleMatch]: 

48 """Evaluate all rules against input. Returns matched rules sorted by priority. 

49 

50 Override rules (override=True) are checked first and short-circuit. 

51 """ 

52 # Check override rules first 

53 for rule in sorted(rules, key=lambda r: r.priority): 

54 if rule.override and evaluate_match_block(rule.match, input_data): 

55 return [RuleMatch(rule=rule, confidence=rule.action.confidence)] 

56 

57 # Check normal rules 

58 matches: list[RuleMatch] = [] 

59 for rule in sorted(rules, key=lambda r: r.priority): 

60 if rule.override: 

61 continue 

62 if evaluate_match_block(rule.match, input_data): 

63 matches.append(RuleMatch(rule=rule, confidence=rule.action.confidence)) 

64 

65 return matches 

66 

67 

68def evaluate_match_block(block: MatchBlock, input_data: RuleEngineInput) -> bool: 

69 """Evaluate a MatchBlock (all/any/none composition).""" 

70 # Empty block with no conditions matches everything 

71 has_conditions = False 

72 

73 if block.all_conditions is not None: 

74 has_conditions = True 

75 # Empty all_conditions list matches everything (fallback rule) 

76 if block.all_conditions and not all(evaluate_match_spec(s, input_data) for s in block.all_conditions): 

77 return False 

78 

79 if block.any_conditions is not None: 

80 has_conditions = True 

81 if not any(evaluate_match_spec(s, input_data) for s in block.any_conditions): 

82 return False 

83 

84 if block.none_conditions is not None: 

85 has_conditions = True 

86 if any(evaluate_match_spec(s, input_data) for s in block.none_conditions): 

87 return False 

88 

89 return has_conditions or ( 

90 block.all_conditions is None and block.any_conditions is None and block.none_conditions is None 

91 ) 

92 

93 

94def evaluate_match_spec(spec: MatchSpec, input_data: RuleEngineInput) -> bool: 

95 """Evaluate a single MatchSpec against input data.""" 

96 value = resolve_field(spec.field, input_data) 

97 

98 if spec.operator == "present": 

99 return value is not None 

100 if spec.operator == "absent": 

101 return value is None 

102 if spec.operator == "eq": 

103 return value == spec.value 

104 if spec.operator == "in": 

105 if isinstance(spec.value, list): 

106 return value in spec.value 

107 return False 

108 if spec.operator in ("gte", "lte", "gt", "lt"): 

109 if value is None or spec.value is None: 

110 return False 

111 try: 

112 v = float(value) 

113 sv = float(spec.value) 

114 except (TypeError, ValueError): 

115 return False 

116 if spec.operator == "gte": 

117 return v >= sv 

118 if spec.operator == "lte": 

119 return v <= sv 

120 if spec.operator == "gt": 

121 return v > sv 

122 if spec.operator == "lt": 

123 return v < sv 

124 

125 return False 

126 

127 

128def resolve_field(field_path: str, input_data: RuleEngineInput) -> str | int | float | bool | None: 

129 """Resolve a dotted field path to a value from RuleEngineInput. 

130 

131 Examples: 

132 "content_type" → input_data.content_type 

133 "metadata.student_id" → input_data.metadata["student_id"] 

134 "signals.word_count" → input_data.signals["word_count"] 

135 "pii_detected" → input_data.pii_detected 

136 "principal_type" → input_data.actor_identity.type (identity spec §3 Gap 2) 

137 "principal.id" → input_data.actor_identity.id 

138 "principal.upn" → input_data.actor_identity.claims["upn"] 

139 """ 

140 parts = field_path.split(".", 1) 

141 top = parts[0] 

142 

143 # Top-level fields 

144 if top == "content_type": 

145 return input_data.content_type 

146 if top == "pii_detected": 

147 return input_data.pii_detected 

148 if top == "source": 

149 return input_data.source 

150 if top == "content": 

151 return input_data.content 

152 if top == "tags": 

153 # "tags" as a field returns comma-joined string for matching 

154 return ",".join(input_data.tags) if input_data.tags else None 

155 

156 # Identity-aware fields (identity spec §3 Gap 2). Short flat forms are 

157 # the ergonomic default for match blocks; the dotted ``principal.*`` 

158 # form is used for action template interpolation. Both resolve here 

159 # so a rule can mix-and-match. 

160 if top == "principal_type": 

161 return input_data.actor_identity.type if input_data.actor_identity else None 

162 if top == "principal_id": 

163 return input_data.actor_identity.id if input_data.actor_identity else None 

164 if top == "principal_upn": 

165 identity = input_data.actor_identity 

166 if identity and identity.claims: 

167 return identity.claims.get("upn") 

168 return None 

169 if top == "principal_app_id": 

170 identity = input_data.actor_identity 

171 if identity and identity.claims: 

172 return identity.claims.get("app_id") 

173 return None 

174 

175 # Dotted paths into dicts 

176 if len(parts) == 2: 

177 sub_key = parts[1] 

178 if top == "metadata" and input_data.metadata: 

179 return input_data.metadata.get(sub_key) 

180 if top == "signals" and input_data.signals: 

181 return input_data.signals.get(sub_key) 

182 if top == "principal" and input_data.actor_identity: 

183 identity = input_data.actor_identity 

184 # Primary structured fields take precedence over claims dict. 

185 if sub_key == "type": 

186 return identity.type 

187 if sub_key == "id": 

188 return identity.id 

189 # oid_or_app_id: convenience alias for action templates — 

190 # resolves to identity.id regardless of type, because id 

191 # is already the stable identifier (oid for users, app_id 

192 # for service accounts) per the JWT classifier. 

193 if sub_key == "oid" or sub_key == "oid_or_app_id": 

194 return identity.id 

195 if identity.claims: 

196 return identity.claims.get(sub_key) 

197 

198 return None 

199 

200 

201def interpolate_template(template: str, input_data: RuleEngineInput) -> str: 

202 """Interpolate {metadata.student_id} style templates. 

203 

204 Uses resolve_field for each {placeholder}. Leaves unresolved placeholders intact. 

205 """ 

206 

207 def _replace(match: re.Match[str]) -> str: 

208 field_path = match.group(1) 

209 value = resolve_field(field_path, input_data) 

210 if value is None: 

211 return match.group(0) # Leave unresolved 

212 return str(value) 

213 

214 return _TEMPLATE_PATTERN.sub(_replace, template)