Coverage for astrocyte/policy/barriers.py: 97%

156 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""Barrier policies — PII scanning, content validation, metadata sanitization. 

2 

3All functions are sync (Rust migration candidates) except scan_async/apply_async. 

4See docs/_design/policy-layer.md section 2 and docs/_design/data-governance.md. 

5""" 

6 

7from __future__ import annotations 

8 

9import json 

10import logging 

11import re 

12from typing import TYPE_CHECKING 

13 

14from astrocyte.errors import PiiRejected 

15from astrocyte.types import Metadata, PiiMatch 

16 

17if TYPE_CHECKING: 

18 from astrocyte.policy.llm_scanner import LlmPiiScanner 

19 from astrocyte.policy.ner_scanner import NerPiiScanner 

20 

21logger = logging.getLogger("astrocyte.pii") 

22 

23# --------------------------------------------------------------------------- 

24# PII regex patterns — global 

25# --------------------------------------------------------------------------- 

26 

27_PII_PATTERNS: dict[str, tuple[re.Pattern[str], str]] = { 

28 "email": ( 

29 re.compile( 

30 r"\b[a-zA-Z0-9](?:[a-zA-Z0-9._%+\-]*[a-zA-Z0-9])?@[a-zA-Z0-9](?:[a-zA-Z0-9\-]*[a-zA-Z0-9])?(?:\.[a-zA-Z0-9](?:[a-zA-Z0-9\-]*[a-zA-Z0-9])?)*\.[a-zA-Z]{2,}\b" 

31 ), 

32 "[EMAIL_REDACTED]", 

33 ), 

34 "phone": ( 

35 re.compile(r"(?:\+?1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}"), 

36 "[PHONE_REDACTED]", 

37 ), 

38 "ssn": ( 

39 re.compile(r"\b\d{3}-\d{2}-\d{4}\b"), 

40 "[SSN_REDACTED]", 

41 ), 

42 "credit_card": ( 

43 re.compile(r"\b(?:\d{4}[-\s]?){3}\d{4}\b"), 

44 "[CC_REDACTED]", 

45 ), 

46 "ip_address": ( 

47 re.compile( 

48 r"\b(?:(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\b" 

49 r"|(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}" 

50 ), 

51 "[IP_REDACTED]", 

52 ), 

53 # Global: date of birth (with context words) 

54 "date_of_birth": ( 

55 re.compile( 

56 r"(?i)(?:born|dob|date\s+of\s+birth|birthday)[:\s]+(\d{4}[-/]\d{2}[-/]\d{2}|\d{1,2}[-/]\d{1,2}[-/]\d{2,4})" 

57 ), 

58 "[DOB_REDACTED]", 

59 ), 

60 # Global: IBAN 

61 "iban": ( 

62 re.compile(r"\b[A-Z]{2}\d{2}\s?[A-Z0-9]{4}(?:\s?[A-Z0-9]{4}){2,7}(?:\s?[A-Z0-9]{1,4})?\b"), 

63 "[IBAN_REDACTED]", 

64 ), 

65} 

66 

67# --------------------------------------------------------------------------- 

68# Country-specific patterns 

69# --------------------------------------------------------------------------- 

70 

71_COUNTRY_PATTERNS: dict[str, dict[str, tuple[re.Pattern[str], str]]] = { 

72 # ── Singapore ── 

73 "SG": { 

74 "nric": ( 

75 re.compile(r"\b[STFGM]\d{7}[A-Z]\b"), 

76 "[NRIC_REDACTED]", 

77 ), 

78 "sg_phone": ( 

79 re.compile(r"\+65\s?\d{4}\s?\d{4}\b"), 

80 "[PHONE_REDACTED]", 

81 ), 

82 }, 

83 # ── India ── 

84 "IN": { 

85 "aadhaar": ( 

86 # Aadhaar: 12 digits, first digit 2-9 (never starts with 0 or 1) 

87 re.compile(r"\b[2-9]\d{3}\s?\d{4}\s?\d{4}\b"), 

88 "[AADHAAR_REDACTED]", 

89 ), 

90 "pan": ( 

91 re.compile(r"\b[A-Z]{5}\d{4}[A-Z]\b"), 

92 "[PAN_REDACTED]", 

93 ), 

94 "in_phone": ( 

95 re.compile(r"\+91\s?\d{5}\s?\d{5}\b"), 

96 "[PHONE_REDACTED]", 

97 ), 

98 }, 

99 # ── United States ── 

100 "US": { 

101 "us_passport": ( 

102 # US passport: letter prefix (optional since 2021) + 8-9 digits 

103 re.compile(r"\b[A-Z]?\d{8,9}\b"), 

104 "[PASSPORT_REDACTED]", 

105 ), 

106 }, 

107 # ── United Kingdom ── 

108 "UK": { 

109 "uk_nino": ( 

110 re.compile(r"\b[A-Z]{2}\s?\d{2}\s?\d{2}\s?\d{2}\s?[A-D]\b"), 

111 "[NINO_REDACTED]", 

112 ), 

113 "uk_nhs": ( 

114 re.compile(r"\b\d{3}\s?\d{3}\s?\d{4}\b"), 

115 "[NHS_REDACTED]", 

116 ), 

117 "uk_phone": ( 

118 re.compile(r"(?:\+44\s?\d{4}\s?\d{6}|0\d{4}\s?\d{6})\b"), 

119 "[PHONE_REDACTED]", 

120 ), 

121 }, 

122 # ── EU: Germany ── 

123 "DE": { 

124 "de_personalausweis": ( 

125 # German ID: letter + digit + 8 alphanum + check digit (structured, not generic 10-char) 

126 re.compile(r"\b[CFGHJKLMNPRTVWXYZ]\d[A-Z0-9]{6}\d\b"), 

127 "[DE_ID_REDACTED]", 

128 ), 

129 }, 

130 # ── EU: France ── 

131 "FR": { 

132 "fr_insee": ( 

133 re.compile(r"\b[12]\s?\d{2}\s?\d{2}\s?\d{2}\s?\d{3}\s?\d{3}\s?\d{2}\b"), 

134 "[FR_SSN_REDACTED]", 

135 ), 

136 }, 

137 # ── EU: Italy ── 

138 "IT": { 

139 "it_codice_fiscale": ( 

140 re.compile(r"\b[A-Z]{6}\d{2}[A-Z]\d{2}[A-Z]\d{3}[A-Z]\b"), 

141 "[IT_CF_REDACTED]", 

142 ), 

143 }, 

144 # ── EU: Spain ── 

145 "ES": { 

146 "es_dni": ( 

147 re.compile(r"\b\d{8}[A-Z]\b"), 

148 "[ES_DNI_REDACTED]", 

149 ), 

150 "es_nie": ( 

151 re.compile(r"\b[XYZ]\d{7}[A-Z]\b"), 

152 "[ES_NIE_REDACTED]", 

153 ), 

154 }, 

155 # ── Australia ── 

156 "AU": { 

157 "au_tfn": ( 

158 re.compile(r"\b\d{3}\s?\d{3}\s?\d{2,3}\b"), 

159 "[TFN_REDACTED]", 

160 ), 

161 "au_medicare": ( 

162 re.compile(r"\b\d{4}\s?\d{5}\s?\d{1,2}\b"), 

163 "[MEDICARE_REDACTED]", 

164 ), 

165 "au_phone": ( 

166 re.compile(r"\+61\s?\d\s?\d{4}\s?\d{4}\b"), 

167 "[PHONE_REDACTED]", 

168 ), 

169 }, 

170 # ── Canada ── 

171 "CA": { 

172 "ca_sin": ( 

173 re.compile(r"\b\d{3}\s?\d{3}\s?\d{3}\b"), 

174 "[SIN_REDACTED]", 

175 ), 

176 }, 

177 # ── Japan ── 

178 "JP": { 

179 "jp_my_number": ( 

180 # My Number: 12 digits, context-aware to avoid Aadhaar overlap 

181 re.compile(r"(?i)(?:my\s*number|マイナンバー)[:\s]+(\d{4}\s?\d{4}\s?\d{4})\b"), 

182 "[MY_NUMBER_REDACTED]", 

183 ), 

184 "jp_phone": ( 

185 re.compile(r"\+81\s?\d{1,4}\s?\d{1,4}\s?\d{4}\b"), 

186 "[PHONE_REDACTED]", 

187 ), 

188 }, 

189 # ── China ── 

190 "CN": { 

191 "cn_resident_id": ( 

192 re.compile(r"\b\d{6}(?:19|20)\d{2}(?:0[1-9]|1[0-2])(?:0[1-9]|[12]\d|3[01])\d{3}[\dXx]\b"), 

193 "[CN_ID_REDACTED]", 

194 ), 

195 "cn_phone": ( 

196 re.compile(r"\+86\s?1\d{2}\s?\d{4}\s?\d{4}\b"), 

197 "[PHONE_REDACTED]", 

198 ), 

199 }, 

200} 

201 

202 

203def _luhn_check(number: str) -> bool: 

204 """Validate credit card or SIN number with Luhn algorithm.""" 

205 digits = [int(d) for d in number if d.isdigit()] 

206 if len(digits) < 9 or len(digits) > 19: 

207 return False 

208 checksum = 0 

209 for i, d in enumerate(reversed(digits)): 

210 if i % 2 == 1: 

211 d *= 2 

212 if d > 9: 

213 d -= 9 

214 checksum += d 

215 return checksum % 10 == 0 

216 

217 

218# --------------------------------------------------------------------------- 

219# PII Scanner 

220# --------------------------------------------------------------------------- 

221 

222 

223class PiiScanner: 

224 """PII detection with multiple modes: regex, NER, LLM, rules_then_llm. 

