Coverage for astrocyte/pipeline/document_postprocess.py: 88%
68 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""Document-level retain post-processing — core entry point.
3After all per-section fact extraction completes for a document, run
4zero-or-more document-wide passes:
6 - **episodic_extract.tag_episodic_facts** — tag facts matching
7 episodic-verb patterns with the ``EPISODIC_MARKER`` entity, so the
8 recall path can surface them via ``search_facts_by_entity``.
9 - **preference_compile.compile_preferences_for_document** — distill
10 ``fact_type='preference'`` facts into ``MentalModel(kind='preference')``
11 rows for advisory recall.
12 - **directive_compile.compile_directives_for_document** — further
13 distill preferences into ≤5 imperative directives stored as
14 ``MentalModel(kind='directive')`` for hard-rule surface.
16Each pass is gated by its config flag (``enabled: bool``). The function
17is the single core call site any retain caller (bench harness today,
18orchestrator hook tomorrow) uses to opt into these features.
20Why a single function rather than 3 separate calls in each caller:
22- Callers stay declarative: pass the config, get whatever passes are
23 enabled. No re-implementing the gating logic per caller.
24- New post-processors are added here; callers benefit automatically.
25- The implicit pipeline order (tag → preference → directive) is
26 encoded in one place so dependent passes (directive needs the
27 preference compilation to have produced facts to read) are
28 guaranteed to run in the right sequence.
30Each pass is failure-isolated — a crash in one doesn't prevent the
31others from running; the failure is logged and surfaced in the result.
33Public API:
34 run_document_postprocess(*, facts, store, mental_model_store,
35 provider, bank_id, document_id, config,
36 model, n_sessions=None)
37 -> DocumentPostprocessResult
38"""
40from __future__ import annotations
42import logging
43from dataclasses import dataclass, field
44from typing import TYPE_CHECKING, Any
46if TYPE_CHECKING:
47 from astrocyte.config import AstrocyteConfig
48 from astrocyte.types import PageIndexFact
50_logger = logging.getLogger("astrocyte.pipeline.document_postprocess")
53@dataclass
54class DocumentPostprocessResult:
55 """Summary of which passes ran + their outputs.
57 ``ok`` is True iff every enabled pass completed without raising.
58 Per-pass failures are recorded in ``failures`` (one entry per
59 failing pass with name + error message).
60 """
62 episodic_tags_applied: int = 0
63 preferences_compiled: int = 0
64 directives_compiled: int = 0
65 passes_run: list[str] = field(default_factory=list)
66 passes_skipped: list[str] = field(default_factory=list)
67 failures: list[dict[str, Any]] = field(default_factory=list)
69 @property
70 def ok(self) -> bool:
71 return not self.failures
74async def run_document_postprocess(
75 *,
76 facts: list[PageIndexFact],
77 store: Any,
78 mental_model_store: Any | None,
79 provider: Any,
80 bank_id: str | None,
81 document_id: str,
82 config: AstrocyteConfig,
83 model: str | None = None,
84 n_sessions: int | None = None,
85) -> DocumentPostprocessResult:
86 """Run document-level retain post-processing.
88 Each pass is independently gated by its config flag. Order is fixed:
89 1. ``config.episodic_extract.enabled`` → tag episodic facts in-place
90 2. ``config.preference_compile.enabled`` → compile preference MentalModels
91 3. ``config.directive_compile.enabled`` → compile directive MentalModels
93 Order matters: tagging (in-place on facts) must happen BEFORE the
94 caller persists facts so the EPISODIC_MARKER entity is included in
95 ``save_facts``. The compile passes operate on the in-memory
96 ``facts`` list directly (no store read), so they can run before or
97 after the caller's save — but tag-then-save-then-compile is the
98 expected lifecycle for retain callers.
100 Args:
101 facts: All extracted facts for the document. Tagged in-place
102 when episodic_extract.enabled.
103 store: PageIndexStore SPI handle.
104 mental_model_store: Required when preference_compile or
105 directive_compile are enabled; pass None when neither is.
106 provider: LLM provider for compile passes. Required when either
107 compile pass is enabled.
108 bank_id: Bank scoping. Required when compile passes are enabled.
109 document_id: The document being post-processed.
110 config: AstrocyteConfig. The function reads its
111 ``episodic_extract``, ``preference_compile`` (if exists),
112 ``directive_compile`` sub-configs.
113 model: LLM model for compile passes. Defaults to None (caller's
114 provider default).
115 n_sessions: Optional hint to directive_compile so it lowers its
116 ≥2-mentions threshold for single-session docs.
117 """
118 result = DocumentPostprocessResult()
120 # ─── 1. episodic_extract.tag_episodic_facts (in-place on facts list) ───
121 if _is_enabled(config, "episodic_extract") and facts:
122 try:
123 from astrocyte.pipeline.episodic_extract import ( # noqa: PLC0415
124 tag_episodic_facts,
125 )
127 tagged = tag_episodic_facts(facts)
128 result.episodic_tags_applied = tagged
129 result.passes_run.append("episodic_extract")
130 except Exception as exc: # noqa: BLE001
131 _logger.warning(
132 "document_postprocess: episodic_extract failed doc=%s: %s",
133 document_id, exc,
134 )
135 result.failures.append({"pass": "episodic_extract", "error": str(exc)})
136 elif _is_enabled(config, "episodic_extract"):
137 result.passes_skipped.append("episodic_extract (empty facts)")
138 else:
139 result.passes_skipped.append("episodic_extract (disabled)")
141 # ─── 2. preference_compile.compile_preferences_for_document ───
142 # (Operates on the in-memory ``facts`` list; no store read needed.)
143 pref_enabled = _is_enabled_pref(config)
144 if pref_enabled and mental_model_store is not None and provider is not None and bank_id:
145 try:
146 from astrocyte.pipeline.preference_compile import ( # noqa: PLC0415
147 compile_preferences_for_document,
148 )
150 pref_ids = await compile_preferences_for_document(
151 mental_model_store=mental_model_store,
152 bank_id=bank_id,
153 document_id=document_id,
154 facts=facts,
155 provider=provider,
156 model=model,
157 )
158 result.preferences_compiled = len(pref_ids)
159 result.passes_run.append("preference_compile")
160 except Exception as exc: # noqa: BLE001
161 _logger.warning(
162 "document_postprocess: preference_compile failed doc=%s: %s",
163 document_id, exc,
164 )
165 result.failures.append({"pass": "preference_compile", "error": str(exc)})
166 elif pref_enabled:
167 result.passes_skipped.append("preference_compile (missing deps)")
168 else:
169 result.passes_skipped.append("preference_compile (disabled)")
171 # ─── 3. directive_compile.compile_directives_for_document ───
172 # DEPRECATED (M19, 2026-05-18): bench evidence (M18b B2 × 2 runs)
173 # showed replicated -30pp SSP regression — the compressed directives
174 # override the answerer's access to original preference nuance.
175 # Hindsight architecture has directives as USER-AUTHORED via the
176 # create_directive MCP tool, not LLM-compiled. Flag stays gated OFF
177 # by default; setting True emits a runtime warning here.
178 if _is_enabled(config, "directive_compile"):
179 _logger.warning(
180 "directive_compile.enabled=True is DEPRECATED — bench evidence "
181 "shows -30pp SSP regression; Hindsight architecture uses "
182 "user-authored directives via create_directive MCP tool. "
183 "See docs/_design/m19-prompt-routing.md.",
184 )
185 if (
186 _is_enabled(config, "directive_compile")
187 and mental_model_store is not None
188 and provider is not None
189 and bank_id
190 and facts
191 ):
192 try:
193 from astrocyte.pipeline.directive_compile import ( # noqa: PLC0415
194 compile_directives_for_document,
195 )
197 directive_ids = await compile_directives_for_document(
198 mental_model_store=mental_model_store,
199 bank_id=bank_id,
200 document_id=document_id,
201 facts=facts,
202 provider=provider,
203 model=model,
204 n_sessions=n_sessions,
205 )
206 result.directives_compiled = len(directive_ids)
207 result.passes_run.append("directive_compile")
208 except Exception as exc: # noqa: BLE001
209 _logger.warning(
210 "document_postprocess: directive_compile failed doc=%s: %s",
211 document_id, exc,
212 )
213 result.failures.append({"pass": "directive_compile", "error": str(exc)})
214 elif _is_enabled(config, "directive_compile"):
215 result.passes_skipped.append("directive_compile (missing deps)")
216 else:
217 result.passes_skipped.append("directive_compile (disabled)")
219 return result
222def _is_enabled(config: AstrocyteConfig, sub: str) -> bool:
223 """Return True if ``config.<sub>.enabled`` is True. Defensive: returns False if missing."""
224 sub_cfg = getattr(config, sub, None)
225 if sub_cfg is None:
226 return False
227 return bool(getattr(sub_cfg, "enabled", False))
230def _is_enabled_pref(config: AstrocyteConfig) -> bool:
231 """Preference-compile gate — defaults to True if PreferenceCompileConfig
232 doesn't exist yet (backward compat with current always-on bench behavior)."""
233 sub_cfg = getattr(config, "preference_compile", None)
234 if sub_cfg is None:
235 return True
236 return bool(getattr(sub_cfg, "enabled", True))