Coverage for astrocyte/documents/ingestor.py: 94%

35 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""DocumentIngestor — bridges Document Engine output to Memory Engine retain. 

2 

3Walks a ``DocumentTree`` and calls a Memory Engine retain function once 

4per tree node. Each retain call carries: 

5 

6 - ``content``: the node's text (or summary if summary_kind='llm' and 

7 the raw text is too large — adaptive per locked policy) 

8 - ``metadata``: opaque dict with source attribution + node identifiers 

9 

10Cross-engine references stay opaque (strings, not FKs). The Memory 

11Engine doesn't know what a tree is; the Document Engine doesn't know 

12what a Memory is. 

13 

14Public API: 

15 ingestor = DocumentIngestor(retain=memory_engine.retain_text) 

16 result = await ingestor.ingest(tree, document, bank_id="my-bank") 

17""" 

18 

19from __future__ import annotations 

20 

21import logging 

22from typing import Any 

23 

24from astrocyte._ingest_spi import IngestResult, MemoryRetainFn 

25from astrocyte.documents.types import Document, DocumentTree, TreeNode 

26 

27logger = logging.getLogger(__name__) 

28 

29SOURCE_KIND = "astrocyte.documents" 

30 

31# Beyond this size, prefer the node's summary over its raw text to avoid 

32# multi-thousand-token retain inputs choking downstream extraction. 

33# Adaptive summarizer's threshold default (200) is per-node; we use a 

34# higher value here so most nodes still feed their full text and only 

35# truly large nodes fall back to summary. 

36DEFAULT_PREFER_SUMMARY_OVER_CHARS = 4_000 

37 

38 

39class DocumentIngestor: 

40 """Walks a DocumentTree and calls retain() per node.""" 

41 

42 def __init__( 

43 self, 

44 retain: MemoryRetainFn, 

45 *, 

46 prefer_summary_over_chars: int = DEFAULT_PREFER_SUMMARY_OVER_CHARS, 

47 skip_empty_text: bool = True, 

48 ) -> None: 

49 self._retain = retain 

50 self._prefer_summary_over_chars = prefer_summary_over_chars 

51 self._skip_empty_text = skip_empty_text 

52 

53 async def ingest( 

54 self, 

55 tree: DocumentTree, 

56 document: Document, 

57 *, 

58 bank_id: str, 

59 extra_metadata: dict[str, Any] | None = None, 

60 ) -> IngestResult: 

61 """Walk all nodes pre-order; emit one retain() per non-empty node. 

62 

63 Returns an ``IngestResult`` summarizing what was emitted and any 

64 per-node failures. Per-node failures are swallowed and logged so 

65 one bad node doesn't abort the whole ingest. 

66 """ 

67 failures: list[dict[str, Any]] = [] 

68 emitted = 0 

69 base_metadata = { 

70 "source": SOURCE_KIND, 

71 "source_document_id": document.id, 

72 "source_uri": document.source_uri, 

73 "mime_type": document.mime_type, 

74 **(extra_metadata or {}), 

75 } 

76 

77 for node in tree.all_nodes(): 

78 content = self._pick_content(node) 

79 if self._skip_empty_text and not content.strip(): 

80 continue 

81 try: 

82 await self._retain( 

83 bank_id=bank_id, 

84 content=content, 

85 metadata={ 

86 **base_metadata, 

87 "tree_node_id": node.id, 

88 "tree_node_parent_id": node.parent_id, 

89 "tree_node_depth": node.depth, 

90 "tree_node_title": node.title, 

91 "tree_node_line_start": node.line_start, 

92 "tree_node_line_end": node.line_end, 

93 "summary_kind": node.summary.kind if node.summary else None, 

94 }, 

95 ) 

96 emitted += 1 

97 except Exception as exc: # noqa: BLE001 

98 logger.warning( 

99 "DocumentIngestor: retain failed for node=%s title=%r: %s", 

100 node.id, 

101 node.title, 

102 exc, 

103 ) 

104 failures.append( 

105 { 

106 "tree_node_id": node.id, 

107 "title": node.title, 

108 "error": str(exc), 

109 } 

110 ) 

111 

112 return IngestResult( 

113 bank_id=bank_id, 

114 source_kind=SOURCE_KIND, 

115 source_id=document.id, 

116 segments_emitted=emitted, 

117 failures=failures, 

118 metadata={"node_count": tree.node_count()}, 

119 ) 

120 

121 # ── content selection ───────────────────────────────────────────── 

122 

123 def _pick_content(self, node: TreeNode) -> str: 

124 """Pick the text we feed to the Memory Engine for this node. 

125 

126 Rule: if the node's raw text is short enough, use it verbatim 

127 (Memory Engine fact extraction sees full context). If it's 

128 bigger than ``prefer_summary_over_chars`` AND the summarizer 

129 produced an LLM-generated summary, use the summary instead 

130 (avoids feeding multi-thousand-char nodes through downstream 

131 extraction). Otherwise fall back to raw text — better to feed 

132 too much than too little. 

133 """ 

134 text = node.text or "" 

135 if len(text) <= self._prefer_summary_over_chars: 

136 return text 

137 if node.summary is not None and node.summary.kind == "llm": 

138 return node.summary.text 

139 return text