225 

226 Sync scan() works for regex and NER modes. 

227 Async scan_async()/apply_async() required for LLM and rules_then_llm modes. 

228 """ 

229 

230 def __init__( 

231 self, 

232 mode: str = "regex", 

233 action: str = "redact", 

234 custom_patterns: dict[str, tuple[str, str]] | None = None, 

235 countries: list[str] | None = None, 

236 type_overrides: dict[str, dict[str, str]] | None = None, 

237 llm_provider: object | None = None, 

238 ner_model: str = "en_core_web_sm", 

239 ) -> None: 

240 self.mode = mode 

241 self.action = action # "redact" | "reject" | "warn" 

242 self._type_overrides = type_overrides or {} 

243 

244 # Build pattern dict: global + country-specific 

245 self._patterns = dict(_PII_PATTERNS) 

246 if countries: 

247 for country in countries: 

248 country_upper = country.upper() 

249 if country_upper in _COUNTRY_PATTERNS: 

250 self._patterns.update(_COUNTRY_PATTERNS[country_upper]) 

251 if custom_patterns: 

252 for name, (pattern, replacement) in custom_patterns.items(): 

253 self._patterns[name] = (re.compile(pattern), replacement) 

254 

255 # NER scanner (lazy init) 

256 self._ner_scanner: NerPiiScanner | None = None 

257 if mode in ("ner", "rules_then_llm"): 

258 self._init_ner(ner_model) 

259 

260 # LLM scanner (lazy init) 

261 self._llm_scanner: LlmPiiScanner | None = None 

262 if mode in ("llm", "rules_then_llm") and llm_provider: 

263 from astrocyte.policy.llm_scanner import LlmPiiScanner as _LlmScanner 

264 

265 self._llm_scanner = _LlmScanner(llm_provider) 

266 

267 def _init_ner(self, model: str) -> None: 

268 """Initialize NER scanner. Fails gracefully if spaCy not installed.""" 

269 try: 

270 from astrocyte.policy.ner_scanner import NerPiiScanner as _NerScanner 

271 

272 self._ner_scanner = _NerScanner(model) 

273 except ImportError: 

274 logger.warning("NER mode requested but spaCy not installed. Install with: pip install astrocyte[ner]") 

275 

276 # ── Sync scanning (regex + NER) ── 

277 

278 def scan(self, text: str) -> list[PiiMatch]: 

279 """Scan text for PII. Returns list of matches. 

