Coverage for astrocyte/pipeline/directive_compile.py: 20%

89 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""M17 follow-up — directive-style preference compile. 

2 

3Distils raw preference-type facts into a small set of imperative 

4directives stored as :class:`MentalModel` rows with ``kind="directive"``. 

5Hindsight-parity: their ``mental_models.subtype='directive'`` rows are 

6user-curated hard rules that the answerer treats as authoritative; 

7auto-extracted preferences (subtype='preference') are advisory and 

8rarely surfaced verbatim in the system prompt. 

9 

10Why a separate "directive" tier rather than re-enabling the M14.6 

11preference surface that M14.7 reverted: 

12 

13- M14.6 surfaced every consolidated preference (typically 8-12 rows 

14 per document) on EVERY question. That bloated the system prompt and 

15 caused cross-category regression on LME (single-session-user dropped 

16 5/5 → 1/5 as the answerer was overwhelmed by personalization context 

17 on non-preference questions). 

18- Hindsight's fix is to keep the consolidation pass (subtype='preference', 

19 advisory) AND add a tight "directive" tier capped at 3-5 rows of the 

20 most-confident, most-stable preferences. Only the directive tier 

21 surfaces verbatim. Result: the answerer gets sparse, high-confidence 

22 hard rules, and the verbose preference tier stays in the store for 

23 on-demand recall. 

24 

25Algorithm (one LLM call per document): 

26 

271. Pull all ``fact_type='preference'`` PageIndexFacts for the document. 

282. If fewer than 2 preference facts, skip (no consolidation signal — 

29 single-mention preferences are too noisy to promote to directives). 

303. Send to an LLM with a prompt asking for **up to 5** imperative 

31 directives, each with title + content phrased as a short rule the 

32 answerer can follow verbatim. The prompt explicitly rejects vague 

33 or single-mention preferences and asks the model to abstain when no 

34 stable signal exists. 

354. Save each as a :class:`MentalModel` with ``kind='directive'``, 

36 ``scope=f'document:{document_id}'``. Idempotent — skip when the 

37 document already has directive models. 

38 

39The bench's :meth:`AstrocyteClient.get_user_profile` surfaces both 

40``kind='general'`` and ``kind='directive'`` models in the 

41``## User Profile`` block. ``kind='preference'`` rows remain in the 

42store but are NOT surfaced in the profile (M14.7 revert preserved). 

43 

44Cost: 1 LLM call per document (gpt-4o-mini), ~$0.01. Marginal. 

45 

46See: 

47- docs/_design/m17-pageindex-ingestion.md (Conversation-Engine bench) 

48- ``astrocyte.pipeline.preference_compile`` — sibling that produces the 

49 larger ``kind='preference'`` pool this directive pass distils from 

50""" 

51 

52from __future__ import annotations 

53 

54import json 

55import logging 

56import re 

57from datetime import datetime, timezone 

58from typing import TYPE_CHECKING 

59 

60from astrocyte.types import MentalModel, Message 

61 

62if TYPE_CHECKING: 

63 from astrocyte.provider import LLMProvider, MentalModelStore 

64 from astrocyte.types import PageIndexFact 

65 

66_logger = logging.getLogger("astrocyte.pipeline.directive_compile") 

67 

68 

69_COMPILE_PROMPT = """\ 

70You are extracting a SMALL SET of HARD-RULE DIRECTIVES from a user's \ 

71stated preferences in a conversation transcript. The directives will be \ 

72injected verbatim into an answering assistant's system prompt — treat \ 

73them as authoritative rules the assistant must honour when answering \ 

74follow-up questions. 

75 

76Output a JSON object with one key, ``directives``, containing an array \ 

77of AT MOST 5 directive objects. Each directive has: 

78- "title": 2-5 word noun phrase naming the directive (e.g. \ 

79"Breakfast preference", "Dance studio") 

80- "content": ONE imperative sentence the assistant can follow. \ 

81Use the form "Prefer X over Y because Z." or "Avoid X; the user dislikes \ 

82it." or "Recommend X for context Z." Always include the specific entity \ 

83the user named. Keep under 25 words. 

84- "source_fact_ids": array of strings — the contributing raw fact_ids. 

85 

86STRICT RULES: 

87- Emit AT MOST 5 directives. Quality over quantity. Sparse is better \ 

88than verbose: surface only the most CONFIDENT, STABLE preferences. 

