Coverage for astrocyte/pipeline/section_compile.py: 41%
182 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""M10.1 — section-aware consolidation engine.
3Hindsight-aligned: aggregation happens at retain time so query-time
4agents can find pre-aggregated observations via wiki recall, instead
5of trying to count distinct items across many section excerpts (which
6the LLM is unreliable at).
8Algorithm per document:
101. Load sections with ``summary_embedding`` populated.
112. DBSCAN-cluster by cosine similarity (eps=0.30, min_samples=2 by
12 default) — same approach as the M8 ``CompileEngine`` over raw
13 memory_units, just at section grain.
143. For each cluster, single LLM call synthesizes a short observation:
15 a 5-10 word title and a 1-3 sentence aggregated claim that cites
16 specific dates / counts / entities. This is the primitive that
17 answers "how many doctors did I visit?" without making the agent
18 count across sessions at query time.
194. Save each observation as a :class:`~astrocyte.types.WikiPage`
20 (kind="topic"), with provenance back to the source sections via
21 ``astrocyte_pi_wiki_provenance`` (migration 015) and an embedding
22 on ``astrocyte_wiki_pages.current_embedding`` (migration 018).
24Differs from M8's :class:`~astrocyte.pipeline.compile.CompileEngine`:
25M8 reads from ``VectorStore`` (memory_units); this module reads from
26``PageIndexStore`` (sections). Both produce ``WikiPage`` rows; both
27use the same ``_dbscan`` clustering. The split exists because the
28PageIndex POC bypasses ``VectorStore`` entirely.
30See:
31- ``docs/_design/recall.md`` §12 (PR2.6 close-out, M10 plan)
32- ``adr/adr-006-three-layer-recall-stack.md`` — Layer 1 wiki tier
33- ``hindsight/hindsight-api-slim/hindsight_api/engine/consolidation/consolidator.py``
34 — Hindsight's analogous flow over raw memories
35"""
37from __future__ import annotations
39import json
40import logging
41import re
42from datetime import datetime, timezone
43from typing import TYPE_CHECKING
45from astrocyte.pipeline.compile import _dbscan # reuse pure-Python DBSCAN
46from astrocyte.types import Message, WikiPage
48if TYPE_CHECKING:
49 from astrocyte.provider import LLMProvider, PageIndexStore
50 from astrocyte.types import PageIndexSection
52_logger = logging.getLogger("astrocyte.pipeline.section_compile")
55# ── Synthesis prompt ────────────────────────────────────────────────
58_OBSERVATION_PROMPT = """\
59You are an observation synthesizer. You will be given several sections \
60from a conversation that all discuss the same topic, presented in \
61CHRONOLOGICAL ORDER (oldest first). Your job: write ONE concise \
62observation that aggregates the user's experience or stable facts \
63across these sections, reflecting the LATEST stable state.
65Pay attention to LATER sections that may supersede earlier ones. If \
66the user changes their mind, switches providers, or adds new items \
67over time, the observation should reflect the LATEST state. Capture \
68the chronological progression when it matters (e.g. "previously did \
69X, now does Y") rather than averaging across time.
71Output a JSON object with EXACTLY two fields:
72- "title": a 5-10 word summary that uses CATEGORY nouns the reader \
73might query (e.g. "Doctors visited", "Model kits worked on", \
74"Camping trips taken"). Title should mention the category (doctor / \
75kit / trip / restaurant / etc.) so the wiki search can match it.
76- "content": 1-3 sentences. MANDATORY: lead with an explicit COUNT \
77followed by a colon-separated enumeration. The reader will use your \
78content as the answer to questions like "how many X?" / "total Y?" \
79/ "list of Z?". DO NOT generalize or theme — enumerate the actual \
80distinct items. When listing items added over time, the order is \
81chronological (earliest first).
83REQUIRED CONTENT SHAPE:
84"User <verb>ed N distinct <category>s: <item1> (<context>), \
85<item2> (<context>), ..., <itemN> (<context>)."
87Examples of good observations:
88{{"title": "Doctors visited", "content": "User visited 3 distinct \
89doctors: a primary care physician (Dr. Patel, May), an ENT specialist \
90(Dr. Lee, June), and a dermatologist (referral, July)."}}
92{{"title": "Model kits worked on", "content": "User worked on 5 model \
93kits: a Revell F-15 Eagle, a B-29 bomber, a 1/72 Spitfire, a Tiger I \
94tank, and a Sherman tank."}}
96{{"title": "Camping trips taken", "content": "User went on 4 camping \
97trips totaling 8 days: Yosemite (3 days, April), Lake Tahoe (2 days, \
98June), Olympic NP (2 days, August), Grand Canyon (1 day, October)."}}
100{{"title": "Korean restaurants visited", "content": "User visited 3 \
101distinct Korean restaurants: Yeon Su (Chelsea), Cote (Flatiron), and \
102Hanjip (LA, during travel)."}}
104ANTI-PATTERNS to avoid:
105- "User is managing health" (no count, no enumeration) — WRONG
106- "User explored various trips" (vague, no specifics) — WRONG
107- "User had multiple doctor visits" (says "multiple" without counting) — WRONG
109When the cluster has only 1 distinct fact, still lead with "1": \
110"User has 1 ongoing project: <description>."
112OUTPUT MUST BE VALID JSON. No prose around it.
114Sections in this cluster:
115{sections}
116"""
119# ── Cluster helpers ─────────────────────────────────────────────────
122def _slugify(s: str) -> str:
123 """Convert a title to a URL-safe slug. e.g. 'Doctors visited' -> 'doctors-visited'."""
124 s = s.lower().strip()
125 s = re.sub(r"[^a-z0-9\s-]", "", s)
126 s = re.sub(r"\s+", "-", s)
127 return s[:60] or "obs"
130def _format_section_for_prompt(section: PageIndexSection) -> str:
131 date = section.session_date.strftime("%Y-%m-%d") if section.session_date is not None else "no-date"
132 summary = (section.summary or section.title or "").strip()
133 return f"[line={section.line_num}, date={date}]\n{summary}"
136async def _synthesize_observation(
137 *,
138 provider: LLMProvider,
139 model: str | None,
140 sections: list[PageIndexSection],
141) -> tuple[str, str] | None:
142 """LLM call — returns (title, content) or None on parse failure.
144 M12.6: sections are sorted by ``session_date`` (with ``line_num``
145 fallback) so the synth prompt sees them in chronological order.
146 The Karpathy "latest state wins" guidance in the prompt requires
147 this ordering to be meaningful.
148 """
149 if not sections:
150 return None
151 ordered = sorted(
152 sections,
153 key=lambda s: (
154 s.session_date or datetime.min.replace(tzinfo=timezone.utc),
155 s.line_num,
156 ),
157 )
158 rendered = "\n\n".join(_format_section_for_prompt(s) for s in ordered)
159 msg = _OBSERVATION_PROMPT.format(sections=rendered)
160 try:
161 completion = await provider.complete(
162 [Message(role="user", content=msg)],
163 model=model,
164 max_tokens=300,
165 temperature=0.0,
166 response_format={"type": "json_object"},
167 )
168 except Exception as exc: # noqa: BLE001
169 _logger.warning(
170 "section_compile.synthesize: LLM call failed (%s)",
171 exc,
172 )
173 return None
174 try:
175 data = json.loads(completion.text)
176 except json.JSONDecodeError as exc:
177 _logger.warning(
178 "section_compile.synthesize: JSON parse failed (%s) text=%r",
179 exc,
180 completion.text[:200],
181 )
182 return None
183 title = str(data.get("title", "")).strip()
184 content = str(data.get("content", "")).strip()
185 if not title or not content:
186 return None
187 return title, content
190# ── Revision pass (Karpathy incremental update, M12.6) ─────────────
193_REVISION_PROMPT = """\
194You are revising a knowledge-base entry to reflect the LATEST stable \
195state across its source sections.
197Current entry (titled "{title}", revision {revision}):
198{content}
200Source sections in CHRONOLOGICAL ORDER (oldest first):
201{sections}
203Examine the sections in time order. If a LATER section supersedes, \
204contradicts, or extends an earlier claim, the entry should reflect \
205the latest state. If the entry already does, no revision is needed.
207Common revision triggers:
208- Entry says "User visited 3 doctors" but a later section adds a 4th → bump count
209- Entry says "User prefers brand X" but later sections say switched to Y → "User now prefers Y (previously X)"
210- Entry uses past tense for an ongoing thing → use ongoing tense
211- Entry omits items that appear in later sections → add them
212- Entry includes superseded items as if current → mark as previous
214If revising: the new content follows the SAME shape as initial compile \
215(lead with explicit COUNT and colon-separated enumeration; cite dates / \
216entities when present).
218Respond as JSON only:
219{{
220 "verdict": "OK" or "REVISE",
221 "revised_title": "<5-10 words; only when REVISE>",
222 "revised_content": "<1-3 sentences; only when REVISE>"
223}}
224"""
227async def _revise_observation(
228 *,
229 provider: LLMProvider,
230 model: str | None,
231 page: WikiPage,
232 sections: list[PageIndexSection],
233) -> tuple[str, str] | None:
234 """LLM call — returns (new_title, new_content) if revised, ``None``
235 if the page already reflects the latest state.
237 Sections must be sorted chronologically by the caller. Resilient
238 to LLM / parse failures (returns ``None``).
239 """
240 if not sections or not page.content.strip():
241 return None
242 rendered = "\n\n".join(_format_section_for_prompt(s) for s in sections)
243 msg = _REVISION_PROMPT.format(
244 title=page.title,
245 revision=page.revision,
246 content=page.content,
247 sections=rendered,
248 )
249 try:
250 completion = await provider.complete(
251 [Message(role="user", content=msg)],
252 model=model,
253 max_tokens=400,
254 temperature=0.0,
255 response_format={"type": "json_object"},
256 )
257 except Exception as exc: # noqa: BLE001
258 _logger.warning(
259 "section_compile.revise: LLM call failed for page=%s (%s)",
260 page.page_id,
261 exc,
262 )
263 return None
264 try:
265 data = json.loads(completion.text)
266 except json.JSONDecodeError as exc:
267 _logger.warning(
268 "section_compile.revise: JSON parse failed for page=%s (%s) text=%r",
269 page.page_id,
270 exc,
271 completion.text[:200],
272 )
273 return None
274 verdict = str(data.get("verdict", "")).strip().upper()
275 if verdict != "REVISE":
276 return None
277 new_title = str(data.get("revised_title", "")).strip()
278 new_content = str(data.get("revised_content", "")).strip()
279 if not new_title or not new_content:
280 return None
281 if new_title == page.title and new_content == page.content:
282 return None
283 return new_title, new_content
286async def revise_wikis_for_document(
287 *,
288 store: PageIndexStore,
289 bank_id: str,
290 document_id: str,
291 provider: LLMProvider,
292 model: str | None = None,
293 embedding_model: str | None = None,
294) -> int:
295 """Karpathy-style revision pass — for each existing wiki of this
296 document, re-check it against its provenance sections in
297 chronological order and persist any revisions.
299 Returns: number of pages actually revised.
301 Idempotent: if no page needs revision, the call is a no-op (just
302 one LLM call per page). Subsequent calls on already-revised pages
303 should converge to no-op as the wiki stabilises.
304 """
305 try:
306 pages = await store.list_wiki_pages_for_doc(bank_id, document_id)
307 except Exception as exc: # noqa: BLE001
308 _logger.warning(
309 "revise_wikis: list_wiki_pages_for_doc failed for doc=%s (%s)",
310 document_id,
311 exc,
312 )
313 return 0
314 if not pages:
315 return 0
317 try:
318 all_sections = await store.load_sections_with_embeddings(bank_id, document_id)
319 except Exception as exc: # noqa: BLE001
320 _logger.warning(
321 "revise_wikis: load_sections failed for doc=%s (%s)",
322 document_id,
323 exc,
324 )
325 return 0
326 section_by_line = {s.line_num: s for s in all_sections}
328 revised_count = 0
329 for page in pages:
330 # Resolve provenance source_ids back to sections.
331 provenance_lines: list[int] = []
332 for src in page.source_ids or []:
333 # source_ids are formatted "docId:lineNum"
334 if ":" not in src:
335 continue
336 _, _, line_str = src.rpartition(":")
337 try:
338 provenance_lines.append(int(line_str))
339 except ValueError:
340 continue
341 prov_sections = [section_by_line[ln] for ln in provenance_lines if ln in section_by_line]
342 if not prov_sections:
343 continue
344 # Sort chronologically — session_date first, line_num fallback.
345 prov_sections.sort(
346 key=lambda s: (
347 s.session_date or datetime.min.replace(tzinfo=timezone.utc),
348 s.line_num,
349 ),
350 )
351 result = await _revise_observation(
352 provider=provider,
353 model=model,
354 page=page,
355 sections=prov_sections,
356 )
357 if result is None:
358 continue
359 new_title, new_content = result
361 # Save as a new revision. ``save_wiki_page`` upserts the page row
362 # and inserts a new revision row keyed by (page_uuid, revision_number).
363 revised_page = WikiPage(
364 page_id=page.page_id,
365 bank_id=page.bank_id,
366 kind=page.kind,
367 title=new_title,
368 content=new_content,
369 scope=page.scope,
370 source_ids=list(page.source_ids or []),
371 cross_links=list(page.cross_links or []),
372 revision=page.revision + 1,
373 revised_at=datetime.now(tz=timezone.utc),
374 tags=page.tags,
375 metadata=page.metadata,
376 )
377 try:
378 new_embedding = (
379 await provider.embed(
380 [f"{new_title}\n\n{new_content}"],
381 model=embedding_model,
382 )
383 )[0]
384 except Exception as exc: # noqa: BLE001
385 _logger.warning(
386 "revise_wikis: embed failed for page=%s (%s)",
387 page.page_id,
388 exc,
389 )
390 new_embedding = None
392 provenance_pairs = [(s.document_id, s.line_num) for s in prov_sections]
393 try:
394 await store.save_wiki_page(
395 page=revised_page,
396 embedding=new_embedding,
397 provenance=provenance_pairs,
398 )
399 revised_count += 1
400 except Exception as exc: # noqa: BLE001
401 _logger.warning(
402 "revise_wikis.save failed for page=%s (%s)",
403 page.page_id,
404 exc,
405 )
407 if revised_count:
408 _logger.info(
409 "revise_wikis: doc=%s revised %d/%d pages",
410 document_id,
411 revised_count,
412 len(pages),
413 )
414 return revised_count
417# ── Public entry point ─────────────────────────────────────────────
420async def compile_sections_for_document(
421 *,
422 store: PageIndexStore,
423 bank_id: str,
424 document_id: str,
425 provider: LLMProvider,
426 model: str | None = None,
427 embedding_model: str | None = None,
428 eps: float = 0.55,
429 min_samples: int = 2,
430 max_pages_per_doc: int = 10,
431) -> list[str]:
432 """Cluster sections in this document, synthesize an observation per
433 cluster, persist as :class:`WikiPage` rows.
435 Returns the list of newly-created ``page_id``s. When the document
436 already has wiki pages (idempotency check), returns ``[]``.
438 ``eps`` / ``min_samples`` tune DBSCAN: defaults match LME-shaped
439 chat conversations (sessions are heterogeneous; eps=0.30 forces
440 fairly close cosine similarity). Lower ``min_samples`` than M8's
441 default (2 vs 3) because section-grain clusters tend to be smaller
442 than memory-unit clusters.
444 ``max_pages_per_doc`` caps cost: at 5-7 LLM calls per doc × 50 LME
445 docs we'd burn ~$2-3 per gate. The cap keeps it bounded if a
446 pathological doc clusters into many small groups.
447 """
448 # Idempotency: skip if pages already exist for this doc.
449 try:
450 existing = await store.count_wiki_pages_for_doc(bank_id, document_id)
451 except Exception as exc: # noqa: BLE001
452 _logger.warning(
453 "section_compile.count failed for doc=%s (%s)",
454 document_id,
455 exc,
456 )
457 existing = 0
458 if existing > 0:
459 _logger.debug(
460 "section_compile: doc=%s already has %d pages — skip initial compile, run revision pass only",
461 document_id,
462 existing,
463 )
464 # M12.6: even when initial compile is skipped, run the Karpathy
465 # revision pass against the existing pages. Lets a code change
466 # (e.g. new prompt) take effect against cached banks without a
467 # full re-extraction.
468 try:
469 await revise_wikis_for_document(
470 store=store,
471 bank_id=bank_id,
472 document_id=document_id,
473 provider=provider,
474 model=model,
475 embedding_model=embedding_model,
476 )
477 except Exception as exc: # noqa: BLE001
478 _logger.warning(
479 "revise_wikis: cache-hit revision pass failed for doc=%s (%s)",
480 document_id,
481 exc,
482 )
483 return []
485 # Load sections with embeddings populated.
486 sections = await store.load_sections_with_embeddings(bank_id, document_id)
487 if not sections:
488 return []
489 sections_with_emb = [s for s in sections if s.summary_embedding]
490 if len(sections_with_emb) < min_samples:
491 # Too few embedded sections to cluster meaningfully.
492 return []
494 # DBSCAN over (line_num_str, embedding) pairs.
495 items = [(str(s.line_num), s.summary_embedding) for s in sections_with_emb]
496 labels = _dbscan(items, eps=eps, min_samples=min_samples)
497 by_cluster: dict[int, list[PageIndexSection]] = {}
498 section_by_line = {s.line_num: s for s in sections_with_emb}
499 for line_str, cluster_id in labels.items():
500 if cluster_id == -1:
501 continue
502 by_cluster.setdefault(cluster_id, []).append(
503 section_by_line[int(line_str)],
504 )
506 if not by_cluster:
507 _logger.debug(
508 "section_compile: doc=%s yielded no clusters (eps=%s min=%s)",
509 document_id,
510 eps,
511 min_samples,
512 )
513 return []
515 # Cap cluster count to control cost. Prefer larger clusters (more
516 # signal per LLM call).
517 clusters = sorted(by_cluster.values(), key=len, reverse=True)[:max_pages_per_doc]
519 created_page_ids: list[str] = []
520 now = datetime.now(tz=timezone.utc)
521 pending_pages: list[WikiPage] = []
522 pending_provenance: list[tuple[str, list[tuple[str, int]]]] = []
523 pending_summaries: list[str] = [] # used for embedding pass
525 for cluster in clusters:
526 synth = await _synthesize_observation(
527 provider=provider,
528 model=model,
529 sections=cluster,
530 )
531 if synth is None:
532 continue
533 title, content = synth
534 page_id = f"obs:{document_id[:8]}:{_slugify(title)}"
535 source_ids = [f"{s.document_id}:{s.line_num}" for s in cluster]
536 page = WikiPage(
537 page_id=page_id,
538 bank_id=bank_id,
539 kind="topic",
540 title=title,
541 content=content,
542 scope=f"document:{document_id}",
543 source_ids=source_ids,
544 cross_links=[],
545 revision=1,
546 revised_at=now,
547 tags=None,
548 metadata=None,
549 )
550 pending_pages.append(page)
551 provenance_pairs = [(s.document_id, s.line_num) for s in cluster]
552 pending_provenance.append((page_id, provenance_pairs))
553 # Embed the title + content together so the wiki search
554 # strategy matches both the topic label and the aggregated
555 # facts. Mirrors how Hindsight indexes observations.
556 pending_summaries.append(f"{title}\n\n{content}")
558 if not pending_pages:
559 return []
561 # Single batched embedding call for all observations in this doc.
562 try:
563 embeddings = await provider.embed(
564 pending_summaries,
565 model=embedding_model,
566 )
567 except Exception as exc: # noqa: BLE001
568 _logger.warning(
569 "section_compile: embedding batch failed for doc=%s (%s)",
570 document_id,
571 exc,
572 )
573 embeddings = [None] * len(pending_pages)
575 for page, embedding, (page_id, provenance_pairs) in zip(
576 pending_pages,
577 embeddings,
578 pending_provenance,
579 ):
580 try:
581 await store.save_wiki_page(
582 page=page,
583 embedding=embedding,
584 provenance=provenance_pairs,
585 )
586 except Exception as exc: # noqa: BLE001
587 _logger.warning(
588 "section_compile.save_wiki_page failed page_id=%s (%s)",
589 page_id,
590 exc,
591 )
592 continue
593 created_page_ids.append(page_id)
595 _logger.info(
596 "section_compile: doc=%s created %d pages from %d clusters",
597 document_id,
598 len(created_page_ids),
599 len(clusters),
600 )
602 # M12.6: Karpathy-style revision pass over the freshly compiled
603 # pages. Catches "latest state" patterns that single-shot synthesis
604 # missed (e.g. counts that grow over time, providers that change).
605 # Idempotent — pages already at latest state are unchanged.
606 try:
607 await revise_wikis_for_document(
608 store=store,
609 bank_id=bank_id,
610 document_id=document_id,
611 provider=provider,
612 model=model,
613 embedding_model=embedding_model,
614 )
615 except Exception as exc: # noqa: BLE001
616 _logger.warning(
617 "revise_wikis: post-initial revision pass failed for doc=%s (%s)",
618 document_id,
619 exc,
620 )
622 return created_page_ids