Coverage for astrocyte/_output_scanner.py: 100%
47 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""DLP output scanner — scans recall/reflect results for PII."""
3from __future__ import annotations
5from astrocyte.config import AstrocyteConfig
6from astrocyte.policy.barriers import PiiScanner
7from astrocyte.policy.observability import StructuredLogger
8from astrocyte.types import MemoryHit, RecallResult, ReflectResult
11class OutputScanner:
12 """Scans recall/reflect output for PII and applies redact/reject/warn actions."""
14 def __init__(self, config: AstrocyteConfig, logger: StructuredLogger) -> None:
15 self._config = config
16 self._logger = logger
17 self._scanner: PiiScanner | None = None
18 if config.dlp.scan_recall_output or config.dlp.scan_reflect_output:
19 self._scanner = PiiScanner(mode="regex", action=config.dlp.output_pii_action)
21 @property
22 def has_scanner(self) -> bool:
23 return self._scanner is not None
25 def scan_recall(self, result: RecallResult) -> RecallResult:
26 """Scan recall hits for PII. Redact/warn/reject per DLP config."""
27 if not self._scanner:
28 return result
29 action = self._config.dlp.output_pii_action
30 scanned_hits: list[MemoryHit] = []
31 for hit in result.hits:
32 matches = self._scanner.scan(hit.text)
33 if not matches:
34 scanned_hits.append(hit)
35 continue
36 if action == "reject":
37 continue # Drop hit silently
38 if action == "redact":
39 redacted, _ = self._scanner.apply(hit.text)
40 scanned_hits.append(
41 MemoryHit(
42 text=redacted,
43 score=hit.score,
44 fact_type=hit.fact_type,
45 metadata=hit.metadata,
46 tags=hit.tags,
47 occurred_at=hit.occurred_at,
48 source=hit.source,
49 memory_id=hit.memory_id,
50 bank_id=hit.bank_id,
51 memory_layer=hit.memory_layer,
52 utility_score=hit.utility_score,
53 )
54 )
55 else:
56 # warn — pass through with logging
57 self._logger.log(
58 "astrocyte.dlp.recall_pii_detected",
59 bank_id=hit.bank_id or "",
60 operation="recall",
61 data={"pii_types": ",".join(m.pii_type for m in matches), "memory_id": hit.memory_id or ""},
62 )
63 scanned_hits.append(hit)
65 return RecallResult(
66 hits=scanned_hits,
67 total_available=result.total_available,
68 truncated=result.truncated,
69 trace=result.trace,
70 )
72 def scan_reflect(self, result: ReflectResult) -> ReflectResult:
73 """Scan reflect answer for PII. Redact/warn/reject per DLP config."""
74 if not self._scanner:
75 return result
76 matches = self._scanner.scan(result.answer)
77 if not matches:
78 return result
80 action = self._config.dlp.output_pii_action
81 if action == "reject":
82 return ReflectResult(
83 answer="",
84 confidence=None,
85 sources=result.sources,
86 observations=["Reflect output blocked by DLP policy: PII detected"],
87 )
88 if action == "redact":
89 redacted, _ = self._scanner.apply(result.answer)
90 return ReflectResult(
91 answer=redacted,
92 confidence=result.confidence,
93 sources=result.sources,
94 observations=result.observations,
95 )
96 # warn
97 self._logger.log(
98 "astrocyte.dlp.reflect_pii_detected",
99 operation="reflect",
100 data={"pii_types": ",".join(m.pii_type for m in matches)},
101 )
102 return result