Coverage for astrocyte/pipeline/compile.py: 92%
179 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""M8: LLM wiki compile — CompileEngine.
3CompileEngine synthesises WikiPage entries from raw memories. Supported triggers:
5- **Explicit**: ``brain.compile(scope, bank_id)`` — caller provides scope string.
6 All memories tagged with that scope are compiled into one wiki page.
8- **Automatic**: ``brain.compile(bank_id)`` — scope discovered from the memory
9 bank itself via two passes:
10 1. Tagged memories → group by tag (each tag = one compile scope).
11 2. Untagged memories → DBSCAN clustering on stored embedding vectors;
12 each cluster is labelled with a lightweight LLM call.
14The engine is async and non-blocking. Raw memories are never modified —
15WikiPages are additive artefacts with provenance back to ``source_ids``.
17Vector embeddings of compiled pages are stored in the VectorStore with
18``memory_layer="compiled"`` and ``fact_type="wiki"``, so the standard
19recall pipeline can search them via ``search_similar`` without changes.
21Design reference: docs/_design/llm-wiki-compile.md §3, §3.1, §3.2.
22"""
24from __future__ import annotations
26import math
27import time
28import uuid
29from datetime import UTC, datetime
30from typing import TYPE_CHECKING
32from astrocyte.types import (
33 CompileResult,
34 CompileScope,
35 Message,
36 VectorItem,
37 WikiPage,
38 WikiPageKind,
39)
41if TYPE_CHECKING:
42 from astrocyte.provider import LLMProvider, VectorStore, WikiStore
44# ---------------------------------------------------------------------------
45# LLM prompts
46# ---------------------------------------------------------------------------
48_COMPILE_SYSTEM_PROMPT = """\
49You are a knowledge compiler. Given a collection of memories about a topic, \
50synthesise a clear, structured wiki page in markdown.
52Rules:
53- Start with a ## Title (the topic name)
54- Organise content into logical ### sections
55- Preserve factual accuracy — do not invent or extrapolate beyond the memories given
56- Note any contradictions or uncertainties explicitly (e.g. "Conflicting information: …")
57- Keep cross-references minimal but useful (e.g. "See also: deployment-pipeline")
58- Output only the markdown content, no preamble or meta-commentary
59"""
61_CLUSTER_LABEL_SYSTEM_PROMPT = """\
62You are a topic classifier. Given a set of short text fragments, identify the \
63single most specific topic they share in common.
65Output only a short slug: 2–4 words, lowercase, hyphenated. No explanation.
66Examples: "incident-response", "alice-background", "deployment-pipeline"
67"""
69# ---------------------------------------------------------------------------
70# Pure-Python DBSCAN (cosine distance, no external deps)
71# ---------------------------------------------------------------------------
74def _cosine_sim(a: list[float], b: list[float]) -> float:
75 dot = sum(x * y for x, y in zip(a, b))
76 na = math.sqrt(sum(x * x for x in a))
77 nb = math.sqrt(sum(x * x for x in b))
78 if na == 0 or nb == 0:
79 return 0.0
80 return dot / (na * nb)
83def _dbscan(
84 items: list[tuple[str, list[float]]],
85 eps: float = 0.25,
86 min_samples: int = 3,
87) -> dict[str, int]:
88 """DBSCAN over cosine distance. No external dependencies.
90 Args:
91 items: List of (id, embedding_vector) pairs.
92 eps: Maximum cosine distance for two points to be neighbours.
93 cosine_distance = 1 - cosine_similarity.
94 min_samples: Minimum neighbourhood size for a core point.
96 Returns:
97 Mapping of item_id → cluster_id. cluster_id == -1 means noise.
98 """
99 n = len(items)
100 if n == 0:
101 return {}
103 ids = [item[0] for item in items]
104 vectors = [item[1] for item in items]
106 # Pre-compute neighbourhoods: points within eps cosine distance
107 neighbours: list[list[int]] = []
108 for i in range(n):
109 nbrs = [j for j in range(n) if i != j and (1.0 - _cosine_sim(vectors[i], vectors[j])) <= eps]
110 neighbours.append(nbrs)
112 labels = [-1] * n
113 visited = [False] * n
114 cluster_id = 0
116 for i in range(n):
117 if visited[i]:
118 continue
119 visited[i] = True
121 if len(neighbours[i]) < min_samples:
122 labels[i] = -1 # noise — may be absorbed later by a cluster
123 continue
125 # Expand cluster from core point i
126 labels[i] = cluster_id
127 seeds: set[int] = set(neighbours[i])
129 while seeds:
130 j = seeds.pop()
131 if not visited[j]:
132 visited[j] = True
133 if len(neighbours[j]) >= min_samples:
134 seeds.update(neighbours[j])
135 if labels[j] == -1:
136 labels[j] = cluster_id # border point absorbed into cluster
138 cluster_id += 1
140 return {ids[i]: labels[i] for i in range(n)}
143# ---------------------------------------------------------------------------
144# CompileEngine
145# ---------------------------------------------------------------------------
148class CompileEngine:
149 """Compiles raw memories into WikiPage entries (M8 LLM wiki compile).
151 Usage::
153 engine = CompileEngine(vector_store, llm_provider, wiki_store)
155 # Explicit compile — caller provides scope
156 result = await engine.run("eng-team", scope="incident-response")
158 # Automatic compile — scope discovered from tags + DBSCAN
159 result = await engine.run("eng-team")
160 """
162 def __init__(
163 self,
164 vector_store: VectorStore,
165 llm_provider: LLMProvider,
166 wiki_store: WikiStore,
167 *,
168 dbscan_eps: float = 0.25,
169 dbscan_min_samples: int = 3,
170 compile_model: str | None = None,
171 max_memories_per_scope: int = 100,
172 ) -> None:
173 self._vs = vector_store
174 self._llm = llm_provider
175 self._ws = wiki_store
176 self._dbscan_eps = dbscan_eps
177 self._dbscan_min_samples = dbscan_min_samples
178 self._compile_model = compile_model
179 self._max_per_scope = max_memories_per_scope
181 # ------------------------------------------------------------------
182 # Public entry point
183 # ------------------------------------------------------------------
185 async def run(
186 self,
187 bank_id: str,
188 scope: str | None = None,
189 ) -> CompileResult:
190 """Compile memories into WikiPages for ``bank_id``.
192 Args:
193 bank_id: Bank to compile.
194 scope: If provided, compile only memories tagged with this scope.
195 If ``None``, trigger full scope discovery (tags + DBSCAN).
197 Returns:
198 :class:`~astrocyte.types.CompileResult` with pages created/updated,
199 noise memory count, and token usage. On error, ``error`` is set and
200 counts reflect partial progress.
201 """
202 start = time.monotonic()
203 tokens_used = 0
204 pages_created = 0
205 pages_updated = 0
206 noise_memories = 0
207 scopes_compiled: list[str] = []
209 try:
210 # Fetch all raw memories once — reused across scope resolution and synthesis
211 all_items = await self._fetch_raw_memories(bank_id)
213 if scope is not None:
214 compile_scopes = self._resolve_explicit(all_items, scope)
215 else:
216 compile_scopes, noise_memories = await self._discover_scopes(all_items)
218 item_map = {item.id: item for item in all_items}
220 for cs in compile_scopes:
221 relevant = [item_map[mid] for mid in cs.memory_ids[: self._max_per_scope] if mid in item_map]
222 page, tokens = await self._synthesise(cs, relevant, bank_id)
223 tokens_used += tokens
225 existing = await self._ws.get_page(page.page_id, bank_id)
226 await self._ws.upsert_page(page, bank_id)
228 # Embed the compiled page and store in VectorStore for recall tiering.
229 # Stored with memory_layer="compiled" and fact_type="wiki" so the
230 # standard search_similar path can filter for wiki pages.
231 try:
232 vecs = await self._llm.embed([f"{page.title}\n\n{page.content[:1000]}"])
233 if vecs:
234 await self._vs.store_vectors(
235 [
236 VectorItem(
237 id=page.page_id,
238 bank_id=bank_id,
239 vector=vecs[0],
240 text=f"[WIKI:{page.kind}] {page.title}\n\n{page.content[:500]}",
241 tags=page.tags,
242 memory_layer="compiled",
243 fact_type="wiki",
244 # Store source_ids so the recall wiki-tier can surface
245 # provenance citations without a WikiStore lookup.
246 metadata={"_wiki_source_ids": ",".join(page.source_ids)},
247 )
248 ]
249 )
250 except Exception:
251 pass # embedding failure is non-fatal; page is still stored
253 if existing is None:
254 pages_created += 1
255 else:
256 pages_updated += 1
257 scopes_compiled.append(cs.scope)
259 except Exception as exc:
260 elapsed_ms = int((time.monotonic() - start) * 1000)
261 return CompileResult(
262 bank_id=bank_id,
263 scopes_compiled=scopes_compiled,
264 pages_created=pages_created,
265 pages_updated=pages_updated,
266 noise_memories=noise_memories,
267 tokens_used=tokens_used,
268 elapsed_ms=elapsed_ms,
269 error=str(exc),
270 )
272 elapsed_ms = int((time.monotonic() - start) * 1000)
273 return CompileResult(
274 bank_id=bank_id,
275 scopes_compiled=scopes_compiled,
276 pages_created=pages_created,
277 pages_updated=pages_updated,
278 noise_memories=noise_memories,
279 tokens_used=tokens_used,
280 elapsed_ms=elapsed_ms,
281 )
283 # ------------------------------------------------------------------
284 # Scope resolution
285 # ------------------------------------------------------------------
287 def _resolve_explicit(
288 self,
289 all_items: list[VectorItem],
290 scope: str,
291 ) -> list[CompileScope]:
292 """Resolve an explicit caller-provided scope.
294 Selects memories whose tags include ``scope``. If no tagged memories
295 match, falls back to all memories in the bank (caller is compiling
296 an untagged bank with a named scope).
297 """
298 matching_ids = [item.id for item in all_items if item.tags and scope in item.tags]
299 if not matching_ids:
300 matching_ids = [item.id for item in all_items]
301 return [CompileScope(scope=scope, source="explicit", memory_ids=matching_ids)]
303 async def _discover_scopes(
304 self,
305 all_items: list[VectorItem],
306 ) -> tuple[list[CompileScope], int]:
307 """Discover compile scopes from memory content.
309 Pass 1 — Tagged memories: group by tag. Each unique tag with at least
310 ``dbscan_min_samples`` memories becomes a compile scope.
312 Pass 2 — Untagged memories: DBSCAN on stored embedding vectors.
313 Each cluster is labelled with a lightweight LLM call. Noise points
314 (memories DBSCAN could not assign to any cluster) are counted and
315 held for the next compile cycle.
317 Returns:
318 (list of CompileScope, noise_count)
319 """
320 if not all_items:
321 return [], 0
323 # Pass 1: tag grouping
324 tag_groups: dict[str, list[str]] = {}
325 untagged: list[VectorItem] = []
327 for item in all_items:
328 if item.tags:
329 for tag in item.tags:
330 tag_groups.setdefault(tag, []).append(item.id)
331 else:
332 untagged.append(item)
334 scopes: list[CompileScope] = [
335 CompileScope(scope=tag, source="tag", memory_ids=ids)
336 for tag, ids in tag_groups.items()
337 if len(ids) >= self._dbscan_min_samples
338 ]
340 # Pass 2: DBSCAN on untagged memories
341 noise_count = 0
342 if untagged:
343 vector_pairs = [
344 (item.id, item.vector)
345 for item in untagged
346 if item.vector # guard against items with no vector
347 ]
348 if vector_pairs:
349 labels = _dbscan(
350 vector_pairs,
351 eps=self._dbscan_eps,
352 min_samples=self._dbscan_min_samples,
353 )
355 cluster_groups: dict[int, list[str]] = {}
356 for item_id, cluster_label in labels.items():
357 if cluster_label == -1:
358 noise_count += 1
359 else:
360 cluster_groups.setdefault(cluster_label, []).append(item_id)
362 untagged_map = {item.id: item for item in untagged}
363 for cluster_label, memory_ids in cluster_groups.items():
364 # Label via LLM using up to 5 representative memories
365 samples = [untagged_map[mid] for mid in memory_ids[:5] if mid in untagged_map]
366 label = await self._label_cluster(samples)
367 scopes.append(
368 CompileScope(
369 scope=label,
370 source="cluster",
371 memory_ids=memory_ids,
372 )
373 )
375 return scopes, noise_count
377 # ------------------------------------------------------------------
378 # Synthesis
379 # ------------------------------------------------------------------
381 async def _synthesise(
382 self,
383 cs: CompileScope,
384 relevant: list[VectorItem],
385 bank_id: str,
386 ) -> tuple[WikiPage, int]:
387 """Synthesise a WikiPage from the memories in a CompileScope."""
388 tokens_used = 0
390 if not relevant:
391 page = _empty_page(cs, bank_id)
392 return page, 0
394 memory_texts = "\n".join(f"[Memory {i + 1}]: {item.text[:400]}" for i, item in enumerate(relevant))
395 user_prompt = f"Topic: {cs.scope}\n\nMemories:\n{memory_texts}"
397 messages = [
398 Message(role="system", content=_COMPILE_SYSTEM_PROMPT),
399 Message(role="user", content=user_prompt),
400 ]
402 completion = await self._llm.complete(
403 messages,
404 model=self._compile_model,
405 max_tokens=2048,
406 )
408 if completion.usage:
409 tokens_used = completion.usage.input_tokens + completion.usage.output_tokens
411 # Collect tags from contributing memories (deduplicated, ordered)
412 seen_tags: set[str] = set()
413 page_tags: list[str] = []
414 for item in relevant:
415 for tag in item.tags or []:
416 if tag not in seen_tags:
417 seen_tags.add(tag)
418 page_tags.append(tag)
420 kind = _infer_kind(cs.scope)
421 page_id = f"{kind}:{cs.scope.replace(' ', '-')}"
423 page = WikiPage(
424 page_id=page_id,
425 bank_id=bank_id,
426 kind=kind,
427 title=cs.scope.replace("-", " ").title(),
428 content=completion.text,
429 scope=cs.scope,
430 source_ids=[item.id for item in relevant],
431 cross_links=[], # extracted in a future lint pass
432 revision=1,
433 revised_at=datetime.now(UTC),
434 tags=page_tags or None,
435 )
436 return page, tokens_used
438 async def _label_cluster(self, items: list[VectorItem]) -> str:
439 """Generate a topic slug for a DBSCAN cluster via a lightweight LLM call."""
440 snippets = "\n".join(f"- {item.text[:150]}" for item in items)
441 messages = [
442 Message(role="system", content=_CLUSTER_LABEL_SYSTEM_PROMPT),
443 Message(role="user", content=f"Texts:\n{snippets}"),
444 ]
445 try:
446 completion = await self._llm.complete(messages, max_tokens=20)
447 label = completion.text.strip().lower().replace(" ", "-")[:50]
448 return label or "general"
449 except Exception:
450 return f"cluster-{uuid.uuid4().hex[:6]}"
452 # ------------------------------------------------------------------
453 # Storage helpers
454 # ------------------------------------------------------------------
456 async def _fetch_raw_memories(self, bank_id: str) -> list[VectorItem]:
457 """Fetch all raw (non-compiled) memories from the VectorStore."""
458 all_items: list[VectorItem] = []
459 offset = 0
460 batch = 100
461 while True:
462 chunk = await self._vs.list_vectors(bank_id, offset=offset, limit=batch)
463 if not chunk:
464 break
465 # Exclude previously compiled pages (stored with memory_layer="compiled")
466 raw = [item for item in chunk if item.memory_layer != "compiled" and item.fact_type != "wiki"]
467 all_items.extend(raw)
468 if len(chunk) < batch:
469 break
470 offset += batch
471 return all_items
474# ---------------------------------------------------------------------------
475# Helpers
476# ---------------------------------------------------------------------------
479def _infer_kind(scope: str) -> WikiPageKind:
480 """Infer WikiPage kind from scope string prefix conventions."""
481 for prefix in ("entity:", "person:", "org:", "location:"):
482 if scope.startswith(prefix):
483 return "entity"
484 if scope.startswith("concept:"):
485 return "concept"
486 return "topic"
489def _empty_page(cs: CompileScope, bank_id: str) -> WikiPage:
490 kind = _infer_kind(cs.scope)
491 title = cs.scope.replace("-", " ").title()
492 return WikiPage(
493 page_id=f"{kind}:{cs.scope.replace(' ', '-')}",
494 bank_id=bank_id,
495 kind=kind,
496 title=title,
497 content=f"## {title}\n\nNo memories found for this scope.",
498 scope=cs.scope,
499 source_ids=[],
500 cross_links=[],
501 revision=1,
502 revised_at=datetime.now(UTC),
503 )