Coverage for astrocyte/pipeline/fact_extraction.py: 76%

276 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""Structured fact extraction at retain time. 

2 

3Replaces the legacy two-pass approach (text chunking → separate 

4entity-extraction LLM call → separate fact-causal LLM call) with a 

5single LLM call that produces a list of structured facts. Each fact 

6carries five dimensions plus its embedded entities and intra-batch 

7causal relations: 

8 

9- ``what`` — the core factual content (1–2 sentences) 

10- ``when`` — temporal expression in natural language plus optional 

11 ISO ``occurred_start`` / ``occurred_end`` 

12- ``where`` — location or N/A 

13- ``who`` — people involved + relationships 

14- ``why`` — context or N/A 

15- ``fact_type`` — ``"world"`` (objective external fact) vs 

16 ``"experience"`` (first-person event) 

17- ``entities`` — named entities embedded in the fact 

18- ``causal_relations`` — directional ``caused_by`` references to other 

19 facts in the SAME batch by index 

20 

21Why this matters: every downstream signal (semantic embedding, cross- 

22encoder rerank, link expansion, reflect synthesis) gets cleaner inputs 

23when ``who`` and ``when`` are first-class fields rather than buried in 

24free text. Multi-hop and temporal questions especially benefit because 

25the structured fields can be filtered / matched directly. 

26 

27Cost: single LLM call per retain text replaces two (entity + causal). 

28Net cost approximately equal; output substantially richer. 

29 

30The module also exposes :func:`materialize_facts` which converts a 

31list of :class:`ExtractedFact` objects into the three artefacts the 

32orchestrator needs to persist: 

33 

34- :class:`VectorItem` per fact (replaces chunk-based VectorItems) 

35- :class:`Entity` per unique entity mentioned across the batch 

36- :class:`MemoryLink` per causal relation (memory-to-memory edges) 

37""" 

38 

39from __future__ import annotations 

40 

41import asyncio 

42import json 

43import logging 

44import re 

45import uuid 

46from dataclasses import dataclass, field 

47from datetime import datetime, timezone 

48 

49from astrocyte.types import ( 

50 Entity, 

51 MemoryLink, 

52 Message, 

53 VectorItem, 

54) 

55 

56_logger = logging.getLogger("astrocyte.fact_extraction") 

57 

58 

59# --------------------------------------------------------------------------- 

60# Data shapes 

61# --------------------------------------------------------------------------- 

62 

63 

64@dataclass 

65class FactEntity: 

66 """An entity mentioned within a fact.""" 

67 

68 name: str 

69 entity_type: str = "OTHER" # PERSON, ORG, LOCATION, CONCEPT, OTHER 

70 

71 

72@dataclass 

73class FactCausalRelation: 

74 """``caused_by`` reference from THIS fact to another fact in the 

75 same batch. Hindsight-style: target_index < source_index is 

76 enforced upstream when building MemoryLinks (we drop self-loops 

77 and out-of-range references).""" 

78 

79 target_fact_index: int 

80 strength: float = 1.0 # 0..1, defaults to strong 

81 

82 

83@dataclass 

84class ExtractedFact: 

85 """A single structured fact extracted from retain text. 

86 

87 The 5-dimension schema (what/when/where/who/why) materializes the 

88 semantic content the LLM extracted, plus embedded entities and 

89 causal links between facts in the same batch. 

90 """ 

91 

92 what: str # core content 

93 when: str = "N/A" # natural-language time expression 

94 where: str = "N/A" 

95 who: str = "N/A" 

96 why: str = "N/A" 

97 fact_type: str = "experience" # "world" | "experience" 

98 occurred_start: datetime | None = None 

99 occurred_end: datetime | None = None 

100 entities: list[FactEntity] = field(default_factory=list) 

101 causal_relations: list[FactCausalRelation] = field(default_factory=list) 

102 

103 

104# --------------------------------------------------------------------------- 

105# Prompt 

106# --------------------------------------------------------------------------- 

107 

108 

109_VERBATIM_SYSTEM_PROMPT = """\ 

110You enrich PRE-CHUNKED text with structured metadata. Do NOT rewrite \ 

111or paraphrase the chunk text — the caller will use the original chunk \ 

112text verbatim. Your job is to produce per-chunk metadata. 

113 

114Output a JSON object: {"facts": [...]}. The facts list MUST be the \ 

115same length as the input chunks list, in the same order. For each \ 

116chunk, produce ONE entry with: 

117- "when" (string, default "N/A"): natural-language time expression \ 

118present in this chunk; "N/A" otherwise. 

119- "where" (string, default "N/A"): location. 

120- "who" (string, default "N/A"): people involved. 

121- "why" (string, default "N/A"): reason / motivation if explicit. 

122- "fact_type" ("world" | "experience"): classify the chunk. 

