Coverage for astrocyte/pipeline/delta_ops.py: 95%

151 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""Delta operations for structured mental models (M21). 

2 

3The LLM's job during a mental-model refresh is to emit a list of these 

4operations, each targeting an existing section (by id) or referencing a 

5position relative to one. :func:`apply_operations` validates and 

6applies each op in turn against a copy of the document; invalid ops 

7(unknown ``section_id``, out-of-range ``index``, malformed payloads) 

8are dropped with a debug-friendly reason — the document never gets 

9worse than its input. 

10 

11Sections and blocks not mentioned by any op are physically copied 

12through unchanged — there is no LLM-mediated re-emission of unchanged 

13text, so prose drift is structurally impossible. 

14 

15Why operations and not "output the new structured doc": 

16 

17- "Output the new doc" still asks the LLM to *generate* every section's 

18 blocks, including ones it didn't intend to modify, which gives it 

19 the same opportunity to drift. 

20- Operations make the no-change case mechanical: zero ops → identical 

21 doc. 

22- Operations are auditable: each refresh produces a log of exactly 

23 what changed, useful for debugging the LLM's behaviour and 

24 explaining diffs. 

25 

26Failure modes are by design conservative: an operation list that fails 

27to parse against the Pydantic schema, or an LLM that returns invalid 

28ops, results in zero changes — the document stays as-is. The 

29structure can only get better or stay the same per refresh, never get 

30worse. 

31 

32Ported from Hindsight ``hindsight_api/engine/reflect/delta_ops.py`` 

33under the project's MIT licence. 

34""" 

35 

36from __future__ import annotations 

37 

38import logging 

39from typing import Annotated, Any, Literal, Union 

40 

41from pydantic import BaseModel, ConfigDict, Field 

42 

43from astrocyte.pipeline.structured_doc import ( 

44 Block, 

45 Section, 

46 StructuredDocument, 

47 make_unique_id, 

48 slugify_heading, 

49) 

50 

51_logger = logging.getLogger("astrocyte.pipeline.delta_ops") 

52 

53 

54# Op payloads --------------------------------------------------------------- 

55 

56 

57class _OpBase(BaseModel): 

58 model_config = ConfigDict(extra="forbid") 

59 

60 

61class AppendBlockOp(_OpBase): 

62 """Add a new block at the end of an existing section.""" 

63 

64 op: Literal["append_block"] = "append_block" 

65 section_id: str 

66 block: Block 

67 

68 

69class InsertBlockOp(_OpBase): 

70 """Insert a new block at ``index`` in an existing section. 

71 

72 ``index`` may equal ``len(section.blocks)`` (append) but not be greater. 

73 """ 

74 

75 op: Literal["insert_block"] = "insert_block" 

76 section_id: str 

77 index: int = Field(ge=0) 

78 block: Block 

79 

80 

81class ReplaceBlockOp(_OpBase): 

82 """Replace the block at ``index`` of an existing section.""" 

83 

84 op: Literal["replace_block"] = "replace_block" 

85 section_id: str 

86 index: int = Field(ge=0) 

87 block: Block 

88 

89 

90class RemoveBlockOp(_OpBase): 

91 """Remove the block at ``index`` of an existing section.""" 

92 

93 op: Literal["remove_block"] = "remove_block" 

94 section_id: str 

95 index: int = Field(ge=0) 

96 

97 

98class AddSectionOp(_OpBase): 

99 """Add a brand-new section. 

100 

101 ``after_section_id`` is optional; when omitted the new section is 

102 appended at the end. ``new_id`` is optional; when omitted we 

103 slugify the heading and disambiguate against existing IDs. 

104 """ 

105 

106 op: Literal["add_section"] = "add_section" 

107 heading: str 

108 level: int = Field(default=2, ge=1, le=6) 

109 blocks: list[Block] = Field(default_factory=list) 

110 after_section_id: str | None = None 

111 new_id: str | None = None 

112 

113 

114class RemoveSectionOp(_OpBase): 

115 """Remove an entire section by id.""" 

116 

117 op: Literal["remove_section"] = "remove_section" 

118 section_id: str 

119 

120 

121class ReplaceSectionBlocksOp(_OpBase): 

122 """Replace all blocks of a section in one go. 

123 

124 Used when most of a section's contents are stale and rebuilding it 

125 as a unit is clearer than emitting many block-level ops. The 

126 section's heading and id are preserved. 

127 """ 

128 

129 op: Literal["replace_section_blocks"] = "replace_section_blocks" 

130 section_id: str 

131 blocks: list[Block] = Field(default_factory=list) 

132 

133 

134class RenameSectionOp(_OpBase): 

135 """Rename a section's heading. The id is unchanged so future ops still resolve.""" 

136 

137 op: Literal["rename_section"] = "rename_section" 

138 section_id: str 

139 new_heading: str 

140 

141 

142Operation = Annotated[ 

143 Union[ 

144 AppendBlockOp, 

145 InsertBlockOp, 

146 ReplaceBlockOp, 

147 RemoveBlockOp, 

148 AddSectionOp, 

149 RemoveSectionOp, 

150 ReplaceSectionBlocksOp, 

151 RenameSectionOp, 

152 ], 

153 Field(discriminator="op"), 

154] 

155 

156 

157class DeltaOperationList(BaseModel): 

158 """Container for the operations produced by an LLM delta call.""" 

159 

160 model_config = ConfigDict(extra="forbid") 

161 operations: list[Operation] = Field(default_factory=list) 

162 

163 

164# Application --------------------------------------------------------------- 

165 

166 

167class AppliedDelta(BaseModel): 

