Coverage for astrocyte/pipeline/preference_compile.py: 87%
82 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""M14.6: consolidated user-preference compile.
3Distils raw preference-type facts into structured consolidated
4preferences, stored as :class:`MentalModel` rows with
5``kind="preference"``. Matches Hindsight's mental-models pattern for
6consolidated knowledge (their ``mental_models.subtype`` column).
8Why a separate compile pass rather than enriching raw extraction:
9- Hindsight tried the dedicated ``opinion`` fact_type and **removed
10 it** (migration g2h3i4j5k6l7 — April 2026). Their evidence: schema
11 rigidity at ingest didn't pan out; preferences are better handled
12 at the compiled-knowledge layer downstream of fact extraction.
13- Raw preference facts capture individual statements but lose the
14 conditional context (when/why/under-what-conditions) the LME
15 single-session-preference category probes. A consolidation pass
16 with an LLM-judged synthesis can preserve that context.
18Algorithm (one LLM call per document):
201. Pull all PageIndexFacts with ``fact_type='preference'`` for the doc.
212. If fewer than 2 preference facts, skip (no consolidation signal).
223. Send to LLM with a prompt asking for up to N consolidated
23 preferences, each with title + content (the content preserves
24 qualifier / condition / sentiment inline) + the source fact ids.
254. Save each as a :class:`MentalModel` with ``kind='preference'``,
26 ``scope=f'document:{document_id}'``. Idempotent — skip if the
27 document already has preference models (M14.6 v1; M14.7 may add
28 incremental refresh).
30The bench's existing :meth:`AstrocyteClient.get_user_profile` lists
31mental models by ``scope`` and serialises them into the answerer's
32``## User Profile`` block — preference-kind models surface there
33automatically. No retrieval-path change needed for v1.
35Cost: ~1 LLM call per document (gpt-4o-mini), ~$0.01-0.02. Marginal.
37See:
38- ``docs/_design/m13-m14-roadmap.md`` §4 (M14.6 placement)
39- ``astrocyte.pipeline.mental_model_compile`` — sibling that produces
40 ``kind='general'`` profile statements
41"""
43from __future__ import annotations
45import json
46import logging
47import re
48from datetime import datetime, timezone
49from typing import TYPE_CHECKING
51from astrocyte.types import MentalModel, Message
53if TYPE_CHECKING:
54 from astrocyte.provider import LLMProvider, MentalModelStore
55 from astrocyte.types import PageIndexFact
57_logger = logging.getLogger("astrocyte.pipeline.preference_compile")
60_COMPILE_PROMPT = """\
61You are consolidating a user's STATED PREFERENCES from a conversation \
62transcript. You will be given the raw preference statements extracted \
63from the conversation. Your job: produce up to {max_prefs} consolidated \
64preferences that capture WHAT the user prefers, plus the CONTEXT \
65(when, why, under what conditions, sentiment strength).
67Output a JSON object with one key, ``preferences``, containing an array \
68of preference objects. Each preference has:
69- "title": 3-7 word noun phrase naming the preference dimension \
70 (e.g. "Breakfast preference", "Coffee in morning", "Reading material")
71- "content": ONE declarative sentence stating the user's preference \
72 with FULL CONTEXT inline. Capture: WHAT they prefer (the object), \
73 WHEN/WHERE (qualifier), WHY (if stated), and STRENGTH (strong / mild \
74 / context-dependent). Examples:
75 - "User prefers oatmeal with berries for breakfast on busy mornings, \
76 citing time efficiency."
77 - "User strongly prefers Dr. Patel's clinic over their previous \
78 clinic because the staff was more attentive."
79 - "User prefers Sony cameras for travel photography over Nikon, \
80 mainly for sensor performance in low light."
81- "source_fact_ids": array of strings — the raw fact_id values from \
82 the input that contributed to this consolidated preference.
84Rules:
85- DO NOT invent preferences not stated in the input. If the user only \
86 said "I like X", that's a preference; if they merely mentioned X, \
87 that's NOT a preference.
88- Merge multiple raw mentions of the SAME preference into one \
89 consolidated row. Cite all contributing fact_ids.
90- If two raw preferences CONTRADICT (e.g. "I used to prefer X, now I \
91 prefer Y"), produce ONE row reflecting the LATEST state, with \
92 "(previously X)" inline.
93- Skip vague / fleeting statements ("kind of liked it"). Only \
94 consolidate STABLE preferences.
95- If the input has fewer than 2 distinct preferences, return \
96 ``{{"preferences": []}}``.
98Input preferences (raw extractions):
99{prefs_block}
101OUTPUT MUST BE VALID JSON. No prose around it.
102"""
105def _slugify(text: str) -> str:
106 """3-7 word title → URL-safe slug. Same shape as section_compile."""
107 s = text.lower().strip()
108 s = re.sub(r"[^a-z0-9\s-]", "", s)
109 s = re.sub(r"\s+", "-", s)
110 return s[:60] or "pref"
113def _format_pref_for_prompt(fact: PageIndexFact) -> str:
114 """Render one preference fact for the consolidation prompt."""
115 parts = [f"id={fact.id}"]
116 if fact.entities:
117 parts.append(f"entities={','.join(fact.entities[:6])}")
118 return f"[{', '.join(parts)}] {fact.text}"
121async def compile_preferences_for_document(
122 *,
123 mental_model_store: MentalModelStore,
124 bank_id: str,
125 document_id: str,
126 facts: list[PageIndexFact],
127 provider: LLMProvider,
128 model: str | None = None,
129 max_preferences: int = 12,
130) -> list[str]:
131 """Consolidate preference-type facts → preference-kind MentalModels.
133 Args:
134 mental_model_store: where to persist the consolidated models.
135 bank_id: tenant scope.
136 document_id: scope for the resulting models
137 (``scope='document:<id>'``).
138 facts: ALL facts extracted for this document — the function
139 filters internally for ``fact_type=='preference'``. Passed
140 from the caller's retain path to avoid an extra round trip
141 to the store.
142 provider: LLM provider for the consolidation call.
143 model: LLM model name (defaults to provider's default —
144 typically gpt-4o-mini for our bench).
145 max_preferences: cap on consolidated rows produced. Same cap
146 shape as ``mental_model_compile``.
148 Returns:
149 List of ``model_id`` values for the rows actually persisted.
150 Empty list when there were insufficient preference facts or
151 the LLM produced no usable output.
153 Idempotent: if preference-kind models already exist for this
154 ``(bank_id, document_id)`` scope, skip the compile. Re-running on a
155 cached bank is therefore a no-op — fresh banks pay the LLM call.
156 """
157 pref_facts = [f for f in facts if f.fact_type == "preference"]
158 if len(pref_facts) < 2:
159 _logger.debug(
160 "preference_compile: doc=%s has %d preference facts (<2), skip",
161 document_id,
162 len(pref_facts),
163 )
164 return []
166 # Idempotency check — skip if we already have preference models for this scope.
167 try:
168 existing = await mental_model_store.list(
169 bank_id,
170 scope=f"document:{document_id}",
171 kind="preference",
172 )
173 except TypeError:
174 # Older MentalModelStore SPI without `kind` kwarg — fall back to
175 # listing all and filtering client-side. Lets this module work
176 # against not-yet-migrated stores (some tests use older fakes).
177 all_for_scope = await mental_model_store.list(
178 bank_id,
179 scope=f"document:{document_id}",
180 )
181 existing = [m for m in all_for_scope if m.kind == "preference"]
182 if existing:
183 _logger.debug(
184 "preference_compile: doc=%s already has %d preference models — skip",
185 document_id,
186 len(existing),
187 )
188 return [m.model_id for m in existing]
190 prefs_block = "\n".join(_format_pref_for_prompt(f) for f in pref_facts)
191 msg = _COMPILE_PROMPT.format(
192 max_prefs=max_preferences,
193 prefs_block=prefs_block,
194 )
196 try:
197 # max_tokens=800 (M14.6) truncated JSON output mid-string on ~90% of
198 # LME documents (M14.7-b1.1 diagnostic 2026-05-13). At
199 # max_preferences=12, each preference body runs ~150-200 tokens of
200 # structured JSON, so 800 was undersized by ~2.5×. The truncation
201 # caused entire documents to land with 0 preference-models, which
202 # silently neutered the B-1 anchor pool for downstream questions.
203 # Bumped to 3000 with margin for ``source_fact_ids`` arrays.
204 completion = await provider.complete(
205 [Message(role="user", content=msg)],
206 model=model,
207 max_tokens=3000,
208 temperature=0.0,
209 response_format={"type": "json_object"},
210 )
211 except Exception as exc: # noqa: BLE001
212 _logger.warning(
213 "preference_compile.llm: call failed for doc=%s (%s)",
214 document_id,
215 exc,
216 )
217 return []
219 try:
220 data = json.loads(completion.text)
221 except (json.JSONDecodeError, AttributeError) as exc:
222 _logger.warning(
223 "preference_compile.parse: bad JSON for doc=%s (%s) text=%r",
224 document_id,
225 exc,
226 getattr(completion, "text", "")[:200],
227 )
228 return []
230 items = data.get("preferences") or []
231 if not isinstance(items, list):
232 return []
234 now = datetime.now(tz=timezone.utc)
235 scope = f"document:{document_id}"
236 saved: list[str] = []
237 seen_ids: set[str] = set()
239 # M40 — index source facts by fact_id so we can pull per-source
240 # evidence timestamps when building each preference's MentalModel.
241 # Preference order: mentioned_at (when the user said it) > occurred_start
242 # (when the event occurred) > now (last resort — only for facts that
243 # carry no temporal info at all, which would trend NEW from any
244 # reference_date in the conversation timeline).
245 _fact_by_id = {getattr(f, "fact_id", None): f for f in pref_facts}
247 def _ts_for_fact_id(fid: str) -> datetime:
248 f = _fact_by_id.get(fid)
249 if f is None:
250 return now
251 return (
252 getattr(f, "mentioned_at", None)
253 or getattr(f, "occurred_start", None)
254 or now
255 )
257 for raw in items[:max_preferences]:
258 if not isinstance(raw, dict):
259 continue
260 title = str(raw.get("title", "")).strip()
261 content = str(raw.get("content", "")).strip()
262 if not title or not content:
263 continue
264 source_fact_ids = raw.get("source_fact_ids") or []
265 if not isinstance(source_fact_ids, list):
266 source_fact_ids = []
267 source_fact_ids = [str(s) for s in source_fact_ids if isinstance(s, (str, int))]
268 # Stable model_id: pref:<doc_short>:<title-slug>
269 slug = _slugify(title)
270 model_id = f"pref:{document_id[:8]}:{slug}"
271 if model_id in seen_ids:
272 continue
273 seen_ids.add(model_id)
275 # M40 — build positionally-aligned timestamp list (one per source_id).
276 source_timestamps = [_ts_for_fact_id(sid) for sid in source_fact_ids]
278 mm = MentalModel(
279 model_id=model_id,
280 bank_id=bank_id,
281 title=title,
282 content=content,
283 scope=scope,
284 source_ids=source_fact_ids,
285 revision=1,
286 refreshed_at=now,
287 kind="preference",
288 source_timestamps=source_timestamps,
289 )
290 try:
291 await mental_model_store.upsert(mm, bank_id)
292 saved.append(model_id)
293 except Exception as exc: # noqa: BLE001
294 _logger.warning(
295 "preference_compile.save: doc=%s id=%s failed (%s)",
296 document_id,
297 model_id,
298 exc,
299 )
301 _logger.info(
302 "preference_compile: doc=%s consolidated %d preferences from %d raw facts",
303 document_id,
304 len(saved),
305 len(pref_facts),
306 )
307 return saved