Coverage for astrocyte/pipeline/preference_compile.py: 87%

82 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""M14.6: consolidated user-preference compile. 

2 

3Distils raw preference-type facts into structured consolidated 

4preferences, stored as :class:`MentalModel` rows with 

5``kind="preference"``. Matches Hindsight's mental-models pattern for 

6consolidated knowledge (their ``mental_models.subtype`` column). 

7 

8Why a separate compile pass rather than enriching raw extraction: 

9- Hindsight tried the dedicated ``opinion`` fact_type and **removed 

10 it** (migration g2h3i4j5k6l7 — April 2026). Their evidence: schema 

11 rigidity at ingest didn't pan out; preferences are better handled 

12 at the compiled-knowledge layer downstream of fact extraction. 

13- Raw preference facts capture individual statements but lose the 

14 conditional context (when/why/under-what-conditions) the LME 

15 single-session-preference category probes. A consolidation pass 

16 with an LLM-judged synthesis can preserve that context. 

17 

18Algorithm (one LLM call per document): 

19 

201. Pull all PageIndexFacts with ``fact_type='preference'`` for the doc. 

212. If fewer than 2 preference facts, skip (no consolidation signal). 

223. Send to LLM with a prompt asking for up to N consolidated 

23 preferences, each with title + content (the content preserves 

24 qualifier / condition / sentiment inline) + the source fact ids. 

254. Save each as a :class:`MentalModel` with ``kind='preference'``, 

26 ``scope=f'document:{document_id}'``. Idempotent — skip if the 

27 document already has preference models (M14.6 v1; M14.7 may add 

28 incremental refresh). 

29 

30The bench's existing :meth:`AstrocyteClient.get_user_profile` lists 

31mental models by ``scope`` and serialises them into the answerer's 

32``## User Profile`` block — preference-kind models surface there 

33automatically. No retrieval-path change needed for v1. 

34 

35Cost: ~1 LLM call per document (gpt-4o-mini), ~$0.01-0.02. Marginal. 

36 

37See: 

38- ``docs/_design/m13-m14-roadmap.md`` §4 (M14.6 placement) 

39- ``astrocyte.pipeline.mental_model_compile`` — sibling that produces 

40 ``kind='general'`` profile statements 

41""" 

42 

43from __future__ import annotations 

44 

45import json 

46import logging 

47import re 

48from datetime import datetime, timezone 

49from typing import TYPE_CHECKING 

50 

51from astrocyte.types import MentalModel, Message 

52 

53if TYPE_CHECKING: 

54 from astrocyte.provider import LLMProvider, MentalModelStore 

55 from astrocyte.types import PageIndexFact 

56 

57_logger = logging.getLogger("astrocyte.pipeline.preference_compile") 

58 

59 

60_COMPILE_PROMPT = """\ 

61You are consolidating a user's STATED PREFERENCES from a conversation \ 

62transcript. You will be given the raw preference statements extracted \ 

63from the conversation. Your job: produce up to {max_prefs} consolidated \ 

64preferences that capture WHAT the user prefers, plus the CONTEXT \ 

65(when, why, under what conditions, sentiment strength). 

66 

67Output a JSON object with one key, ``preferences``, containing an array \ 

68of preference objects. Each preference has: 

69- "title": 3-7 word noun phrase naming the preference dimension \ 

70 (e.g. "Breakfast preference", "Coffee in morning", "Reading material") 

71- "content": ONE declarative sentence stating the user's preference \ 

72 with FULL CONTEXT inline. Capture: WHAT they prefer (the object), \ 

73 WHEN/WHERE (qualifier), WHY (if stated), and STRENGTH (strong / mild \ 

74 / context-dependent). Examples: 

75 - "User prefers oatmeal with berries for breakfast on busy mornings, \ 

76 citing time efficiency." 

77 - "User strongly prefers Dr. Patel's clinic over their previous \ 

78 clinic because the staff was more attentive." 

79 - "User prefers Sony cameras for travel photography over Nikon, \ 

80 mainly for sensor performance in low light." 

81- "source_fact_ids": array of strings — the raw fact_id values from \ 

82 the input that contributed to this consolidated preference. 

83 

84Rules: 

85- DO NOT invent preferences not stated in the input. If the user only \ 

86 said "I like X", that's a preference; if they merely mentioned X, \ 

87 that's NOT a preference. 