123- "occurred_start" (ISO 8601 string or null): resolve "when" to an \ 

124absolute date when possible; null otherwise. 

125- "occurred_end" (ISO 8601 string or null): instant or end of range. 

126- "entities" (list): each entity mentioned in this chunk, as \ 

127{"name": str, "entity_type": str}. Types: PERSON, ORG, LOCATION, \ 

128PRODUCT, CONCEPT, OTHER. 

129 

130Rules: 

1311. Output exactly one entry per input chunk, in the same order. 

1322. Don't invent. Use "N/A" / null / [] for absent metadata. 

1333. Output JSON only. 

134""" 

135 

136 

137# JSON Schema for OpenAI structured outputs (Phase 2 of cost-control 

138# port). When the provider supports ``response_format=json_schema`` (set 

139# at the call site), the model is decode-time constrained to this shape 

140# and malformed-JSON parse failures become impossible. Strict mode 

141# requires every property in ``required`` and ``additionalProperties: 

142# false`` at every level — keep it that way when extending. 

143# 

144# ``causal_relations`` is intentionally omitted from the schema: 

145# ``causal_links.enabled`` defaults to false in our research configs, 

146# and OpenAI strict-mode JSON schema enforces required-everywhere which 

147# makes optional cross-chunk index references awkward. If we re-enable 

148# causal extraction, add a separate schema variant rather than bolting 

149# it on here. 

150_VERBATIM_JSON_SCHEMA: dict = { 

151 "name": "verbatim_facts", 

152 "strict": True, 

153 "schema": { 

154 "type": "object", 

155 "additionalProperties": False, 

156 "required": ["facts"], 

157 "properties": { 

158 "facts": { 

159 "type": "array", 

160 "items": { 

161 "type": "object", 

162 "additionalProperties": False, 

163 "required": [ 

164 "when", 

165 "where", 

166 "who", 

167 "why", 

168 "fact_type", 

169 "occurred_start", 

170 "occurred_end", 

171 "entities", 

172 ], 

173 "properties": { 

174 "when": {"type": "string"}, 

175 "where": {"type": "string"}, 

176 "who": {"type": "string"}, 

177 "why": {"type": "string"}, 

178 "fact_type": { 

179 "type": "string", 

180 "enum": ["world", "experience"], 

181 }, 

182 "occurred_start": {"type": ["string", "null"]}, 

183 "occurred_end": {"type": ["string", "null"]}, 

184 "entities": { 

185 "type": "array", 

186 "items": { 

187 "type": "object", 

188 "additionalProperties": False, 

189 "required": ["name", "entity_type"], 

190 "properties": { 

191 "name": {"type": "string"}, 

192 "entity_type": { 

193 "type": "string", 

194 "enum": [ 

195 "PERSON", 

196 "ORG", 

197 "LOCATION", 

198 "PRODUCT", 

199 "CONCEPT", 

200 "OTHER", 

201 ], 

202 }, 

203 }, 

204 }, 

205 }, 

206 }, 

207 }, 

208 } 

209 }, 

210 }, 

211} 

212 

213 

214def _build_verbatim_user_prompt( 

215 chunk_texts: list[str], 

216 event_date: datetime | None = None, 

217) -> str: 

218 lines = [] 

219 if event_date is not None: 

220 lines.append(f"Reference date for relative time expressions: {event_date.isoformat()}") 

221 lines.append("") 

222 lines.append(f"Chunks ({len(chunk_texts)} total, indexed):") 

223 for i, text in enumerate(chunk_texts): 

224 snippet = text.strip() 

225 if len(snippet) > 800: 

226 snippet = snippet[:797] + "..." 

227 lines.append(f"[{i}] {snippet}") 

228 lines.append("") 

229 lines.append("Per-chunk metadata (JSON, same order, same length):") 

230 return "\n".join(lines) 

231 

232 

233# --------------------------------------------------------------------------- 

234# Per-chunk parallel verbatim extraction (Phase 3 of cost-control port) 

235# --------------------------------------------------------------------------- 

236# 

237# Splits the verbatim metadata extraction into one LLM call per chunk, 

238# dispatched in parallel via ``asyncio.gather`` with a ``Semaphore`` 

239# bounding in-flight calls per session. Trade-offs vs the batched 

240# ``extract_facts_verbatim``: 

241# 

242# - Output tokens per call are tiny (~150 tokens for one entry vs ~150 * 

243# N for N-chunk batched), so per-call latency drops sharply on 

244# gpt-4o-mini (latency is roughly linear in output tokens). 

245# - More API calls overall, but they run in parallel, so wall time drops. 

246# - Cross-chunk ``causal_relations`` index references are dropped — they 

247# require co-located chunks in one prompt to reference. Acceptable 

248# default because ``causal_links.enabled`` is false in our research 

249# configs; if you need causal extraction, use the batched path. 

250 

251_VERBATIM_SINGLE_SYSTEM_PROMPT = """\ 

