Coverage for astrocyte/pipeline/extraction.py: 91%

218 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""M3 — inbound extraction: normalize → chunking profile resolution → metadata/tags. 

2 

3Pipeline shape: **Raw → Normalizer → Chunker → Entity extract (optional) → retain**. 

4 

5See ``docs/_design/product-roadmap.md`` (M3) and ``built-in-pipeline.md``. 

6""" 

7 

8from __future__ import annotations 

9 

10import dataclasses 

11import json 

12import re 

13from dataclasses import dataclass 

14from functools import lru_cache 

15 

16import yaml 

17 

18from astrocyte.config import AstrocyteConfig, ExtractionProfileConfig, SourceConfig 

19from astrocyte.mip.schema import ChunkerSpec 

20from astrocyte.types import Metadata, MetadataValue, RetainRequest 

21 

22_STRATEGY_ALIASES: dict[str, str] = { 

23 "semantic": "sentence", 

24 "dialogue": "dialogue", 

25 "sentence": "sentence", 

26 "paragraph": "paragraph", 

27 "fixed": "fixed", 

28} 

29 

30# Code defaults; packaged ``extraction_builtin.yaml`` (if present) merges on top, then user config. 

31BUILTIN_EXTRACTION_PROFILES: dict[str, ExtractionProfileConfig] = { 

32 "builtin_text": ExtractionProfileConfig( 

33 content_type="text", 

34 chunking_strategy="sentence", 

35 ), 

36 "builtin_conversation": ExtractionProfileConfig( 

37 content_type="conversation", 

38 chunking_strategy="dialogue", 

39 ), 

40 "locomo_conversation": ExtractionProfileConfig( 

41 content_type="conversation", 

42 chunking_strategy="dialogue", 

43 entity_extraction="metadata", 

44 fact_type="experience", 

45 ), 

46 "locomo_persona": ExtractionProfileConfig( 

47 content_type="document", 

48 chunking_strategy="paragraph", 

49 entity_extraction="metadata", 

50 fact_type="wiki", 

51 ), 

52} 

53 

54 

55def _extraction_profile_from_mapping(data: dict[str, object]) -> ExtractionProfileConfig: 

56 valid = {f.name for f in dataclasses.fields(ExtractionProfileConfig)} 

57 filtered = {k: v for k, v in data.items() if k in valid} 

58 return ExtractionProfileConfig(**filtered) 

59 

60 

61@lru_cache(maxsize=1) 

62def _packaged_yaml_builtin_profiles() -> dict[str, ExtractionProfileConfig]: 

63 """Load optional ``extraction_builtin.yaml`` next to this module (wheel-safe via importlib.resources).""" 

64 try: 

65 from importlib import resources 

66 

67 path = resources.files("astrocyte.pipeline").joinpath("extraction_builtin.yaml") 

68 raw = path.read_text(encoding="utf-8") 

69 except (OSError, TypeError, FileNotFoundError, ValueError, ModuleNotFoundError): 

70 return {} 

71 loaded = yaml.safe_load(raw) or {} 

72 if not isinstance(loaded, dict): 

73 return {} 

74 out: dict[str, ExtractionProfileConfig] = {} 

75 for name, pdata in loaded.items(): 

76 if isinstance(pdata, dict): 

77 out[str(name)] = _extraction_profile_from_mapping(pdata) 

78 return out 

79 

80 

81def _all_builtin_profiles() -> dict[str, ExtractionProfileConfig]: 

82 """Code builtins, overridden/extended by packaged YAML when shipped.""" 

83 return {**BUILTIN_EXTRACTION_PROFILES, **_packaged_yaml_builtin_profiles()} 

84 

85 

86def merged_user_and_builtin_profiles( 

87 user: dict[str, ExtractionProfileConfig] | None, 

88) -> dict[str, ExtractionProfileConfig]: 

89 """Merge packaged + code builtins with an optional user table (user wins on name clash).""" 

90 return {**_all_builtin_profiles(), **(user or {})} 

91 

92 

93def merged_extraction_profiles(config: AstrocyteConfig) -> dict[str, ExtractionProfileConfig]: 

94 """Return builtins plus ``config.extraction_profiles`` (user definitions override same-named builtins).""" 

95 return merged_user_and_builtin_profiles(config.extraction_profiles) 

96 

97 

98def extraction_profile_for_source( 

99 source_id: str, 

100 sources: dict[str, SourceConfig] | None, 

101) -> str | None: 

102 """Resolve ``sources.*.extraction_profile`` for ingest callers (full webhook wiring is M4).""" 

103 if not sources: 

104 return None 

105 src = sources.get(source_id) 

106 if src is None: 

107 return None 

108 return src.extraction_profile 

109 

110 

111def effective_content_type(request_ct: str, profile: ExtractionProfileConfig | None) -> str: 

112 """Use profile ``content_type`` when set; otherwise ``RetainRequest.content_type``.""" 

113 if profile is not None and profile.content_type: 

114 return str(profile.content_type).strip().lower() 

115 return (request_ct or "text").strip().lower() or "text" 

116 

117 

118def _normalize_chunking_strategy(name: str) -> str: 

119 key = (name or "").strip().lower() 

120 if key in _STRATEGY_ALIASES: 

121 return _STRATEGY_ALIASES[key] 

122 raise ValueError(f"Unknown chunking strategy: {name!r}") 

123 

124 

125@dataclass(frozen=True) 

126class ChunkingDecision: 

127 """Resolved chunking parameters for one retain call. 

