Coverage for astrocyte/policy/barriers.py: 97%
156 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""Barrier policies — PII scanning, content validation, metadata sanitization.
3All functions are sync (Rust migration candidates) except scan_async/apply_async.
4See docs/_design/policy-layer.md section 2 and docs/_design/data-governance.md.
5"""
7from __future__ import annotations
9import json
10import logging
11import re
12from typing import TYPE_CHECKING
14from astrocyte.errors import PiiRejected
15from astrocyte.types import Metadata, PiiMatch
17if TYPE_CHECKING:
18 from astrocyte.policy.llm_scanner import LlmPiiScanner
19 from astrocyte.policy.ner_scanner import NerPiiScanner
21logger = logging.getLogger("astrocyte.pii")
23# ---------------------------------------------------------------------------
24# PII regex patterns — global
25# ---------------------------------------------------------------------------
27_PII_PATTERNS: dict[str, tuple[re.Pattern[str], str]] = {
28 "email": (
29 re.compile(
30 r"\b[a-zA-Z0-9](?:[a-zA-Z0-9._%+\-]*[a-zA-Z0-9])?@[a-zA-Z0-9](?:[a-zA-Z0-9\-]*[a-zA-Z0-9])?(?:\.[a-zA-Z0-9](?:[a-zA-Z0-9\-]*[a-zA-Z0-9])?)*\.[a-zA-Z]{2,}\b"
31 ),
32 "[EMAIL_REDACTED]",
33 ),
34 "phone": (
35 re.compile(r"(?:\+?1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}"),
36 "[PHONE_REDACTED]",
37 ),
38 "ssn": (
39 re.compile(r"\b\d{3}-\d{2}-\d{4}\b"),
40 "[SSN_REDACTED]",
41 ),
42 "credit_card": (
43 re.compile(r"\b(?:\d{4}[-\s]?){3}\d{4}\b"),
44 "[CC_REDACTED]",
45 ),
46 "ip_address": (
47 re.compile(
48 r"\b(?:(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\b"
49 r"|(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}"
50 ),
51 "[IP_REDACTED]",
52 ),
53 # Global: date of birth (with context words)
54 "date_of_birth": (
55 re.compile(
56 r"(?i)(?:born|dob|date\s+of\s+birth|birthday)[:\s]+(\d{4}[-/]\d{2}[-/]\d{2}|\d{1,2}[-/]\d{1,2}[-/]\d{2,4})"
57 ),
58 "[DOB_REDACTED]",
59 ),
60 # Global: IBAN
61 "iban": (
62 re.compile(r"\b[A-Z]{2}\d{2}\s?[A-Z0-9]{4}(?:\s?[A-Z0-9]{4}){2,7}(?:\s?[A-Z0-9]{1,4})?\b"),
63 "[IBAN_REDACTED]",
64 ),
65}
67# ---------------------------------------------------------------------------
68# Country-specific patterns
69# ---------------------------------------------------------------------------
71_COUNTRY_PATTERNS: dict[str, dict[str, tuple[re.Pattern[str], str]]] = {
72 # ── Singapore ──
73 "SG": {
74 "nric": (
75 re.compile(r"\b[STFGM]\d{7}[A-Z]\b"),
76 "[NRIC_REDACTED]",
77 ),
78 "sg_phone": (
79 re.compile(r"\+65\s?\d{4}\s?\d{4}\b"),
80 "[PHONE_REDACTED]",
81 ),
82 },
83 # ── India ──
84 "IN": {
85 "aadhaar": (
86 # Aadhaar: 12 digits, first digit 2-9 (never starts with 0 or 1)
87 re.compile(r"\b[2-9]\d{3}\s?\d{4}\s?\d{4}\b"),
88 "[AADHAAR_REDACTED]",
89 ),
90 "pan": (
91 re.compile(r"\b[A-Z]{5}\d{4}[A-Z]\b"),
92 "[PAN_REDACTED]",
93 ),
94 "in_phone": (
95 re.compile(r"\+91\s?\d{5}\s?\d{5}\b"),
96 "[PHONE_REDACTED]",
97 ),
98 },
99 # ── United States ──
100 "US": {
101 "us_passport": (
102 # US passport: letter prefix (optional since 2021) + 8-9 digits
103 re.compile(r"\b[A-Z]?\d{8,9}\b"),
104 "[PASSPORT_REDACTED]",
105 ),
106 },
107 # ── United Kingdom ──
108 "UK": {
109 "uk_nino": (
110 re.compile(r"\b[A-Z]{2}\s?\d{2}\s?\d{2}\s?\d{2}\s?[A-D]\b"),
111 "[NINO_REDACTED]",
112 ),
113 "uk_nhs": (
114 re.compile(r"\b\d{3}\s?\d{3}\s?\d{4}\b"),
115 "[NHS_REDACTED]",
116 ),
117 "uk_phone": (
118 re.compile(r"(?:\+44\s?\d{4}\s?\d{6}|0\d{4}\s?\d{6})\b"),
119 "[PHONE_REDACTED]",
120 ),
121 },
122 # ── EU: Germany ──
123 "DE": {
124 "de_personalausweis": (
125 # German ID: letter + digit + 8 alphanum + check digit (structured, not generic 10-char)
126 re.compile(r"\b[CFGHJKLMNPRTVWXYZ]\d[A-Z0-9]{6}\d\b"),
127 "[DE_ID_REDACTED]",
128 ),
129 },
130 # ── EU: France ──
131 "FR": {
132 "fr_insee": (
133 re.compile(r"\b[12]\s?\d{2}\s?\d{2}\s?\d{2}\s?\d{3}\s?\d{3}\s?\d{2}\b"),
134 "[FR_SSN_REDACTED]",
135 ),
136 },
137 # ── EU: Italy ──
138 "IT": {
139 "it_codice_fiscale": (
140 re.compile(r"\b[A-Z]{6}\d{2}[A-Z]\d{2}[A-Z]\d{3}[A-Z]\b"),
141 "[IT_CF_REDACTED]",
142 ),
143 },
144 # ── EU: Spain ──
145 "ES": {
146 "es_dni": (
147 re.compile(r"\b\d{8}[A-Z]\b"),
148 "[ES_DNI_REDACTED]",
149 ),
150 "es_nie": (
151 re.compile(r"\b[XYZ]\d{7}[A-Z]\b"),
152 "[ES_NIE_REDACTED]",
153 ),
154 },
155 # ── Australia ──
156 "AU": {
157 "au_tfn": (
158 re.compile(r"\b\d{3}\s?\d{3}\s?\d{2,3}\b"),
159 "[TFN_REDACTED]",
160 ),
161 "au_medicare": (
162 re.compile(r"\b\d{4}\s?\d{5}\s?\d{1,2}\b"),
163 "[MEDICARE_REDACTED]",
164 ),
165 "au_phone": (
166 re.compile(r"\+61\s?\d\s?\d{4}\s?\d{4}\b"),
167 "[PHONE_REDACTED]",
168 ),
169 },
170 # ── Canada ──
171 "CA": {
172 "ca_sin": (
173 re.compile(r"\b\d{3}\s?\d{3}\s?\d{3}\b"),
174 "[SIN_REDACTED]",
175 ),
176 },
177 # ── Japan ──
178 "JP": {
179 "jp_my_number": (
180 # My Number: 12 digits, context-aware to avoid Aadhaar overlap
181 re.compile(r"(?i)(?:my\s*number|マイナンバー)[:\s]+(\d{4}\s?\d{4}\s?\d{4})\b"),
182 "[MY_NUMBER_REDACTED]",
183 ),
184 "jp_phone": (
185 re.compile(r"\+81\s?\d{1,4}\s?\d{1,4}\s?\d{4}\b"),
186 "[PHONE_REDACTED]",
187 ),
188 },
189 # ── China ──
190 "CN": {
191 "cn_resident_id": (
192 re.compile(r"\b\d{6}(?:19|20)\d{2}(?:0[1-9]|1[0-2])(?:0[1-9]|[12]\d|3[01])\d{3}[\dXx]\b"),
193 "[CN_ID_REDACTED]",
194 ),
195 "cn_phone": (
196 re.compile(r"\+86\s?1\d{2}\s?\d{4}\s?\d{4}\b"),
197 "[PHONE_REDACTED]",
198 ),
199 },
200}
203def _luhn_check(number: str) -> bool:
204 """Validate credit card or SIN number with Luhn algorithm."""
205 digits = [int(d) for d in number if d.isdigit()]
206 if len(digits) < 9 or len(digits) > 19:
207 return False
208 checksum = 0
209 for i, d in enumerate(reversed(digits)):
210 if i % 2 == 1:
211 d *= 2
212 if d > 9:
213 d -= 9
214 checksum += d
215 return checksum % 10 == 0
218# ---------------------------------------------------------------------------
219# PII Scanner
220# ---------------------------------------------------------------------------
223class PiiScanner:
224 """PII detection with multiple modes: regex, NER, LLM, rules_then_llm.
226 Sync scan() works for regex and NER modes.
227 Async scan_async()/apply_async() required for LLM and rules_then_llm modes.
228 """
230 def __init__(
231 self,
232 mode: str = "regex",
233 action: str = "redact",
234 custom_patterns: dict[str, tuple[str, str]] | None = None,
235 countries: list[str] | None = None,
236 type_overrides: dict[str, dict[str, str]] | None = None,
237 llm_provider: object | None = None,
238 ner_model: str = "en_core_web_sm",
239 ) -> None:
240 self.mode = mode
241 self.action = action # "redact" | "reject" | "warn"
242 self._type_overrides = type_overrides or {}
244 # Build pattern dict: global + country-specific
245 self._patterns = dict(_PII_PATTERNS)
246 if countries:
247 for country in countries:
248 country_upper = country.upper()
249 if country_upper in _COUNTRY_PATTERNS:
250 self._patterns.update(_COUNTRY_PATTERNS[country_upper])
251 if custom_patterns:
252 for name, (pattern, replacement) in custom_patterns.items():
253 self._patterns[name] = (re.compile(pattern), replacement)
255 # NER scanner (lazy init)
256 self._ner_scanner: NerPiiScanner | None = None
257 if mode in ("ner", "rules_then_llm"):
258 self._init_ner(ner_model)
260 # LLM scanner (lazy init)
261 self._llm_scanner: LlmPiiScanner | None = None
262 if mode in ("llm", "rules_then_llm") and llm_provider:
263 from astrocyte.policy.llm_scanner import LlmPiiScanner as _LlmScanner
265 self._llm_scanner = _LlmScanner(llm_provider)
267 def _init_ner(self, model: str) -> None:
268 """Initialize NER scanner. Fails gracefully if spaCy not installed."""
269 try:
270 from astrocyte.policy.ner_scanner import NerPiiScanner as _NerScanner
272 self._ner_scanner = _NerScanner(model)
273 except ImportError:
274 logger.warning("NER mode requested but spaCy not installed. Install with: pip install astrocyte[ner]")
276 # ── Sync scanning (regex + NER) ──
278 def scan(self, text: str) -> list[PiiMatch]:
279 """Scan text for PII. Returns list of matches.
281 Works for regex, ner, and rules_then_llm (regex+NER portion) modes.
282 For llm mode, use scan_async().
283 """
284 if self.mode == "disabled":
285 return []
287 matches = self._scan_regex(text)
289 # Include NER for ner mode and rules_then_llm (NER is part of "rules")
290 if self.mode in ("ner", "rules_then_llm") and self._ner_scanner:
291 ner_matches = self._ner_scanner.scan(text)
292 matches = self._merge_matches(matches, ner_matches)
294 return matches
296 def _scan_regex(self, text: str) -> list[PiiMatch]:
297 """Regex-only scan. Sync, pure."""
298 matches: list[PiiMatch] = []
299 for pii_type, (pattern, replacement) in self._patterns.items():
300 for m in pattern.finditer(text):
301 matched_text = m.group()
303 # Credit card: validate with Luhn
304 if pii_type == "credit_card" and not _luhn_check(matched_text):
305 continue
307 # CA SIN: validate with Luhn
308 if pii_type == "ca_sin" and not _luhn_check(matched_text):
309 continue
311 matches.append(
312 PiiMatch(
313 pii_type=pii_type,
314 start=m.start(),
315 end=m.end(),
316 matched_text=matched_text,
317 replacement=replacement,
318 )
319 )
321 return matches
323 @staticmethod
324 def _merge_matches(a: list[PiiMatch], b: list[PiiMatch]) -> list[PiiMatch]:
325 """Merge two match lists, removing overlaps (prefer earlier/longer)."""
326 combined = sorted(a + b, key=lambda m: (m.start, -(m.end - m.start)))
327 merged: list[PiiMatch] = []
328 last_end = -1
329 for match in combined:
330 if match.start >= last_end:
331 merged.append(match)
332 last_end = match.end
333 return merged
335 # ── Async scanning (LLM + rules_then_llm) ──
337 async def scan_async(self, text: str) -> list[PiiMatch]:
338 """Async scan — supports all modes including LLM."""
339 if self.mode in ("disabled", "regex", "ner"):
340 return self.scan(text)
342 if self.mode == "llm" and self._llm_scanner:
343 return await self._llm_scanner.scan(text)
345 if self.mode == "rules_then_llm":
346 # Try regex + NER first
347 matches = self.scan(text)
348 if matches:
349 return matches
350 # Fall back to LLM
351 if self._llm_scanner:
352 return await self._llm_scanner.scan(text)
354 return self.scan(text)
356 # ── Apply actions ──
358 def apply(self, text: str) -> tuple[str, list[PiiMatch]]:
359 """Scan and apply action. Returns (processed_text, matches).
361 Raises PiiRejected if action is 'reject' and PII is found.
362 """
363 matches = self.scan(text)
364 return self._apply_matches(text, matches)
366 async def apply_async(self, text: str) -> tuple[str, list[PiiMatch]]:
367 """Async scan and apply — supports LLM modes."""
368 matches = await self.scan_async(text)
369 return self._apply_matches(text, matches)
371 def _apply_matches(self, text: str, matches: list[PiiMatch]) -> tuple[str, list[PiiMatch]]:
372 """Apply action to detected matches."""
373 if not matches:
374 return text, []
376 # Check per-type overrides for reject
377 reject_types = []
378 for match in matches:
379 override = self._type_overrides.get(match.pii_type)
380 if override and override.get("action") == "reject":
381 reject_types.append(match.pii_type)
382 elif not override and self.action == "reject":
383 reject_types.append(match.pii_type)
385 if reject_types:
386 raise PiiRejected(reject_types)
388 # Apply per-type actions
389 if self.action == "redact" or any(
390 self._type_overrides.get(m.pii_type, {}).get("action") == "redact" for m in matches
391 ):
392 result = text
393 for match in sorted(matches, key=lambda m: m.start, reverse=True):
394 override = self._type_overrides.get(match.pii_type)
395 action = override.get("action", self.action) if override else self.action
396 if action == "redact":
397 replacement = override.get("replacement", match.replacement) if override else match.replacement
398 result = result[: match.start] + (replacement or "[REDACTED]") + result[match.end :]
399 # warn: leave in place
400 return result, matches
402 # action == "warn": return original text with matches for logging
403 return text, matches
406# ---------------------------------------------------------------------------
407# Content validator
408# ---------------------------------------------------------------------------
411class ContentValidator:
412 """Validates retain content against policy rules.
414 Sync, stateless — Rust migration candidate.
415 """
417 def __init__(
418 self,
419 max_content_length: int = 50000,
420 reject_empty: bool = True,
421 allowed_content_types: list[str] | None = None,
422 ) -> None:
423 self.max_content_length = max_content_length
424 self.reject_empty = reject_empty
425 self.allowed_content_types = allowed_content_types or ["text", "conversation", "document"]
427 def validate(self, content: str, content_type: str = "text") -> list[str]:
428 """Validate content. Returns list of error messages (empty = valid)."""
429 errors: list[str] = []
431 if self.reject_empty and not content.strip():
432 errors.append("Content is empty")
434 if len(content) > self.max_content_length:
435 errors.append(f"Content too long: {len(content)} > {self.max_content_length}")
437 if content_type not in self.allowed_content_types:
438 errors.append(f"Content type '{content_type}' not allowed. Allowed: {self.allowed_content_types}")
440 return errors
443# ---------------------------------------------------------------------------
444# Metadata sanitizer
445# ---------------------------------------------------------------------------
448class MetadataSanitizer:
449 """Strips blocked keys and enforces size limits on metadata.
451 Sync, stateless — Rust migration candidate.
452 """
454 def __init__(
455 self,
456 blocked_keys: list[str] | None = None,
457 max_size_bytes: int = 4096,
458 ) -> None:
459 self.blocked_keys = set(blocked_keys or ["api_key", "password", "token", "secret"])
460 self.max_size_bytes = max_size_bytes
462 def sanitize(self, metadata: Metadata | None) -> tuple[Metadata | None, list[str]]:
463 """Sanitize metadata. Returns (cleaned_metadata, warnings)."""
464 if metadata is None:
465 return None, []
467 warnings: list[str] = []
468 cleaned: Metadata = {}
470 for key, value in metadata.items():
471 # Match exact key or key as a standalone word boundary segment
472 key_lower = key.lower()
473 if key_lower in self.blocked_keys or any(
474 re.search(rf"(?:^|[_.\-]){re.escape(bk)}(?:$|[_.\-])", key_lower) for bk in self.blocked_keys
475 ):
476 warnings.append(f"Blocked metadata key: '{key}'")
477 continue
478 cleaned[key] = value
480 # Size check — remove keys in reverse alphabetical order (deterministic)
481 serialized = json.dumps(cleaned, default=str)
482 if len(serialized.encode("utf-8")) > self.max_size_bytes:
483 warnings.append(f"Metadata exceeds {self.max_size_bytes} bytes, truncated")
484 keys_by_priority = sorted(cleaned.keys(), reverse=True) # z→a: drop least likely important first
485 for drop_key in keys_by_priority:
486 if len(json.dumps(cleaned, default=str).encode("utf-8")) <= self.max_size_bytes:
487 break
488 cleaned.pop(drop_key)
490 return cleaned if cleaned else None, warnings