Coverage for astrocyte/pipeline/document_postprocess.py: 88%

68 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""Document-level retain post-processing — core entry point. 

2 

3After all per-section fact extraction completes for a document, run 

4zero-or-more document-wide passes: 

5 

6 - **episodic_extract.tag_episodic_facts** — tag facts matching 

7 episodic-verb patterns with the ``EPISODIC_MARKER`` entity, so the 

8 recall path can surface them via ``search_facts_by_entity``. 

9 - **preference_compile.compile_preferences_for_document** — distill 

10 ``fact_type='preference'`` facts into ``MentalModel(kind='preference')`` 

11 rows for advisory recall. 

12 - **directive_compile.compile_directives_for_document** — further 

13 distill preferences into ≤5 imperative directives stored as 

14 ``MentalModel(kind='directive')`` for hard-rule surface. 

15 

16Each pass is gated by its config flag (``enabled: bool``). The function 

17is the single core call site any retain caller (bench harness today, 

18orchestrator hook tomorrow) uses to opt into these features. 

19 

20Why a single function rather than 3 separate calls in each caller: 

21 

22- Callers stay declarative: pass the config, get whatever passes are 

23 enabled. No re-implementing the gating logic per caller. 

24- New post-processors are added here; callers benefit automatically. 

25- The implicit pipeline order (tag → preference → directive) is 

26 encoded in one place so dependent passes (directive needs the 

27 preference compilation to have produced facts to read) are 

28 guaranteed to run in the right sequence. 

29 

30Each pass is failure-isolated — a crash in one doesn't prevent the 

31others from running; the failure is logged and surfaced in the result. 

32 

33Public API: 

34 run_document_postprocess(*, facts, store, mental_model_store, 

35 provider, bank_id, document_id, config, 

36 model, n_sessions=None) 

37 -> DocumentPostprocessResult 

38""" 

39 

40from __future__ import annotations 

41 

42import logging 

43from dataclasses import dataclass, field 

44from typing import TYPE_CHECKING, Any 

45 

46if TYPE_CHECKING: 

47 from astrocyte.config import AstrocyteConfig 

48 from astrocyte.types import PageIndexFact 

49 

50_logger = logging.getLogger("astrocyte.pipeline.document_postprocess") 

51 

52 

53@dataclass 

54class DocumentPostprocessResult: 

55 """Summary of which passes ran + their outputs. 

56 

57 ``ok`` is True iff every enabled pass completed without raising. 

58 Per-pass failures are recorded in ``failures`` (one entry per 

59 failing pass with name + error message). 

60 """ 

61 

62 episodic_tags_applied: int = 0 

63 preferences_compiled: int = 0 

64 directives_compiled: int = 0 

65 passes_run: list[str] = field(default_factory=list) 

66 passes_skipped: list[str] = field(default_factory=list) 

67 failures: list[dict[str, Any]] = field(default_factory=list) 

68 

69 @property 

70 def ok(self) -> bool: 

71 return not self.failures 

72 

73 

74async def run_document_postprocess( 

75 *, 

76 facts: list[PageIndexFact], 

77 store: Any, 

78 mental_model_store: Any | None, 

79 provider: Any, 

80 bank_id: str | None, 

81 document_id: str, 

82 config: AstrocyteConfig, 

83 model: str | None = None, 

84 n_sessions: int | None = None, 

85) -> DocumentPostprocessResult: 

86 """Run document-level retain post-processing. 

87 

88 Each pass is independently gated by its config flag. Order is fixed: 

89 1. ``config.episodic_extract.enabled`` → tag episodic facts in-place 

90 2. ``config.preference_compile.enabled`` → compile preference MentalModels 

91 3. ``config.directive_compile.enabled`` → compile directive MentalModels 

92 

93 Order matters: tagging (in-place on facts) must happen BEFORE the 

94 caller persists facts so the EPISODIC_MARKER entity is included in 

95 ``save_facts``. The compile passes operate on the in-memory 

96 ``facts`` list directly (no store read), so they can run before or 

97 after the caller's save — but tag-then-save-then-compile is the 

98 expected lifecycle for retain callers. 

99 

100 Args: 

101 facts: All extracted facts for the document. Tagged in-place 

102 when episodic_extract.enabled. 

103 store: PageIndexStore SPI handle. 

104 mental_model_store: Required when preference_compile or 

105 directive_compile are enabled; pass None when neither is. 

106 provider: LLM provider for compile passes. Required when either 

107 compile pass is enabled. 

108 bank_id: Bank scoping. Required when compile passes are enabled. 

109 document_id: The document being post-processed. 

110 config: AstrocyteConfig. The function reads its 

111 ``episodic_extract``, ``preference_compile`` (if exists), 

112 ``directive_compile`` sub-configs. 

113 model: LLM model for compile passes. Defaults to None (caller's 

114 provider default). 

115 n_sessions: Optional hint to directive_compile so it lowers its 

116 ≥2-mentions threshold for single-session docs. 

