Coverage for astrocyte/conversations/chunking.py: 98%
65 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""Turn-aware conversation chunking (Hindsight parity).
3Splits a Conversation into chunks suitable for downstream fact
4extraction. Key properties:
6- **Never splits a single turn** — turn boundaries are inviolable;
7 speaker context stays intact within a chunk.
8- **Never spans two sessions** — when consecutive turns carry
9 different ``metadata['session_id']`` values, the current chunk
10 flushes before the new session's turns begin. Each session
11 becomes at least one chunk; long sessions split within
12 themselves but never across.
13- **Greedy fill within a session** — packs as many consecutive
14 same-session turns into a chunk as fit under ``max_chars``, then
15 starts a new chunk at the next turn (or session boundary).
16- **Single-turn overflow** — if a single turn exceeds ``max_chars``
17 by itself, it becomes its own chunk (and downstream extraction
18 must handle the oversize input). We don't try to split mid-turn
19 because doing so loses speaker attribution and corrupts the
20 conversation semantics.
22Output format: each ``ConversationChunk`` carries the original turns
23plus a rendered markdown text representation suitable for feeding to
24``memory.retain()`` directly. The rendering uses ``**{role}**:
25{content}`` per turn, matching the convention LME/LoCoMo benches and
26Hindsight's ``_chunk_conversation`` already use.
28Public API:
29 chunk_conversation(conversation, max_chars=12_000) -> list[ConversationChunk]
30"""
32from __future__ import annotations
34from dataclasses import dataclass, field
35from datetime import datetime
36from typing import Any
38from astrocyte.conversations.types import Conversation, ConversationTurn
40# Hindsight's default conversation chunk size is large (~120k chars,
41# ~30k tokens) because their fact-extraction model handles long
42# contexts. We start more conservatively at 12k chars (~3k tokens) so
43# extraction prompts stay within smaller-model context windows. Tunable
44# per call.
45DEFAULT_MAX_CHARS_PER_CHUNK = 12_000
48@dataclass
49class ConversationChunk:
50 """One chunk of a conversation — consecutive turns under a char budget.
52 ``rendered_text`` is the markdown-formatted version suitable for
53 passing directly to ``memory.retain(content=...)``. ``turns`` is the
54 raw underlying ConversationTurn objects so downstream callers can
55 cross-reference IDs / metadata.
56 """
58 conversation_id: str
59 turns: list[ConversationTurn]
60 rendered_text: str
61 char_count: int
62 chunk_index: int = 0
63 metadata: dict = field(default_factory=dict)
65 @property
66 def turn_count(self) -> int:
67 return len(self.turns)
69 @property
70 def earliest_timestamp(self) -> datetime | None:
71 timestamps = [t.timestamp for t in self.turns if t.timestamp is not None]
72 return min(timestamps) if timestamps else None
74 @property
75 def latest_timestamp(self) -> datetime | None:
76 timestamps = [t.timestamp for t in self.turns if t.timestamp is not None]
77 return max(timestamps) if timestamps else None
80# ─── rendering ─────────────────────────────────────────────────────────
83def _render_turn(turn: ConversationTurn) -> str:
84 """Render one turn as a markdown block: ``**{role}**: {content}``.
86 Matches the format LME/LoCoMo bench harnesses already use; matches
87 what Hindsight's ``_chunk_conversation`` produces when serializing
88 JSON arrays back to text.
89 """
90 content = turn.content.replace("\r\n", "\n").strip()
91 return f"**{turn.role}**: {content}"
94def _render_chunk(turns: list[ConversationTurn]) -> str:
95 """Join turns with blank lines between them."""
96 return "\n\n".join(_render_turn(t) for t in turns)
99def _rendered_char_estimate(turns: list[ConversationTurn]) -> int:
100 """Cheap estimate of rendered char count without doing the actual render.
102 Used in the packing loop to decide if adding one more turn would
103 exceed the budget. Worst-case overhead per turn: ``len(role) + 4
104 ("**" + ": ") + 2 ("\\n\\n" separator)``.
105 """
106 return sum(t.char_count() + 2 for t in turns)
109# ─── chunking ──────────────────────────────────────────────────────────
112_SESSION_SENTINEL = object()
115def _session_key(turn: ConversationTurn) -> Any:
116 """Return the turn's session marker, or a shared sentinel when absent.
118 Turns are grouped into sessions by ``metadata['session_id']``. When a
119 turn has no ``session_id``, it shares the sentinel with other
120 session-less turns, so conversations that never carry session
121 markers behave exactly like the pre-M17 chunker (one big greedy
122 pack across the whole conversation).
123 """
124 if turn.metadata is None:
125 return _SESSION_SENTINEL
126 val = turn.metadata.get("session_id", _SESSION_SENTINEL)
127 return val if val is not None else _SESSION_SENTINEL
130def chunk_conversation(
131 conversation: Conversation,
132 *,
133 max_chars: int = DEFAULT_MAX_CHARS_PER_CHUNK,
134) -> list[ConversationChunk]:
135 """Split a Conversation into chunks at turn boundaries.
137 Two boundaries trigger a flush:
138 1. **Session boundary** — the next turn's ``metadata['session_id']``
139 differs from the current chunk's. A chunk never spans two
140 sessions; each session becomes at least one chunk.
141 2. **Size boundary within a session** — adding the next turn
142 would exceed ``max_chars``.
144 A single turn larger than ``max_chars`` becomes its own chunk (we
145 don't split mid-turn — that would lose speaker attribution).
147 Empty conversation → empty list. Single-turn conversation → single
148 chunk (regardless of turn size). Conversations whose turns carry
149 no ``session_id`` behave like the pre-session-aware chunker: one
150 greedy pack across the whole conversation.
151 """
152 if not conversation.turns:
153 return []
155 chunks: list[ConversationChunk] = []
156 current: list[ConversationTurn] = []
157 current_size = 0
158 current_session: Any = _SESSION_SENTINEL
159 chunk_index = 0
161 def _flush() -> None:
162 nonlocal current, current_size, chunk_index
163 chunks.append(
164 ConversationChunk(
165 conversation_id=conversation.id,
166 turns=current,
167 rendered_text=_render_chunk(current),
168 char_count=_rendered_char_estimate(current),
169 chunk_index=chunk_index,
170 ),
171 )
172 chunk_index += 1
173 current = []
174 current_size = 0
176 for turn in conversation.turns:
177 turn_size = turn.char_count() + 2 # +2 for "\n\n" separator
178 turn_session = _session_key(turn)
180 if current:
181 session_changed = turn_session is not current_session
182 size_overflow = current_size + turn_size > max_chars
183 if session_changed or size_overflow:
184 _flush()
186 current.append(turn)
187 current_size += turn_size
188 current_session = turn_session
190 if current:
191 _flush()
193 return chunks