280 

281 Works for regex, ner, and rules_then_llm (regex+NER portion) modes. 

282 For llm mode, use scan_async(). 

283 """ 

284 if self.mode == "disabled": 

285 return [] 

286 

287 matches = self._scan_regex(text) 

288 

289 # Include NER for ner mode and rules_then_llm (NER is part of "rules") 

290 if self.mode in ("ner", "rules_then_llm") and self._ner_scanner: 

291 ner_matches = self._ner_scanner.scan(text) 

292 matches = self._merge_matches(matches, ner_matches) 

293 

294 return matches 

295 

296 def _scan_regex(self, text: str) -> list[PiiMatch]: 

297 """Regex-only scan. Sync, pure.""" 

298 matches: list[PiiMatch] = [] 

299 for pii_type, (pattern, replacement) in self._patterns.items(): 

300 for m in pattern.finditer(text): 

301 matched_text = m.group() 

302 

303 # Credit card: validate with Luhn 

304 if pii_type == "credit_card" and not _luhn_check(matched_text): 

305 continue 

306 

307 # CA SIN: validate with Luhn 

308 if pii_type == "ca_sin" and not _luhn_check(matched_text): 

309 continue 

310 

311 matches.append( 

312 PiiMatch( 

313 pii_type=pii_type, 

314 start=m.start(), 

315 end=m.end(), 

316 matched_text=matched_text, 

317 replacement=replacement, 

318 ) 

319 ) 

320 

321 return matches 

322 

323 @staticmethod 

324 def _merge_matches(a: list[PiiMatch], b: list[PiiMatch]) -> list[PiiMatch]: 

325 """Merge two match lists, removing overlaps (prefer earlier/longer).""" 

326 combined = sorted(a + b, key=lambda m: (m.start, -(m.end - m.start))) 

327 merged: list[PiiMatch] = [] 

328 last_end = -1 

329 for match in combined: 

330 if match.start >= last_end: 

331 merged.append(match) 

332 last_end = match.end 

333 return merged 

334 

335 # ── Async scanning (LLM + rules_then_llm) ── 

336 

337 async def scan_async(self, text: str) -> list[PiiMatch]: 

338 """Async scan — supports all modes including LLM.""" 

339 if self.mode in ("disabled", "regex", "ner"): 

340 return self.scan(text) 

341 

342 if self.mode == "llm" and self._llm_scanner: 

343 return await self._llm_scanner.scan(text) 

344 

345 if self.mode == "rules_then_llm": 

346 # Try regex + NER first 

347 matches = self.scan(text) 

348 if matches: 

349 return matches 

350 # Fall back to LLM 

351 if self._llm_scanner: 

352 return await self._llm_scanner.scan(text) 

353 

354 return self.scan(text) 

355 

356 # ── Apply actions ── 

357 

358 def apply(self, text: str) -> tuple[str, list[PiiMatch]]: 

359 """Scan and apply action. Returns (processed_text, matches). 