117 """ 

118 result = DocumentPostprocessResult() 

119 

120 # ─── 1. episodic_extract.tag_episodic_facts (in-place on facts list) ─── 

121 if _is_enabled(config, "episodic_extract") and facts: 

122 try: 

123 from astrocyte.pipeline.episodic_extract import ( # noqa: PLC0415 

124 tag_episodic_facts, 

125 ) 

126 

127 tagged = tag_episodic_facts(facts) 

128 result.episodic_tags_applied = tagged 

129 result.passes_run.append("episodic_extract") 

130 except Exception as exc: # noqa: BLE001 

131 _logger.warning( 

132 "document_postprocess: episodic_extract failed doc=%s: %s", 

133 document_id, exc, 

134 ) 

135 result.failures.append({"pass": "episodic_extract", "error": str(exc)}) 

136 elif _is_enabled(config, "episodic_extract"): 

137 result.passes_skipped.append("episodic_extract (empty facts)") 

138 else: 

139 result.passes_skipped.append("episodic_extract (disabled)") 

140 

141 # ─── 2. preference_compile.compile_preferences_for_document ─── 

142 # (Operates on the in-memory ``facts`` list; no store read needed.) 

143 pref_enabled = _is_enabled_pref(config) 

144 if pref_enabled and mental_model_store is not None and provider is not None and bank_id: 

145 try: 

146 from astrocyte.pipeline.preference_compile import ( # noqa: PLC0415 

147 compile_preferences_for_document, 

148 ) 

149 

150 pref_ids = await compile_preferences_for_document( 

151 mental_model_store=mental_model_store, 

152 bank_id=bank_id, 

153 document_id=document_id, 

154 facts=facts, 

155 provider=provider, 

156 model=model, 

157 ) 

158 result.preferences_compiled = len(pref_ids) 

159 result.passes_run.append("preference_compile") 

160 except Exception as exc: # noqa: BLE001 

161 _logger.warning( 

162 "document_postprocess: preference_compile failed doc=%s: %s", 

163 document_id, exc, 

164 ) 

165 result.failures.append({"pass": "preference_compile", "error": str(exc)}) 

166 elif pref_enabled: 

167 result.passes_skipped.append("preference_compile (missing deps)") 

168 else: 

169 result.passes_skipped.append("preference_compile (disabled)") 

170 

171 # ─── 3. directive_compile.compile_directives_for_document ─── 

172 # DEPRECATED (M19, 2026-05-18): bench evidence (M18b B2 × 2 runs) 

173 # showed replicated -30pp SSP regression — the compressed directives 

174 # override the answerer's access to original preference nuance. 

175 # Hindsight architecture has directives as USER-AUTHORED via the 

176 # create_directive MCP tool, not LLM-compiled. Flag stays gated OFF 

177 # by default; setting True emits a runtime warning here. 

178 if _is_enabled(config, "directive_compile"): 

179 _logger.warning( 

180 "directive_compile.enabled=True is DEPRECATED — bench evidence " 

181 "shows -30pp SSP regression; Hindsight architecture uses " 

182 "user-authored directives via create_directive MCP tool. " 

183 "See docs/_design/m19-prompt-routing.md.", 

184 ) 

185 if ( 

186 _is_enabled(config, "directive_compile") 

187 and mental_model_store is not None 

188 and provider is not None 

189 and bank_id 

190 and facts 

191 ): 

192 try: 

193 from astrocyte.pipeline.directive_compile import ( # noqa: PLC0415 

194 compile_directives_for_document, 

195 ) 

196 

197 directive_ids = await compile_directives_for_document( 

198 mental_model_store=mental_model_store, 

199 bank_id=bank_id, 

200 document_id=document_id, 

201 facts=facts, 

202 provider=provider, 

203 model=model, 

204 n_sessions=n_sessions, 

205 ) 

206 result.directives_compiled = len(directive_ids) 

207 result.passes_run.append("directive_compile") 

208 except Exception as exc: # noqa: BLE001 

209 _logger.warning( 

210 "document_postprocess: directive_compile failed doc=%s: %s", 

211 document_id, exc, 

212 ) 

213 result.failures.append({"pass": "directive_compile", "error": str(exc)}) 

214 elif _is_enabled(config, "directive_compile"): 

215 result.passes_skipped.append("directive_compile (missing deps)") 

216 else: 

217 result.passes_skipped.append("directive_compile (disabled)") 

218 

219 return result 

220 

221 

222def _is_enabled(config: AstrocyteConfig, sub: str) -> bool: 

223 """Return True if ``config.<sub>.enabled`` is True. Defensive: returns False if missing.""" 

224 sub_cfg = getattr(config, sub, None) 

225 if sub_cfg is None: 

226 return False 

227 return bool(getattr(sub_cfg, "enabled", False)) 

228 

229 

230def _is_enabled_pref(config: AstrocyteConfig) -> bool: 

231 """Preference-compile gate — defaults to True if PreferenceCompileConfig 

232 doesn't exist yet (backward compat with current always-on bench behavior).""" 

233 sub_cfg = getattr(config, "preference_compile", None) 

234 if sub_cfg is None: 

235 return True 

236 return bool(getattr(sub_cfg, "enabled", True))