252You enrich a single PRE-CHUNKED text excerpt with structured metadata. \ 

253Do NOT rewrite or paraphrase the chunk — produce metadata only. 

254 

255Output a JSON object with these fields: 

256- "when" (string, default "N/A"): natural-language time expression in \ 

257the chunk; "N/A" otherwise. 

258- "where" (string, default "N/A"): location. 

259- "who" (string, default "N/A"): people involved. 

260- "why" (string, default "N/A"): reason / motivation if explicit. 

261- "fact_type" ("world" | "experience"): classify the chunk. 

262- "occurred_start" (ISO 8601 string or null): resolve "when" to an \ 

263absolute date when possible; null otherwise. 

264- "occurred_end" (ISO 8601 string or null): instant or end of range. 

265- "entities" (list): each entity as {"name": str, "entity_type": str}. \ 

266Types: PERSON, ORG, LOCATION, PRODUCT, CONCEPT, OTHER. 

267 

268Don't invent. Use "N/A" / null / [] for absent metadata. Output JSON \ 

269only. 

270""" 

271 

272 

273_VERBATIM_SINGLE_JSON_SCHEMA: dict = { 

274 "name": "verbatim_chunk_metadata", 

275 "strict": True, 

276 "schema": { 

277 "type": "object", 

278 "additionalProperties": False, 

279 "required": [ 

280 "when", 

281 "where", 

282 "who", 

283 "why", 

284 "fact_type", 

285 "occurred_start", 

286 "occurred_end", 

287 "entities", 

288 ], 

289 "properties": { 

290 "when": {"type": "string"}, 

291 "where": {"type": "string"}, 

292 "who": {"type": "string"}, 

293 "why": {"type": "string"}, 

294 "fact_type": { 

295 "type": "string", 

296 "enum": ["world", "experience"], 

297 }, 

298 "occurred_start": {"type": ["string", "null"]}, 

299 "occurred_end": {"type": ["string", "null"]}, 

300 "entities": { 

301 "type": "array", 

302 "items": { 

303 "type": "object", 

304 "additionalProperties": False, 

305 "required": ["name", "entity_type"], 

306 "properties": { 

307 "name": {"type": "string"}, 

308 "entity_type": { 

309 "type": "string", 

310 "enum": [ 

311 "PERSON", 

312 "ORG", 

313 "LOCATION", 

314 "PRODUCT", 

315 "CONCEPT", 

316 "OTHER", 

317 ], 

318 }, 

319 }, 

320 }, 

321 }, 

322 }, 

323 }, 

324} 

325 

326 

327def _build_verbatim_single_user_prompt( 

328 chunk: str, 

329 event_date: datetime | None = None, 

330) -> str: 

331 lines = [] 

332 if event_date is not None: 

333 lines.append(f"Reference date for relative time expressions: {event_date.isoformat()}") 

334 lines.append("") 

335 lines.append("Chunk:") 

336 snippet = chunk.strip() 

337 # Larger cap than batched (which truncates at 800 chars per chunk 

338 # to fit many in one prompt) — single-chunk path can afford the 

339 # full chunk text up to chunk_max_size. 

340 if len(snippet) > 8000: 

341 snippet = snippet[:7997] + "..." 

342 lines.append(snippet) 

343 lines.append("") 

344 lines.append("Metadata (JSON):") 

345 return "\n".join(lines) 

346 

347 

348@dataclass(slots=True, frozen=True) 

349class _ChunkResult: 

350 """Result of one parallel verbatim-extraction task. 

351 

352 Internal record paired with each input chunk. ``raw`` is the 

353 parsed metadata dict from the LLM (vendor-shaped JSON, no fixed 

354 schema beyond the constraints in ``_VERBATIM_SINGLE_JSON_SCHEMA``) 

355 or ``{}`` when extraction fell through after retries. 

356 """ 

357 

358 idx: int 

359 raw: dict 

360 

361 

362class _VerbatimChunkError(Exception): 

363 """Raised by ``_extract_one_chunk_verbatim_attempt`` on any failure 

364 that should be retried — LLM-side exception, malformed JSON, etc. 

365 Internal to the retry wrapper; never escapes the public surface.""" 

366 

367 

368async def _extract_one_chunk_verbatim_attempt( 

369 chunk: str, 

370 llm_provider, 

371 *, 

372 event_date: datetime | None = None, 

373) -> dict: 

374 """Single attempt at extracting verbatim metadata for ONE chunk. 

375 Raises :class:`_VerbatimChunkError` on any failure so the outer 