168 """Outcome of applying a list of operations to a document. 

169 

170 ``applied`` is a per-op summary (audit trail). ``skipped`` is a 

171 parallel list for ops the validator dropped, each carrying a 

172 ``reason`` so the refresh path can log why the LLM's output was 

173 rejected without spinning the cycle of "no idea what failed". 

174 """ 

175 

176 model_config = ConfigDict(extra="forbid") 

177 

178 document: StructuredDocument 

179 applied: list[dict[str, Any]] = Field(default_factory=list) 

180 skipped: list[dict[str, Any]] = Field(default_factory=list) 

181 

182 @property 

183 def changed(self) -> bool: 

184 return len(self.applied) > 0 

185 

186 

187def _op_summary(op: Operation) -> dict[str, Any]: 

188 """Compact dict suitable for the audit trail. 

189 

190 Drops the verbose ``block`` / ``blocks`` payloads — the audit 

191 cares about which sections were modified, not what content was 

192 inserted. Use the full :class:`AppliedDelta.document` to inspect 

193 the new state. 

194 """ 

195 data = op.model_dump() 

196 return {k: v for k, v in data.items() if k != "block" and k != "blocks"} | { 

197 "op": data["op"], 

198 } 

199 

200 

201def apply_operations( 

202 doc: StructuredDocument, 

203 operations: list[Operation], 

204) -> AppliedDelta: 

205 """Apply a list of operations to a document, returning a new document. 

206 

207 The original document is never mutated. Invalid operations (unknown 

208 section, out-of-range index, name collision when adding a section) 

209 are skipped and recorded in ``skipped`` with a ``reason`` string. 

210 """ 

211 new_doc = doc.model_copy(deep=True) 

212 applied: list[dict[str, Any]] = [] 

213 skipped: list[dict[str, Any]] = [] 

214 

215 def skip(op: Operation, reason: str) -> None: 

216 entry = _op_summary(op) 

217 entry["reason"] = reason 

218 skipped.append(entry) 

219 _logger.debug("delta_ops: skipping op %s", entry) 

220 

221 for op in operations: 

222 if isinstance(op, AppendBlockOp): 

223 section = new_doc.section_by_id(op.section_id) 

224 if section is None: 

225 skip(op, f"unknown section_id: {op.section_id}") 

226 continue 

227 section.blocks.append(op.block) 

228 applied.append(_op_summary(op)) 

229 continue 

230 

231 if isinstance(op, InsertBlockOp): 

232 section = new_doc.section_by_id(op.section_id) 

233 if section is None: 

234 skip(op, f"unknown section_id: {op.section_id}") 

235 continue 

236 if op.index > len(section.blocks): 

237 skip(op, f"index out of range: {op.index} > {len(section.blocks)}") 

238 continue 

239 section.blocks.insert(op.index, op.block) 

240 applied.append(_op_summary(op)) 

241 continue 

242 

243 if isinstance(op, ReplaceBlockOp): 

244 section = new_doc.section_by_id(op.section_id) 

245 if section is None: 

246 skip(op, f"unknown section_id: {op.section_id}") 

247 continue 

248 if op.index >= len(section.blocks): 

249 skip(op, f"index out of range: {op.index} >= {len(section.blocks)}") 

250 continue 

251 section.blocks[op.index] = op.block 

252 applied.append(_op_summary(op)) 

253 continue 

254 

255 if isinstance(op, RemoveBlockOp): 

256 section = new_doc.section_by_id(op.section_id) 

257 if section is None: 

258 skip(op, f"unknown section_id: {op.section_id}") 

259 continue 

260 if op.index >= len(section.blocks): 

261 skip(op, f"index out of range: {op.index} >= {len(section.blocks)}") 

262 continue 

263 section.blocks.pop(op.index) 

264 applied.append(_op_summary(op)) 

265 continue 

266 

267 if isinstance(op, AddSectionOp): 

268 existing_ids = {s.id for s in new_doc.sections} 

269 base_id = op.new_id or slugify_heading(op.heading) 

270 section_id = make_unique_id(base_id, existing_ids) 

271 new_section = Section( 

272 id=section_id, 

273 heading=op.heading, 

274 level=op.level, 

275 blocks=list(op.blocks), 

276 ) 

277 if op.after_section_id is None: 

278 new_doc.sections.append(new_section) 

279 else: 

280 idx = new_doc.section_index(op.after_section_id) 

281 if idx is None: 

282 skip(op, f"unknown after_section_id: {op.after_section_id}") 

283 continue 

284 new_doc.sections.insert(idx + 1, new_section) 

285 entry = _op_summary(op) 

286 entry["assigned_id"] = section_id 

287 applied.append(entry) 

288 continue 

289 

290 if isinstance(op, RemoveSectionOp): 

291 idx = new_doc.section_index(op.section_id) 

292 if idx is None: 

293 skip(op, f"unknown section_id: {op.section_id}") 

294 continue 

295 new_doc.sections.pop(idx) 

296 applied.append(_op_summary(op)) 

297 continue 

298 

299 if isinstance(op, ReplaceSectionBlocksOp): 

300 section = new_doc.section_by_id(op.section_id) 

301 if section is None: 

302 skip(op, f"unknown section_id: {op.section_id}") 

303 continue 

304 section.blocks = list(op.blocks) 

305 applied.append(_op_summary(op)) 

306 continue 

307 

308 if isinstance(op, RenameSectionOp): 

309 section = new_doc.section_by_id(op.section_id) 

310 if section is None: 

311 skip(op, f"unknown section_id: {op.section_id}") 

312 continue 

313 section.heading = op.new_heading 

314 applied.append(_op_summary(op)) 

315 continue 

316 

317 skip(op, f"unhandled op type: {type(op).__name__}") # pragma: no cover 

318 

319 return AppliedDelta(document=new_doc, applied=applied, skipped=skipped)