Coverage for astrocyte/pipeline/compile.py: 92%

179 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""M8: LLM wiki compile — CompileEngine. 

2 

3CompileEngine synthesises WikiPage entries from raw memories. Supported triggers: 

4 

5- **Explicit**: ``brain.compile(scope, bank_id)`` — caller provides scope string. 

6 All memories tagged with that scope are compiled into one wiki page. 

7 

8- **Automatic**: ``brain.compile(bank_id)`` — scope discovered from the memory 

9 bank itself via two passes: 

10 1. Tagged memories → group by tag (each tag = one compile scope). 

11 2. Untagged memories → DBSCAN clustering on stored embedding vectors; 

12 each cluster is labelled with a lightweight LLM call. 

13 

14The engine is async and non-blocking. Raw memories are never modified — 

15WikiPages are additive artefacts with provenance back to ``source_ids``. 

16 

17Vector embeddings of compiled pages are stored in the VectorStore with 

18``memory_layer="compiled"`` and ``fact_type="wiki"``, so the standard 

19recall pipeline can search them via ``search_similar`` without changes. 

20 

21Design reference: docs/_design/llm-wiki-compile.md §3, §3.1, §3.2. 

22""" 

23 

24from __future__ import annotations 

25 

26import math 

27import time 

28import uuid 

29from datetime import UTC, datetime 

30from typing import TYPE_CHECKING 

31 

32from astrocyte.types import ( 

33 CompileResult, 

34 CompileScope, 

35 Message, 

36 VectorItem, 

37 WikiPage, 

38 WikiPageKind, 

39) 

40 

41if TYPE_CHECKING: 

42 from astrocyte.provider import LLMProvider, VectorStore, WikiStore 

43 

44# --------------------------------------------------------------------------- 

45# LLM prompts 

46# --------------------------------------------------------------------------- 

47 

48_COMPILE_SYSTEM_PROMPT = """\ 

49You are a knowledge compiler. Given a collection of memories about a topic, \ 

50synthesise a clear, structured wiki page in markdown. 

51 

52Rules: 

53- Start with a ## Title (the topic name) 

54- Organise content into logical ### sections 

55- Preserve factual accuracy — do not invent or extrapolate beyond the memories given 

56- Note any contradictions or uncertainties explicitly (e.g. "Conflicting information: …") 

57- Keep cross-references minimal but useful (e.g. "See also: deployment-pipeline") 

58- Output only the markdown content, no preamble or meta-commentary 

59""" 

60 

61_CLUSTER_LABEL_SYSTEM_PROMPT = """\ 

62You are a topic classifier. Given a set of short text fragments, identify the \ 

63single most specific topic they share in common. 

64 

65Output only a short slug: 2–4 words, lowercase, hyphenated. No explanation. 

66Examples: "incident-response", "alice-background", "deployment-pipeline" 

67""" 

68 

69# --------------------------------------------------------------------------- 

70# Pure-Python DBSCAN (cosine distance, no external deps) 

71# --------------------------------------------------------------------------- 

72 

73 

74def _cosine_sim(a: list[float], b: list[float]) -> float: 

75 dot = sum(x * y for x, y in zip(a, b)) 

76 na = math.sqrt(sum(x * x for x in a)) 

77 nb = math.sqrt(sum(x * x for x in b)) 

78 if na == 0 or nb == 0: 

79 return 0.0 

80 return dot / (na * nb) 

81 

82 

83def _dbscan( 

84 items: list[tuple[str, list[float]]], 

85 eps: float = 0.25, 

86 min_samples: int = 3, 

87) -> dict[str, int]: 

88 """DBSCAN over cosine distance. No external dependencies. 

89 

90 Args: 

91 items: List of (id, embedding_vector) pairs. 

92 eps: Maximum cosine distance for two points to be neighbours. 

93 cosine_distance = 1 - cosine_similarity. 

94 min_samples: Minimum neighbourhood size for a core point. 

95 

96 Returns: 

97 Mapping of item_id → cluster_id. cluster_id == -1 means noise. 