376 retry loop can back off and try again.""" 

377 messages_in = [ 

378 Message(role="system", content=_VERBATIM_SINGLE_SYSTEM_PROMPT), 

379 Message( 

380 role="user", 

381 content=_build_verbatim_single_user_prompt(chunk, event_date), 

382 ), 

383 ] 

384 try: 

385 completion = await llm_provider.complete( 

386 messages_in, 

387 # Per-chunk output is one metadata object; tiny in absolute 

388 # terms, but at chunk_max_size=2048 with dialogue-dense 

389 # chunks the LLM can emit dozens of entities and the JSON 

390 # blows past 512 tokens, getting truncated mid-emit. 

391 # Structured outputs can't recover from a hard max_tokens 

392 # cap, so we get parse failures even with json_schema mode. 

393 # 2048 gives ~10× headroom and is still tiny vs the 

394 # batched path's 4096 ceiling. 

395 max_tokens=2048, 

396 temperature=0.0, 

397 response_format={ 

398 "type": "json_schema", 

399 "json_schema": _VERBATIM_SINGLE_JSON_SCHEMA, 

400 }, 

401 ) 

402 except TypeError: 

403 # Provider's complete() pre-dates response_format kwarg — 

404 # retry once without (still JSON-parseable thanks to the 

405 # prompt). A real failure on this fallback raises through 

406 # the surrounding except. 

407 try: 

408 completion = await llm_provider.complete( 

409 messages_in, 

410 max_tokens=2048, 

411 temperature=0.0, 

412 ) 

413 except Exception as exc: 

414 raise _VerbatimChunkError(f"LLM call failed: {exc}") from exc 

415 except Exception as exc: 

416 raise _VerbatimChunkError(f"LLM call failed: {exc}") from exc 

417 parsed = _parse_json_object(completion.text) 

418 if parsed is None: 

419 raise _VerbatimChunkError("malformed JSON response") 

420 return parsed 

421 

422 

423async def _extract_one_chunk_verbatim( 

424 chunk: str, 

425 llm_provider, 

426 *, 

427 event_date: datetime | None = None, 

428 max_retries: int = 3, 

429 base_retry_delay: float = 2.0, 

430) -> dict: 

431 """Extract verbatim metadata for ONE chunk with retries. 

432 

433 Phase 4 of cost-control port — wraps 

434 :func:`_extract_one_chunk_verbatim_attempt` with the same retry 

435 policy Hindsight uses for ``_extract_chunk_with_retry``: up to 

436 ``max_retries`` attempts with exponential backoff 

437 (``base_retry_delay`` × 2^attempt). On final exhaustion returns 

438 ``{}`` so the caller falls through to a metadata-less 

439 ExtractedFact (chunk text still preserved). 

440 

441 Tests that need to exercise the bare single-attempt failure path 

442 can pass ``max_retries=1``. 

443 """ 

444 if not chunk.strip(): 

445 return {} 

446 last_exc: BaseException | None = None 

447 for attempt in range(max(1, max_retries)): 

448 try: 

449 return await _extract_one_chunk_verbatim_attempt( 

450 chunk, 

451 llm_provider, 

452 event_date=event_date, 

453 ) 

454 except _VerbatimChunkError as exc: 

455 last_exc = exc 

456 if attempt < max_retries - 1: 

457 delay = base_retry_delay * (2**attempt) 

458 _logger.warning( 

459 "fact_extraction (verbatim/parallel) chunk attempt %d/%d failed (%s); retrying in %.0fs", 

460 attempt + 1, 

461 max_retries, 

462 exc, 

463 delay, 

464 ) 

465 await asyncio.sleep(delay) 

466 _logger.warning( 

467 "fact_extraction (verbatim/parallel) chunk exhausted %d retries (%s); " 

468 "falling through to metadata-less ExtractedFact", 

469 max_retries, 

470 last_exc, 

471 ) 

472 return {} 

473 

474 

475def _build_extracted_fact_from_raw( 

476 chunk: str, 

477 raw: dict, 

478) -> ExtractedFact: 

479 """Convert one parsed metadata dict + the original chunk text into 

480 an :class:`ExtractedFact`. Shared by the batched and per-chunk 

