Coverage for astrocyte/pipeline/delta_ops.py: 95%
151 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""Delta operations for structured mental models (M21).
3The LLM's job during a mental-model refresh is to emit a list of these
4operations, each targeting an existing section (by id) or referencing a
5position relative to one. :func:`apply_operations` validates and
6applies each op in turn against a copy of the document; invalid ops
7(unknown ``section_id``, out-of-range ``index``, malformed payloads)
8are dropped with a debug-friendly reason — the document never gets
9worse than its input.
11Sections and blocks not mentioned by any op are physically copied
12through unchanged — there is no LLM-mediated re-emission of unchanged
13text, so prose drift is structurally impossible.
15Why operations and not "output the new structured doc":
17- "Output the new doc" still asks the LLM to *generate* every section's
18 blocks, including ones it didn't intend to modify, which gives it
19 the same opportunity to drift.
20- Operations make the no-change case mechanical: zero ops → identical
21 doc.
22- Operations are auditable: each refresh produces a log of exactly
23 what changed, useful for debugging the LLM's behaviour and
24 explaining diffs.
26Failure modes are by design conservative: an operation list that fails
27to parse against the Pydantic schema, or an LLM that returns invalid
28ops, results in zero changes — the document stays as-is. The
29structure can only get better or stay the same per refresh, never get
30worse.
32Ported from Hindsight ``hindsight_api/engine/reflect/delta_ops.py``
33under the project's MIT licence.
34"""
36from __future__ import annotations
38import logging
39from typing import Annotated, Any, Literal, Union
41from pydantic import BaseModel, ConfigDict, Field
43from astrocyte.pipeline.structured_doc import (
44 Block,
45 Section,
46 StructuredDocument,
47 make_unique_id,
48 slugify_heading,
49)
51_logger = logging.getLogger("astrocyte.pipeline.delta_ops")
54# Op payloads ---------------------------------------------------------------
57class _OpBase(BaseModel):
58 model_config = ConfigDict(extra="forbid")
61class AppendBlockOp(_OpBase):
62 """Add a new block at the end of an existing section."""
64 op: Literal["append_block"] = "append_block"
65 section_id: str
66 block: Block
69class InsertBlockOp(_OpBase):
70 """Insert a new block at ``index`` in an existing section.
72 ``index`` may equal ``len(section.blocks)`` (append) but not be greater.
73 """
75 op: Literal["insert_block"] = "insert_block"
76 section_id: str
77 index: int = Field(ge=0)
78 block: Block
81class ReplaceBlockOp(_OpBase):
82 """Replace the block at ``index`` of an existing section."""
84 op: Literal["replace_block"] = "replace_block"
85 section_id: str
86 index: int = Field(ge=0)
87 block: Block
90class RemoveBlockOp(_OpBase):
91 """Remove the block at ``index`` of an existing section."""
93 op: Literal["remove_block"] = "remove_block"
94 section_id: str
95 index: int = Field(ge=0)
98class AddSectionOp(_OpBase):
99 """Add a brand-new section.
101 ``after_section_id`` is optional; when omitted the new section is
102 appended at the end. ``new_id`` is optional; when omitted we
103 slugify the heading and disambiguate against existing IDs.
104 """
106 op: Literal["add_section"] = "add_section"
107 heading: str
108 level: int = Field(default=2, ge=1, le=6)
109 blocks: list[Block] = Field(default_factory=list)
110 after_section_id: str | None = None
111 new_id: str | None = None
114class RemoveSectionOp(_OpBase):
115 """Remove an entire section by id."""
117 op: Literal["remove_section"] = "remove_section"
118 section_id: str
121class ReplaceSectionBlocksOp(_OpBase):
122 """Replace all blocks of a section in one go.
124 Used when most of a section's contents are stale and rebuilding it
125 as a unit is clearer than emitting many block-level ops. The
126 section's heading and id are preserved.
127 """
129 op: Literal["replace_section_blocks"] = "replace_section_blocks"
130 section_id: str
131 blocks: list[Block] = Field(default_factory=list)
134class RenameSectionOp(_OpBase):
135 """Rename a section's heading. The id is unchanged so future ops still resolve."""
137 op: Literal["rename_section"] = "rename_section"
138 section_id: str
139 new_heading: str
142Operation = Annotated[
143 Union[
144 AppendBlockOp,
145 InsertBlockOp,
146 ReplaceBlockOp,
147 RemoveBlockOp,
148 AddSectionOp,
149 RemoveSectionOp,
150 ReplaceSectionBlocksOp,
151 RenameSectionOp,
152 ],
153 Field(discriminator="op"),
154]
157class DeltaOperationList(BaseModel):
158 """Container for the operations produced by an LLM delta call."""
160 model_config = ConfigDict(extra="forbid")
161 operations: list[Operation] = Field(default_factory=list)
164# Application ---------------------------------------------------------------
167class AppliedDelta(BaseModel):
168 """Outcome of applying a list of operations to a document.
170 ``applied`` is a per-op summary (audit trail). ``skipped`` is a
171 parallel list for ops the validator dropped, each carrying a
172 ``reason`` so the refresh path can log why the LLM's output was
173 rejected without spinning the cycle of "no idea what failed".
174 """
176 model_config = ConfigDict(extra="forbid")
178 document: StructuredDocument
179 applied: list[dict[str, Any]] = Field(default_factory=list)
180 skipped: list[dict[str, Any]] = Field(default_factory=list)
182 @property
183 def changed(self) -> bool:
184 return len(self.applied) > 0
187def _op_summary(op: Operation) -> dict[str, Any]:
188 """Compact dict suitable for the audit trail.
190 Drops the verbose ``block`` / ``blocks`` payloads — the audit
191 cares about which sections were modified, not what content was
192 inserted. Use the full :class:`AppliedDelta.document` to inspect
193 the new state.
194 """
195 data = op.model_dump()
196 return {k: v for k, v in data.items() if k != "block" and k != "blocks"} | {
197 "op": data["op"],
198 }
201def apply_operations(
202 doc: StructuredDocument,
203 operations: list[Operation],
204) -> AppliedDelta:
205 """Apply a list of operations to a document, returning a new document.
207 The original document is never mutated. Invalid operations (unknown
208 section, out-of-range index, name collision when adding a section)
209 are skipped and recorded in ``skipped`` with a ``reason`` string.
210 """
211 new_doc = doc.model_copy(deep=True)
212 applied: list[dict[str, Any]] = []
213 skipped: list[dict[str, Any]] = []
215 def skip(op: Operation, reason: str) -> None:
216 entry = _op_summary(op)
217 entry["reason"] = reason
218 skipped.append(entry)
219 _logger.debug("delta_ops: skipping op %s", entry)
221 for op in operations:
222 if isinstance(op, AppendBlockOp):
223 section = new_doc.section_by_id(op.section_id)
224 if section is None:
225 skip(op, f"unknown section_id: {op.section_id}")
226 continue
227 section.blocks.append(op.block)
228 applied.append(_op_summary(op))
229 continue
231 if isinstance(op, InsertBlockOp):
232 section = new_doc.section_by_id(op.section_id)
233 if section is None:
234 skip(op, f"unknown section_id: {op.section_id}")
235 continue
236 if op.index > len(section.blocks):
237 skip(op, f"index out of range: {op.index} > {len(section.blocks)}")
238 continue
239 section.blocks.insert(op.index, op.block)
240 applied.append(_op_summary(op))
241 continue
243 if isinstance(op, ReplaceBlockOp):
244 section = new_doc.section_by_id(op.section_id)
245 if section is None:
246 skip(op, f"unknown section_id: {op.section_id}")
247 continue
248 if op.index >= len(section.blocks):
249 skip(op, f"index out of range: {op.index} >= {len(section.blocks)}")
250 continue
251 section.blocks[op.index] = op.block
252 applied.append(_op_summary(op))
253 continue
255 if isinstance(op, RemoveBlockOp):
256 section = new_doc.section_by_id(op.section_id)
257 if section is None:
258 skip(op, f"unknown section_id: {op.section_id}")
259 continue
260 if op.index >= len(section.blocks):
261 skip(op, f"index out of range: {op.index} >= {len(section.blocks)}")
262 continue
263 section.blocks.pop(op.index)
264 applied.append(_op_summary(op))
265 continue
267 if isinstance(op, AddSectionOp):
268 existing_ids = {s.id for s in new_doc.sections}
269 base_id = op.new_id or slugify_heading(op.heading)
270 section_id = make_unique_id(base_id, existing_ids)
271 new_section = Section(
272 id=section_id,
273 heading=op.heading,
274 level=op.level,
275 blocks=list(op.blocks),
276 )
277 if op.after_section_id is None:
278 new_doc.sections.append(new_section)
279 else:
280 idx = new_doc.section_index(op.after_section_id)
281 if idx is None:
282 skip(op, f"unknown after_section_id: {op.after_section_id}")
283 continue
284 new_doc.sections.insert(idx + 1, new_section)
285 entry = _op_summary(op)
286 entry["assigned_id"] = section_id
287 applied.append(entry)
288 continue
290 if isinstance(op, RemoveSectionOp):
291 idx = new_doc.section_index(op.section_id)
292 if idx is None:
293 skip(op, f"unknown section_id: {op.section_id}")
294 continue
295 new_doc.sections.pop(idx)
296 applied.append(_op_summary(op))
297 continue
299 if isinstance(op, ReplaceSectionBlocksOp):
300 section = new_doc.section_by_id(op.section_id)
301 if section is None:
302 skip(op, f"unknown section_id: {op.section_id}")
303 continue
304 section.blocks = list(op.blocks)
305 applied.append(_op_summary(op))
306 continue
308 if isinstance(op, RenameSectionOp):
309 section = new_doc.section_by_id(op.section_id)
310 if section is None:
311 skip(op, f"unknown section_id: {op.section_id}")
312 continue
313 section.heading = op.new_heading
314 applied.append(_op_summary(op))
315 continue
317 skip(op, f"unhandled op type: {type(op).__name__}") # pragma: no cover
319 return AppliedDelta(document=new_doc, applied=applied, skipped=skipped)