128 

129 ``overlap`` is ``None`` when no source set it explicitly — callers should 

130 fall through to ``chunk_text``'s default. 

131 """ 

132 

133 strategy: str 

134 max_size: int 

135 overlap: int | None = None 

136 

137 

138def resolve_retain_chunking( 

139 content_type: str, 

140 *, 

141 profile: ExtractionProfileConfig | None, 

142 default_strategy: str, 

143 default_max_chunk_size: int, 

144 mip_chunker: ChunkerSpec | None = None, 

145) -> ChunkingDecision: 

146 """Pick chunking parameters for :class:`~astrocyte.types.RetainRequest`. 

147 

148 Precedence (highest to lowest): 

149 1. ``mip_chunker`` — per-rule overrides from a MIP ``RoutingDecision`` 

150 2. ``profile`` — per-source ``ExtractionProfileConfig`` 

151 3. ``content_type`` — built-in routing for known types 

152 4. ``default_strategy`` / ``default_max_chunk_size`` — orchestrator defaults 

153 

154 Within each layer, individual fields are independent — a MIP override that 

155 only sets ``max_size`` still allows ``profile`` or ``content_type`` to 

156 determine the strategy. 

157 """ 

158 # Layer 4: defaults 

159 strategy: str = default_strategy 

160 max_size: int = default_max_chunk_size 

161 overlap: int | None = None 

162 

163 # Layer 3: content_type 

164 ct = (content_type or "").strip().lower() 

165 if ct in ("conversation", "transcript"): 

166 strategy = "dialogue" 

167 elif ct in ("document", "email"): 

168 strategy = "paragraph" 

169 elif ct == "event": 

170 strategy = "sentence" 

171 

172 # Layer 2: profile 

173 if profile is not None: 

174 if profile.chunk_size is not None: 

175 max_size = int(profile.chunk_size) 

176 if profile.chunking_strategy: 

177 strategy = _normalize_chunking_strategy(str(profile.chunking_strategy)) 

178 

179 # Layer 1: MIP override (highest precedence) 

180 if mip_chunker is not None: 

181 if mip_chunker.strategy is not None: 

182 strategy = _normalize_chunking_strategy(mip_chunker.strategy) 

183 if mip_chunker.max_size is not None: 

184 max_size = int(mip_chunker.max_size) 

185 if mip_chunker.overlap is not None: 

186 overlap = int(mip_chunker.overlap) 

187 

188 return ChunkingDecision(strategy=strategy, max_size=max_size, overlap=overlap) 

189 

190 

191# --- Line endings & per-type normalizer (conservative heuristics) --- 

192 

193 

194def _normalize_line_endings(s: str) -> str: 

195 return s.replace("\r\n", "\n").replace("\r", "\n") 

196 

197 

198def _looks_like_rfc_headers(block: str) -> bool: 

199 lines = [ln for ln in block.split("\n") if ln.strip()] 

200 if len(lines) < 2: 

201 return False 

202 headerish = sum(1 for ln in lines if re.match(r"^[A-Za-z][A-Za-z0-9-]*:", ln)) 

203 return headerish >= 2 and headerish >= len(lines) - 1 

204 

205 

206def _normalize_email_body(s: str) -> str: 

207 if "\n\n" in s: 

208 head, rest = s.split("\n\n", 1) 

209 if _looks_like_rfc_headers(head): 

210 s = rest 

211 if "\n-- \n" in s: 

212 s = s.split("\n-- \n", 1)[0] 

213 return s.strip() 

214 

215 

216def _collapse_blank_runs(s: str) -> str: 

217 return re.sub(r"\n{3,}", "\n\n", s.strip()) 

218 

219 

220def normalize_content(raw: str, content_type: str, *, profile: ExtractionProfileConfig | None = None) -> str: 

221 """Normalize raw inbound text for chunking/embed (stage before chunker). 