360 

361 Raises PiiRejected if action is 'reject' and PII is found. 

362 """ 

363 matches = self.scan(text) 

364 return self._apply_matches(text, matches) 

365 

366 async def apply_async(self, text: str) -> tuple[str, list[PiiMatch]]: 

367 """Async scan and apply — supports LLM modes.""" 

368 matches = await self.scan_async(text) 

369 return self._apply_matches(text, matches) 

370 

371 def _apply_matches(self, text: str, matches: list[PiiMatch]) -> tuple[str, list[PiiMatch]]: 

372 """Apply action to detected matches.""" 

373 if not matches: 

374 return text, [] 

375 

376 # Check per-type overrides for reject 

377 reject_types = [] 

378 for match in matches: 

379 override = self._type_overrides.get(match.pii_type) 

380 if override and override.get("action") == "reject": 

381 reject_types.append(match.pii_type) 

382 elif not override and self.action == "reject": 

383 reject_types.append(match.pii_type) 

384 

385 if reject_types: 

386 raise PiiRejected(reject_types) 

387 

388 # Apply per-type actions 

389 if self.action == "redact" or any( 

390 self._type_overrides.get(m.pii_type, {}).get("action") == "redact" for m in matches 

391 ): 

392 result = text 

393 for match in sorted(matches, key=lambda m: m.start, reverse=True): 

394 override = self._type_overrides.get(match.pii_type) 

395 action = override.get("action", self.action) if override else self.action 

396 if action == "redact": 

397 replacement = override.get("replacement", match.replacement) if override else match.replacement 

398 result = result[: match.start] + (replacement or "[REDACTED]") + result[match.end :] 

399 # warn: leave in place 

400 return result, matches 

401 

402 # action == "warn": return original text with matches for logging 

403 return text, matches 

404 

405 

406# --------------------------------------------------------------------------- 

407# Content validator 

408# --------------------------------------------------------------------------- 

409 

410 

411class ContentValidator: 

412 """Validates retain content against policy rules. 

