Coverage for astrocyte/_output_scanner.py: 100%

47 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""DLP output scanner — scans recall/reflect results for PII.""" 

2 

3from __future__ import annotations 

4 

5from astrocyte.config import AstrocyteConfig 

6from astrocyte.policy.barriers import PiiScanner 

7from astrocyte.policy.observability import StructuredLogger 

8from astrocyte.types import MemoryHit, RecallResult, ReflectResult 

9 

10 

11class OutputScanner: 

12 """Scans recall/reflect output for PII and applies redact/reject/warn actions.""" 

13 

14 def __init__(self, config: AstrocyteConfig, logger: StructuredLogger) -> None: 

15 self._config = config 

16 self._logger = logger 

17 self._scanner: PiiScanner | None = None 

18 if config.dlp.scan_recall_output or config.dlp.scan_reflect_output: 

19 self._scanner = PiiScanner(mode="regex", action=config.dlp.output_pii_action) 

20 

21 @property 

22 def has_scanner(self) -> bool: 

23 return self._scanner is not None 

24 

25 def scan_recall(self, result: RecallResult) -> RecallResult: 

26 """Scan recall hits for PII. Redact/warn/reject per DLP config.""" 

27 if not self._scanner: 

28 return result 

29 action = self._config.dlp.output_pii_action 

30 scanned_hits: list[MemoryHit] = [] 

31 for hit in result.hits: 

32 matches = self._scanner.scan(hit.text) 

33 if not matches: 

34 scanned_hits.append(hit) 

35 continue 

36 if action == "reject": 

37 continue # Drop hit silently 

38 if action == "redact": 

39 redacted, _ = self._scanner.apply(hit.text) 

40 scanned_hits.append( 

41 MemoryHit( 

42 text=redacted, 

43 score=hit.score, 

44 fact_type=hit.fact_type, 

45 metadata=hit.metadata, 

46 tags=hit.tags, 

47 occurred_at=hit.occurred_at, 

48 source=hit.source, 

49 memory_id=hit.memory_id, 

50 bank_id=hit.bank_id, 

51 memory_layer=hit.memory_layer, 

52 utility_score=hit.utility_score, 

53 ) 

54 ) 

55 else: 

56 # warn — pass through with logging 

57 self._logger.log( 

58 "astrocyte.dlp.recall_pii_detected", 

59 bank_id=hit.bank_id or "", 

60 operation="recall", 

61 data={"pii_types": ",".join(m.pii_type for m in matches), "memory_id": hit.memory_id or ""}, 

62 ) 

63 scanned_hits.append(hit) 

64 

65 return RecallResult( 

66 hits=scanned_hits, 

67 total_available=result.total_available, 

68 truncated=result.truncated, 

69 trace=result.trace, 

70 ) 

71 

72 def scan_reflect(self, result: ReflectResult) -> ReflectResult: 

73 """Scan reflect answer for PII. Redact/warn/reject per DLP config.""" 

74 if not self._scanner: 

75 return result 

76 matches = self._scanner.scan(result.answer) 

77 if not matches: 

78 return result 

79 

80 action = self._config.dlp.output_pii_action 

81 if action == "reject": 

82 return ReflectResult( 

83 answer="", 

84 confidence=None, 

85 sources=result.sources, 

86 observations=["Reflect output blocked by DLP policy: PII detected"], 

87 ) 

88 if action == "redact": 

89 redacted, _ = self._scanner.apply(result.answer) 

90 return ReflectResult( 

91 answer=redacted, 

92 confidence=result.confidence, 

93 sources=result.sources, 

94 observations=result.observations, 

95 ) 

96 # warn 

97 self._logger.log( 

98 "astrocyte.dlp.reflect_pii_detected", 

99 operation="reflect", 

100 data={"pii_types": ",".join(m.pii_type for m in matches)}, 

101 ) 

102 return result