Coverage for astrocyte/policy/llm_scanner.py: 90%

71 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""LLM-based PII scanner — contextual PII detection via LLM. 

2 

3Detects PII that regex and NER miss: medical records, contextual references 

4("my mother's maiden name is Smith"), and implicit PII patterns. 

5 

6Async — requires LLM API call. 

7""" 

8 

9from __future__ import annotations 

10 

11import json 

12import logging 

13from typing import TYPE_CHECKING 

14 

15from astrocyte.types import Message, PiiMatch 

16 

17if TYPE_CHECKING: 

18 from astrocyte.provider import LLMProvider 

19 

20logger = logging.getLogger("astrocyte.pii") 

21 

22_LLM_PII_SYSTEM_PROMPT = """You are a PII detection system. Analyze user-provided text for personally identifiable information. 

23 

24For each PII item found, return a JSON object with: 

25- "type": the PII category (name, email, phone, address, ssn, credit_card, medical_record, date_of_birth, national_id, passport, financial_account) 

26- "text": the exact matched text from the content 

27- "start": character offset where the PII starts within the content 

28- "end": character offset where the PII ends within the content 

29 

30Return a JSON array of all detected items. If no PII is found, return: [] 

31Respond with ONLY the JSON array, no other text.""" 

32 

33 

34class LlmPiiScanner: 

35 """LLM-based contextual PII detection. 

36 

37 Async — requires LLM provider for inference. 

38 Falls back to empty list on any failure. 

39 """ 

40 

41 def __init__(self, llm_provider: LLMProvider) -> None: 

42 self._llm = llm_provider 

43 

44 async def scan(self, text: str) -> list[PiiMatch]: 

45 """Ask LLM to identify PII with positions. Returns matches.""" 

46 user_content = f"<content>\n{text[:2000]}\n</content>" 

47 

48 try: 

49 completion = await self._llm.complete( 

50 messages=[ 

51 Message(role="system", content=_LLM_PII_SYSTEM_PROMPT), 

52 Message(role="user", content=user_content), 

53 ], 

54 max_tokens=500, 

55 temperature=0, 

56 ) 

57 return _parse_llm_response(completion.text, text) 

58 except Exception: 

59 logger.warning("LLM PII scan failed, returning empty matches") 

60 return [] 

61 

62 

63# Replacement map for LLM-detected types 

64_TYPE_REPLACEMENTS: dict[str, str] = { 

65 "name": "[NAME_REDACTED]", 

66 "email": "[EMAIL_REDACTED]", 

67 "phone": "[PHONE_REDACTED]", 

68 "address": "[ADDRESS_REDACTED]", 

69 "ssn": "[SSN_REDACTED]", 

70 "credit_card": "[CC_REDACTED]", 

71 "medical_record": "[MEDICAL_REDACTED]", 

72 "date_of_birth": "[DOB_REDACTED]", 

73 "national_id": "[NATIONAL_ID_REDACTED]", 

74 "passport": "[PASSPORT_REDACTED]", 

75 "financial_account": "[ACCOUNT_REDACTED]", 

76} 

77 

78 

79def _parse_llm_response(response: str, original_text: str) -> list[PiiMatch]: 

80 """Parse LLM JSON response into PiiMatch list. Graceful fallback.""" 

81 try: 

82 text = response.strip() 

83 # Extract from code block if present 

84 if "```" in text: 

85 start = text.index("```") + 3 

86 if text[start:].startswith("json"): 

87 start += 4 

88 end = text.index("```", start) 

89 text = text[start:end].strip() 

90 

91 items = json.loads(text) 

92 if not isinstance(items, list): 

93 return [] 

94 

95 matches: list[PiiMatch] = [] 

96 used_offsets: set[int] = set() # Track used start positions to avoid duplicate mapping 

97 

98 for item in items: 

99 if not isinstance(item, dict): 

100 continue 

101 pii_type = item.get("type", "unknown") 

102 matched_text = item.get("text", "") 

103 start = item.get("start") 

104 end = item.get("end") 

105 

106 # Validate positions — LLMs sometimes get offsets wrong 

107 if start is not None and end is not None: 

108 start = int(start) 

109 end = int(end) 

110 # Verify the offsets actually point to the claimed text 

111 if start >= 0 and end <= len(original_text): 

112 actual = original_text[start:end] 

113 if actual != matched_text and matched_text: 

114 # LLM gave wrong offsets — re-locate 

115 start, end = _find_unused_occurrence(original_text, matched_text, used_offsets) 

116 if start is None: 

117 continue 

118 elif matched_text: 

119 # No offsets provided — find unused occurrence in original 

120 start, end = _find_unused_occurrence(original_text, matched_text, used_offsets) 

121 if start is None: 

122 continue 

123 

124 if start is None or end is None: 

125 continue 

126 

127 used_offsets.add(start) 

128 replacement = _TYPE_REPLACEMENTS.get(pii_type, f"[{pii_type.upper()}_REDACTED]") 

129 matches.append( 

130 PiiMatch( 

131 pii_type=pii_type, 

132 start=start, 

133 end=end, 

134 matched_text=matched_text, 

135 replacement=replacement, 

136 ) 

137 ) 

138 

139 return matches 

140 except (json.JSONDecodeError, ValueError, KeyError): 

141 logger.warning("Failed to parse LLM PII response") 

142 return [] 

143 

144 

145def _find_unused_occurrence(text: str, needle: str, used: set[int]) -> tuple[int | None, int | None]: 

146 """Find the first occurrence of needle in text that hasn't been used yet.""" 

147 search_start = 0 

148 while True: 

149 idx = text.find(needle, search_start) 

150 if idx < 0: 

151 return None, None 

152 if idx not in used: 

153 return idx, idx + len(needle) 

154 search_start = idx + 1