413 

414 Sync, stateless — Rust migration candidate. 

415 """ 

416 

417 def __init__( 

418 self, 

419 max_content_length: int = 50000, 

420 reject_empty: bool = True, 

421 allowed_content_types: list[str] | None = None, 

422 ) -> None: 

423 self.max_content_length = max_content_length 

424 self.reject_empty = reject_empty 

425 self.allowed_content_types = allowed_content_types or ["text", "conversation", "document"] 

426 

427 def validate(self, content: str, content_type: str = "text") -> list[str]: 

428 """Validate content. Returns list of error messages (empty = valid).""" 

429 errors: list[str] = [] 

430 

431 if self.reject_empty and not content.strip(): 

432 errors.append("Content is empty") 

433 

434 if len(content) > self.max_content_length: 

435 errors.append(f"Content too long: {len(content)} > {self.max_content_length}") 

436 

437 if content_type not in self.allowed_content_types: 

438 errors.append(f"Content type '{content_type}' not allowed. Allowed: {self.allowed_content_types}") 

439 

440 return errors 

441 

442 

443# --------------------------------------------------------------------------- 

444# Metadata sanitizer 

445# --------------------------------------------------------------------------- 

446 

447 

448class MetadataSanitizer: 

449 """Strips blocked keys and enforces size limits on metadata. 

450 

451 Sync, stateless — Rust migration candidate. 

452 """ 

453 

454 def __init__( 

455 self, 

456 blocked_keys: list[str] | None = None, 

457 max_size_bytes: int = 4096, 

458 ) -> None: 

459 self.blocked_keys = set(blocked_keys or ["api_key", "password", "token", "secret"]) 

460 self.max_size_bytes = max_size_bytes 

461 

462 def sanitize(self, metadata: Metadata | None) -> tuple[Metadata | None, list[str]]: 

463 """Sanitize metadata. Returns (cleaned_metadata, warnings).""" 

464 if metadata is None: 

465 return None, [] 

466 

467 warnings: list[str] = [] 

468 cleaned: Metadata = {} 

469 

470 for key, value in metadata.items(): 

471 # Match exact key or key as a standalone word boundary segment 

472 key_lower = key.lower() 

473 if key_lower in self.blocked_keys or any( 

474 re.search(rf"(?:^|[_.\-]){re.escape(bk)}(?:$|[_.\-])", key_lower) for bk in self.blocked_keys 

475 ): 

476 warnings.append(f"Blocked metadata key: '{key}'") 

477 continue 

478 cleaned[key] = value 

479 

480 # Size check — remove keys in reverse alphabetical order (deterministic) 

481 serialized = json.dumps(cleaned, default=str) 

482 if len(serialized.encode("utf-8")) > self.max_size_bytes: 

483 warnings.append(f"Metadata exceeds {self.max_size_bytes} bytes, truncated") 

484 keys_by_priority = sorted(cleaned.keys(), reverse=True) # z→a: drop least likely important first 

485 for drop_key in keys_by_priority: 

486 if len(json.dumps(cleaned, default=str).encode("utf-8")) <= self.max_size_bytes: 

487 break 

488 cleaned.pop(drop_key) 

489 

490 return cleaned if cleaned else None, warnings