Coverage for astrocyte/conversations/ingestor.py: 100%

29 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""ConversationIngestor — bridges Conversation Engine output to Memory Engine retain. 

2 

3Chunks a ``Conversation`` at turn boundaries (Hindsight-parity via 

4``chunk_conversation``) and calls a Memory Engine retain function once 

5per chunk. Each retain call carries: 

6 

7 - ``content``: the rendered chunk text (``**{role}**: {content}`` per 

8 turn, separated by blank lines) 

9 - ``metadata``: opaque dict with source attribution + turn-range 

10 identifiers + timestamps when available 

11 

12Public API: 

13 ingestor = ConversationIngestor(retain=memory_engine.retain_text) 

14 result = await ingestor.ingest(conversation, bank_id="my-bank") 

15""" 

16 

17from __future__ import annotations 

18 

19import logging 

20from typing import Any 

21 

22from astrocyte._ingest_spi import IngestResult, MemoryRetainFn 

23from astrocyte.conversations.chunking import ( 

24 DEFAULT_MAX_CHARS_PER_CHUNK, 

25 chunk_conversation, 

26) 

27from astrocyte.conversations.types import Conversation 

28 

29logger = logging.getLogger(__name__) 

30 

31SOURCE_KIND = "astrocyte.conversations" 

32 

33 

34class ConversationIngestor: 

35 """Chunks a Conversation and calls retain() per chunk.""" 

36 

37 def __init__( 

38 self, 

39 retain: MemoryRetainFn, 

40 *, 

41 max_chars_per_chunk: int = DEFAULT_MAX_CHARS_PER_CHUNK, 

42 ) -> None: 

43 self._retain = retain 

44 self._max_chars = max_chars_per_chunk 

45 

46 async def ingest( 

47 self, 

48 conversation: Conversation, 

49 *, 

50 bank_id: str, 

51 extra_metadata: dict[str, Any] | None = None, 

52 ) -> IngestResult: 

53 """Chunk + emit one retain() per chunk. 

54 

55 Per-chunk failures are swallowed and logged (one bad chunk 

56 shouldn't abort the whole ingest); their details surface in 

57 ``IngestResult.failures``. 

58 """ 

59 chunks = chunk_conversation(conversation, max_chars=self._max_chars) 

60 failures: list[dict[str, Any]] = [] 

61 emitted = 0 

62 

63 base_metadata = { 

64 "source": SOURCE_KIND, 

65 "source_conversation_id": conversation.id, 

66 "source_uri": conversation.source_uri, 

67 "conversation_title": conversation.title, 

68 **(extra_metadata or {}), 

69 } 

70 

71 for chunk in chunks: 

72 try: 

73 # M31 Fix 2 — propagate session_id from the chunk's turns. 

74 # The chunker (``astrocyte.conversations.chunking``) groups 

75 # turns by ``metadata['session_id']`` so all turns within a 

76 # chunk share the same session boundary. Pluck the first 

77 # turn's value as the chunk-level identifier; ``None`` is 

78 # the back-compat shape when turns were added without 

79 # session metadata (pre-M31 callers). 

80 chunk_session_id: str | None = None 

81 if chunk.turns: 

82 first_meta = chunk.turns[0].metadata or {} 

83 chunk_session_id = first_meta.get("session_id") 

84 await self._retain( 

85 bank_id=bank_id, 

86 content=chunk.rendered_text, 

87 metadata={ 

88 **base_metadata, 

89 "chunk_index": chunk.chunk_index, 

90 "turn_count": chunk.turn_count, 

91 "turn_ids": [t.id for t in chunk.turns], 

92 # Per-turn roles preserved so downstream callers can 

93 # derive a section-grain speaker tag. Hindsight-parity: 

94 # the speaker filter at retrieval time relies on knowing 

95 # which roles appear in each chunk. 

96 "turn_roles": [t.role for t in chunk.turns], 

97 "earliest_timestamp": ( 

98 chunk.earliest_timestamp.isoformat() if chunk.earliest_timestamp else None 

99 ), 

100 "latest_timestamp": (chunk.latest_timestamp.isoformat() if chunk.latest_timestamp else None), 

101 # M31 Fix 2 — session boundary identifier; pass-through 

102 # to the retain SPI so adapters can persist it onto 

103 # section/fact rows for query-time ``session_filter``. 

104 "session_id": chunk_session_id, 

105 }, 

106 ) 

107 emitted += 1 

108 except Exception as exc: # noqa: BLE001 

109 logger.warning( 

110 "ConversationIngestor: retain failed for chunk=%d of conv=%s: %s", 

111 chunk.chunk_index, 

112 conversation.id, 

113 exc, 

114 ) 

115 failures.append( 

116 { 

117 "chunk_index": chunk.chunk_index, 

118 "turn_count": chunk.turn_count, 

119 "error": str(exc), 

120 } 

121 ) 

122 

123 return IngestResult( 

124 bank_id=bank_id, 

125 source_kind=SOURCE_KIND, 

126 source_id=conversation.id, 

127 segments_emitted=emitted, 

128 failures=failures, 

129 metadata={ 

130 "turn_count": conversation.turn_count(), 

131 "chunk_count": len(chunks), 

132 }, 

133 )