481 parallel paths.""" 

482 entities: list[FactEntity] = [] 

483 for ent in raw.get("entities") or []: 

484 if not isinstance(ent, dict): 

485 continue 

486 name = str(ent.get("name") or "").strip() 

487 if not name: 

488 continue 

489 etype = str(ent.get("entity_type") or "OTHER").strip().upper() or "OTHER" 

490 entities.append(FactEntity(name=name, entity_type=etype)) 

491 ftype = str(raw.get("fact_type") or "experience").strip().lower() 

492 if ftype not in {"world", "experience"}: 

493 ftype = "experience" 

494 return ExtractedFact( 

495 what=chunk, 

496 when=str(raw.get("when") or "N/A").strip() or "N/A", 

497 where=str(raw.get("where") or "N/A").strip() or "N/A", 

498 who=str(raw.get("who") or "N/A").strip() or "N/A", 

499 why=str(raw.get("why") or "N/A").strip() or "N/A", 

500 fact_type=ftype, 

501 occurred_start=_parse_iso_datetime(raw.get("occurred_start")), 

502 occurred_end=_parse_iso_datetime(raw.get("occurred_end")), 

503 entities=entities, 

504 # Per-chunk parallel path drops cross-chunk causal_relations. 

505 # Caller using this helper from the batched path can override. 

506 causal_relations=[], 

507 ) 

508 

509 

510async def extract_facts_verbatim_parallel( 

511 chunk_texts: list[str], 

512 llm_provider, 

513 *, 

514 event_date: datetime | None = None, 

515 max_concurrency: int = 6, 

516 max_retries: int = 3, 

517 base_retry_delay: float = 2.0, 

518) -> list[ExtractedFact]: 

519 """Per-chunk parallel verbatim extraction. 

520 

521 Phase 3 of the Hindsight cost-control port. Sends one LLM call per 

522 chunk in parallel, bounded by ``max_concurrency`` per session call. 

523 Returns one :class:`ExtractedFact` per input chunk in the same 

524 order; failed chunks get a metadata-less ExtractedFact preserving 

525 the chunk text. 

526 

527 When to use vs :func:`extract_facts_verbatim`: 

528 - Sessions with many small chunks (LME-shaped traffic, ~6 chunks 

529 after Phase 1 chunk_size=2048): per-chunk parallel typically 

530 ~2× faster wall time because per-call output is small and they 

531 run concurrently. 

532 - Sessions with very few chunks (1–2): batched is comparable; the 

533 extra round trips don't pay for themselves. 

534 

535 Drops cross-chunk ``causal_relations`` index references — they make 

536 no sense per-chunk. ``causal_links.enabled=false`` in our research 

537 configs, so this is a non-loss; if causal extraction is on, prefer 

538 the batched path. 

539 """ 

540 if not chunk_texts: 

541 return [] 

542 if not any(t.strip() for t in chunk_texts): 

543 return [] 

544 sem = asyncio.Semaphore(max(1, max_concurrency)) 

545 

546 async def _one(idx: int, text: str) -> _ChunkResult: 

547 async with sem: 

548 return _ChunkResult( 

549 idx=idx, 

550 raw=await _extract_one_chunk_verbatim( 

551 text, 

552 llm_provider, 

553 event_date=event_date, 

554 max_retries=max_retries, 

555 base_retry_delay=base_retry_delay, 

556 ), 

557 ) 

558 

559 gathered = await asyncio.gather( 

560 *[_one(i, t) for i, t in enumerate(chunk_texts)], 

561 return_exceptions=True, 

562 ) 

563 

564 # Reassemble in input order. asyncio.gather preserves order, but we 

565 # store explicit ``_ChunkResult`` records to defend against future 

566 # refactors that change the dispatch pattern. 

567 raw_by_idx: dict[int, dict] = {} 

568 for r in gathered: 

569 if isinstance(r, BaseException): 

570 # The semaphore-wrapped coroutine swallows exceptions inside 

571 # _extract_one_chunk_verbatim and returns {}, so reaching 

572 # this branch means a programming error or task 

573 # cancellation. Log and continue with an empty metadata. 

574 _logger.warning( 

575 "fact_extraction (verbatim/parallel) task raised: %r", 

576 r, 

577 ) 

578 continue 

579 raw_by_idx[r.idx] = r.raw if isinstance(r.raw, dict) else {} 

580 

581 return [_build_extracted_fact_from_raw(chunk, raw_by_idx.get(idx, {})) for idx, chunk in enumerate(chunk_texts)] 

582 

583 

584# --------------------------------------------------------------------------- 

585# JSON parsing helpers 

586# --------------------------------------------------------------------------- 

587 

588 

589def _parse_json_object(raw: str) -> dict | None: 

590 text = raw.strip() 

591 if text.startswith("```"): 

592 text = re.sub(r"^```(?:json)?\s*", "", text) 

593 text = re.sub(r"\s*```$", "", text) 

594 match = re.search(r"\{.*\}", text, re.DOTALL) 

595 if match is None: 

596 return None 

597 try: 

598 payload = json.loads(match.group(0)) 

599 except json.JSONDecodeError: 

600 return None 

601 return payload if isinstance(payload, dict) else None 

602 

603 

604def _parse_iso_datetime(value) -> datetime | None: 

605 if not value: 

606 return None 

607 if isinstance(value, datetime): 

608 return value if value.tzinfo else value.replace(tzinfo=timezone.utc) 

609 if not isinstance(value, str): 

610 return None 

611 s = value.strip() 

612 if not s: 

613 return None 

614 # Tolerate trailing 'Z' (Python 3.11+ handles it, but be defensive). 

615 if s.endswith("Z"): 

616 s = s[:-1] + "+00:00" 

617 try: 

618 dt = datetime.fromisoformat(s) 

619 except ValueError: 

620 return None 

621 return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc) 

622 

623 

624# --------------------------------------------------------------------------- 

625# Extraction 

626# --------------------------------------------------------------------------- 

627 

628 

629async def extract_facts_verbatim( 

630 chunk_texts: list[str], 

631 llm_provider, 

632 *, 

633 event_date: datetime | None = None, 

634) -> list[ExtractedFact]: 

635 """Extract per-chunk metadata WITHOUT paraphrasing the chunk text. 

