Coverage for astrocyte/pipeline/section_compile.py: 41%

182 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""M10.1 — section-aware consolidation engine. 

2 

3Hindsight-aligned: aggregation happens at retain time so query-time 

4agents can find pre-aggregated observations via wiki recall, instead 

5of trying to count distinct items across many section excerpts (which 

6the LLM is unreliable at). 

7 

8Algorithm per document: 

9 

101. Load sections with ``summary_embedding`` populated. 

112. DBSCAN-cluster by cosine similarity (eps=0.30, min_samples=2 by 

12 default) — same approach as the M8 ``CompileEngine`` over raw 

13 memory_units, just at section grain. 

143. For each cluster, single LLM call synthesizes a short observation: 

15 a 5-10 word title and a 1-3 sentence aggregated claim that cites 

16 specific dates / counts / entities. This is the primitive that 

17 answers "how many doctors did I visit?" without making the agent 

18 count across sessions at query time. 

194. Save each observation as a :class:`~astrocyte.types.WikiPage` 

20 (kind="topic"), with provenance back to the source sections via 

21 ``astrocyte_pi_wiki_provenance`` (migration 015) and an embedding 

22 on ``astrocyte_wiki_pages.current_embedding`` (migration 018). 

23 

24Differs from M8's :class:`~astrocyte.pipeline.compile.CompileEngine`: 

25M8 reads from ``VectorStore`` (memory_units); this module reads from 

26``PageIndexStore`` (sections). Both produce ``WikiPage`` rows; both 

27use the same ``_dbscan`` clustering. The split exists because the 

28PageIndex POC bypasses ``VectorStore`` entirely. 

29 

30See: 

31- ``docs/_design/recall.md`` §12 (PR2.6 close-out, M10 plan) 

32- ``adr/adr-006-three-layer-recall-stack.md`` — Layer 1 wiki tier 

33- ``hindsight/hindsight-api-slim/hindsight_api/engine/consolidation/consolidator.py`` 

34 — Hindsight's analogous flow over raw memories 

35""" 

36 

37from __future__ import annotations 

38 

39import json 

40import logging 

41import re 

42from datetime import datetime, timezone 

43from typing import TYPE_CHECKING 

44 

45from astrocyte.pipeline.compile import _dbscan # reuse pure-Python DBSCAN 

46from astrocyte.types import Message, WikiPage 

47 

48if TYPE_CHECKING: 

49 from astrocyte.provider import LLMProvider, PageIndexStore 

50 from astrocyte.types import PageIndexSection 

51 

52_logger = logging.getLogger("astrocyte.pipeline.section_compile") 

53 

54 

55# ── Synthesis prompt ──────────────────────────────────────────────── 

56 

57 

58_OBSERVATION_PROMPT = """\ 

59You are an observation synthesizer. You will be given several sections \ 

60from a conversation that all discuss the same topic, presented in \ 

61CHRONOLOGICAL ORDER (oldest first). Your job: write ONE concise \ 

62observation that aggregates the user's experience or stable facts \ 

63across these sections, reflecting the LATEST stable state. 

64 

65Pay attention to LATER sections that may supersede earlier ones. If \ 

66the user changes their mind, switches providers, or adds new items \ 

67over time, the observation should reflect the LATEST state. Capture \ 

