Coverage for astrocyte/pipeline/extraction.py: 91%
218 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""M3 — inbound extraction: normalize → chunking profile resolution → metadata/tags.
3Pipeline shape: **Raw → Normalizer → Chunker → Entity extract (optional) → retain**.
5See ``docs/_design/product-roadmap.md`` (M3) and ``built-in-pipeline.md``.
6"""
8from __future__ import annotations
10import dataclasses
11import json
12import re
13from dataclasses import dataclass
14from functools import lru_cache
16import yaml
18from astrocyte.config import AstrocyteConfig, ExtractionProfileConfig, SourceConfig
19from astrocyte.mip.schema import ChunkerSpec
20from astrocyte.types import Metadata, MetadataValue, RetainRequest
22_STRATEGY_ALIASES: dict[str, str] = {
23 "semantic": "sentence",
24 "dialogue": "dialogue",
25 "sentence": "sentence",
26 "paragraph": "paragraph",
27 "fixed": "fixed",
28}
30# Code defaults; packaged ``extraction_builtin.yaml`` (if present) merges on top, then user config.
31BUILTIN_EXTRACTION_PROFILES: dict[str, ExtractionProfileConfig] = {
32 "builtin_text": ExtractionProfileConfig(
33 content_type="text",
34 chunking_strategy="sentence",
35 ),
36 "builtin_conversation": ExtractionProfileConfig(
37 content_type="conversation",
38 chunking_strategy="dialogue",
39 ),
40 "locomo_conversation": ExtractionProfileConfig(
41 content_type="conversation",
42 chunking_strategy="dialogue",
43 entity_extraction="metadata",
44 fact_type="experience",
45 ),
46 "locomo_persona": ExtractionProfileConfig(
47 content_type="document",
48 chunking_strategy="paragraph",
49 entity_extraction="metadata",
50 fact_type="wiki",
51 ),
52}
55def _extraction_profile_from_mapping(data: dict[str, object]) -> ExtractionProfileConfig:
56 valid = {f.name for f in dataclasses.fields(ExtractionProfileConfig)}
57 filtered = {k: v for k, v in data.items() if k in valid}
58 return ExtractionProfileConfig(**filtered)
61@lru_cache(maxsize=1)
62def _packaged_yaml_builtin_profiles() -> dict[str, ExtractionProfileConfig]:
63 """Load optional ``extraction_builtin.yaml`` next to this module (wheel-safe via importlib.resources)."""
64 try:
65 from importlib import resources
67 path = resources.files("astrocyte.pipeline").joinpath("extraction_builtin.yaml")
68 raw = path.read_text(encoding="utf-8")
69 except (OSError, TypeError, FileNotFoundError, ValueError, ModuleNotFoundError):
70 return {}
71 loaded = yaml.safe_load(raw) or {}
72 if not isinstance(loaded, dict):
73 return {}
74 out: dict[str, ExtractionProfileConfig] = {}
75 for name, pdata in loaded.items():
76 if isinstance(pdata, dict):
77 out[str(name)] = _extraction_profile_from_mapping(pdata)
78 return out
81def _all_builtin_profiles() -> dict[str, ExtractionProfileConfig]:
82 """Code builtins, overridden/extended by packaged YAML when shipped."""
83 return {**BUILTIN_EXTRACTION_PROFILES, **_packaged_yaml_builtin_profiles()}
86def merged_user_and_builtin_profiles(
87 user: dict[str, ExtractionProfileConfig] | None,
88) -> dict[str, ExtractionProfileConfig]:
89 """Merge packaged + code builtins with an optional user table (user wins on name clash)."""
90 return {**_all_builtin_profiles(), **(user or {})}
93def merged_extraction_profiles(config: AstrocyteConfig) -> dict[str, ExtractionProfileConfig]:
94 """Return builtins plus ``config.extraction_profiles`` (user definitions override same-named builtins)."""
95 return merged_user_and_builtin_profiles(config.extraction_profiles)
98def extraction_profile_for_source(
99 source_id: str,
100 sources: dict[str, SourceConfig] | None,
101) -> str | None:
102 """Resolve ``sources.*.extraction_profile`` for ingest callers (full webhook wiring is M4)."""
103 if not sources:
104 return None
105 src = sources.get(source_id)
106 if src is None:
107 return None
108 return src.extraction_profile
111def effective_content_type(request_ct: str, profile: ExtractionProfileConfig | None) -> str:
112 """Use profile ``content_type`` when set; otherwise ``RetainRequest.content_type``."""
113 if profile is not None and profile.content_type:
114 return str(profile.content_type).strip().lower()
115 return (request_ct or "text").strip().lower() or "text"
118def _normalize_chunking_strategy(name: str) -> str:
119 key = (name or "").strip().lower()
120 if key in _STRATEGY_ALIASES:
121 return _STRATEGY_ALIASES[key]
122 raise ValueError(f"Unknown chunking strategy: {name!r}")
125@dataclass(frozen=True)
126class ChunkingDecision:
127 """Resolved chunking parameters for one retain call.
129 ``overlap`` is ``None`` when no source set it explicitly — callers should
130 fall through to ``chunk_text``'s default.
131 """
133 strategy: str
134 max_size: int
135 overlap: int | None = None
138def resolve_retain_chunking(
139 content_type: str,
140 *,
141 profile: ExtractionProfileConfig | None,
142 default_strategy: str,
143 default_max_chunk_size: int,
144 mip_chunker: ChunkerSpec | None = None,
145) -> ChunkingDecision:
146 """Pick chunking parameters for :class:`~astrocyte.types.RetainRequest`.
148 Precedence (highest to lowest):
149 1. ``mip_chunker`` — per-rule overrides from a MIP ``RoutingDecision``
150 2. ``profile`` — per-source ``ExtractionProfileConfig``
151 3. ``content_type`` — built-in routing for known types
152 4. ``default_strategy`` / ``default_max_chunk_size`` — orchestrator defaults
154 Within each layer, individual fields are independent — a MIP override that
155 only sets ``max_size`` still allows ``profile`` or ``content_type`` to
156 determine the strategy.
157 """
158 # Layer 4: defaults
159 strategy: str = default_strategy
160 max_size: int = default_max_chunk_size
161 overlap: int | None = None
163 # Layer 3: content_type
164 ct = (content_type or "").strip().lower()
165 if ct in ("conversation", "transcript"):
166 strategy = "dialogue"
167 elif ct in ("document", "email"):
168 strategy = "paragraph"
169 elif ct == "event":
170 strategy = "sentence"
172 # Layer 2: profile
173 if profile is not None:
174 if profile.chunk_size is not None:
175 max_size = int(profile.chunk_size)
176 if profile.chunking_strategy:
177 strategy = _normalize_chunking_strategy(str(profile.chunking_strategy))
179 # Layer 1: MIP override (highest precedence)
180 if mip_chunker is not None:
181 if mip_chunker.strategy is not None:
182 strategy = _normalize_chunking_strategy(mip_chunker.strategy)
183 if mip_chunker.max_size is not None:
184 max_size = int(mip_chunker.max_size)
185 if mip_chunker.overlap is not None:
186 overlap = int(mip_chunker.overlap)
188 return ChunkingDecision(strategy=strategy, max_size=max_size, overlap=overlap)
191# --- Line endings & per-type normalizer (conservative heuristics) ---
194def _normalize_line_endings(s: str) -> str:
195 return s.replace("\r\n", "\n").replace("\r", "\n")
198def _looks_like_rfc_headers(block: str) -> bool:
199 lines = [ln for ln in block.split("\n") if ln.strip()]
200 if len(lines) < 2:
201 return False
202 headerish = sum(1 for ln in lines if re.match(r"^[A-Za-z][A-Za-z0-9-]*:", ln))
203 return headerish >= 2 and headerish >= len(lines) - 1
206def _normalize_email_body(s: str) -> str:
207 if "\n\n" in s:
208 head, rest = s.split("\n\n", 1)
209 if _looks_like_rfc_headers(head):
210 s = rest
211 if "\n-- \n" in s:
212 s = s.split("\n-- \n", 1)[0]
213 return s.strip()
216def _collapse_blank_runs(s: str) -> str:
217 return re.sub(r"\n{3,}", "\n\n", s.strip())
220def normalize_content(raw: str, content_type: str, *, profile: ExtractionProfileConfig | None = None) -> str:
221 """Normalize raw inbound text for chunking/embed (stage before chunker).
223 Per-type behavior is intentionally shallow: line-ending cleanup, light email/event handling,
224 and transcript/document blank-line collapse. MIME decoding and full mail parsing are **not** done here.
225 """
226 _ = profile # reserved for future profile-driven normalizer options
227 ct = (content_type or "text").strip().lower()
228 text = raw or ""
229 text = text.lstrip("\ufeff")
230 text = _normalize_line_endings(text)
232 if ct == "email":
233 text = _normalize_email_body(text)
234 elif ct == "document":
235 text = _collapse_blank_runs(text)
236 elif ct in ("conversation", "transcript"):
237 text = _collapse_blank_runs(text)
238 elif ct == "event":
239 text = text.strip()
240 else:
241 text = text.strip()
242 return text
245def _json_path_get(root: object, path: str) -> MetadataValue | None:
246 """Minimal ``$.a.b`` style path for JSON objects (leading ``$`` optional)."""
247 p = path.strip()
248 if p.startswith("$"):
249 p = p[1:]
250 if p.startswith("."):
251 p = p[1:]
252 if not p:
253 return None
254 cur: object = root
255 for part in p.split("."):
256 if not isinstance(cur, dict) or part not in cur:
257 return None
258 cur = cur[part]
259 if isinstance(cur, (str, int, float, bool)) or cur is None:
260 return cur
261 if isinstance(cur, list):
262 return json.dumps(cur)
263 return str(cur)
266def apply_metadata_mapping(
267 content: str,
268 profile: ExtractionProfileConfig | None,
269) -> Metadata | None:
270 """Apply ``ExtractionProfileConfig.metadata_mapping`` (JSON paths → metadata keys)."""
271 if profile is None or not profile.metadata_mapping:
272 return None
273 data: object | None
274 try:
275 data = json.loads(content)
276 except (json.JSONDecodeError, TypeError):
277 data = None
278 if not isinstance(data, (dict, list)):
279 return None
280 root = data if isinstance(data, dict) else {"$": data}
281 out: Metadata = {}
282 for meta_key, spec in profile.metadata_mapping.items():
283 path = str(spec).strip()
284 if not path.startswith("$"):
285 # Literal string value (no JSON extraction)
286 out[str(meta_key)] = path
287 continue
288 val = _json_path_get(root, path)
289 if val is not None:
290 out[str(meta_key)] = val
291 return out or None
294def apply_tag_rules(content: str, profile: ExtractionProfileConfig | None) -> list[str] | None:
295 """Apply ``tag_rules``: ``contains`` / ``match`` substring → ``tags``."""
296 if profile is None or not profile.tag_rules:
297 return None
298 extra: list[str] = []
299 for rule in profile.tag_rules:
300 if not isinstance(rule, dict):
301 continue
302 needle = rule.get("contains")
303 if needle is None:
304 needle = rule.get("match")
305 tags = rule.get("tags")
306 if needle is None or not tags:
307 continue
308 if str(needle) in content:
309 extra.extend(str(t) for t in tags)
310 return extra or None
313def should_extract_entities(profile: ExtractionProfileConfig | None, *, graph_store_configured: bool) -> bool:
314 """Whether to run LLM entity extraction (requires graph store to persist)."""
315 if profile is not None and profile.entity_extraction == "metadata":
316 return True
317 if not graph_store_configured:
318 return False
319 if profile is None:
320 return True
321 ee = profile.entity_extraction
322 if ee is None:
323 return True
324 if ee is False or ee == "disabled":
325 return False
326 return True
329def merge_tags(base: list[str] | None, extra: list[str] | None) -> list[str] | None:
330 if not base and not extra:
331 return None
332 ordered: list[str] = []
333 for t in (base or []) + (extra or []):
334 if t and t not in ordered:
335 ordered.append(t)
336 return ordered or None
339def merge_metadata(
340 mapped: Metadata | None,
341 request_meta: Metadata | None,
342) -> Metadata | None:
343 """Merge profile-derived metadata with request metadata; **request wins** on key conflicts."""
344 if not mapped and not request_meta:
345 return None
346 out: Metadata = {**(mapped or {}), **(request_meta or {})}
347 return out or None
350@dataclass(frozen=True)
351class PreparedRetainInput:
352 """Result of Raw → normalizer → metadata/tags merge (before chunk/embed in orchestrator)."""
354 text: str
355 metadata: Metadata | None
356 tags: list[str] | None
357 extract_entities: bool
358 effective_content_type: str
359 fact_type: str
362def resolve_retain_fact_type(profile: ExtractionProfileConfig | None) -> str:
363 """Default fact type for stored chunks; profile ``fact_type`` overrides ``world``."""
364 if profile is not None and profile.fact_type and str(profile.fact_type).strip():
365 return str(profile.fact_type).strip()
366 return "world"
369def prepare_retain_input(
370 request: RetainRequest,
371 profile: ExtractionProfileConfig | None,
372 *,
373 graph_store_configured: bool,
374) -> PreparedRetainInput:
375 """Single entrypoint for the pre-chunk extraction chain (normalizer + profile fields)."""
376 ect = effective_content_type(request.content_type, profile)
377 normalized = normalize_content(request.content, ect, profile=profile)
378 mapped = apply_metadata_mapping(normalized, profile)
379 merged_meta = merge_metadata(mapped, request.metadata)
380 rule_tags = apply_tag_rules(normalized, profile)
381 merged_tags = merge_tags(request.tags, rule_tags)
382 do_entities = should_extract_entities(profile, graph_store_configured=graph_store_configured)
383 return PreparedRetainInput(
384 text=normalized,
385 metadata=merged_meta,
386 tags=merged_tags,
387 extract_entities=do_entities,
388 effective_content_type=ect,
389 fact_type=resolve_retain_fact_type(profile),
390 )