636 

637 The "what" field of each returned :class:`ExtractedFact` is set to 

638 the original chunk text — not an LLM-generated summary. The LLM's 

639 job here is only to produce structured metadata (entities, 

640 causal_relations, temporal range, fact_type, where/who/why 

641 annotations) per chunk. 

642 

643 Why this exists (the design lesson from 2026-05-02): 

644 The "concise" mode :func:`extract_facts` replaces conversation text 

645 with structured paraphrases like "Caroline went hiking | Involving: 

646 Caroline | When: yesterday". That paraphrase loses the surface 

647 vocabulary of the original conversation, which question embeddings 

648 typically share — causing severe recall_hit_rate degradation. The 

649 verbatim mode preserves the original vocabulary while still 

650 enriching each chunk with the structured metadata needed for 

651 causal/temporal/per-fact retrieval signals. 

652 

653 Returns one :class:`ExtractedFact` per input chunk, in the same 

654 order, with ``what`` = the chunk text. Returns ``[]`` on any 

655 failure so retain falls back to legacy chunking. 

656 

657 Args: 

658 chunk_texts: List of pre-chunked source texts; one per memory. 

659 llm_provider: Producer of the metadata extraction. 

660 event_date: Reference for resolving relative time expressions. 

661 """ 

662 if not chunk_texts: 

663 return [] 

664 # Prefilter: drop empty chunks but preserve indices for the LLM. 

665 if not any(t.strip() for t in chunk_texts): 

666 return [] 

667 

668 # Phase 2 of cost-control port: prefer structured outputs so the 

669 # decoder is constrained to ``_VERBATIM_JSON_SCHEMA`` and the 

670 # "malformed JSON" failure mode becomes impossible. Falls back 

671 # transparently to the legacy free-form path when the provider 

672 # doesn't accept the kwarg (legacy fakes / non-OpenAI providers 

673 # that haven't ported the SPI extension yet). 

674 messages_in = [ 

675 Message(role="system", content=_VERBATIM_SYSTEM_PROMPT), 

676 Message( 

677 role="user", 

678 content=_build_verbatim_user_prompt(chunk_texts, event_date), 

679 ), 

680 ] 

681 try: 

682 completion = await llm_provider.complete( 

683 messages_in, 

684 max_tokens=4096, 

685 temperature=0.0, 

686 response_format={"type": "json_schema", "json_schema": _VERBATIM_JSON_SCHEMA}, 

687 ) 

688 except TypeError: 

689 # Provider's complete() pre-dates the response_format kwarg — 

690 # retry without it. Free-form path is still resilient via the 

691 # _parse_json_object fallback below. 

692 try: 

693 completion = await llm_provider.complete( 

694 messages_in, 

695 max_tokens=4096, 

696 temperature=0.0, 

697 ) 

698 except Exception as exc: 

699 _logger.warning("fact_extraction (verbatim) LLM call failed (%s)", exc) 

700 return [] 

701 except Exception as exc: 

702 _logger.warning("fact_extraction (verbatim) LLM call failed (%s)", exc) 

703 return [] 

704 

705 parsed = _parse_json_object(completion.text) 

706 if parsed is None: 

707 _logger.warning("fact_extraction (verbatim): malformed JSON response") 

708 return [] 

709 raw_metadata = parsed.get("facts") 

710 if not isinstance(raw_metadata, list): 

711 _logger.warning("fact_extraction (verbatim): 'facts' is not a list") 

712 return [] 

713 

714 out: list[ExtractedFact] = [] 

715 for idx, chunk in enumerate(chunk_texts): 

716 # Pull the matching metadata entry by index. When the LLM 

717 # returned fewer entries than chunks, the trailing chunks get 

718 # bare metadata-less ExtractedFacts (still preserves chunk text). 

719 raw = raw_metadata[idx] if idx < len(raw_metadata) else {} 

720 if not isinstance(raw, dict): 

721 raw = {} 

722 

723 # Entities 

724 entities: list[FactEntity] = [] 

725 for ent in raw.get("entities") or []: 

726 if not isinstance(ent, dict): 

727 continue 

728 name = str(ent.get("name") or "").strip() 

729 if not name: 

730 continue 

731 etype = str(ent.get("entity_type") or "OTHER").strip().upper() or "OTHER" 

732 entities.append(FactEntity(name=name, entity_type=etype)) 

733 

734 # Causal relations — same semantics as concise mode but indices 

735 # reference the chunk position (which IS the memory position). 

736 causal: list[FactCausalRelation] = [] 

737 for rel in raw.get("causal_relations") or []: 

738 if not isinstance(rel, dict): 

739 continue 

740 try: 

741 target = int(rel.get("target_fact_index")) 

742 except (TypeError, ValueError): 

743 continue 

744 if target == idx or not (0 <= target < len(chunk_texts)): 

745 continue 

746 try: 

747 strength = float(rel.get("strength", 1.0)) 

748 except (TypeError, ValueError): 

749 strength = 1.0 

750 causal.append(FactCausalRelation(target_fact_index=target, strength=strength)) 

751 

752 ftype = str(raw.get("fact_type") or "experience").strip().lower() 

753 if ftype not in {"world", "experience"}: 

754 ftype = "experience" 

755 

756 out.append( 

757 ExtractedFact( 

758 # KEY: "what" is the ORIGINAL chunk text, not a paraphrase. 

759 what=chunk, 

760 when=str(raw.get("when") or "N/A").strip() or "N/A", 

761 where=str(raw.get("where") or "N/A").strip() or "N/A", 

762 who=str(raw.get("who") or "N/A").strip() or "N/A", 

763 why=str(raw.get("why") or "N/A").strip() or "N/A", 

764 fact_type=ftype, 

765 occurred_start=_parse_iso_datetime(raw.get("occurred_start")), 

766 occurred_end=_parse_iso_datetime(raw.get("occurred_end")), 

767 entities=entities, 

768 causal_relations=causal, 

769 ) 

770 ) 

771 return out 

772 

773 

774# --------------------------------------------------------------------------- 

775# Materialization 

776# --------------------------------------------------------------------------- 

777 

778 

779def _slug(value: str) -> str: 

780 return re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-") or "unknown" 

781 

782 

783@dataclass 

784class MaterializedFacts: 

785 """The retain-pipeline artefacts produced from a fact-extraction batch. 