98 """ 

99 n = len(items) 

100 if n == 0: 

101 return {} 

102 

103 ids = [item[0] for item in items] 

104 vectors = [item[1] for item in items] 

105 

106 # Pre-compute neighbourhoods: points within eps cosine distance 

107 neighbours: list[list[int]] = [] 

108 for i in range(n): 

109 nbrs = [j for j in range(n) if i != j and (1.0 - _cosine_sim(vectors[i], vectors[j])) <= eps] 

110 neighbours.append(nbrs) 

111 

112 labels = [-1] * n 

113 visited = [False] * n 

114 cluster_id = 0 

115 

116 for i in range(n): 

117 if visited[i]: 

118 continue 

119 visited[i] = True 

120 

121 if len(neighbours[i]) < min_samples: 

122 labels[i] = -1 # noise — may be absorbed later by a cluster 

123 continue 

124 

125 # Expand cluster from core point i 

126 labels[i] = cluster_id 

127 seeds: set[int] = set(neighbours[i]) 

128 

129 while seeds: 

130 j = seeds.pop() 

131 if not visited[j]: 

132 visited[j] = True 

133 if len(neighbours[j]) >= min_samples: 

134 seeds.update(neighbours[j]) 

135 if labels[j] == -1: 

136 labels[j] = cluster_id # border point absorbed into cluster 

137 

138 cluster_id += 1 

139 

140 return {ids[i]: labels[i] for i in range(n)} 

141 

142 

143# --------------------------------------------------------------------------- 

144# CompileEngine 

145# --------------------------------------------------------------------------- 

146 

147 

148class CompileEngine: 

149 """Compiles raw memories into WikiPage entries (M8 LLM wiki compile). 

150 

151 Usage:: 

152 

153 engine = CompileEngine(vector_store, llm_provider, wiki_store) 

154 

155 # Explicit compile — caller provides scope 

156 result = await engine.run("eng-team", scope="incident-response") 

157 

158 # Automatic compile — scope discovered from tags + DBSCAN 

159 result = await engine.run("eng-team") 

160 """ 

161 

162 def __init__( 

163 self, 

164 vector_store: VectorStore, 

165 llm_provider: LLMProvider, 

166 wiki_store: WikiStore, 

167 *, 

168 dbscan_eps: float = 0.25, 

169 dbscan_min_samples: int = 3, 

170 compile_model: str | None = None, 

171 max_memories_per_scope: int = 100, 

172 ) -> None: 

173 self._vs = vector_store 

174 self._llm = llm_provider 

175 self._ws = wiki_store 

176 self._dbscan_eps = dbscan_eps 

177 self._dbscan_min_samples = dbscan_min_samples 

178 self._compile_model = compile_model 

179 self._max_per_scope = max_memories_per_scope 

180 

181 # ------------------------------------------------------------------ 

182 # Public entry point 

183 # ------------------------------------------------------------------ 

184 

185 async def run( 

186 self, 

187 bank_id: str, 

188 scope: str | None = None, 

189 ) -> CompileResult: 

190 """Compile memories into WikiPages for ``bank_id``. 

191 

192 Args: 

193 bank_id: Bank to compile. 

194 scope: If provided, compile only memories tagged with this scope. 

195 If ``None``, trigger full scope discovery (tags + DBSCAN). 

196 

197 Returns: 

198 :class:`~astrocyte.types.CompileResult` with pages created/updated, 

199 noise memory count, and token usage. On error, ``error`` is set and 

200 counts reflect partial progress. 

201 """ 

202 start = time.monotonic() 

203 tokens_used = 0 

204 pages_created = 0 

205 pages_updated = 0 

206 noise_memories = 0 

207 scopes_compiled: list[str] = [] 

208 

209 try: 

210 # Fetch all raw memories once — reused across scope resolution and synthesis 

211 all_items = await self._fetch_raw_memories(bank_id) 

212 

213 if scope is not None: 

214 compile_scopes = self._resolve_explicit(all_items, scope) 

215 else: 

216 compile_scopes, noise_memories = await self._discover_scopes(all_items) 

217 

218 item_map = {item.id: item for item in all_items} 

219 

220 for cs in compile_scopes: 

221 relevant = [item_map[mid] for mid in cs.memory_ids[: self._max_per_scope] if mid in item_map] 

222 page, tokens = await self._synthesise(cs, relevant, bank_id) 

223 tokens_used += tokens 

224 

225 existing = await self._ws.get_page(page.page_id, bank_id) 

226 await self._ws.upsert_page(page, bank_id) 

227 

228 # Embed the compiled page and store in VectorStore for recall tiering. 

229 # Stored with memory_layer="compiled" and fact_type="wiki" so the 

230 # standard search_similar path can filter for wiki pages. 

231 try: 

232 vecs = await self._llm.embed([f"{page.title}\n\n{page.content[:1000]}"]) 

233 if vecs: 

234 await self._vs.store_vectors( 

235 [ 

236 VectorItem( 

237 id=page.page_id, 

238 bank_id=bank_id, 

239 vector=vecs[0], 

240 text=f"[WIKI:{page.kind}] {page.title}\n\n{page.content[:500]}", 

241 tags=page.tags, 

242 memory_layer="compiled", 

243 fact_type="wiki", 

244 # Store source_ids so the recall wiki-tier can surface 

245 # provenance citations without a WikiStore lookup. 

246 metadata={"_wiki_source_ids": ",".join(page.source_ids)}, 

247 ) 

248 ] 

249 ) 

250 except Exception: 

251 pass # embedding failure is non-fatal; page is still stored 

252 

253 if existing is None: 

254 pages_created += 1 

255 else: 

256 pages_updated += 1 

257 scopes_compiled.append(cs.scope) 

258 

259 except Exception as exc: 

260 elapsed_ms = int((time.monotonic() - start) * 1000) 

261 return CompileResult( 

262 bank_id=bank_id, 

263 scopes_compiled=scopes_compiled, 

264 pages_created=pages_created, 

265 pages_updated=pages_updated, 

266 noise_memories=noise_memories, 

267 tokens_used=tokens_used, 

268 elapsed_ms=elapsed_ms, 

269 error=str(exc), 

270 ) 

271 

272 elapsed_ms = int((time.monotonic() - start) * 1000) 

273 return CompileResult( 

274 bank_id=bank_id, 

275 scopes_compiled=scopes_compiled, 

276 pages_created=pages_created, 

277 pages_updated=pages_updated, 

278 noise_memories=noise_memories, 

279 tokens_used=tokens_used, 

280 elapsed_ms=elapsed_ms, 

281 ) 

282 

283 # ------------------------------------------------------------------ 

284 # Scope resolution 

285 # ------------------------------------------------------------------ 

286 

287 def _resolve_explicit( 

288 self, 

289 all_items: list[VectorItem], 

290 scope: str, 

291 ) -> list[CompileScope]: 

292 """Resolve an explicit caller-provided scope. 

