Coverage for astrocyte/conversations/ingestor.py: 100%
29 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""ConversationIngestor — bridges Conversation Engine output to Memory Engine retain.
3Chunks a ``Conversation`` at turn boundaries (Hindsight-parity via
4``chunk_conversation``) and calls a Memory Engine retain function once
5per chunk. Each retain call carries:
7 - ``content``: the rendered chunk text (``**{role}**: {content}`` per
8 turn, separated by blank lines)
9 - ``metadata``: opaque dict with source attribution + turn-range
10 identifiers + timestamps when available
12Public API:
13 ingestor = ConversationIngestor(retain=memory_engine.retain_text)
14 result = await ingestor.ingest(conversation, bank_id="my-bank")
15"""
17from __future__ import annotations
19import logging
20from typing import Any
22from astrocyte._ingest_spi import IngestResult, MemoryRetainFn
23from astrocyte.conversations.chunking import (
24 DEFAULT_MAX_CHARS_PER_CHUNK,
25 chunk_conversation,
26)
27from astrocyte.conversations.types import Conversation
29logger = logging.getLogger(__name__)
31SOURCE_KIND = "astrocyte.conversations"
34class ConversationIngestor:
35 """Chunks a Conversation and calls retain() per chunk."""
37 def __init__(
38 self,
39 retain: MemoryRetainFn,
40 *,
41 max_chars_per_chunk: int = DEFAULT_MAX_CHARS_PER_CHUNK,
42 ) -> None:
43 self._retain = retain
44 self._max_chars = max_chars_per_chunk
46 async def ingest(
47 self,
48 conversation: Conversation,
49 *,
50 bank_id: str,
51 extra_metadata: dict[str, Any] | None = None,
52 ) -> IngestResult:
53 """Chunk + emit one retain() per chunk.
55 Per-chunk failures are swallowed and logged (one bad chunk
56 shouldn't abort the whole ingest); their details surface in
57 ``IngestResult.failures``.
58 """
59 chunks = chunk_conversation(conversation, max_chars=self._max_chars)
60 failures: list[dict[str, Any]] = []
61 emitted = 0
63 base_metadata = {
64 "source": SOURCE_KIND,
65 "source_conversation_id": conversation.id,
66 "source_uri": conversation.source_uri,
67 "conversation_title": conversation.title,
68 **(extra_metadata or {}),
69 }
71 for chunk in chunks:
72 try:
73 # M31 Fix 2 — propagate session_id from the chunk's turns.
74 # The chunker (``astrocyte.conversations.chunking``) groups
75 # turns by ``metadata['session_id']`` so all turns within a
76 # chunk share the same session boundary. Pluck the first
77 # turn's value as the chunk-level identifier; ``None`` is
78 # the back-compat shape when turns were added without
79 # session metadata (pre-M31 callers).
80 chunk_session_id: str | None = None
81 if chunk.turns:
82 first_meta = chunk.turns[0].metadata or {}
83 chunk_session_id = first_meta.get("session_id")
84 await self._retain(
85 bank_id=bank_id,
86 content=chunk.rendered_text,
87 metadata={
88 **base_metadata,
89 "chunk_index": chunk.chunk_index,
90 "turn_count": chunk.turn_count,
91 "turn_ids": [t.id for t in chunk.turns],
92 # Per-turn roles preserved so downstream callers can
93 # derive a section-grain speaker tag. Hindsight-parity:
94 # the speaker filter at retrieval time relies on knowing
95 # which roles appear in each chunk.
96 "turn_roles": [t.role for t in chunk.turns],
97 "earliest_timestamp": (
98 chunk.earliest_timestamp.isoformat() if chunk.earliest_timestamp else None
99 ),
100 "latest_timestamp": (chunk.latest_timestamp.isoformat() if chunk.latest_timestamp else None),
101 # M31 Fix 2 — session boundary identifier; pass-through
102 # to the retain SPI so adapters can persist it onto
103 # section/fact rows for query-time ``session_filter``.
104 "session_id": chunk_session_id,
105 },
106 )
107 emitted += 1
108 except Exception as exc: # noqa: BLE001
109 logger.warning(
110 "ConversationIngestor: retain failed for chunk=%d of conv=%s: %s",
111 chunk.chunk_index,
112 conversation.id,
113 exc,
114 )
115 failures.append(
116 {
117 "chunk_index": chunk.chunk_index,
118 "turn_count": chunk.turn_count,
119 "error": str(exc),
120 }
121 )
123 return IngestResult(
124 bank_id=bank_id,
125 source_kind=SOURCE_KIND,
126 source_id=conversation.id,
127 segments_emitted=emitted,
128 failures=failures,
129 metadata={
130 "turn_count": conversation.turn_count(),
131 "chunk_count": len(chunks),
132 },
133 )