786 

787 - ``vector_items`` — one VectorItem per fact, ready for store_vectors. 

788 The item's ``id`` is also the fact's identifier for causal/entity 

789 linkage. Order matches the input ``ExtractedFact`` list. 

790 - ``entities`` — deduplicated Entity list across the batch (one 

791 entity per unique (name, type) tuple). Each entity uses a 

792 deterministic ID ``{type}:{slug}`` so subsequent retain calls 

793 collapse repeated mentions to the same canonical row. 

794 - ``memory_entity_associations`` — list of (vector_item_id, 

795 entity_id) tuples for the orchestrator's 

796 link_memories_to_entities call. 

797 - ``memory_links`` — directional caused_by edges; source is the 

798 effect's memory ID, target is the cause's memory ID. 

799 """ 

800 

801 vector_items: list[VectorItem] 

802 entities: list[Entity] 

803 memory_entity_associations: list[tuple[str, str]] 

804 memory_links: list[MemoryLink] 

805 

806 

807def materialize_facts( 

808 facts: list[ExtractedFact], 

809 *, 

810 bank_id: str, 

811 tags: list[str] | None = None, 

812 metadata: dict | None = None, 

813 occurred_at: datetime | None = None, 

814 embeddings: list[list[float]] | None = None, 

815) -> MaterializedFacts: 

816 """Convert extracted facts to retain-pipeline artefacts. 

817 

818 The orchestrator wires this output into its existing storage path 

819 (store_vectors → store_entities → link_memories_to_entities → 

820 store_memory_links) without further translation. 

821 

822 The VectorItem's ``text`` is always the raw chunk content 

823 (``fact.what``) — verbatim mode is now the only supported flow 

824 after the M9 concise-path removal. Storing the original vocabulary 

825 is what makes question embeddings match against retained memories; 

826 decorated text (``what | Involving: ... | When: ...``) caused 

827 severe recall_hit_rate degradation (2026-05-02 finding). 

828 

829 Args: 

830 facts: Output of :func:`extract_facts_verbatim` / 

831 :func:`extract_facts_verbatim_parallel`. 

832 bank_id: Target bank. 

833 tags: Tags applied to every produced VectorItem. 

834 metadata: Base metadata merged onto every VectorItem; the 

835 fact's structured fields (when/where/who/why/fact_type) 

836 are written under ``_fact_*`` prefixed keys for downstream 

837 queries that want to filter/promote on them. 

838 occurred_at: Default timestamp when a fact lacks 