293 

294 Selects memories whose tags include ``scope``. If no tagged memories 

295 match, falls back to all memories in the bank (caller is compiling 

296 an untagged bank with a named scope). 

297 """ 

298 matching_ids = [item.id for item in all_items if item.tags and scope in item.tags] 

299 if not matching_ids: 

300 matching_ids = [item.id for item in all_items] 

301 return [CompileScope(scope=scope, source="explicit", memory_ids=matching_ids)] 

302 

303 async def _discover_scopes( 

304 self, 

305 all_items: list[VectorItem], 

306 ) -> tuple[list[CompileScope], int]: 

307 """Discover compile scopes from memory content. 

308 

309 Pass 1 — Tagged memories: group by tag. Each unique tag with at least 

310 ``dbscan_min_samples`` memories becomes a compile scope. 

311 

312 Pass 2 — Untagged memories: DBSCAN on stored embedding vectors. 

313 Each cluster is labelled with a lightweight LLM call. Noise points 

314 (memories DBSCAN could not assign to any cluster) are counted and 

315 held for the next compile cycle. 

316 

317 Returns: 

318 (list of CompileScope, noise_count) 

319 """ 

320 if not all_items: 

321 return [], 0 

322 

323 # Pass 1: tag grouping 

324 tag_groups: dict[str, list[str]] = {} 

325 untagged: list[VectorItem] = [] 

326 

327 for item in all_items: 

328 if item.tags: 

329 for tag in item.tags: 

330 tag_groups.setdefault(tag, []).append(item.id) 

331 else: 

332 untagged.append(item) 

333 

334 scopes: list[CompileScope] = [ 

335 CompileScope(scope=tag, source="tag", memory_ids=ids) 

336 for tag, ids in tag_groups.items() 

337 if len(ids) >= self._dbscan_min_samples 

338 ] 

339 

340 # Pass 2: DBSCAN on untagged memories 

341 noise_count = 0 

342 if untagged: 

343 vector_pairs = [ 

344 (item.id, item.vector) 

345 for item in untagged 

346 if item.vector # guard against items with no vector 

347 ] 

348 if vector_pairs: 

349 labels = _dbscan( 

350 vector_pairs, 

351 eps=self._dbscan_eps, 

352 min_samples=self._dbscan_min_samples, 

353 ) 

354 

355 cluster_groups: dict[int, list[str]] = {} 

356 for item_id, cluster_label in labels.items(): 

357 if cluster_label == -1: 

358 noise_count += 1 

359 else: 

360 cluster_groups.setdefault(cluster_label, []).append(item_id) 

361 

362 untagged_map = {item.id: item for item in untagged} 

363 for cluster_label, memory_ids in cluster_groups.items(): 

364 # Label via LLM using up to 5 representative memories 

365 samples = [untagged_map[mid] for mid in memory_ids[:5] if mid in untagged_map] 

366 label = await self._label_cluster(samples) 

367 scopes.append( 

368 CompileScope( 

369 scope=label, 

370 source="cluster", 

371 memory_ids=memory_ids, 

372 ) 

373 ) 

374 

375 return scopes, noise_count 

376 

377 # ------------------------------------------------------------------ 

378 # Synthesis 

379 # ------------------------------------------------------------------ 

380 

381 async def _synthesise( 

382 self, 

383 cs: CompileScope, 

384 relevant: list[VectorItem], 

385 bank_id: str, 

386 ) -> tuple[WikiPage, int]: 

387 """Synthesise a WikiPage from the memories in a CompileScope.""" 

388 tokens_used = 0 

389 

390 if not relevant: 

391 page = _empty_page(cs, bank_id) 

392 return page, 0 

393 

394 memory_texts = "\n".join(f"[Memory {i + 1}]: {item.text[:400]}" for i, item in enumerate(relevant)) 

395 user_prompt = f"Topic: {cs.scope}\n\nMemories:\n{memory_texts}" 

396 

397 messages = [ 

398 Message(role="system", content=_COMPILE_SYSTEM_PROMPT), 

399 Message(role="user", content=user_prompt), 

400 ] 

401 

402 completion = await self._llm.complete( 

403 messages, 

404 model=self._compile_model, 

405 max_tokens=2048, 

406 ) 

407 

408 if completion.usage: 

409 tokens_used = completion.usage.input_tokens + completion.usage.output_tokens 

410 

411 # Collect tags from contributing memories (deduplicated, ordered) 

412 seen_tags: set[str] = set() 

413 page_tags: list[str] = [] 

414 for item in relevant: 

415 for tag in item.tags or []: 

416 if tag not in seen_tags: 

417 seen_tags.add(tag) 

418 page_tags.append(tag) 

419 

420 kind = _infer_kind(cs.scope) 

421 page_id = f"{kind}:{cs.scope.replace(' ', '-')}" 

422 

423 page = WikiPage( 

424 page_id=page_id, 

425 bank_id=bank_id, 

426 kind=kind, 

427 title=cs.scope.replace("-", " ").title(), 

428 content=completion.text, 

429 scope=cs.scope, 

430 source_ids=[item.id for item in relevant], 

431 cross_links=[], # extracted in a future lint pass 

432 revision=1, 

433 revised_at=datetime.now(UTC), 

434 tags=page_tags or None, 

435 ) 

436 return page, tokens_used 

437 

438 async def _label_cluster(self, items: list[VectorItem]) -> str: 

439 """Generate a topic slug for a DBSCAN cluster via a lightweight LLM call.""" 

440 snippets = "\n".join(f"- {item.text[:150]}" for item in items) 

441 messages = [ 

442 Message(role="system", content=_CLUSTER_LABEL_SYSTEM_PROMPT), 

443 Message(role="user", content=f"Texts:\n{snippets}"), 

444 ] 

445 try: 

446 completion = await self._llm.complete(messages, max_tokens=20) 

447 label = completion.text.strip().lower().replace(" ", "-")[:50] 

448 return label or "general" 

449 except Exception: 

450 return f"cluster-{uuid.uuid4().hex[:6]}" 

451 

452 # ------------------------------------------------------------------ 

453 # Storage helpers 

454 # ------------------------------------------------------------------ 

455 

456 async def _fetch_raw_memories(self, bank_id: str) -> list[VectorItem]: 

457 """Fetch all raw (non-compiled) memories from the VectorStore.""" 

458 all_items: list[VectorItem] = [] 

459 offset = 0 

460 batch = 100 

461 while True: 

462 chunk = await self._vs.list_vectors(bank_id, offset=offset, limit=batch) 

463 if not chunk: 

464 break 

465 # Exclude previously compiled pages (stored with memory_layer="compiled") 

466 raw = [item for item in chunk if item.memory_layer != "compiled" and item.fact_type != "wiki"] 

467 all_items.extend(raw) 

468 if len(chunk) < batch: 

469 break 

470 offset += batch 

471 return all_items 

472 

473 

474# --------------------------------------------------------------------------- 

475# Helpers 

476# --------------------------------------------------------------------------- 

477 

478 

479def _infer_kind(scope: str) -> WikiPageKind: 

480 """Infer WikiPage kind from scope string prefix conventions.""" 

481 for prefix in ("entity:", "person:", "org:", "location:"): 

482 if scope.startswith(prefix): 

483 return "entity" 

484 if scope.startswith("concept:"): 

485 return "concept" 

486 return "topic" 

487 

488 

489def _empty_page(cs: CompileScope, bank_id: str) -> WikiPage: 

490 kind = _infer_kind(cs.scope) 

491 title = cs.scope.replace("-", " ").title() 

492 return WikiPage( 

493 page_id=f"{kind}:{cs.scope.replace(' ', '-')}", 

494 bank_id=bank_id, 

495 kind=kind, 

496 title=title, 

497 content=f"## {title}\n\nNo memories found for this scope.", 

498 scope=cs.scope, 

499 source_ids=[], 

500 cross_links=[], 

501 revision=1, 

502 revised_at=datetime.now(UTC), 

503 )