Coverage for astrocyte/conversations/chunking.py: 98%

65 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""Turn-aware conversation chunking (Hindsight parity). 

2 

3Splits a Conversation into chunks suitable for downstream fact 

4extraction. Key properties: 

5 

6- **Never splits a single turn** — turn boundaries are inviolable; 

7 speaker context stays intact within a chunk. 

8- **Never spans two sessions** — when consecutive turns carry 

9 different ``metadata['session_id']`` values, the current chunk 

10 flushes before the new session's turns begin. Each session 

11 becomes at least one chunk; long sessions split within 

12 themselves but never across. 

13- **Greedy fill within a session** — packs as many consecutive 

14 same-session turns into a chunk as fit under ``max_chars``, then 

15 starts a new chunk at the next turn (or session boundary). 

16- **Single-turn overflow** — if a single turn exceeds ``max_chars`` 

17 by itself, it becomes its own chunk (and downstream extraction 

18 must handle the oversize input). We don't try to split mid-turn 

19 because doing so loses speaker attribution and corrupts the 

20 conversation semantics. 

21 

22Output format: each ``ConversationChunk`` carries the original turns 

23plus a rendered markdown text representation suitable for feeding to 

24``memory.retain()`` directly. The rendering uses ``**{role}**: 

25{content}`` per turn, matching the convention LME/LoCoMo benches and 

26Hindsight's ``_chunk_conversation`` already use. 

27 

28Public API: 

29 chunk_conversation(conversation, max_chars=12_000) -> list[ConversationChunk] 

30""" 

31 

32from __future__ import annotations 

33 

34from dataclasses import dataclass, field 

35from datetime import datetime 

36from typing import Any 

37 

38from astrocyte.conversations.types import Conversation, ConversationTurn 

39 

40# Hindsight's default conversation chunk size is large (~120k chars, 

41# ~30k tokens) because their fact-extraction model handles long 

42# contexts. We start more conservatively at 12k chars (~3k tokens) so 

43# extraction prompts stay within smaller-model context windows. Tunable 

44# per call. 

45DEFAULT_MAX_CHARS_PER_CHUNK = 12_000 

46 

47 

48@dataclass 

49class ConversationChunk: 

50 """One chunk of a conversation — consecutive turns under a char budget. 

51 

52 ``rendered_text`` is the markdown-formatted version suitable for 

53 passing directly to ``memory.retain(content=...)``. ``turns`` is the 

54 raw underlying ConversationTurn objects so downstream callers can 

55 cross-reference IDs / metadata. 

56 """ 

57 

58 conversation_id: str 

59 turns: list[ConversationTurn] 

60 rendered_text: str 

61 char_count: int 

62 chunk_index: int = 0 

63 metadata: dict = field(default_factory=dict) 

64 

65 @property 

66 def turn_count(self) -> int: 

67 return len(self.turns) 

68 

69 @property 

70 def earliest_timestamp(self) -> datetime | None: 

71 timestamps = [t.timestamp for t in self.turns if t.timestamp is not None] 

72 return min(timestamps) if timestamps else None 

73 

74 @property 

75 def latest_timestamp(self) -> datetime | None: 

76 timestamps = [t.timestamp for t in self.turns if t.timestamp is not None] 

77 return max(timestamps) if timestamps else None 

78 

79 

80# ─── rendering ───────────────────────────────────────────────────────── 

81 

82 

83def _render_turn(turn: ConversationTurn) -> str: 

84 """Render one turn as a markdown block: ``**{role}**: {content}``. 

85 

86 Matches the format LME/LoCoMo bench harnesses already use; matches 

87 what Hindsight's ``_chunk_conversation`` produces when serializing 

88 JSON arrays back to text. 

89 """ 

90 content = turn.content.replace("\r\n", "\n").strip() 

91 return f"**{turn.role}**: {content}" 

92 

93 

94def _render_chunk(turns: list[ConversationTurn]) -> str: 

95 """Join turns with blank lines between them.""" 

96 return "\n\n".join(_render_turn(t) for t in turns) 

97 

98 

99def _rendered_char_estimate(turns: list[ConversationTurn]) -> int: 

100 """Cheap estimate of rendered char count without doing the actual render. 

101 

102 Used in the packing loop to decide if adding one more turn would 

103 exceed the budget. Worst-case overhead per turn: ``len(role) + 4 

104 ("**" + ": ") + 2 ("\\n\\n" separator)``. 

105 """ 

106 return sum(t.char_count() + 2 for t in turns) 

107 

108 

109# ─── chunking ────────────────────────────────────────────────────────── 

110 

111 

112_SESSION_SENTINEL = object() 

113 

114 

115def _session_key(turn: ConversationTurn) -> Any: 

116 """Return the turn's session marker, or a shared sentinel when absent. 

117 

118 Turns are grouped into sessions by ``metadata['session_id']``. When a 

119 turn has no ``session_id``, it shares the sentinel with other 

120 session-less turns, so conversations that never carry session 

121 markers behave exactly like the pre-M17 chunker (one big greedy 

122 pack across the whole conversation). 

123 """ 

124 if turn.metadata is None: 

125 return _SESSION_SENTINEL 

126 val = turn.metadata.get("session_id", _SESSION_SENTINEL) 

127 return val if val is not None else _SESSION_SENTINEL 

128 

129 

130def chunk_conversation( 

131 conversation: Conversation, 

132 *, 

133 max_chars: int = DEFAULT_MAX_CHARS_PER_CHUNK, 

134) -> list[ConversationChunk]: 

135 """Split a Conversation into chunks at turn boundaries. 

136 

137 Two boundaries trigger a flush: 

138 1. **Session boundary** — the next turn's ``metadata['session_id']`` 

139 differs from the current chunk's. A chunk never spans two 

140 sessions; each session becomes at least one chunk. 

141 2. **Size boundary within a session** — adding the next turn 

142 would exceed ``max_chars``. 

143 

144 A single turn larger than ``max_chars`` becomes its own chunk (we 

145 don't split mid-turn — that would lose speaker attribution). 

146 

147 Empty conversation → empty list. Single-turn conversation → single 

148 chunk (regardless of turn size). Conversations whose turns carry 

149 no ``session_id`` behave like the pre-session-aware chunker: one 

150 greedy pack across the whole conversation. 

151 """ 

152 if not conversation.turns: 

153 return [] 

154 

155 chunks: list[ConversationChunk] = [] 

156 current: list[ConversationTurn] = [] 

157 current_size = 0 

158 current_session: Any = _SESSION_SENTINEL 

159 chunk_index = 0 

160 

161 def _flush() -> None: 

162 nonlocal current, current_size, chunk_index 

163 chunks.append( 

164 ConversationChunk( 

165 conversation_id=conversation.id, 

166 turns=current, 

167 rendered_text=_render_chunk(current), 

168 char_count=_rendered_char_estimate(current), 

169 chunk_index=chunk_index, 

170 ), 

171 ) 

172 chunk_index += 1 

173 current = [] 

174 current_size = 0 

175 

176 for turn in conversation.turns: 

177 turn_size = turn.char_count() + 2 # +2 for "\n\n" separator 

178 turn_session = _session_key(turn) 

179 

180 if current: 

181 session_changed = turn_session is not current_session 

182 size_overflow = current_size + turn_size > max_chars 

183 if session_changed or size_overflow: 

184 _flush() 

185 

186 current.append(turn) 

187 current_size += turn_size 

188 current_session = turn_session 

189 

190 if current: 

191 _flush() 

192 

193 return chunks