88- Merge multiple raw mentions of the SAME preference into one \ 

89 consolidated row. Cite all contributing fact_ids. 

90- If two raw preferences CONTRADICT (e.g. "I used to prefer X, now I \ 

91 prefer Y"), produce ONE row reflecting the LATEST state, with \ 

92 "(previously X)" inline. 

93- Skip vague / fleeting statements ("kind of liked it"). Only \ 

94 consolidate STABLE preferences. 

95- If the input has fewer than 2 distinct preferences, return \ 

96 ``{{"preferences": []}}``. 

97 

98Input preferences (raw extractions): 

99{prefs_block} 

100 

101OUTPUT MUST BE VALID JSON. No prose around it. 

102""" 

103 

104 

105def _slugify(text: str) -> str: 

106 """3-7 word title → URL-safe slug. Same shape as section_compile.""" 

107 s = text.lower().strip() 

108 s = re.sub(r"[^a-z0-9\s-]", "", s) 

109 s = re.sub(r"\s+", "-", s) 

110 return s[:60] or "pref" 

111 

112 

113def _format_pref_for_prompt(fact: PageIndexFact) -> str: 

114 """Render one preference fact for the consolidation prompt.""" 

115 parts = [f"id={fact.id}"] 

116 if fact.entities: 

117 parts.append(f"entities={','.join(fact.entities[:6])}") 

118 return f"[{', '.join(parts)}] {fact.text}" 

119 

120 

121async def compile_preferences_for_document( 

122 *, 

123 mental_model_store: MentalModelStore, 

124 bank_id: str, 

125 document_id: str, 

126 facts: list[PageIndexFact], 

127 provider: LLMProvider, 

128 model: str | None = None, 

129 max_preferences: int = 12, 

130) -> list[str]: 

131 """Consolidate preference-type facts → preference-kind MentalModels. 

132 

133 Args: 

134 mental_model_store: where to persist the consolidated models. 

135 bank_id: tenant scope. 

136 document_id: scope for the resulting models 

137 (``scope='document:<id>'``). 

138 facts: ALL facts extracted for this document — the function 

139 filters internally for ``fact_type=='preference'``. Passed 

140 from the caller's retain path to avoid an extra round trip 

141 to the store. 

142 provider: LLM provider for the consolidation call. 

143 model: LLM model name (defaults to provider's default — 

144 typically gpt-4o-mini for our bench). 

145 max_preferences: cap on consolidated rows produced. Same cap 

146 shape as ``mental_model_compile``. 

147 

148 Returns: 

149 List of ``model_id`` values for the rows actually persisted. 

150 Empty list when there were insufficient preference facts or 

151 the LLM produced no usable output. 

152 

153 Idempotent: if preference-kind models already exist for this 

154 ``(bank_id, document_id)`` scope, skip the compile. Re-running on a 

155 cached bank is therefore a no-op — fresh banks pay the LLM call. 

156 """ 

157 pref_facts = [f for f in facts if f.fact_type == "preference"] 

158 if len(pref_facts) < 2: 

159 _logger.debug( 

160 "preference_compile: doc=%s has %d preference facts (<2), skip", 

161 document_id, 

162 len(pref_facts), 

163 ) 

164 return [] 

165 

166 # Idempotency check — skip if we already have preference models for this scope. 

167 try: 

168 existing = await mental_model_store.list( 

169 bank_id, 

170 scope=f"document:{document_id}", 

171 kind="preference", 

172 ) 

173 except TypeError: 

174 # Older MentalModelStore SPI without `kind` kwarg — fall back to 

175 # listing all and filtering client-side. Lets this module work 

176 # against not-yet-migrated stores (some tests use older fakes). 

177 all_for_scope = await mental_model_store.list( 

178 bank_id, 

179 scope=f"document:{document_id}", 

180 ) 

181 existing = [m for m in all_for_scope if m.kind == "preference"] 

182 if existing: 

183 _logger.debug( 

184 "preference_compile: doc=%s already has %d preference models — skip", 

185 document_id, 

186 len(existing), 

187 ) 

188 return [m.model_id for m in existing] 

189 

190 prefs_block = "\n".join(_format_pref_for_prompt(f) for f in pref_facts) 

191 msg = _COMPILE_PROMPT.format( 

192 max_prefs=max_preferences, 

193 prefs_block=prefs_block, 

194 ) 

195 

196 try: 

197 # max_tokens=800 (M14.6) truncated JSON output mid-string on ~90% of 

198 # LME documents (M14.7-b1.1 diagnostic 2026-05-13). At 

199 # max_preferences=12, each preference body runs ~150-200 tokens of 

200 # structured JSON, so 800 was undersized by ~2.5×. The truncation 

201 # caused entire documents to land with 0 preference-models, which 

202 # silently neutered the B-1 anchor pool for downstream questions. 

203 # Bumped to 3000 with margin for ``source_fact_ids`` arrays. 

204 completion = await provider.complete( 

205 [Message(role="user", content=msg)], 

206 model=model, 

207 max_tokens=3000, 

208 temperature=0.0, 

209 response_format={"type": "json_object"}, 

210 ) 

211 except Exception as exc: # noqa: BLE001 

212 _logger.warning( 

213 "preference_compile.llm: call failed for doc=%s (%s)", 

214 document_id, 

215 exc, 

216 ) 

217 return [] 

218 

219 try: 

220 data = json.loads(completion.text) 

221 except (json.JSONDecodeError, AttributeError) as exc: 

222 _logger.warning( 

223 "preference_compile.parse: bad JSON for doc=%s (%s) text=%r", 

224 document_id, 

225 exc, 

226 getattr(completion, "text", "")[:200], 

227 ) 

228 return [] 

229 

230 items = data.get("preferences") or [] 

231 if not isinstance(items, list): 

232 return [] 

233 

234 now = datetime.now(tz=timezone.utc) 

235 scope = f"document:{document_id}" 

236 saved: list[str] = [] 

237 seen_ids: set[str] = set() 

238 

239 # M40 — index source facts by fact_id so we can pull per-source 

240 # evidence timestamps when building each preference's MentalModel. 

241 # Preference order: mentioned_at (when the user said it) > occurred_start 

242 # (when the event occurred) > now (last resort — only for facts that 

243 # carry no temporal info at all, which would trend NEW from any 

244 # reference_date in the conversation timeline). 

245 _fact_by_id = {getattr(f, "fact_id", None): f for f in pref_facts} 

246 

247 def _ts_for_fact_id(fid: str) -> datetime: 

248 f = _fact_by_id.get(fid) 

249 if f is None: 

250 return now 

251 return ( 

252 getattr(f, "mentioned_at", None) 

253 or getattr(f, "occurred_start", None) 

254 or now 

255 ) 

256 

257 for raw in items[:max_preferences]: 

258 if not isinstance(raw, dict): 

259 continue 

260 title = str(raw.get("title", "")).strip() 

261 content = str(raw.get("content", "")).strip() 

262 if not title or not content: 

263 continue 

264 source_fact_ids = raw.get("source_fact_ids") or [] 

265 if not isinstance(source_fact_ids, list): 

266 source_fact_ids = [] 

267 source_fact_ids = [str(s) for s in source_fact_ids if isinstance(s, (str, int))] 

268 # Stable model_id: pref:<doc_short>:<title-slug> 

269 slug = _slugify(title) 

270 model_id = f"pref:{document_id[:8]}:{slug}" 

271 if model_id in seen_ids: 

272 continue 

273 seen_ids.add(model_id) 

274 

275 # M40 — build positionally-aligned timestamp list (one per source_id). 

276 source_timestamps = [_ts_for_fact_id(sid) for sid in source_fact_ids] 

277 

278 mm = MentalModel( 

279 model_id=model_id, 

280 bank_id=bank_id, 

281 title=title, 

282 content=content, 

283 scope=scope, 

284 source_ids=source_fact_ids, 

285 revision=1, 

286 refreshed_at=now, 

287 kind="preference", 

288 source_timestamps=source_timestamps, 

289 ) 

290 try: 

291 await mental_model_store.upsert(mm, bank_id) 

292 saved.append(model_id) 

293 except Exception as exc: # noqa: BLE001 

294 _logger.warning( 

295 "preference_compile.save: doc=%s id=%s failed (%s)", 

296 document_id, 

297 model_id, 

298 exc, 

299 ) 

300 

301 _logger.info( 

302 "preference_compile: doc=%s consolidated %d preferences from %d raw facts", 

303 document_id, 

304 len(saved), 

305 len(pref_facts), 

306 ) 

307 return saved