68the chronological progression when it matters (e.g. "previously did \ 

69X, now does Y") rather than averaging across time. 

70 

71Output a JSON object with EXACTLY two fields: 

72- "title": a 5-10 word summary that uses CATEGORY nouns the reader \ 

73might query (e.g. "Doctors visited", "Model kits worked on", \ 

74"Camping trips taken"). Title should mention the category (doctor / \ 

75kit / trip / restaurant / etc.) so the wiki search can match it. 

76- "content": 1-3 sentences. MANDATORY: lead with an explicit COUNT \ 

77followed by a colon-separated enumeration. The reader will use your \ 

78content as the answer to questions like "how many X?" / "total Y?" \ 

79/ "list of Z?". DO NOT generalize or theme — enumerate the actual \ 

80distinct items. When listing items added over time, the order is \ 

81chronological (earliest first). 

82 

83REQUIRED CONTENT SHAPE: 

84"User <verb>ed N distinct <category>s: <item1> (<context>), \ 

85<item2> (<context>), ..., <itemN> (<context>)." 

86 

87Examples of good observations: 

88{{"title": "Doctors visited", "content": "User visited 3 distinct \ 

89doctors: a primary care physician (Dr. Patel, May), an ENT specialist \ 

90(Dr. Lee, June), and a dermatologist (referral, July)."}} 

91 

92{{"title": "Model kits worked on", "content": "User worked on 5 model \ 

93kits: a Revell F-15 Eagle, a B-29 bomber, a 1/72 Spitfire, a Tiger I \ 

94tank, and a Sherman tank."}} 

95 

96{{"title": "Camping trips taken", "content": "User went on 4 camping \ 

97trips totaling 8 days: Yosemite (3 days, April), Lake Tahoe (2 days, \ 

98June), Olympic NP (2 days, August), Grand Canyon (1 day, October)."}} 

99 

100{{"title": "Korean restaurants visited", "content": "User visited 3 \ 

101distinct Korean restaurants: Yeon Su (Chelsea), Cote (Flatiron), and \ 

102Hanjip (LA, during travel)."}} 

103 

104ANTI-PATTERNS to avoid: 

105- "User is managing health" (no count, no enumeration) — WRONG 

106- "User explored various trips" (vague, no specifics) — WRONG 

107- "User had multiple doctor visits" (says "multiple" without counting) — WRONG 

108 

109When the cluster has only 1 distinct fact, still lead with "1": \ 

110"User has 1 ongoing project: <description>." 

111 

112OUTPUT MUST BE VALID JSON. No prose around it. 

113 

114Sections in this cluster: 

115{sections} 

116""" 

117 

118 

119# ── Cluster helpers ───────────────────────────────────────────────── 

120 

121 

122def _slugify(s: str) -> str: 

123 """Convert a title to a URL-safe slug. e.g. 'Doctors visited' -> 'doctors-visited'.""" 

124 s = s.lower().strip() 

125 s = re.sub(r"[^a-z0-9\s-]", "", s) 

126 s = re.sub(r"\s+", "-", s) 

127 return s[:60] or "obs" 

128 

129 

130def _format_section_for_prompt(section: PageIndexSection) -> str: 

131 date = section.session_date.strftime("%Y-%m-%d") if section.session_date is not None else "no-date" 

132 summary = (section.summary or section.title or "").strip() 

133 return f"[line={section.line_num}, date={date}]\n{summary}" 

134 

135 

136async def _synthesize_observation( 

137 *, 

138 provider: LLMProvider, 

139 model: str | None, 

140 sections: list[PageIndexSection], 

141) -> tuple[str, str] | None: 

142 """LLM call — returns (title, content) or None on parse failure. 

143 

144 M12.6: sections are sorted by ``session_date`` (with ``line_num`` 

145 fallback) so the synth prompt sees them in chronological order. 

146 The Karpathy "latest state wins" guidance in the prompt requires 

147 this ordering to be meaningful. 

148 """ 

149 if not sections: 

150 return None 

151 ordered = sorted( 

152 sections, 

153 key=lambda s: ( 

154 s.session_date or datetime.min.replace(tzinfo=timezone.utc), 

155 s.line_num, 

156 ), 

157 ) 

158 rendered = "\n\n".join(_format_section_for_prompt(s) for s in ordered) 

159 msg = _OBSERVATION_PROMPT.format(sections=rendered) 

160 try: 

161 completion = await provider.complete( 

162 [Message(role="user", content=msg)], 

163 model=model, 

164 max_tokens=300, 

165 temperature=0.0, 

166 response_format={"type": "json_object"}, 

167 ) 

168 except Exception as exc: # noqa: BLE001 

169 _logger.warning( 

170 "section_compile.synthesize: LLM call failed (%s)", 

171 exc, 

172 ) 

173 return None 

174 try: 

175 data = json.loads(completion.text) 

176 except json.JSONDecodeError as exc: 

177 _logger.warning( 

178 "section_compile.synthesize: JSON parse failed (%s) text=%r", 

179 exc, 

180 completion.text[:200], 

181 ) 

182 return None 

183 title = str(data.get("title", "")).strip() 

184 content = str(data.get("content", "")).strip() 

185 if not title or not content: 

186 return None 

187 return title, content 

188 

189 

190# ── Revision pass (Karpathy incremental update, M12.6) ───────────── 

191 

192 

193_REVISION_PROMPT = """\ 

194You are revising a knowledge-base entry to reflect the LATEST stable \ 

195state across its source sections. 

196 

197Current entry (titled "{title}", revision {revision}): 

198{content} 

199 

200Source sections in CHRONOLOGICAL ORDER (oldest first): 

201{sections} 

202 

203Examine the sections in time order. If a LATER section supersedes, \ 

204contradicts, or extends an earlier claim, the entry should reflect \ 

205the latest state. If the entry already does, no revision is needed. 

206 

207Common revision triggers: 

208- Entry says "User visited 3 doctors" but a later section adds a 4th → bump count 

209- Entry says "User prefers brand X" but later sections say switched to Y → "User now prefers Y (previously X)" 

210- Entry uses past tense for an ongoing thing → use ongoing tense 

211- Entry omits items that appear in later sections → add them 

212- Entry includes superseded items as if current → mark as previous 

213 

214If revising: the new content follows the SAME shape as initial compile \ 

215(lead with explicit COUNT and colon-separated enumeration; cite dates / \ 

216entities when present). 

217 

218Respond as JSON only: 

219{{ 

220 "verdict": "OK" or "REVISE", 

221 "revised_title": "<5-10 words; only when REVISE>", 

222 "revised_content": "<1-3 sentences; only when REVISE>" 

223}} 

224""" 

225 

226 

227async def _revise_observation( 

228 *, 

229 provider: LLMProvider, 

230 model: str | None, 

231 page: WikiPage, 

232 sections: list[PageIndexSection], 

233) -> tuple[str, str] | None: 

234 """LLM call — returns (new_title, new_content) if revised, ``None`` 

235 if the page already reflects the latest state. 

236 

237 Sections must be sorted chronologically by the caller. Resilient 

238 to LLM / parse failures (returns ``None``). 

239 """ 

240 if not sections or not page.content.strip(): 

241 return None 

242 rendered = "\n\n".join(_format_section_for_prompt(s) for s in sections) 

243 msg = _REVISION_PROMPT.format( 

244 title=page.title, 

245 revision=page.revision, 

246 content=page.content, 

247 sections=rendered, 

248 ) 

249 try: 

250 completion = await provider.complete( 

251 [Message(role="user", content=msg)], 

252 model=model, 

253 max_tokens=400, 

254 temperature=0.0, 

255 response_format={"type": "json_object"}, 

256 ) 

257 except Exception as exc: # noqa: BLE001 

258 _logger.warning( 

259 "section_compile.revise: LLM call failed for page=%s (%s)", 

260 page.page_id, 

261 exc, 

262 ) 

263 return None 

264 try: 

265 data = json.loads(completion.text) 

266 except json.JSONDecodeError as exc: 

267 _logger.warning( 

268 "section_compile.revise: JSON parse failed for page=%s (%s) text=%r", 

269 page.page_id, 

270 exc, 

271 completion.text[:200], 

272 ) 

273 return None 

274 verdict = str(data.get("verdict", "")).strip().upper() 

275 if verdict != "REVISE": 

276 return None 

277 new_title = str(data.get("revised_title", "")).strip() 

278 new_content = str(data.get("revised_content", "")).strip() 

279 if not new_title or not new_content: 

280 return None 

281 if new_title == page.title and new_content == page.content: 

282 return None 

283 return new_title, new_content 

284 

285 

286async def revise_wikis_for_document( 

287 *, 

288 store: PageIndexStore, 

289 bank_id: str, 

290 document_id: str, 

291 provider: LLMProvider, 

292 model: str | None = None, 

293 embedding_model: str | None = None, 

294) -> int: 

295 """Karpathy-style revision pass — for each existing wiki of this 

296 document, re-check it against its provenance sections in 

297 chronological order and persist any revisions. 

298 

299 Returns: number of pages actually revised. 

300 

301 Idempotent: if no page needs revision, the call is a no-op (just 

302 one LLM call per page). Subsequent calls on already-revised pages 

303 should converge to no-op as the wiki stabilises. 

304 """ 

305 try: 

306 pages = await store.list_wiki_pages_for_doc(bank_id, document_id) 

307 except Exception as exc: # noqa: BLE001 

308 _logger.warning( 

309 "revise_wikis: list_wiki_pages_for_doc failed for doc=%s (%s)", 

310 document_id, 

311 exc, 

312 ) 

313 return 0 

314 if not pages: 

315 return 0 

316 

317 try: 

318 all_sections = await store.load_sections_with_embeddings(bank_id, document_id) 

319 except Exception as exc: # noqa: BLE001 

320 _logger.warning( 

321 "revise_wikis: load_sections failed for doc=%s (%s)", 

322 document_id, 

323 exc, 

324 ) 

325 return 0 

326 section_by_line = {s.line_num: s for s in all_sections} 

327 

328 revised_count = 0 

329 for page in pages: 

330 # Resolve provenance source_ids back to sections. 

331 provenance_lines: list[int] = [] 

332 for src in page.source_ids or []: 

333 # source_ids are formatted "docId:lineNum" 

334 if ":" not in src: 

335 continue 

336 _, _, line_str = src.rpartition(":") 

337 try: 

338 provenance_lines.append(int(line_str)) 

339 except ValueError: 

340 continue 

341 prov_sections = [section_by_line[ln] for ln in provenance_lines if ln in section_by_line] 

342 if not prov_sections: 

343 continue 

344 # Sort chronologically — session_date first, line_num fallback. 

345 prov_sections.sort( 

346 key=lambda s: ( 

347 s.session_date or datetime.min.replace(tzinfo=timezone.utc), 

348 s.line_num, 

349 ), 

350 ) 

351 result = await _revise_observation( 

352 provider=provider, 

353 model=model, 

354 page=page, 

355 sections=prov_sections, 

356 ) 

357 if result is None: 

358 continue 

359 new_title, new_content = result 

360 

361 # Save as a new revision. ``save_wiki_page`` upserts the page row 

362 # and inserts a new revision row keyed by (page_uuid, revision_number). 

363 revised_page = WikiPage( 

364 page_id=page.page_id, 

365 bank_id=page.bank_id, 

366 kind=page.kind, 

367 title=new_title, 

368 content=new_content, 

369 scope=page.scope, 

370 source_ids=list(page.source_ids or []), 

371 cross_links=list(page.cross_links or []), 

372 revision=page.revision + 1, 

373 revised_at=datetime.now(tz=timezone.utc), 

374 tags=page.tags, 

375 metadata=page.metadata, 

376 ) 

377 try: 

378 new_embedding = ( 

379 await provider.embed( 

380 [f"{new_title}\n\n{new_content}"], 

381 model=embedding_model, 

382 ) 

383 )[0] 

384 except Exception as exc: # noqa: BLE001 

385 _logger.warning( 

386 "revise_wikis: embed failed for page=%s (%s)", 

387 page.page_id, 

388 exc, 

389 ) 

390 new_embedding = None 

391 

392 provenance_pairs = [(s.document_id, s.line_num) for s in prov_sections] 

393 try: 

394 await store.save_wiki_page( 

395 page=revised_page, 

396 embedding=new_embedding, 

397 provenance=provenance_pairs, 

398 ) 

399 revised_count += 1 

400 except Exception as exc: # noqa: BLE001 

401 _logger.warning( 

402 "revise_wikis.save failed for page=%s (%s)", 

403 page.page_id, 

404 exc, 

405 ) 

406 

407 if revised_count: 

408 _logger.info( 

409 "revise_wikis: doc=%s revised %d/%d pages", 

410 document_id, 

411 revised_count, 

412 len(pages), 

413 ) 

414 return revised_count 

415 

416 

417# ── Public entry point ───────────────────────────────────────────── 

418 

419 

420async def compile_sections_for_document( 

421 *, 

422 store: PageIndexStore, 

423 bank_id: str, 

424 document_id: str, 

425 provider: LLMProvider, 

426 model: str | None = None, 

427 embedding_model: str | None = None, 

428 eps: float = 0.55, 

429 min_samples: int = 2, 

430 max_pages_per_doc: int = 10, 

431) -> list[str]: 

432 """Cluster sections in this document, synthesize an observation per 

433 cluster, persist as :class:`WikiPage` rows. 

434 

435 Returns the list of newly-created ``page_id``s. When the document 

436 already has wiki pages (idempotency check), returns ``[]``. 

437 

438 ``eps`` / ``min_samples`` tune DBSCAN: defaults match LME-shaped 

439 chat conversations (sessions are heterogeneous; eps=0.30 forces 

440 fairly close cosine similarity). Lower ``min_samples`` than M8's 

441 default (2 vs 3) because section-grain clusters tend to be smaller 

442 than memory-unit clusters. 

443 

444 ``max_pages_per_doc`` caps cost: at 5-7 LLM calls per doc × 50 LME 

445 docs we'd burn ~$2-3 per gate. The cap keeps it bounded if a 

446 pathological doc clusters into many small groups. 

447 """ 

448 # Idempotency: skip if pages already exist for this doc. 

449 try: 

450 existing = await store.count_wiki_pages_for_doc(bank_id, document_id) 

451 except Exception as exc: # noqa: BLE001 

452 _logger.warning( 

453 "section_compile.count failed for doc=%s (%s)", 

454 document_id, 

455 exc, 

456 ) 

457 existing = 0 

458 if existing > 0: 

459 _logger.debug( 

460 "section_compile: doc=%s already has %d pages — skip initial compile, run revision pass only", 

461 document_id, 

462 existing, 

463 ) 

464 # M12.6: even when initial compile is skipped, run the Karpathy 

465 # revision pass against the existing pages. Lets a code change 

466 # (e.g. new prompt) take effect against cached banks without a 

467 # full re-extraction. 

468 try: 

469 await revise_wikis_for_document( 

470 store=store, 

471 bank_id=bank_id, 

472 document_id=document_id, 

473 provider=provider, 

474 model=model, 

475 embedding_model=embedding_model, 

476 ) 

477 except Exception as exc: # noqa: BLE001 

478 _logger.warning( 

479 "revise_wikis: cache-hit revision pass failed for doc=%s (%s)", 

480 document_id, 

481 exc, 

482 ) 

483 return [] 

484 

485 # Load sections with embeddings populated. 

486 sections = await store.load_sections_with_embeddings(bank_id, document_id) 

487 if not sections: 

488 return [] 

489 sections_with_emb = [s for s in sections if s.summary_embedding] 

490 if len(sections_with_emb) < min_samples: 

491 # Too few embedded sections to cluster meaningfully. 

492 return [] 

493 

494 # DBSCAN over (line_num_str, embedding) pairs. 

495 items = [(str(s.line_num), s.summary_embedding) for s in sections_with_emb] 

496 labels = _dbscan(items, eps=eps, min_samples=min_samples) 

497 by_cluster: dict[int, list[PageIndexSection]] = {} 

498 section_by_line = {s.line_num: s for s in sections_with_emb} 

499 for line_str, cluster_id in labels.items(): 

500 if cluster_id == -1: 

501 continue 

502 by_cluster.setdefault(cluster_id, []).append( 

503 section_by_line[int(line_str)], 

504 ) 

505 

506 if not by_cluster: 

507 _logger.debug( 

508 "section_compile: doc=%s yielded no clusters (eps=%s min=%s)", 

509 document_id, 

510 eps, 

511 min_samples, 

512 ) 

513 return [] 

514 

515 # Cap cluster count to control cost. Prefer larger clusters (more 

516 # signal per LLM call). 

517 clusters = sorted(by_cluster.values(), key=len, reverse=True)[:max_pages_per_doc] 

518 

519 created_page_ids: list[str] = [] 

520 now = datetime.now(tz=timezone.utc) 

521 pending_pages: list[WikiPage] = [] 

522 pending_provenance: list[tuple[str, list[tuple[str, int]]]] = [] 

523 pending_summaries: list[str] = [] # used for embedding pass 

524 

525 for cluster in clusters: 

526 synth = await _synthesize_observation( 

527 provider=provider, 

528 model=model, 

529 sections=cluster, 

530 ) 

531 if synth is None: 

532 continue 

533 title, content = synth 

534 page_id = f"obs:{document_id[:8]}:{_slugify(title)}" 

535 source_ids = [f"{s.document_id}:{s.line_num}" for s in cluster] 

536 page = WikiPage( 

537 page_id=page_id, 

538 bank_id=bank_id, 

539 kind="topic", 

540 title=title, 

541 content=content, 

542 scope=f"document:{document_id}", 

543 source_ids=source_ids, 

544 cross_links=[], 

545 revision=1, 

546 revised_at=now, 

547 tags=None, 

548 metadata=None, 

549 ) 

550 pending_pages.append(page) 

551 provenance_pairs = [(s.document_id, s.line_num) for s in cluster] 

552 pending_provenance.append((page_id, provenance_pairs)) 

553 # Embed the title + content together so the wiki search 

554 # strategy matches both the topic label and the aggregated 

555 # facts. Mirrors how Hindsight indexes observations. 

556 pending_summaries.append(f"{title}\n\n{content}") 

557 

558 if not pending_pages: 

559 return [] 

560 

561 # Single batched embedding call for all observations in this doc. 

562 try: 

563 embeddings = await provider.embed( 

564 pending_summaries, 

565 model=embedding_model, 

566 ) 

567 except Exception as exc: # noqa: BLE001 

568 _logger.warning( 

569 "section_compile: embedding batch failed for doc=%s (%s)", 

570 document_id, 

571 exc, 

572 ) 

573 embeddings = [None] * len(pending_pages) 

574 

575 for page, embedding, (page_id, provenance_pairs) in zip( 

576 pending_pages, 

577 embeddings, 

578 pending_provenance, 

579 ): 

580 try: 

581 await store.save_wiki_page( 

582 page=page, 

583 embedding=embedding, 

584 provenance=provenance_pairs, 

585 ) 

586 except Exception as exc: # noqa: BLE001 

587 _logger.warning( 

588 "section_compile.save_wiki_page failed page_id=%s (%s)", 

589 page_id, 

590 exc, 

591 ) 

592 continue 

593 created_page_ids.append(page_id) 

594 

595 _logger.info( 

596 "section_compile: doc=%s created %d pages from %d clusters", 

597 document_id, 

598 len(created_page_ids), 

599 len(clusters), 

600 ) 

601 

602 # M12.6: Karpathy-style revision pass over the freshly compiled 

603 # pages. Catches "latest state" patterns that single-shot synthesis 

604 # missed (e.g. counts that grow over time, providers that change). 

605 # Idempotent — pages already at latest state are unchanged. 

606 try: 

607 await revise_wikis_for_document( 

608 store=store, 

609 bank_id=bank_id, 

610 document_id=document_id, 

611 provider=provider, 

612 model=model, 

613 embedding_model=embedding_model, 

614 ) 

615 except Exception as exc: # noqa: BLE001 

616 _logger.warning( 

617 "revise_wikis: post-initial revision pass failed for doc=%s (%s)", 

618 document_id, 

619 exc, 

620 ) 

621 

622 return created_page_ids