89- {min_facts_rule} 

90- Phrase content imperatively (do/don't, prefer/avoid). The assistant \ 

91will read it as a rule. 

92- DO NOT include fleeting / weak statements ("kind of liked it"). \ 

93Only stable preferences with explicit positive/negative sentiment. 

94- {empty_input_rule} 

95 

96Input preferences (raw extractions): 

97{prefs_block} 

98 

99OUTPUT MUST BE VALID JSON. No prose around it. 

100""" 

101 

102_MIN_FACTS_RULE_MULTI = ( 

103 "A directive must be backed by AT LEAST 2 distinct raw preference " 

104 "facts in the input. Drop single-mention preferences entirely." 

105) 

106_MIN_FACTS_RULE_SINGLE = ( 

107 "This document has only ONE session — single-mention preferences " 

108 "are admissible because no repeat signal is possible. Emit a " 

109 "directive for any clearly-stated preference even if it appears " 

110 "only once, as long as it carries explicit positive/negative " 

111 'sentiment (e.g. "no screens after 9:30pm", "avoid Y", ' 

112 '"prefer X"). Skip only vague / fleeting statements.' 

113) 

114_EMPTY_RULE_MULTI = ( 

115 "If the input has fewer than 2 distinct stable preferences, " 

116 'return ``{{"directives": []}}``. Returning zero is correct ' 

117 "when the signal is absent." 

118) 

119_EMPTY_RULE_SINGLE = ( 

120 "If the input has no clearly-stated preferences, return " 

121 '``{{"directives": []}}``. Returning zero is correct when the ' 

122 "signal is absent." 

123) 

124 

125 

126_MAX_DIRECTIVES = 5 

127_MIN_FACTS_TO_COMPILE = 2 

128 

129 

130def _slugify(text: str) -> str: 

131 s = text.lower().strip() 

132 s = re.sub(r"[^a-z0-9\s-]", "", s) 

133 s = re.sub(r"\s+", "-", s) 

134 return s[:60] or "dir" 

135 

136 

137def _format_pref_for_prompt(fact: PageIndexFact) -> str: 

138 parts = [f"id={fact.id}"] 

139 if fact.entities: 

140 parts.append(f"entities={','.join(fact.entities[:6])}") 

141 return f"[{', '.join(parts)}] {fact.text}" 

142 

143 

144async def compile_directives_for_document( 

145 *, 

146 mental_model_store: MentalModelStore, 

147 bank_id: str, 

148 document_id: str, 

149 facts: list[PageIndexFact], 

150 provider: LLMProvider, 

151 model: str | None = None, 

152 max_directives: int = _MAX_DIRECTIVES, 

153 n_sessions: int | None = None, 

154) -> list[str]: 

155 """Consolidate preference facts → at most ``max_directives`` directive 

156 mental models. Idempotent: skips when directive models already exist 

157 for the document. 

158 

159 ``n_sessions`` is the document's session count (caller-supplied so 

160 we don't need a store round-trip). When ``n_sessions == 1`` we lower 

161 the minimum-facts threshold to 1: in a single-session document, the 

162 ≥2-distinct-facts heuristic silently drops every single-mention 

163 preference (e.g. "no screens after 9:30pm" said once), which is the 

164 only signal we have for that document. The cross-document corpus 

165 that motivated the ≥2 threshold doesn't apply when there IS only 

166 one session. 

167 

168 Returns the list of ``model_id`` values for rows persisted (or 

169 already-present on repeat call). 

170 """ 

171 pref_facts = [f for f in facts if f.fact_type == "preference"] 

172 # Fix 2 (conv-run-4): single-session docs lower the threshold to 1 

173 # so single-mention preferences make it to a directive instead of 

174 # being dropped silently. The ≥2 threshold is a noise-reduction 

175 # heuristic for multi-session corpora — irrelevant when only one 

176 # session's worth of preferences exists for this document. 

177 is_single_session = n_sessions is not None and n_sessions <= 1 

178 min_threshold = 1 if is_single_session else _MIN_FACTS_TO_COMPILE 

179 if len(pref_facts) < min_threshold: 

180 _logger.debug( 

181 "directive_compile: doc=%s has %d preference facts (<%d, single_session=%s), skip", 

182 document_id, 

183 len(pref_facts), 

184 min_threshold, 

185 is_single_session, 

186 ) 

187 return [] 

188 

189 # Idempotency — skip if directives already exist for this document scope. 

190 try: 

191 existing = await mental_model_store.list( 

192 bank_id, 

193 scope=f"document:{document_id}", 

194 kind="directive", 

195 ) 

196 except TypeError: 

197 all_for_scope = await mental_model_store.list( 

198 bank_id, 

199 scope=f"document:{document_id}", 

200 ) 

201 existing = [m for m in all_for_scope if getattr(m, "kind", None) == "directive"] 

202 if existing: 

203 return [m.model_id for m in existing] 

204 

205 prefs_block = "\n".join(_format_pref_for_prompt(f) for f in pref_facts) 

206 msg = _COMPILE_PROMPT.format( 

207 prefs_block=prefs_block, 

208 min_facts_rule=(_MIN_FACTS_RULE_SINGLE if is_single_session else _MIN_FACTS_RULE_MULTI), 

209 empty_input_rule=(_EMPTY_RULE_SINGLE if is_single_session else _EMPTY_RULE_MULTI), 

210 ) 

211 

212 try: 

213 completion = await provider.complete( 

214 [Message(role="user", content=msg)], 

215 model=model, 

216 max_tokens=1200, 

217 temperature=0.0, 

218 response_format={"type": "json_object"}, 

219 ) 

220 except Exception as exc: # noqa: BLE001 

221 _logger.warning( 

222 "directive_compile.llm: call failed doc=%s (%s)", 

223 document_id, 

224 exc, 

225 ) 

226 return [] 

227 

228 try: 

229 data = json.loads(completion.text) 

230 except (json.JSONDecodeError, AttributeError) as exc: 

231 _logger.warning( 

232 "directive_compile.parse: bad JSON doc=%s (%s) text=%r", 

233 document_id, 

234 exc, 

235 getattr(completion, "text", "")[:200], 

236 ) 

237 return [] 

238 

239 items = data.get("directives") or [] 

240 if not isinstance(items, list): 

241 return [] 

242 

243 now = datetime.now(tz=timezone.utc) 

244 scope = f"document:{document_id}" 

245 saved: list[str] = [] 

246 seen_ids: set[str] = set() 

247 

248 # M40 — index source facts so MM construction can attach per-source 

249 # evidence timestamps in conversation time (mirrors preference_compile). 

250 _fact_by_id = {getattr(f, "fact_id", None): f for f in pref_facts} 

251 

252 def _ts_for_fact_id(fid: str) -> datetime: 

253 f = _fact_by_id.get(fid) 

254 if f is None: 

255 return now 

256 return ( 

257 getattr(f, "mentioned_at", None) 

258 or getattr(f, "occurred_start", None) 

259 or now 

260 ) 

261 

262 for raw in items[:max_directives]: 

263 if not isinstance(raw, dict): 

264 continue 

265 title = str(raw.get("title", "")).strip() 

266 content = str(raw.get("content", "")).strip() 

267 if not title or not content: 

268 continue 

269 source_fact_ids = raw.get("source_fact_ids") or [] 

270 if not isinstance(source_fact_ids, list): 

271 source_fact_ids = [] 

272 source_fact_ids = [str(s) for s in source_fact_ids if isinstance(s, (str, int))] 

273 slug = _slugify(title) 

274 model_id = f"dir:{document_id[:8]}:{slug}" 

275 if model_id in seen_ids: 

276 continue 

277 seen_ids.add(model_id) 

278 

279 source_timestamps = [_ts_for_fact_id(sid) for sid in source_fact_ids] 

280 

281 mm = MentalModel( 

282 model_id=model_id, 

283 bank_id=bank_id, 

284 title=title, 

285 content=content, 

286 scope=scope, 

287 source_ids=source_fact_ids, 

288 revision=1, 

289 refreshed_at=now, 

290 kind="directive", 

291 source_timestamps=source_timestamps, 

292 ) 

293 try: 

294 await mental_model_store.upsert(mm, bank_id) 

295 saved.append(model_id) 

296 except Exception as exc: # noqa: BLE001 

297 _logger.warning( 

298 "directive_compile.save: doc=%s id=%s failed (%s)", 

299 document_id, 

300 model_id, 

301 exc, 

302 ) 

303 

304 _logger.info( 

305 "directive_compile: doc=%s produced %d directives from %d preference facts", 

306 document_id, 

307 len(saved), 

308 len(pref_facts), 

309 ) 

310 return saved