222 

223 Per-type behavior is intentionally shallow: line-ending cleanup, light email/event handling, 

224 and transcript/document blank-line collapse. MIME decoding and full mail parsing are **not** done here. 

225 """ 

226 _ = profile # reserved for future profile-driven normalizer options 

227 ct = (content_type or "text").strip().lower() 

228 text = raw or "" 

229 text = text.lstrip("\ufeff") 

230 text = _normalize_line_endings(text) 

231 

232 if ct == "email": 

233 text = _normalize_email_body(text) 

234 elif ct == "document": 

235 text = _collapse_blank_runs(text) 

236 elif ct in ("conversation", "transcript"): 

237 text = _collapse_blank_runs(text) 

238 elif ct == "event": 

239 text = text.strip() 

240 else: 

241 text = text.strip() 

242 return text 

243 

244 

245def _json_path_get(root: object, path: str) -> MetadataValue | None: 

246 """Minimal ``$.a.b`` style path for JSON objects (leading ``$`` optional).""" 

247 p = path.strip() 

248 if p.startswith("$"): 

249 p = p[1:] 

250 if p.startswith("."): 

251 p = p[1:] 

252 if not p: 

253 return None 

254 cur: object = root 

255 for part in p.split("."): 

256 if not isinstance(cur, dict) or part not in cur: 

257 return None 

258 cur = cur[part] 

259 if isinstance(cur, (str, int, float, bool)) or cur is None: 

260 return cur 

261 if isinstance(cur, list): 

262 return json.dumps(cur) 

263 return str(cur) 

264 

265 

266def apply_metadata_mapping( 

267 content: str, 

268 profile: ExtractionProfileConfig | None, 

269) -> Metadata | None: 

270 """Apply ``ExtractionProfileConfig.metadata_mapping`` (JSON paths → metadata keys).""" 

271 if profile is None or not profile.metadata_mapping: 

272 return None 

273 data: object | None 

274 try: 

275 data = json.loads(content) 

276 except (json.JSONDecodeError, TypeError): 

277 data = None 

278 if not isinstance(data, (dict, list)): 

279 return None 

280 root = data if isinstance(data, dict) else {"$": data} 

281 out: Metadata = {} 

282 for meta_key, spec in profile.metadata_mapping.items(): 

283 path = str(spec).strip() 

284 if not path.startswith("$"): 

285 # Literal string value (no JSON extraction) 

286 out[str(meta_key)] = path 

287 continue 

288 val = _json_path_get(root, path) 

289 if val is not None: 

290 out[str(meta_key)] = val 

291 return out or None 

292 

293 

294def apply_tag_rules(content: str, profile: ExtractionProfileConfig | None) -> list[str] | None: 

295 """Apply ``tag_rules``: ``contains`` / ``match`` substring → ``tags``.""" 

296 if profile is None or not profile.tag_rules: 

297 return None 

298 extra: list[str] = [] 

299 for rule in profile.tag_rules: 

300 if not isinstance(rule, dict): 

301 continue 

302 needle = rule.get("contains") 

303 if needle is None: 

304 needle = rule.get("match") 

305 tags = rule.get("tags") 

306 if needle is None or not tags: 

307 continue 

308 if str(needle) in content: 

309 extra.extend(str(t) for t in tags) 

310 return extra or None 

311 

312 

313def should_extract_entities(profile: ExtractionProfileConfig | None, *, graph_store_configured: bool) -> bool: 

314 """Whether to run LLM entity extraction (requires graph store to persist).""" 

315 if profile is not None and profile.entity_extraction == "metadata": 

316 return True 

317 if not graph_store_configured: 

318 return False 

319 if profile is None: 

320 return True 

321 ee = profile.entity_extraction 

322 if ee is None: 

323 return True 

324 if ee is False or ee == "disabled": 

325 return False 

326 return True 

327 

328 

329def merge_tags(base: list[str] | None, extra: list[str] | None) -> list[str] | None: 

330 if not base and not extra: 

331 return None 

332 ordered: list[str] = [] 

333 for t in (base or []) + (extra or []): 

334 if t and t not in ordered: 

335 ordered.append(t) 

336 return ordered or None 

337 

338 

339def merge_metadata( 

340 mapped: Metadata | None, 

341 request_meta: Metadata | None, 

342) -> Metadata | None: 

343 """Merge profile-derived metadata with request metadata; **request wins** on key conflicts.""" 

344 if not mapped and not request_meta: 

345 return None 

346 out: Metadata = {**(mapped or {}), **(request_meta or {})} 

347 return out or None 

348 

349 

350@dataclass(frozen=True) 

351class PreparedRetainInput: 

352 """Result of Raw → normalizer → metadata/tags merge (before chunk/embed in orchestrator).""" 

353 

354 text: str 

355 metadata: Metadata | None 

356 tags: list[str] | None 

357 extract_entities: bool 

358 effective_content_type: str 

359 fact_type: str 

360 

361 

362def resolve_retain_fact_type(profile: ExtractionProfileConfig | None) -> str: 

363 """Default fact type for stored chunks; profile ``fact_type`` overrides ``world``.""" 

364 if profile is not None and profile.fact_type and str(profile.fact_type).strip(): 

365 return str(profile.fact_type).strip() 

366 return "world" 

367 

368 

369def prepare_retain_input( 

370 request: RetainRequest, 

371 profile: ExtractionProfileConfig | None, 

372 *, 

373 graph_store_configured: bool, 

374) -> PreparedRetainInput: 

375 """Single entrypoint for the pre-chunk extraction chain (normalizer + profile fields).""" 

376 ect = effective_content_type(request.content_type, profile) 

377 normalized = normalize_content(request.content, ect, profile=profile) 

378 mapped = apply_metadata_mapping(normalized, profile) 

379 merged_meta = merge_metadata(mapped, request.metadata) 

380 rule_tags = apply_tag_rules(normalized, profile) 

381 merged_tags = merge_tags(request.tags, rule_tags) 

382 do_entities = should_extract_entities(profile, graph_store_configured=graph_store_configured) 

383 return PreparedRetainInput( 

384 text=normalized, 

385 metadata=merged_meta, 

386 tags=merged_tags, 

387 extract_entities=do_entities, 

388 effective_content_type=ect, 

389 fact_type=resolve_retain_fact_type(profile), 

390 )