Coverage for astrocyte/pipeline/chunking.py: 85%

109 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""Text chunking — split content into memory-sized pieces. 

2 

3Sync, pure computation — Rust migration candidate. 

4See docs/_design/built-in-pipeline.md section 2. 

5""" 

6 

7from __future__ import annotations 

8 

9import re 

10 

11#: Default maximum characters per chunk. 

12DEFAULT_CHUNK_SIZE = 512 

13 

14#: Default character overlap between consecutive chunks. 

15DEFAULT_CHUNK_OVERLAP = 50 

16 

17 

18def chunk_text( 

19 text: str, 

20 strategy: str = "sentence", 

21 max_chunk_size: int = DEFAULT_CHUNK_SIZE, 

22 overlap: int = DEFAULT_CHUNK_OVERLAP, 

23) -> list[str]: 

24 """Split text into chunks using the specified strategy. 

25 

26 Strategies: 

27 - "sentence": split on sentence boundaries (.!?) 

28 - "paragraph": split on double newlines 

29 - "fixed": fixed character count with overlap 

30 - "dialogue": split on speaker turn boundaries (``speaker: text`` format) 

31 

32 Returns list of non-empty chunks. 

33 """ 

34 if not text.strip(): 

35 return [] 

36 

37 if strategy == "sentence": 

38 return _chunk_sentences(text, max_chunk_size, overlap) 

39 elif strategy == "paragraph": 

40 return _chunk_paragraphs(text, max_chunk_size, overlap) 

41 elif strategy == "dialogue": 

42 return _chunk_dialogue(text, max_chunk_size, overlap) 

43 elif strategy == "fixed": 

44 return _chunk_fixed(text, max_chunk_size, overlap) 

45 else: 

46 raise ValueError(f"Unknown chunking strategy: {strategy}") 

47 

48 

49def _chunk_sentences(text: str, max_size: int, overlap: int) -> list[str]: 

50 """Split on sentence boundaries, merging short sentences up to max_size.""" 

51 # Split on sentence-ending punctuation followed by whitespace 

52 sentences = re.split(r"(?<=[.!?])\s+", text.strip()) 

53 chunks: list[str] = [] 

54 current = "" 

55 

56 for sentence in sentences: 

57 sentence = sentence.strip() 

58 if not sentence: 

59 continue 

60 

61 # If a single sentence exceeds max_size, split it with fixed-size chunking 

62 if len(sentence) > max_size: 

63 if current.strip(): 

64 chunks.append(current.strip()) 

65 current = "" 

66 chunks.extend(_chunk_fixed(sentence, max_size, overlap=overlap)) 

67 continue 

68 

69 if current and len(current) + len(sentence) + 1 > max_size: 

70 chunks.append(current.strip()) 

71 current = sentence 

72 else: 

73 current = f"{current} {sentence}".strip() if current else sentence 

74 

75 if current.strip(): 

76 chunks.append(current.strip()) 

77 

78 return [c for c in chunks if c] 

79 

80 

81def _chunk_paragraphs(text: str, max_size: int, overlap: int) -> list[str]: 

82 """Split on double newlines, merging short paragraphs up to max_size.""" 

83 paragraphs = re.split(r"\n\s*\n", text.strip()) 

84 chunks: list[str] = [] 

85 current = "" 

86 

87 for para in paragraphs: 

88 para = para.strip() 

89 if not para: 

90 continue 

91 

92 # If a single paragraph exceeds max_size, split it with fixed-size chunking 

93 if len(para) > max_size: 

94 if current.strip(): 

95 chunks.append(current.strip()) 

96 current = "" 

97 chunks.extend(_chunk_fixed(para, max_size, overlap=overlap)) 

98 continue 

99 

100 if current and len(current) + len(para) + 2 > max_size: 

101 chunks.append(current.strip()) 

102 current = para 

103 else: 

104 current = f"{current}\n\n{para}".strip() if current else para 

105 

106 if current.strip(): 

107 chunks.append(current.strip()) 

108 

109 return [c for c in chunks if c] 

110 

111 

112def _chunk_dialogue(text: str, max_size: int, overlap: int) -> list[str]: 

113 """Split on speaker turn boundaries, keeping complete turns together. 

114 

115 Expects the ``speaker: text`` format (one turn per line). Groups consecutive 

116 turns into chunks up to ``max_size`` without splitting a turn across chunks. 

117 Falls back to sentence chunking for turns that exceed ``max_size``. 

118 """ 

119 # Split into individual turns at line boundaries where a speaker label starts 

120 lines = text.strip().split("\n") 

121 turns: list[str] = [] 

122 current_turn = "" 

123 

124 for line in lines: 

125 line = line.rstrip() 

126 if not line: 

127 continue 

128 # New turn starts when line matches "word(s): text" pattern 

129 if re.match(r"^[A-Za-z][\w\s]*:", line) and current_turn: 

130 turns.append(current_turn.strip()) 

131 current_turn = line 

132 else: 

133 # Continuation of current turn (or first line) 

134 current_turn = f"{current_turn}\n{line}" if current_turn else line 

135 

136 if current_turn.strip(): 

137 turns.append(current_turn.strip()) 

138 

139 if not turns: 

140 return _chunk_sentences(text, max_size, overlap) 

141 

142 # Group turns into chunks up to max_size 

143 chunks: list[str] = [] 

144 current = "" 

145 

146 for turn in turns: 

147 # If a single turn exceeds max_size, split it but keep speaker label 

148 if len(turn) > max_size: 

149 if current.strip(): 

150 chunks.append(current.strip()) 

151 current = "" 

152 # Extract speaker label and split the rest 

153 match = re.match(r"^([A-Za-z][\w\s]*:)\s*", turn) 

154 if match: 

155 speaker_prefix = match.group(1) + " " 

156 turn_text = turn[match.end() :] 

157 sub_chunks = _chunk_sentences(turn_text, max_size - len(speaker_prefix), overlap) 

158 chunks.extend(f"{speaker_prefix}{sc}" for sc in sub_chunks) 

159 else: 

160 chunks.extend(_chunk_sentences(turn, max_size, overlap)) 

161 continue 

162 

163 if current and len(current) + len(turn) + 1 > max_size: 

164 chunks.append(current.strip()) 

165 current = turn 

166 else: 

167 current = f"{current}\n{turn}" if current else turn 

168 

169 if current.strip(): 

170 chunks.append(current.strip()) 

171 

172 return [c for c in chunks if c] 

173 

174 

175def _chunk_fixed(text: str, max_size: int, overlap: int) -> list[str]: 

176 """Fixed-size chunks with overlap.""" 

177 if len(text) <= max_size: 

178 return [text.strip()] if text.strip() else [] 

179 

180 chunks: list[str] = [] 

181 start = 0 

182 step = max(1, max_size - overlap) 

183 

184 while start < len(text): 

185 end = min(start + max_size, len(text)) 

186 chunk = text[start:end].strip() 

187 if chunk: 

188 chunks.append(chunk) 

189 start += step 

190 

191 return chunks