Coverage for astrocyte/pipeline/fact_extraction.py: 76%
276 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""Structured fact extraction at retain time.
3Replaces the legacy two-pass approach (text chunking → separate
4entity-extraction LLM call → separate fact-causal LLM call) with a
5single LLM call that produces a list of structured facts. Each fact
6carries five dimensions plus its embedded entities and intra-batch
7causal relations:
9- ``what`` — the core factual content (1–2 sentences)
10- ``when`` — temporal expression in natural language plus optional
11 ISO ``occurred_start`` / ``occurred_end``
12- ``where`` — location or N/A
13- ``who`` — people involved + relationships
14- ``why`` — context or N/A
15- ``fact_type`` — ``"world"`` (objective external fact) vs
16 ``"experience"`` (first-person event)
17- ``entities`` — named entities embedded in the fact
18- ``causal_relations`` — directional ``caused_by`` references to other
19 facts in the SAME batch by index
21Why this matters: every downstream signal (semantic embedding, cross-
22encoder rerank, link expansion, reflect synthesis) gets cleaner inputs
23when ``who`` and ``when`` are first-class fields rather than buried in
24free text. Multi-hop and temporal questions especially benefit because
25the structured fields can be filtered / matched directly.
27Cost: single LLM call per retain text replaces two (entity + causal).
28Net cost approximately equal; output substantially richer.
30The module also exposes :func:`materialize_facts` which converts a
31list of :class:`ExtractedFact` objects into the three artefacts the
32orchestrator needs to persist:
34- :class:`VectorItem` per fact (replaces chunk-based VectorItems)
35- :class:`Entity` per unique entity mentioned across the batch
36- :class:`MemoryLink` per causal relation (memory-to-memory edges)
37"""
39from __future__ import annotations
41import asyncio
42import json
43import logging
44import re
45import uuid
46from dataclasses import dataclass, field
47from datetime import datetime, timezone
49from astrocyte.types import (
50 Entity,
51 MemoryLink,
52 Message,
53 VectorItem,
54)
56_logger = logging.getLogger("astrocyte.fact_extraction")
59# ---------------------------------------------------------------------------
60# Data shapes
61# ---------------------------------------------------------------------------
64@dataclass
65class FactEntity:
66 """An entity mentioned within a fact."""
68 name: str
69 entity_type: str = "OTHER" # PERSON, ORG, LOCATION, CONCEPT, OTHER
72@dataclass
73class FactCausalRelation:
74 """``caused_by`` reference from THIS fact to another fact in the
75 same batch. Hindsight-style: target_index < source_index is
76 enforced upstream when building MemoryLinks (we drop self-loops
77 and out-of-range references)."""
79 target_fact_index: int
80 strength: float = 1.0 # 0..1, defaults to strong
83@dataclass
84class ExtractedFact:
85 """A single structured fact extracted from retain text.
87 The 5-dimension schema (what/when/where/who/why) materializes the
88 semantic content the LLM extracted, plus embedded entities and
89 causal links between facts in the same batch.
90 """
92 what: str # core content
93 when: str = "N/A" # natural-language time expression
94 where: str = "N/A"
95 who: str = "N/A"
96 why: str = "N/A"
97 fact_type: str = "experience" # "world" | "experience"
98 occurred_start: datetime | None = None
99 occurred_end: datetime | None = None
100 entities: list[FactEntity] = field(default_factory=list)
101 causal_relations: list[FactCausalRelation] = field(default_factory=list)
104# ---------------------------------------------------------------------------
105# Prompt
106# ---------------------------------------------------------------------------
109_VERBATIM_SYSTEM_PROMPT = """\
110You enrich PRE-CHUNKED text with structured metadata. Do NOT rewrite \
111or paraphrase the chunk text — the caller will use the original chunk \
112text verbatim. Your job is to produce per-chunk metadata.
114Output a JSON object: {"facts": [...]}. The facts list MUST be the \
115same length as the input chunks list, in the same order. For each \
116chunk, produce ONE entry with:
117- "when" (string, default "N/A"): natural-language time expression \
118present in this chunk; "N/A" otherwise.
119- "where" (string, default "N/A"): location.
120- "who" (string, default "N/A"): people involved.
121- "why" (string, default "N/A"): reason / motivation if explicit.
122- "fact_type" ("world" | "experience"): classify the chunk.
123- "occurred_start" (ISO 8601 string or null): resolve "when" to an \
124absolute date when possible; null otherwise.
125- "occurred_end" (ISO 8601 string or null): instant or end of range.
126- "entities" (list): each entity mentioned in this chunk, as \
127{"name": str, "entity_type": str}. Types: PERSON, ORG, LOCATION, \
128PRODUCT, CONCEPT, OTHER.
130Rules:
1311. Output exactly one entry per input chunk, in the same order.
1322. Don't invent. Use "N/A" / null / [] for absent metadata.
1333. Output JSON only.
134"""
137# JSON Schema for OpenAI structured outputs (Phase 2 of cost-control
138# port). When the provider supports ``response_format=json_schema`` (set
139# at the call site), the model is decode-time constrained to this shape
140# and malformed-JSON parse failures become impossible. Strict mode
141# requires every property in ``required`` and ``additionalProperties:
142# false`` at every level — keep it that way when extending.
143#
144# ``causal_relations`` is intentionally omitted from the schema:
145# ``causal_links.enabled`` defaults to false in our research configs,
146# and OpenAI strict-mode JSON schema enforces required-everywhere which
147# makes optional cross-chunk index references awkward. If we re-enable
148# causal extraction, add a separate schema variant rather than bolting
149# it on here.
150_VERBATIM_JSON_SCHEMA: dict = {
151 "name": "verbatim_facts",
152 "strict": True,
153 "schema": {
154 "type": "object",
155 "additionalProperties": False,
156 "required": ["facts"],
157 "properties": {
158 "facts": {
159 "type": "array",
160 "items": {
161 "type": "object",
162 "additionalProperties": False,
163 "required": [
164 "when",
165 "where",
166 "who",
167 "why",
168 "fact_type",
169 "occurred_start",
170 "occurred_end",
171 "entities",
172 ],
173 "properties": {
174 "when": {"type": "string"},
175 "where": {"type": "string"},
176 "who": {"type": "string"},
177 "why": {"type": "string"},
178 "fact_type": {
179 "type": "string",
180 "enum": ["world", "experience"],
181 },
182 "occurred_start": {"type": ["string", "null"]},
183 "occurred_end": {"type": ["string", "null"]},
184 "entities": {
185 "type": "array",
186 "items": {
187 "type": "object",
188 "additionalProperties": False,
189 "required": ["name", "entity_type"],
190 "properties": {
191 "name": {"type": "string"},
192 "entity_type": {
193 "type": "string",
194 "enum": [
195 "PERSON",
196 "ORG",
197 "LOCATION",
198 "PRODUCT",
199 "CONCEPT",
200 "OTHER",
201 ],
202 },
203 },
204 },
205 },
206 },
207 },
208 }
209 },
210 },
211}
214def _build_verbatim_user_prompt(
215 chunk_texts: list[str],
216 event_date: datetime | None = None,
217) -> str:
218 lines = []
219 if event_date is not None:
220 lines.append(f"Reference date for relative time expressions: {event_date.isoformat()}")
221 lines.append("")
222 lines.append(f"Chunks ({len(chunk_texts)} total, indexed):")
223 for i, text in enumerate(chunk_texts):
224 snippet = text.strip()
225 if len(snippet) > 800:
226 snippet = snippet[:797] + "..."
227 lines.append(f"[{i}] {snippet}")
228 lines.append("")
229 lines.append("Per-chunk metadata (JSON, same order, same length):")
230 return "\n".join(lines)
233# ---------------------------------------------------------------------------
234# Per-chunk parallel verbatim extraction (Phase 3 of cost-control port)
235# ---------------------------------------------------------------------------
236#
237# Splits the verbatim metadata extraction into one LLM call per chunk,
238# dispatched in parallel via ``asyncio.gather`` with a ``Semaphore``
239# bounding in-flight calls per session. Trade-offs vs the batched
240# ``extract_facts_verbatim``:
241#
242# - Output tokens per call are tiny (~150 tokens for one entry vs ~150 *
243# N for N-chunk batched), so per-call latency drops sharply on
244# gpt-4o-mini (latency is roughly linear in output tokens).
245# - More API calls overall, but they run in parallel, so wall time drops.
246# - Cross-chunk ``causal_relations`` index references are dropped — they
247# require co-located chunks in one prompt to reference. Acceptable
248# default because ``causal_links.enabled`` is false in our research
249# configs; if you need causal extraction, use the batched path.
251_VERBATIM_SINGLE_SYSTEM_PROMPT = """\
252You enrich a single PRE-CHUNKED text excerpt with structured metadata. \
253Do NOT rewrite or paraphrase the chunk — produce metadata only.
255Output a JSON object with these fields:
256- "when" (string, default "N/A"): natural-language time expression in \
257the chunk; "N/A" otherwise.
258- "where" (string, default "N/A"): location.
259- "who" (string, default "N/A"): people involved.
260- "why" (string, default "N/A"): reason / motivation if explicit.
261- "fact_type" ("world" | "experience"): classify the chunk.
262- "occurred_start" (ISO 8601 string or null): resolve "when" to an \
263absolute date when possible; null otherwise.
264- "occurred_end" (ISO 8601 string or null): instant or end of range.
265- "entities" (list): each entity as {"name": str, "entity_type": str}. \
266Types: PERSON, ORG, LOCATION, PRODUCT, CONCEPT, OTHER.
268Don't invent. Use "N/A" / null / [] for absent metadata. Output JSON \
269only.
270"""
273_VERBATIM_SINGLE_JSON_SCHEMA: dict = {
274 "name": "verbatim_chunk_metadata",
275 "strict": True,
276 "schema": {
277 "type": "object",
278 "additionalProperties": False,
279 "required": [
280 "when",
281 "where",
282 "who",
283 "why",
284 "fact_type",
285 "occurred_start",
286 "occurred_end",
287 "entities",
288 ],
289 "properties": {
290 "when": {"type": "string"},
291 "where": {"type": "string"},
292 "who": {"type": "string"},
293 "why": {"type": "string"},
294 "fact_type": {
295 "type": "string",
296 "enum": ["world", "experience"],
297 },
298 "occurred_start": {"type": ["string", "null"]},
299 "occurred_end": {"type": ["string", "null"]},
300 "entities": {
301 "type": "array",
302 "items": {
303 "type": "object",
304 "additionalProperties": False,
305 "required": ["name", "entity_type"],
306 "properties": {
307 "name": {"type": "string"},
308 "entity_type": {
309 "type": "string",
310 "enum": [
311 "PERSON",
312 "ORG",
313 "LOCATION",
314 "PRODUCT",
315 "CONCEPT",
316 "OTHER",
317 ],
318 },
319 },
320 },
321 },
322 },
323 },
324}
327def _build_verbatim_single_user_prompt(
328 chunk: str,
329 event_date: datetime | None = None,
330) -> str:
331 lines = []
332 if event_date is not None:
333 lines.append(f"Reference date for relative time expressions: {event_date.isoformat()}")
334 lines.append("")
335 lines.append("Chunk:")
336 snippet = chunk.strip()
337 # Larger cap than batched (which truncates at 800 chars per chunk
338 # to fit many in one prompt) — single-chunk path can afford the
339 # full chunk text up to chunk_max_size.
340 if len(snippet) > 8000:
341 snippet = snippet[:7997] + "..."
342 lines.append(snippet)
343 lines.append("")
344 lines.append("Metadata (JSON):")
345 return "\n".join(lines)
348@dataclass(slots=True, frozen=True)
349class _ChunkResult:
350 """Result of one parallel verbatim-extraction task.
352 Internal record paired with each input chunk. ``raw`` is the
353 parsed metadata dict from the LLM (vendor-shaped JSON, no fixed
354 schema beyond the constraints in ``_VERBATIM_SINGLE_JSON_SCHEMA``)
355 or ``{}`` when extraction fell through after retries.
356 """
358 idx: int
359 raw: dict
362class _VerbatimChunkError(Exception):
363 """Raised by ``_extract_one_chunk_verbatim_attempt`` on any failure
364 that should be retried — LLM-side exception, malformed JSON, etc.
365 Internal to the retry wrapper; never escapes the public surface."""
368async def _extract_one_chunk_verbatim_attempt(
369 chunk: str,
370 llm_provider,
371 *,
372 event_date: datetime | None = None,
373) -> dict:
374 """Single attempt at extracting verbatim metadata for ONE chunk.
375 Raises :class:`_VerbatimChunkError` on any failure so the outer
376 retry loop can back off and try again."""
377 messages_in = [
378 Message(role="system", content=_VERBATIM_SINGLE_SYSTEM_PROMPT),
379 Message(
380 role="user",
381 content=_build_verbatim_single_user_prompt(chunk, event_date),
382 ),
383 ]
384 try:
385 completion = await llm_provider.complete(
386 messages_in,
387 # Per-chunk output is one metadata object; tiny in absolute
388 # terms, but at chunk_max_size=2048 with dialogue-dense
389 # chunks the LLM can emit dozens of entities and the JSON
390 # blows past 512 tokens, getting truncated mid-emit.
391 # Structured outputs can't recover from a hard max_tokens
392 # cap, so we get parse failures even with json_schema mode.
393 # 2048 gives ~10× headroom and is still tiny vs the
394 # batched path's 4096 ceiling.
395 max_tokens=2048,
396 temperature=0.0,
397 response_format={
398 "type": "json_schema",
399 "json_schema": _VERBATIM_SINGLE_JSON_SCHEMA,
400 },
401 )
402 except TypeError:
403 # Provider's complete() pre-dates response_format kwarg —
404 # retry once without (still JSON-parseable thanks to the
405 # prompt). A real failure on this fallback raises through
406 # the surrounding except.
407 try:
408 completion = await llm_provider.complete(
409 messages_in,
410 max_tokens=2048,
411 temperature=0.0,
412 )
413 except Exception as exc:
414 raise _VerbatimChunkError(f"LLM call failed: {exc}") from exc
415 except Exception as exc:
416 raise _VerbatimChunkError(f"LLM call failed: {exc}") from exc
417 parsed = _parse_json_object(completion.text)
418 if parsed is None:
419 raise _VerbatimChunkError("malformed JSON response")
420 return parsed
423async def _extract_one_chunk_verbatim(
424 chunk: str,
425 llm_provider,
426 *,
427 event_date: datetime | None = None,
428 max_retries: int = 3,
429 base_retry_delay: float = 2.0,
430) -> dict:
431 """Extract verbatim metadata for ONE chunk with retries.
433 Phase 4 of cost-control port — wraps
434 :func:`_extract_one_chunk_verbatim_attempt` with the same retry
435 policy Hindsight uses for ``_extract_chunk_with_retry``: up to
436 ``max_retries`` attempts with exponential backoff
437 (``base_retry_delay`` × 2^attempt). On final exhaustion returns
438 ``{}`` so the caller falls through to a metadata-less
439 ExtractedFact (chunk text still preserved).
441 Tests that need to exercise the bare single-attempt failure path
442 can pass ``max_retries=1``.
443 """
444 if not chunk.strip():
445 return {}
446 last_exc: BaseException | None = None
447 for attempt in range(max(1, max_retries)):
448 try:
449 return await _extract_one_chunk_verbatim_attempt(
450 chunk,
451 llm_provider,
452 event_date=event_date,
453 )
454 except _VerbatimChunkError as exc:
455 last_exc = exc
456 if attempt < max_retries - 1:
457 delay = base_retry_delay * (2**attempt)
458 _logger.warning(
459 "fact_extraction (verbatim/parallel) chunk attempt %d/%d failed (%s); retrying in %.0fs",
460 attempt + 1,
461 max_retries,
462 exc,
463 delay,
464 )
465 await asyncio.sleep(delay)
466 _logger.warning(
467 "fact_extraction (verbatim/parallel) chunk exhausted %d retries (%s); "
468 "falling through to metadata-less ExtractedFact",
469 max_retries,
470 last_exc,
471 )
472 return {}
475def _build_extracted_fact_from_raw(
476 chunk: str,
477 raw: dict,
478) -> ExtractedFact:
479 """Convert one parsed metadata dict + the original chunk text into
480 an :class:`ExtractedFact`. Shared by the batched and per-chunk
481 parallel paths."""
482 entities: list[FactEntity] = []
483 for ent in raw.get("entities") or []:
484 if not isinstance(ent, dict):
485 continue
486 name = str(ent.get("name") or "").strip()
487 if not name:
488 continue
489 etype = str(ent.get("entity_type") or "OTHER").strip().upper() or "OTHER"
490 entities.append(FactEntity(name=name, entity_type=etype))
491 ftype = str(raw.get("fact_type") or "experience").strip().lower()
492 if ftype not in {"world", "experience"}:
493 ftype = "experience"
494 return ExtractedFact(
495 what=chunk,
496 when=str(raw.get("when") or "N/A").strip() or "N/A",
497 where=str(raw.get("where") or "N/A").strip() or "N/A",
498 who=str(raw.get("who") or "N/A").strip() or "N/A",
499 why=str(raw.get("why") or "N/A").strip() or "N/A",
500 fact_type=ftype,
501 occurred_start=_parse_iso_datetime(raw.get("occurred_start")),
502 occurred_end=_parse_iso_datetime(raw.get("occurred_end")),
503 entities=entities,
504 # Per-chunk parallel path drops cross-chunk causal_relations.
505 # Caller using this helper from the batched path can override.
506 causal_relations=[],
507 )
510async def extract_facts_verbatim_parallel(
511 chunk_texts: list[str],
512 llm_provider,
513 *,
514 event_date: datetime | None = None,
515 max_concurrency: int = 6,
516 max_retries: int = 3,
517 base_retry_delay: float = 2.0,
518) -> list[ExtractedFact]:
519 """Per-chunk parallel verbatim extraction.
521 Phase 3 of the Hindsight cost-control port. Sends one LLM call per
522 chunk in parallel, bounded by ``max_concurrency`` per session call.
523 Returns one :class:`ExtractedFact` per input chunk in the same
524 order; failed chunks get a metadata-less ExtractedFact preserving
525 the chunk text.
527 When to use vs :func:`extract_facts_verbatim`:
528 - Sessions with many small chunks (LME-shaped traffic, ~6 chunks
529 after Phase 1 chunk_size=2048): per-chunk parallel typically
530 ~2× faster wall time because per-call output is small and they
531 run concurrently.
532 - Sessions with very few chunks (1–2): batched is comparable; the
533 extra round trips don't pay for themselves.
535 Drops cross-chunk ``causal_relations`` index references — they make
536 no sense per-chunk. ``causal_links.enabled=false`` in our research
537 configs, so this is a non-loss; if causal extraction is on, prefer
538 the batched path.
539 """
540 if not chunk_texts:
541 return []
542 if not any(t.strip() for t in chunk_texts):
543 return []
544 sem = asyncio.Semaphore(max(1, max_concurrency))
546 async def _one(idx: int, text: str) -> _ChunkResult:
547 async with sem:
548 return _ChunkResult(
549 idx=idx,
550 raw=await _extract_one_chunk_verbatim(
551 text,
552 llm_provider,
553 event_date=event_date,
554 max_retries=max_retries,
555 base_retry_delay=base_retry_delay,
556 ),
557 )
559 gathered = await asyncio.gather(
560 *[_one(i, t) for i, t in enumerate(chunk_texts)],
561 return_exceptions=True,
562 )
564 # Reassemble in input order. asyncio.gather preserves order, but we
565 # store explicit ``_ChunkResult`` records to defend against future
566 # refactors that change the dispatch pattern.
567 raw_by_idx: dict[int, dict] = {}
568 for r in gathered:
569 if isinstance(r, BaseException):
570 # The semaphore-wrapped coroutine swallows exceptions inside
571 # _extract_one_chunk_verbatim and returns {}, so reaching
572 # this branch means a programming error or task
573 # cancellation. Log and continue with an empty metadata.
574 _logger.warning(
575 "fact_extraction (verbatim/parallel) task raised: %r",
576 r,
577 )
578 continue
579 raw_by_idx[r.idx] = r.raw if isinstance(r.raw, dict) else {}
581 return [_build_extracted_fact_from_raw(chunk, raw_by_idx.get(idx, {})) for idx, chunk in enumerate(chunk_texts)]
584# ---------------------------------------------------------------------------
585# JSON parsing helpers
586# ---------------------------------------------------------------------------
589def _parse_json_object(raw: str) -> dict | None:
590 text = raw.strip()
591 if text.startswith("```"):
592 text = re.sub(r"^```(?:json)?\s*", "", text)
593 text = re.sub(r"\s*```$", "", text)
594 match = re.search(r"\{.*\}", text, re.DOTALL)
595 if match is None:
596 return None
597 try:
598 payload = json.loads(match.group(0))
599 except json.JSONDecodeError:
600 return None
601 return payload if isinstance(payload, dict) else None
604def _parse_iso_datetime(value) -> datetime | None:
605 if not value:
606 return None
607 if isinstance(value, datetime):
608 return value if value.tzinfo else value.replace(tzinfo=timezone.utc)
609 if not isinstance(value, str):
610 return None
611 s = value.strip()
612 if not s:
613 return None
614 # Tolerate trailing 'Z' (Python 3.11+ handles it, but be defensive).
615 if s.endswith("Z"):
616 s = s[:-1] + "+00:00"
617 try:
618 dt = datetime.fromisoformat(s)
619 except ValueError:
620 return None
621 return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)
624# ---------------------------------------------------------------------------
625# Extraction
626# ---------------------------------------------------------------------------
629async def extract_facts_verbatim(
630 chunk_texts: list[str],
631 llm_provider,
632 *,
633 event_date: datetime | None = None,
634) -> list[ExtractedFact]:
635 """Extract per-chunk metadata WITHOUT paraphrasing the chunk text.
637 The "what" field of each returned :class:`ExtractedFact` is set to
638 the original chunk text — not an LLM-generated summary. The LLM's
639 job here is only to produce structured metadata (entities,
640 causal_relations, temporal range, fact_type, where/who/why
641 annotations) per chunk.
643 Why this exists (the design lesson from 2026-05-02):
644 The "concise" mode :func:`extract_facts` replaces conversation text
645 with structured paraphrases like "Caroline went hiking | Involving:
646 Caroline | When: yesterday". That paraphrase loses the surface
647 vocabulary of the original conversation, which question embeddings
648 typically share — causing severe recall_hit_rate degradation. The
649 verbatim mode preserves the original vocabulary while still
650 enriching each chunk with the structured metadata needed for
651 causal/temporal/per-fact retrieval signals.
653 Returns one :class:`ExtractedFact` per input chunk, in the same
654 order, with ``what`` = the chunk text. Returns ``[]`` on any
655 failure so retain falls back to legacy chunking.
657 Args:
658 chunk_texts: List of pre-chunked source texts; one per memory.
659 llm_provider: Producer of the metadata extraction.
660 event_date: Reference for resolving relative time expressions.
661 """
662 if not chunk_texts:
663 return []
664 # Prefilter: drop empty chunks but preserve indices for the LLM.
665 if not any(t.strip() for t in chunk_texts):
666 return []
668 # Phase 2 of cost-control port: prefer structured outputs so the
669 # decoder is constrained to ``_VERBATIM_JSON_SCHEMA`` and the
670 # "malformed JSON" failure mode becomes impossible. Falls back
671 # transparently to the legacy free-form path when the provider
672 # doesn't accept the kwarg (legacy fakes / non-OpenAI providers
673 # that haven't ported the SPI extension yet).
674 messages_in = [
675 Message(role="system", content=_VERBATIM_SYSTEM_PROMPT),
676 Message(
677 role="user",
678 content=_build_verbatim_user_prompt(chunk_texts, event_date),
679 ),
680 ]
681 try:
682 completion = await llm_provider.complete(
683 messages_in,
684 max_tokens=4096,
685 temperature=0.0,
686 response_format={"type": "json_schema", "json_schema": _VERBATIM_JSON_SCHEMA},
687 )
688 except TypeError:
689 # Provider's complete() pre-dates the response_format kwarg —
690 # retry without it. Free-form path is still resilient via the
691 # _parse_json_object fallback below.
692 try:
693 completion = await llm_provider.complete(
694 messages_in,
695 max_tokens=4096,
696 temperature=0.0,
697 )
698 except Exception as exc:
699 _logger.warning("fact_extraction (verbatim) LLM call failed (%s)", exc)
700 return []
701 except Exception as exc:
702 _logger.warning("fact_extraction (verbatim) LLM call failed (%s)", exc)
703 return []
705 parsed = _parse_json_object(completion.text)
706 if parsed is None:
707 _logger.warning("fact_extraction (verbatim): malformed JSON response")
708 return []
709 raw_metadata = parsed.get("facts")
710 if not isinstance(raw_metadata, list):
711 _logger.warning("fact_extraction (verbatim): 'facts' is not a list")
712 return []
714 out: list[ExtractedFact] = []
715 for idx, chunk in enumerate(chunk_texts):
716 # Pull the matching metadata entry by index. When the LLM
717 # returned fewer entries than chunks, the trailing chunks get
718 # bare metadata-less ExtractedFacts (still preserves chunk text).
719 raw = raw_metadata[idx] if idx < len(raw_metadata) else {}
720 if not isinstance(raw, dict):
721 raw = {}
723 # Entities
724 entities: list[FactEntity] = []
725 for ent in raw.get("entities") or []:
726 if not isinstance(ent, dict):
727 continue
728 name = str(ent.get("name") or "").strip()
729 if not name:
730 continue
731 etype = str(ent.get("entity_type") or "OTHER").strip().upper() or "OTHER"
732 entities.append(FactEntity(name=name, entity_type=etype))
734 # Causal relations — same semantics as concise mode but indices
735 # reference the chunk position (which IS the memory position).
736 causal: list[FactCausalRelation] = []
737 for rel in raw.get("causal_relations") or []:
738 if not isinstance(rel, dict):
739 continue
740 try:
741 target = int(rel.get("target_fact_index"))
742 except (TypeError, ValueError):
743 continue
744 if target == idx or not (0 <= target < len(chunk_texts)):
745 continue
746 try:
747 strength = float(rel.get("strength", 1.0))
748 except (TypeError, ValueError):
749 strength = 1.0
750 causal.append(FactCausalRelation(target_fact_index=target, strength=strength))
752 ftype = str(raw.get("fact_type") or "experience").strip().lower()
753 if ftype not in {"world", "experience"}:
754 ftype = "experience"
756 out.append(
757 ExtractedFact(
758 # KEY: "what" is the ORIGINAL chunk text, not a paraphrase.
759 what=chunk,
760 when=str(raw.get("when") or "N/A").strip() or "N/A",
761 where=str(raw.get("where") or "N/A").strip() or "N/A",
762 who=str(raw.get("who") or "N/A").strip() or "N/A",
763 why=str(raw.get("why") or "N/A").strip() or "N/A",
764 fact_type=ftype,
765 occurred_start=_parse_iso_datetime(raw.get("occurred_start")),
766 occurred_end=_parse_iso_datetime(raw.get("occurred_end")),
767 entities=entities,
768 causal_relations=causal,
769 )
770 )
771 return out
774# ---------------------------------------------------------------------------
775# Materialization
776# ---------------------------------------------------------------------------
779def _slug(value: str) -> str:
780 return re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-") or "unknown"
783@dataclass
784class MaterializedFacts:
785 """The retain-pipeline artefacts produced from a fact-extraction batch.
787 - ``vector_items`` — one VectorItem per fact, ready for store_vectors.
788 The item's ``id`` is also the fact's identifier for causal/entity
789 linkage. Order matches the input ``ExtractedFact`` list.
790 - ``entities`` — deduplicated Entity list across the batch (one
791 entity per unique (name, type) tuple). Each entity uses a
792 deterministic ID ``{type}:{slug}`` so subsequent retain calls
793 collapse repeated mentions to the same canonical row.
794 - ``memory_entity_associations`` — list of (vector_item_id,
795 entity_id) tuples for the orchestrator's
796 link_memories_to_entities call.
797 - ``memory_links`` — directional caused_by edges; source is the
798 effect's memory ID, target is the cause's memory ID.
799 """
801 vector_items: list[VectorItem]
802 entities: list[Entity]
803 memory_entity_associations: list[tuple[str, str]]
804 memory_links: list[MemoryLink]
807def materialize_facts(
808 facts: list[ExtractedFact],
809 *,
810 bank_id: str,
811 tags: list[str] | None = None,
812 metadata: dict | None = None,
813 occurred_at: datetime | None = None,
814 embeddings: list[list[float]] | None = None,
815) -> MaterializedFacts:
816 """Convert extracted facts to retain-pipeline artefacts.
818 The orchestrator wires this output into its existing storage path
819 (store_vectors → store_entities → link_memories_to_entities →
820 store_memory_links) without further translation.
822 The VectorItem's ``text`` is always the raw chunk content
823 (``fact.what``) — verbatim mode is now the only supported flow
824 after the M9 concise-path removal. Storing the original vocabulary
825 is what makes question embeddings match against retained memories;
826 decorated text (``what | Involving: ... | When: ...``) caused
827 severe recall_hit_rate degradation (2026-05-02 finding).
829 Args:
830 facts: Output of :func:`extract_facts_verbatim` /
831 :func:`extract_facts_verbatim_parallel`.
832 bank_id: Target bank.
833 tags: Tags applied to every produced VectorItem.
834 metadata: Base metadata merged onto every VectorItem; the
835 fact's structured fields (when/where/who/why/fact_type)
836 are written under ``_fact_*`` prefixed keys for downstream
837 queries that want to filter/promote on them.
838 occurred_at: Default timestamp when a fact lacks
839 ``occurred_start``. Typically the retain request's
840 ``occurred_at``.
841 embeddings: Optional pre-computed embeddings per fact (must
842 match ``len(facts)``). When omitted, items are created
843 with empty vectors — caller is responsible for embedding.
844 """
845 base_metadata = dict(metadata or {})
846 base_tags = list(tags or [])
847 now = datetime.now(timezone.utc)
849 # Step 1: deduplicate entities across the batch by (name, type).
850 # Entity IDs are deterministic — same name+type produces same ID.
851 entity_by_key: dict[tuple[str, str], Entity] = {}
852 for fact in facts:
853 for fent in fact.entities:
854 key = (fent.name.strip().lower(), fent.entity_type.strip().upper())
855 if key in entity_by_key:
856 continue
857 eid = f"{key[1].lower()}:{_slug(fent.name)}"
858 entity_by_key[key] = Entity(
859 id=eid,
860 name=fent.name,
861 entity_type=fent.entity_type,
862 aliases=[fent.name],
863 metadata={"source": "fact_extraction"},
864 )
865 entities = list(entity_by_key.values())
867 # Step 2: build VectorItems, one per fact. The IDs flow through
868 # causal_relations resolution below.
869 items: list[VectorItem] = []
870 associations: list[tuple[str, str]] = []
871 for idx, fact in enumerate(facts):
872 item_id = uuid.uuid4().hex
873 fact_metadata = dict(base_metadata)
874 # Promote structured dimensions into metadata so downstream
875 # consumers can filter/rerank on them. Use the ``_fact_*``
876 # prefix to avoid colliding with caller-supplied keys.
877 if fact.when and fact.when.upper() != "N/A":
878 fact_metadata["_fact_when"] = fact.when
879 if fact.where and fact.where.upper() != "N/A":
880 fact_metadata["_fact_where"] = fact.where
881 if fact.who and fact.who.upper() != "N/A":
882 fact_metadata["_fact_who"] = fact.who
883 if fact.why and fact.why.upper() != "N/A":
884 fact_metadata["_fact_why"] = fact.why
885 fact_metadata["_fact_type"] = fact.fact_type
886 if fact.occurred_start is not None:
887 fact_metadata["_fact_occurred_start"] = fact.occurred_start.isoformat()
888 if fact.occurred_end is not None:
889 fact_metadata["_fact_occurred_end"] = fact.occurred_end.isoformat()
891 vector = embeddings[idx] if embeddings is not None and idx < len(embeddings) else []
893 items.append(
894 VectorItem(
895 id=item_id,
896 bank_id=bank_id,
897 vector=vector,
898 # Always store the raw chunk text so question embeddings
899 # match against the original vocabulary. The decorated-
900 # text path (concise mode) was removed in M9.
901 text=fact.what,
902 metadata=fact_metadata,
903 tags=list(base_tags),
904 fact_type=fact.fact_type,
905 memory_layer="raw",
906 occurred_at=fact.occurred_start or occurred_at,
907 retained_at=now,
908 )
909 )
911 # Association: link this memory to each entity it mentions.
912 for fent in fact.entities:
913 key = (fent.name.strip().lower(), fent.entity_type.strip().upper())
914 ent = entity_by_key.get(key)
915 if ent is not None:
916 associations.append((item_id, ent.id))
918 # Step 3: causal relations → MemoryLinks. Source is the EFFECT
919 # (the fact making the claim); target is the CAUSE.
920 memory_links: list[MemoryLink] = []
921 for src_idx, fact in enumerate(facts):
922 for rel in fact.causal_relations:
923 tgt_idx = rel.target_fact_index
924 if tgt_idx == src_idx:
925 continue # self-loop
926 if not (0 <= tgt_idx < len(facts)):
927 continue
928 try:
929 strength = float(rel.strength)
930 except (TypeError, ValueError):
931 strength = 1.0
932 memory_links.append(
933 MemoryLink(
934 source_memory_id=items[src_idx].id,
935 target_memory_id=items[tgt_idx].id,
936 link_type="caused_by",
937 confidence=min(1.0, max(0.0, strength)),
938 weight=1.0,
939 created_at=now,
940 metadata={"bank_id": bank_id, "source": "fact_extraction"},
941 )
942 )
944 return MaterializedFacts(
945 vector_items=items,
946 entities=entities,
947 memory_entity_associations=associations,
948 memory_links=memory_links,
949 )