Coverage for astrocyte/pipeline/chunking.py: 85%
109 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""Text chunking — split content into memory-sized pieces.
3Sync, pure computation — Rust migration candidate.
4See docs/_design/built-in-pipeline.md section 2.
5"""
7from __future__ import annotations
9import re
11#: Default maximum characters per chunk.
12DEFAULT_CHUNK_SIZE = 512
14#: Default character overlap between consecutive chunks.
15DEFAULT_CHUNK_OVERLAP = 50
18def chunk_text(
19 text: str,
20 strategy: str = "sentence",
21 max_chunk_size: int = DEFAULT_CHUNK_SIZE,
22 overlap: int = DEFAULT_CHUNK_OVERLAP,
23) -> list[str]:
24 """Split text into chunks using the specified strategy.
26 Strategies:
27 - "sentence": split on sentence boundaries (.!?)
28 - "paragraph": split on double newlines
29 - "fixed": fixed character count with overlap
30 - "dialogue": split on speaker turn boundaries (``speaker: text`` format)
32 Returns list of non-empty chunks.
33 """
34 if not text.strip():
35 return []
37 if strategy == "sentence":
38 return _chunk_sentences(text, max_chunk_size, overlap)
39 elif strategy == "paragraph":
40 return _chunk_paragraphs(text, max_chunk_size, overlap)
41 elif strategy == "dialogue":
42 return _chunk_dialogue(text, max_chunk_size, overlap)
43 elif strategy == "fixed":
44 return _chunk_fixed(text, max_chunk_size, overlap)
45 else:
46 raise ValueError(f"Unknown chunking strategy: {strategy}")
49def _chunk_sentences(text: str, max_size: int, overlap: int) -> list[str]:
50 """Split on sentence boundaries, merging short sentences up to max_size."""
51 # Split on sentence-ending punctuation followed by whitespace
52 sentences = re.split(r"(?<=[.!?])\s+", text.strip())
53 chunks: list[str] = []
54 current = ""
56 for sentence in sentences:
57 sentence = sentence.strip()
58 if not sentence:
59 continue
61 # If a single sentence exceeds max_size, split it with fixed-size chunking
62 if len(sentence) > max_size:
63 if current.strip():
64 chunks.append(current.strip())
65 current = ""
66 chunks.extend(_chunk_fixed(sentence, max_size, overlap=overlap))
67 continue
69 if current and len(current) + len(sentence) + 1 > max_size:
70 chunks.append(current.strip())
71 current = sentence
72 else:
73 current = f"{current} {sentence}".strip() if current else sentence
75 if current.strip():
76 chunks.append(current.strip())
78 return [c for c in chunks if c]
81def _chunk_paragraphs(text: str, max_size: int, overlap: int) -> list[str]:
82 """Split on double newlines, merging short paragraphs up to max_size."""
83 paragraphs = re.split(r"\n\s*\n", text.strip())
84 chunks: list[str] = []
85 current = ""
87 for para in paragraphs:
88 para = para.strip()
89 if not para:
90 continue
92 # If a single paragraph exceeds max_size, split it with fixed-size chunking
93 if len(para) > max_size:
94 if current.strip():
95 chunks.append(current.strip())
96 current = ""
97 chunks.extend(_chunk_fixed(para, max_size, overlap=overlap))
98 continue
100 if current and len(current) + len(para) + 2 > max_size:
101 chunks.append(current.strip())
102 current = para
103 else:
104 current = f"{current}\n\n{para}".strip() if current else para
106 if current.strip():
107 chunks.append(current.strip())
109 return [c for c in chunks if c]
112def _chunk_dialogue(text: str, max_size: int, overlap: int) -> list[str]:
113 """Split on speaker turn boundaries, keeping complete turns together.
115 Expects the ``speaker: text`` format (one turn per line). Groups consecutive
116 turns into chunks up to ``max_size`` without splitting a turn across chunks.
117 Falls back to sentence chunking for turns that exceed ``max_size``.
118 """
119 # Split into individual turns at line boundaries where a speaker label starts
120 lines = text.strip().split("\n")
121 turns: list[str] = []
122 current_turn = ""
124 for line in lines:
125 line = line.rstrip()
126 if not line:
127 continue
128 # New turn starts when line matches "word(s): text" pattern
129 if re.match(r"^[A-Za-z][\w\s]*:", line) and current_turn:
130 turns.append(current_turn.strip())
131 current_turn = line
132 else:
133 # Continuation of current turn (or first line)
134 current_turn = f"{current_turn}\n{line}" if current_turn else line
136 if current_turn.strip():
137 turns.append(current_turn.strip())
139 if not turns:
140 return _chunk_sentences(text, max_size, overlap)
142 # Group turns into chunks up to max_size
143 chunks: list[str] = []
144 current = ""
146 for turn in turns:
147 # If a single turn exceeds max_size, split it but keep speaker label
148 if len(turn) > max_size:
149 if current.strip():
150 chunks.append(current.strip())
151 current = ""
152 # Extract speaker label and split the rest
153 match = re.match(r"^([A-Za-z][\w\s]*:)\s*", turn)
154 if match:
155 speaker_prefix = match.group(1) + " "
156 turn_text = turn[match.end() :]
157 sub_chunks = _chunk_sentences(turn_text, max_size - len(speaker_prefix), overlap)
158 chunks.extend(f"{speaker_prefix}{sc}" for sc in sub_chunks)
159 else:
160 chunks.extend(_chunk_sentences(turn, max_size, overlap))
161 continue
163 if current and len(current) + len(turn) + 1 > max_size:
164 chunks.append(current.strip())
165 current = turn
166 else:
167 current = f"{current}\n{turn}" if current else turn
169 if current.strip():
170 chunks.append(current.strip())
172 return [c for c in chunks if c]
175def _chunk_fixed(text: str, max_size: int, overlap: int) -> list[str]:
176 """Fixed-size chunks with overlap."""
177 if len(text) <= max_size:
178 return [text.strip()] if text.strip() else []
180 chunks: list[str] = []
181 start = 0
182 step = max(1, max_size - overlap)
184 while start < len(text):
185 end = min(start + max_size, len(text))
186 chunk = text[start:end].strip()
187 if chunk:
188 chunks.append(chunk)
189 start += step
191 return chunks