Coverage for astrocyte/policy/llm_scanner.py: 90%
71 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""LLM-based PII scanner — contextual PII detection via LLM.
3Detects PII that regex and NER miss: medical records, contextual references
4("my mother's maiden name is Smith"), and implicit PII patterns.
6Async — requires LLM API call.
7"""
9from __future__ import annotations
11import json
12import logging
13from typing import TYPE_CHECKING
15from astrocyte.types import Message, PiiMatch
17if TYPE_CHECKING:
18 from astrocyte.provider import LLMProvider
20logger = logging.getLogger("astrocyte.pii")
22_LLM_PII_SYSTEM_PROMPT = """You are a PII detection system. Analyze user-provided text for personally identifiable information.
24For each PII item found, return a JSON object with:
25- "type": the PII category (name, email, phone, address, ssn, credit_card, medical_record, date_of_birth, national_id, passport, financial_account)
26- "text": the exact matched text from the content
27- "start": character offset where the PII starts within the content
28- "end": character offset where the PII ends within the content
30Return a JSON array of all detected items. If no PII is found, return: []
31Respond with ONLY the JSON array, no other text."""
34class LlmPiiScanner:
35 """LLM-based contextual PII detection.
37 Async — requires LLM provider for inference.
38 Falls back to empty list on any failure.
39 """
41 def __init__(self, llm_provider: LLMProvider) -> None:
42 self._llm = llm_provider
44 async def scan(self, text: str) -> list[PiiMatch]:
45 """Ask LLM to identify PII with positions. Returns matches."""
46 user_content = f"<content>\n{text[:2000]}\n</content>"
48 try:
49 completion = await self._llm.complete(
50 messages=[
51 Message(role="system", content=_LLM_PII_SYSTEM_PROMPT),
52 Message(role="user", content=user_content),
53 ],
54 max_tokens=500,
55 temperature=0,
56 )
57 return _parse_llm_response(completion.text, text)
58 except Exception:
59 logger.warning("LLM PII scan failed, returning empty matches")
60 return []
63# Replacement map for LLM-detected types
64_TYPE_REPLACEMENTS: dict[str, str] = {
65 "name": "[NAME_REDACTED]",
66 "email": "[EMAIL_REDACTED]",
67 "phone": "[PHONE_REDACTED]",
68 "address": "[ADDRESS_REDACTED]",
69 "ssn": "[SSN_REDACTED]",
70 "credit_card": "[CC_REDACTED]",
71 "medical_record": "[MEDICAL_REDACTED]",
72 "date_of_birth": "[DOB_REDACTED]",
73 "national_id": "[NATIONAL_ID_REDACTED]",
74 "passport": "[PASSPORT_REDACTED]",
75 "financial_account": "[ACCOUNT_REDACTED]",
76}
79def _parse_llm_response(response: str, original_text: str) -> list[PiiMatch]:
80 """Parse LLM JSON response into PiiMatch list. Graceful fallback."""
81 try:
82 text = response.strip()
83 # Extract from code block if present
84 if "```" in text:
85 start = text.index("```") + 3
86 if text[start:].startswith("json"):
87 start += 4
88 end = text.index("```", start)
89 text = text[start:end].strip()
91 items = json.loads(text)
92 if not isinstance(items, list):
93 return []
95 matches: list[PiiMatch] = []
96 used_offsets: set[int] = set() # Track used start positions to avoid duplicate mapping
98 for item in items:
99 if not isinstance(item, dict):
100 continue
101 pii_type = item.get("type", "unknown")
102 matched_text = item.get("text", "")
103 start = item.get("start")
104 end = item.get("end")
106 # Validate positions — LLMs sometimes get offsets wrong
107 if start is not None and end is not None:
108 start = int(start)
109 end = int(end)
110 # Verify the offsets actually point to the claimed text
111 if start >= 0 and end <= len(original_text):
112 actual = original_text[start:end]
113 if actual != matched_text and matched_text:
114 # LLM gave wrong offsets — re-locate
115 start, end = _find_unused_occurrence(original_text, matched_text, used_offsets)
116 if start is None:
117 continue
118 elif matched_text:
119 # No offsets provided — find unused occurrence in original
120 start, end = _find_unused_occurrence(original_text, matched_text, used_offsets)
121 if start is None:
122 continue
124 if start is None or end is None:
125 continue
127 used_offsets.add(start)
128 replacement = _TYPE_REPLACEMENTS.get(pii_type, f"[{pii_type.upper()}_REDACTED]")
129 matches.append(
130 PiiMatch(
131 pii_type=pii_type,
132 start=start,
133 end=end,
134 matched_text=matched_text,
135 replacement=replacement,
136 )
137 )
139 return matches
140 except (json.JSONDecodeError, ValueError, KeyError):
141 logger.warning("Failed to parse LLM PII response")
142 return []
145def _find_unused_occurrence(text: str, needle: str, used: set[int]) -> tuple[int | None, int | None]:
146 """Find the first occurrence of needle in text that hasn't been used yet."""
147 search_start = 0
148 while True:
149 idx = text.find(needle, search_start)
150 if idx < 0:
151 return None, None
152 if idx not in used:
153 return idx, idx + len(needle)
154 search_start = idx + 1