Coverage for astrocyte/pipeline/directive_compile.py: 20%
89 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""M17 follow-up — directive-style preference compile.
3Distils raw preference-type facts into a small set of imperative
4directives stored as :class:`MentalModel` rows with ``kind="directive"``.
5Hindsight-parity: their ``mental_models.subtype='directive'`` rows are
6user-curated hard rules that the answerer treats as authoritative;
7auto-extracted preferences (subtype='preference') are advisory and
8rarely surfaced verbatim in the system prompt.
10Why a separate "directive" tier rather than re-enabling the M14.6
11preference surface that M14.7 reverted:
13- M14.6 surfaced every consolidated preference (typically 8-12 rows
14 per document) on EVERY question. That bloated the system prompt and
15 caused cross-category regression on LME (single-session-user dropped
16 5/5 → 1/5 as the answerer was overwhelmed by personalization context
17 on non-preference questions).
18- Hindsight's fix is to keep the consolidation pass (subtype='preference',
19 advisory) AND add a tight "directive" tier capped at 3-5 rows of the
20 most-confident, most-stable preferences. Only the directive tier
21 surfaces verbatim. Result: the answerer gets sparse, high-confidence
22 hard rules, and the verbose preference tier stays in the store for
23 on-demand recall.
25Algorithm (one LLM call per document):
271. Pull all ``fact_type='preference'`` PageIndexFacts for the document.
282. If fewer than 2 preference facts, skip (no consolidation signal —
29 single-mention preferences are too noisy to promote to directives).
303. Send to an LLM with a prompt asking for **up to 5** imperative
31 directives, each with title + content phrased as a short rule the
32 answerer can follow verbatim. The prompt explicitly rejects vague
33 or single-mention preferences and asks the model to abstain when no
34 stable signal exists.
354. Save each as a :class:`MentalModel` with ``kind='directive'``,
36 ``scope=f'document:{document_id}'``. Idempotent — skip when the
37 document already has directive models.
39The bench's :meth:`AstrocyteClient.get_user_profile` surfaces both
40``kind='general'`` and ``kind='directive'`` models in the
41``## User Profile`` block. ``kind='preference'`` rows remain in the
42store but are NOT surfaced in the profile (M14.7 revert preserved).
44Cost: 1 LLM call per document (gpt-4o-mini), ~$0.01. Marginal.
46See:
47- docs/_design/m17-pageindex-ingestion.md (Conversation-Engine bench)
48- ``astrocyte.pipeline.preference_compile`` — sibling that produces the
49 larger ``kind='preference'`` pool this directive pass distils from
50"""
52from __future__ import annotations
54import json
55import logging
56import re
57from datetime import datetime, timezone
58from typing import TYPE_CHECKING
60from astrocyte.types import MentalModel, Message
62if TYPE_CHECKING:
63 from astrocyte.provider import LLMProvider, MentalModelStore
64 from astrocyte.types import PageIndexFact
66_logger = logging.getLogger("astrocyte.pipeline.directive_compile")
69_COMPILE_PROMPT = """\
70You are extracting a SMALL SET of HARD-RULE DIRECTIVES from a user's \
71stated preferences in a conversation transcript. The directives will be \
72injected verbatim into an answering assistant's system prompt — treat \
73them as authoritative rules the assistant must honour when answering \
74follow-up questions.
76Output a JSON object with one key, ``directives``, containing an array \
77of AT MOST 5 directive objects. Each directive has:
78- "title": 2-5 word noun phrase naming the directive (e.g. \
79"Breakfast preference", "Dance studio")
80- "content": ONE imperative sentence the assistant can follow. \
81Use the form "Prefer X over Y because Z." or "Avoid X; the user dislikes \
82it." or "Recommend X for context Z." Always include the specific entity \
83the user named. Keep under 25 words.
84- "source_fact_ids": array of strings — the contributing raw fact_ids.
86STRICT RULES:
87- Emit AT MOST 5 directives. Quality over quantity. Sparse is better \
88than verbose: surface only the most CONFIDENT, STABLE preferences.
89- {min_facts_rule}
90- Phrase content imperatively (do/don't, prefer/avoid). The assistant \
91will read it as a rule.
92- DO NOT include fleeting / weak statements ("kind of liked it"). \
93Only stable preferences with explicit positive/negative sentiment.
94- {empty_input_rule}
96Input preferences (raw extractions):
97{prefs_block}
99OUTPUT MUST BE VALID JSON. No prose around it.
100"""
102_MIN_FACTS_RULE_MULTI = (
103 "A directive must be backed by AT LEAST 2 distinct raw preference "
104 "facts in the input. Drop single-mention preferences entirely."
105)
106_MIN_FACTS_RULE_SINGLE = (
107 "This document has only ONE session — single-mention preferences "
108 "are admissible because no repeat signal is possible. Emit a "
109 "directive for any clearly-stated preference even if it appears "
110 "only once, as long as it carries explicit positive/negative "
111 'sentiment (e.g. "no screens after 9:30pm", "avoid Y", '
112 '"prefer X"). Skip only vague / fleeting statements.'
113)
114_EMPTY_RULE_MULTI = (
115 "If the input has fewer than 2 distinct stable preferences, "
116 'return ``{{"directives": []}}``. Returning zero is correct '
117 "when the signal is absent."
118)
119_EMPTY_RULE_SINGLE = (
120 "If the input has no clearly-stated preferences, return "
121 '``{{"directives": []}}``. Returning zero is correct when the '
122 "signal is absent."
123)
126_MAX_DIRECTIVES = 5
127_MIN_FACTS_TO_COMPILE = 2
130def _slugify(text: str) -> str:
131 s = text.lower().strip()
132 s = re.sub(r"[^a-z0-9\s-]", "", s)
133 s = re.sub(r"\s+", "-", s)
134 return s[:60] or "dir"
137def _format_pref_for_prompt(fact: PageIndexFact) -> str:
138 parts = [f"id={fact.id}"]
139 if fact.entities:
140 parts.append(f"entities={','.join(fact.entities[:6])}")
141 return f"[{', '.join(parts)}] {fact.text}"
144async def compile_directives_for_document(
145 *,
146 mental_model_store: MentalModelStore,
147 bank_id: str,
148 document_id: str,
149 facts: list[PageIndexFact],
150 provider: LLMProvider,
151 model: str | None = None,
152 max_directives: int = _MAX_DIRECTIVES,
153 n_sessions: int | None = None,
154) -> list[str]:
155 """Consolidate preference facts → at most ``max_directives`` directive
156 mental models. Idempotent: skips when directive models already exist
157 for the document.
159 ``n_sessions`` is the document's session count (caller-supplied so
160 we don't need a store round-trip). When ``n_sessions == 1`` we lower
161 the minimum-facts threshold to 1: in a single-session document, the
162 ≥2-distinct-facts heuristic silently drops every single-mention
163 preference (e.g. "no screens after 9:30pm" said once), which is the
164 only signal we have for that document. The cross-document corpus
165 that motivated the ≥2 threshold doesn't apply when there IS only
166 one session.
168 Returns the list of ``model_id`` values for rows persisted (or
169 already-present on repeat call).
170 """
171 pref_facts = [f for f in facts if f.fact_type == "preference"]
172 # Fix 2 (conv-run-4): single-session docs lower the threshold to 1
173 # so single-mention preferences make it to a directive instead of
174 # being dropped silently. The ≥2 threshold is a noise-reduction
175 # heuristic for multi-session corpora — irrelevant when only one
176 # session's worth of preferences exists for this document.
177 is_single_session = n_sessions is not None and n_sessions <= 1
178 min_threshold = 1 if is_single_session else _MIN_FACTS_TO_COMPILE
179 if len(pref_facts) < min_threshold:
180 _logger.debug(
181 "directive_compile: doc=%s has %d preference facts (<%d, single_session=%s), skip",
182 document_id,
183 len(pref_facts),
184 min_threshold,
185 is_single_session,
186 )
187 return []
189 # Idempotency — skip if directives already exist for this document scope.
190 try:
191 existing = await mental_model_store.list(
192 bank_id,
193 scope=f"document:{document_id}",
194 kind="directive",
195 )
196 except TypeError:
197 all_for_scope = await mental_model_store.list(
198 bank_id,
199 scope=f"document:{document_id}",
200 )
201 existing = [m for m in all_for_scope if getattr(m, "kind", None) == "directive"]
202 if existing:
203 return [m.model_id for m in existing]
205 prefs_block = "\n".join(_format_pref_for_prompt(f) for f in pref_facts)
206 msg = _COMPILE_PROMPT.format(
207 prefs_block=prefs_block,
208 min_facts_rule=(_MIN_FACTS_RULE_SINGLE if is_single_session else _MIN_FACTS_RULE_MULTI),
209 empty_input_rule=(_EMPTY_RULE_SINGLE if is_single_session else _EMPTY_RULE_MULTI),
210 )
212 try:
213 completion = await provider.complete(
214 [Message(role="user", content=msg)],
215 model=model,
216 max_tokens=1200,
217 temperature=0.0,
218 response_format={"type": "json_object"},
219 )
220 except Exception as exc: # noqa: BLE001
221 _logger.warning(
222 "directive_compile.llm: call failed doc=%s (%s)",
223 document_id,
224 exc,
225 )
226 return []
228 try:
229 data = json.loads(completion.text)
230 except (json.JSONDecodeError, AttributeError) as exc:
231 _logger.warning(
232 "directive_compile.parse: bad JSON doc=%s (%s) text=%r",
233 document_id,
234 exc,
235 getattr(completion, "text", "")[:200],
236 )
237 return []
239 items = data.get("directives") or []
240 if not isinstance(items, list):
241 return []
243 now = datetime.now(tz=timezone.utc)
244 scope = f"document:{document_id}"
245 saved: list[str] = []
246 seen_ids: set[str] = set()
248 # M40 — index source facts so MM construction can attach per-source
249 # evidence timestamps in conversation time (mirrors preference_compile).
250 _fact_by_id = {getattr(f, "fact_id", None): f for f in pref_facts}
252 def _ts_for_fact_id(fid: str) -> datetime:
253 f = _fact_by_id.get(fid)
254 if f is None:
255 return now
256 return (
257 getattr(f, "mentioned_at", None)
258 or getattr(f, "occurred_start", None)
259 or now
260 )
262 for raw in items[:max_directives]:
263 if not isinstance(raw, dict):
264 continue
265 title = str(raw.get("title", "")).strip()
266 content = str(raw.get("content", "")).strip()
267 if not title or not content:
268 continue
269 source_fact_ids = raw.get("source_fact_ids") or []
270 if not isinstance(source_fact_ids, list):
271 source_fact_ids = []
272 source_fact_ids = [str(s) for s in source_fact_ids if isinstance(s, (str, int))]
273 slug = _slugify(title)
274 model_id = f"dir:{document_id[:8]}:{slug}"
275 if model_id in seen_ids:
276 continue
277 seen_ids.add(model_id)
279 source_timestamps = [_ts_for_fact_id(sid) for sid in source_fact_ids]
281 mm = MentalModel(
282 model_id=model_id,
283 bank_id=bank_id,
284 title=title,
285 content=content,
286 scope=scope,
287 source_ids=source_fact_ids,
288 revision=1,
289 refreshed_at=now,
290 kind="directive",
291 source_timestamps=source_timestamps,
292 )
293 try:
294 await mental_model_store.upsert(mm, bank_id)
295 saved.append(model_id)
296 except Exception as exc: # noqa: BLE001
297 _logger.warning(
298 "directive_compile.save: doc=%s id=%s failed (%s)",
299 document_id,
300 model_id,
301 exc,
302 )
304 _logger.info(
305 "directive_compile: doc=%s produced %d directives from %d preference facts",
306 document_id,
307 len(saved),
308 len(pref_facts),
309 )
310 return saved