839 ``occurred_start``. Typically the retain request's 

840 ``occurred_at``. 

841 embeddings: Optional pre-computed embeddings per fact (must 

842 match ``len(facts)``). When omitted, items are created 

843 with empty vectors — caller is responsible for embedding. 

844 """ 

845 base_metadata = dict(metadata or {}) 

846 base_tags = list(tags or []) 

847 now = datetime.now(timezone.utc) 

848 

849 # Step 1: deduplicate entities across the batch by (name, type). 

850 # Entity IDs are deterministic — same name+type produces same ID. 

851 entity_by_key: dict[tuple[str, str], Entity] = {} 

852 for fact in facts: 

853 for fent in fact.entities: 

854 key = (fent.name.strip().lower(), fent.entity_type.strip().upper()) 

855 if key in entity_by_key: 

856 continue 

857 eid = f"{key[1].lower()}:{_slug(fent.name)}" 

858 entity_by_key[key] = Entity( 

859 id=eid, 

860 name=fent.name, 

861 entity_type=fent.entity_type, 

862 aliases=[fent.name], 

863 metadata={"source": "fact_extraction"}, 

864 ) 

865 entities = list(entity_by_key.values()) 

866 

867 # Step 2: build VectorItems, one per fact. The IDs flow through 

868 # causal_relations resolution below. 

869 items: list[VectorItem] = [] 

870 associations: list[tuple[str, str]] = [] 

871 for idx, fact in enumerate(facts): 

872 item_id = uuid.uuid4().hex 

873 fact_metadata = dict(base_metadata) 

874 # Promote structured dimensions into metadata so downstream 

875 # consumers can filter/rerank on them. Use the ``_fact_*`` 

876 # prefix to avoid colliding with caller-supplied keys. 

877 if fact.when and fact.when.upper() != "N/A": 

878 fact_metadata["_fact_when"] = fact.when 

879 if fact.where and fact.where.upper() != "N/A": 

880 fact_metadata["_fact_where"] = fact.where 

881 if fact.who and fact.who.upper() != "N/A": 

882 fact_metadata["_fact_who"] = fact.who 

883 if fact.why and fact.why.upper() != "N/A": 

884 fact_metadata["_fact_why"] = fact.why 

885 fact_metadata["_fact_type"] = fact.fact_type 

886 if fact.occurred_start is not None: 

887 fact_metadata["_fact_occurred_start"] = fact.occurred_start.isoformat() 

888 if fact.occurred_end is not None: 

889 fact_metadata["_fact_occurred_end"] = fact.occurred_end.isoformat() 

890 

891 vector = embeddings[idx] if embeddings is not None and idx < len(embeddings) else [] 

892 

893 items.append( 

894 VectorItem( 

895 id=item_id, 

896 bank_id=bank_id, 

897 vector=vector, 

898 # Always store the raw chunk text so question embeddings 

899 # match against the original vocabulary. The decorated- 

900 # text path (concise mode) was removed in M9. 

901 text=fact.what, 

902 metadata=fact_metadata, 

903 tags=list(base_tags), 

904 fact_type=fact.fact_type, 

905 memory_layer="raw", 

906 occurred_at=fact.occurred_start or occurred_at, 

907 retained_at=now, 

908 ) 

909 ) 

910 

911 # Association: link this memory to each entity it mentions. 

912 for fent in fact.entities: 

913 key = (fent.name.strip().lower(), fent.entity_type.strip().upper()) 

914 ent = entity_by_key.get(key) 

915 if ent is not None: 

916 associations.append((item_id, ent.id)) 

917 

918 # Step 3: causal relations → MemoryLinks. Source is the EFFECT 

919 # (the fact making the claim); target is the CAUSE. 

920 memory_links: list[MemoryLink] = [] 

921 for src_idx, fact in enumerate(facts): 

922 for rel in fact.causal_relations: 

923 tgt_idx = rel.target_fact_index 

924 if tgt_idx == src_idx: 

925 continue # self-loop 

926 if not (0 <= tgt_idx < len(facts)): 

927 continue 

928 try: 

929 strength = float(rel.strength) 

930 except (TypeError, ValueError): 

931 strength = 1.0 

932 memory_links.append( 

933 MemoryLink( 

934 source_memory_id=items[src_idx].id, 

935 target_memory_id=items[tgt_idx].id, 

936 link_type="caused_by", 

937 confidence=min(1.0, max(0.0, strength)), 

938 weight=1.0, 

939 created_at=now, 

940 metadata={"bank_id": bank_id, "source": "fact_extraction"}, 

941 ) 

942 ) 

943 

944 return MaterializedFacts( 

945 vector_items=items, 

946 entities=entities, 

947 memory_entity_associations=associations, 

948 memory_links=memory_links, 

949 )