Coverage for astrocyte/documents/ingestor.py: 94%
35 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""DocumentIngestor — bridges Document Engine output to Memory Engine retain.
3Walks a ``DocumentTree`` and calls a Memory Engine retain function once
4per tree node. Each retain call carries:
6 - ``content``: the node's text (or summary if summary_kind='llm' and
7 the raw text is too large — adaptive per locked policy)
8 - ``metadata``: opaque dict with source attribution + node identifiers
10Cross-engine references stay opaque (strings, not FKs). The Memory
11Engine doesn't know what a tree is; the Document Engine doesn't know
12what a Memory is.
14Public API:
15 ingestor = DocumentIngestor(retain=memory_engine.retain_text)
16 result = await ingestor.ingest(tree, document, bank_id="my-bank")
17"""
19from __future__ import annotations
21import logging
22from typing import Any
24from astrocyte._ingest_spi import IngestResult, MemoryRetainFn
25from astrocyte.documents.types import Document, DocumentTree, TreeNode
27logger = logging.getLogger(__name__)
29SOURCE_KIND = "astrocyte.documents"
31# Beyond this size, prefer the node's summary over its raw text to avoid
32# multi-thousand-token retain inputs choking downstream extraction.
33# Adaptive summarizer's threshold default (200) is per-node; we use a
34# higher value here so most nodes still feed their full text and only
35# truly large nodes fall back to summary.
36DEFAULT_PREFER_SUMMARY_OVER_CHARS = 4_000
39class DocumentIngestor:
40 """Walks a DocumentTree and calls retain() per node."""
42 def __init__(
43 self,
44 retain: MemoryRetainFn,
45 *,
46 prefer_summary_over_chars: int = DEFAULT_PREFER_SUMMARY_OVER_CHARS,
47 skip_empty_text: bool = True,
48 ) -> None:
49 self._retain = retain
50 self._prefer_summary_over_chars = prefer_summary_over_chars
51 self._skip_empty_text = skip_empty_text
53 async def ingest(
54 self,
55 tree: DocumentTree,
56 document: Document,
57 *,
58 bank_id: str,
59 extra_metadata: dict[str, Any] | None = None,
60 ) -> IngestResult:
61 """Walk all nodes pre-order; emit one retain() per non-empty node.
63 Returns an ``IngestResult`` summarizing what was emitted and any
64 per-node failures. Per-node failures are swallowed and logged so
65 one bad node doesn't abort the whole ingest.
66 """
67 failures: list[dict[str, Any]] = []
68 emitted = 0
69 base_metadata = {
70 "source": SOURCE_KIND,
71 "source_document_id": document.id,
72 "source_uri": document.source_uri,
73 "mime_type": document.mime_type,
74 **(extra_metadata or {}),
75 }
77 for node in tree.all_nodes():
78 content = self._pick_content(node)
79 if self._skip_empty_text and not content.strip():
80 continue
81 try:
82 await self._retain(
83 bank_id=bank_id,
84 content=content,
85 metadata={
86 **base_metadata,
87 "tree_node_id": node.id,
88 "tree_node_parent_id": node.parent_id,
89 "tree_node_depth": node.depth,
90 "tree_node_title": node.title,
91 "tree_node_line_start": node.line_start,
92 "tree_node_line_end": node.line_end,
93 "summary_kind": node.summary.kind if node.summary else None,
94 },
95 )
96 emitted += 1
97 except Exception as exc: # noqa: BLE001
98 logger.warning(
99 "DocumentIngestor: retain failed for node=%s title=%r: %s",
100 node.id,
101 node.title,
102 exc,
103 )
104 failures.append(
105 {
106 "tree_node_id": node.id,
107 "title": node.title,
108 "error": str(exc),
109 }
110 )
112 return IngestResult(
113 bank_id=bank_id,
114 source_kind=SOURCE_KIND,
115 source_id=document.id,
116 segments_emitted=emitted,
117 failures=failures,
118 metadata={"node_count": tree.node_count()},
119 )
121 # ── content selection ─────────────────────────────────────────────
123 def _pick_content(self, node: TreeNode) -> str:
124 """Pick the text we feed to the Memory Engine for this node.
126 Rule: if the node's raw text is short enough, use it verbatim
127 (Memory Engine fact extraction sees full context). If it's
128 bigger than ``prefer_summary_over_chars`` AND the summarizer
129 produced an LLM-generated summary, use the summary instead
130 (avoids feeding multi-thousand-char nodes through downstream
131 extraction). Otherwise fall back to raw text — better to feed
132 too much than too little.
133 """
134 text = node.text or ""
135 if len(text) <= self._prefer_summary_over_chars:
136 return text
137 if node.summary is not None and node.summary.kind == "llm":
138 return node.summary.text
139 return text