Coverage for astrocyte/pipeline/wiki_incremental.py: 89%

89 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""M14.2: Karpathy update-affected-wikis. 

2 

3When a new document is retained, identify existing wiki pages whose 

4provenance entities overlap with the new document's entities, then run 

5an LLM update pass per affected wiki with the new content folded in. 

6Models Karpathy's LLM-wiki spec ("update-affected-wikis" op) and 

7Hindsight's observation-consolidation freshness pattern — both share 

8the shape: detect what changed, ripple to dependent compiled artefacts. 

9 

10Why this is the next M14 lever after M14.6 revert: 

11 

12The post-M14.7-revert decision was: stop auto-injecting preferences 

13into the answerer's context (Hindsight design); double down on the 

14consolidation pipeline so retrieved candidates ARE fresh. M14.2 

15attacks that directly: 

16 

171. **knowledge-update** category on LME (current 5/5 at top_20 — already 

18 strong, but only because wikis happen to recompile from scratch each 

19 bench run). Real users add documents incrementally; stale wikis 

20 would otherwise hurt knowledge-update over time. 

212. **multi-session** on LME — questions that span sessions need 

22 cross-document wiki updates to surface coherent state. Currently 

23 wikis are document-local; M14.2 enables document-spanning updates. 

243. **open-domain** + **temporal** on LoCoMo — both benefit from 

25 freshness signal in the wiki content (e.g. "user now lives in 

26 Berlin (previously Paris)" inline in the relevant wiki). 

27 

28Algorithm: 

29 

30:: 

31 

32 on_document_retained(doc): 

33 affected = find_affected_wikis(doc.entities) # entity overlap 

34 for wiki in affected: 

35 ctx = (wiki.content, new_doc.relevant_sections) 

36 out = llm.update_wiki(ctx) # one call per wiki 

37 if out.verdict == "UPDATE": 

38 save_revision(wiki, out.new_content) 

39 

40The "find_affected_wikis" step is a cheap SQL join via 

41``astrocyte_pi_wiki_provenance`` × ``astrocyte_pi_section_entities`` 

42(both already populated by M9 + M12.5). 

43 

44Cost: ~$0.05 per source (one LLM call per affected wiki, typically 

452-5 wikis per source). At LoCoMo 200 docs that's ~$1-2 per bench run. 

46Marginal vs the answerer's per-question cost. 

47 

48Design references: 

49- ``docs/_design/llm-wiki-compile.md`` (Karpathy spec) 

50- ``docs/_design/m13-m14-roadmap.md`` §4 (M14.2 row) 

51- Hindsight: ``hindsight_api/engine/consolidation/consolidator.py`` 

52 (similar shape: identify novel facts → update or create observations) 

53 

54Status: SKELETON — interface + entity-overlap query + LLM prompt 

55designed; ``update_affected_wikis_for_document`` is the entry point. 

56The orchestration wiring into retain (FSM state ``UPDATE_AFFECTED_WIKIS`` 

57in the M14.0 scaffold, or direct call from ``bench_pageindex_locomo``) 

58is the next implementation step. 

59""" 

60 

61from __future__ import annotations 

62 

63import json 

64import logging 

65from dataclasses import dataclass, field 

66from datetime import datetime, timezone 

67from typing import TYPE_CHECKING 

68 

69from astrocyte.types import Message 

70 

71if TYPE_CHECKING: 

72 from astrocyte.provider import LLMProvider, PageIndexStore 

73 from astrocyte.types import WikiPage 

74 

75_logger = logging.getLogger("astrocyte.pipeline.wiki_incremental") 

76 

77 

78# --------------------------------------------------------------------------- 

79# Tunables 

80# --------------------------------------------------------------------------- 

81 

82#: Minimum entities shared between new doc and wiki provenance before we 

83#: consider the wiki "affected". 1 is the loosest threshold; raising to 2 

84#: cuts spurious cross-doc updates but misses true single-entity-overlap 

85#: updates. Calibrated against LoCoMo's entity density (~5-15 unique 

86#: entities per session) — 1 is right for the bench. 

87MIN_ENTITY_OVERLAP: int = 1 

88 

89#: Cap on wikis updated per retain. If a single source touches >N wikis 

90#: we update only the top-N by overlap count — keeps cost bounded. 

91MAX_AFFECTED_WIKIS_PER_SOURCE: int = 8 

92 

93#: max_tokens for the update LLM call. Sized for a 1-3 paragraph wiki 

94#: with revision notes; ample margin for context preservation. 

95UPDATE_LLM_MAX_TOKENS: int = 1500 

96 

97 

98# --------------------------------------------------------------------------- 

99# Result types 

100# --------------------------------------------------------------------------- 

101 

102 

103@dataclass 

104class WikiUpdateResult: 

105 """One wiki's update outcome from an incremental update pass.""" 

106 

107 page_id: str 

108 verdict: str # 'UPDATED' | 'NO_CHANGE' | 'FAILED' 

109 new_revision: int | None = None 

110 detail: str = "" 

111 

112 

113@dataclass 

114class IncrementalUpdateReport: 

115 """Per-document aggregate of wiki update outcomes.""" 

116 

117 document_id: str 

118 affected_count: int # how many wikis the entity-overlap query found 

119 updated: list[WikiUpdateResult] = field(default_factory=list) 

120 skipped: list[WikiUpdateResult] = field(default_factory=list) 

121 failed: list[WikiUpdateResult] = field(default_factory=list) 

122 

123 

124# --------------------------------------------------------------------------- 

125# LLM prompt 

126# --------------------------------------------------------------------------- 

127 

128 

129_UPDATE_PROMPT = """\ 

130You are maintaining a knowledge wiki. A new conversation has been retained \ 

131that overlaps with this existing wiki entry. Decide whether the new \ 

132content REQUIRES the wiki to be updated, or whether the existing wiki \ 

133already captures the new information (no change needed). 

134 

135Existing wiki (title: "{title}", revision {revision}): 

136=== BEGIN WIKI === 

137{wiki_content} 

138=== END WIKI === 

139 

140New content from the retained conversation (chronologically AFTER the \ 

141wiki's existing provenance): 

142=== BEGIN NEW CONTENT === 

143{new_content} 

144=== END NEW CONTENT === 

145 

146Rules: 

147- The wiki tracks LATEST state. If the new content updates a fact \ 

148 the wiki currently states (e.g. user moved cities, changed jobs, \ 

149 finished a project they were planning), emit UPDATE with the \ 

150 revised content reflecting the new state. Preserve the previous \ 

151 state inline as parenthetical when historically relevant (e.g. \ 

152 "(previously lived in Paris)"). 

153- If the new content adds NEW information not covered by the wiki, \ 

154 emit UPDATE with the addition integrated naturally. 

155- If the new content merely re-states what the wiki already says, \ 

156 emit NO_CHANGE. 

157- If the new content is unrelated to the wiki's topic despite the \ 

158 entity overlap (e.g. shared person name in different contexts), \ 

159 emit NO_CHANGE. 

160- Preserve the wiki's voice and structure. Output the FULL revised \ 

161 content, not a diff. 

162 

163Output a JSON object: 

164{{ 

165 "verdict": "UPDATE" or "NO_CHANGE", 

166 "revised_content": "<full revised wiki content; only when UPDATE>", 

167 "revised_title": "<optional new title; only when UPDATE and title needs change>" 

168}} 

169 

170OUTPUT MUST BE VALID JSON. No prose around it. 

171""" 

172 

173 

174# --------------------------------------------------------------------------- 

175# Public entry point 

176# --------------------------------------------------------------------------- 

177 

178 

179async def update_affected_wikis_for_document( # noqa: PLR0913 

180 *, 

181 page_index_store: PageIndexStore, 

182 provider: LLMProvider, 

183 bank_id: str, 

184 document_id: str, 

185 new_entities: list[str], 

186 new_content_excerpts: dict[str, str], 

187 model: str | None = None, 

188 min_overlap: int = MIN_ENTITY_OVERLAP, 

189 max_updates: int = MAX_AFFECTED_WIKIS_PER_SOURCE, 

190) -> IncrementalUpdateReport: 

191 """Run Karpathy update-affected-wikis for one retained source. 

192 

193 Args: 

194 page_index_store: source of the entity-overlap query AND wiki 

195 persistence (``save_wiki_page`` for revised wikis). 

196 provider: LLM provider for the update calls. 

197 bank_id: tenant scope. 

198 document_id: the newly-retained document whose entities trigger 

199 the update sweep. 

200 new_entities: distinct entity names extracted from the new 

201 document (already deduped by caller). 

202 new_content_excerpts: ``entity_name → relevant excerpt`` from 

203 the new document; passed to the update LLM as evidence. 

204 model: LLM model name (defaults to provider's default). 

205 min_overlap: minimum shared entities to flag a wiki as affected. 

206 max_updates: cap on update calls (top-N by overlap count). 

207 

208 Returns: 

209 :class:`IncrementalUpdateReport` with per-wiki outcomes. 

210 

211 Idempotent across re-runs: the second invocation finds the wiki's 

212 revision already reflects the new state and emits NO_CHANGE. 

213 """ 

214 report = IncrementalUpdateReport( 

215 document_id=document_id, 

216 affected_count=0, 

217 ) 

218 

219 if not new_entities: 

220 _logger.debug( 

221 "wiki_incremental: doc=%s has no entities — skip", 

222 document_id, 

223 ) 

224 return report 

225 

226 # ── Find affected wikis via entity overlap (SPI returns full WikiPage rows) ── 

227 try: 

228 affected = await page_index_store.list_wikis_affected_by_entities( 

229 bank_id, 

230 list(new_entities), 

231 min_overlap=min_overlap, 

232 limit=max_updates, 

233 ) 

234 except Exception as exc: # noqa: BLE001 

235 _logger.warning( 

236 "wiki_incremental.spi: list_wikis_affected_by_entities failed doc=%s (%s)", 

237 document_id, 

238 exc, 

239 ) 

240 return report 

241 

242 report.affected_count = len(affected) 

243 if not affected: 

244 _logger.debug( 

245 "wiki_incremental: doc=%s no wikis affected by entities=%s", 

246 document_id, 

247 new_entities[:5], 

248 ) 

249 return report 

250 

251 # ── Per-wiki update calls ──────────────────────────────────────── 

252 for wiki, _overlap_count, shared_entities in affected: 

253 result = await _update_one_wiki( 

254 wiki=wiki, 

255 page_index_store=page_index_store, 

256 provider=provider, 

257 shared_entities=shared_entities, 

258 new_content_excerpts=new_content_excerpts, 

259 model=model, 

260 ) 

261 if result.verdict == "UPDATED": 

262 report.updated.append(result) 

263 elif result.verdict == "NO_CHANGE": 

264 report.skipped.append(result) 

265 else: 

266 report.failed.append(result) 

267 

268 _logger.info( 

269 "wiki_incremental: doc=%s affected=%d updated=%d skipped=%d failed=%d", 

270 document_id, 

271 report.affected_count, 

272 len(report.updated), 

273 len(report.skipped), 

274 len(report.failed), 

275 ) 

276 return report 

277 

278 

279# --------------------------------------------------------------------------- 

280# Internals 

281# --------------------------------------------------------------------------- 

282 

283 

284async def _update_one_wiki( # noqa: PLR0913 

285 *, 

286 wiki: WikiPage, 

287 page_index_store: PageIndexStore, 

288 provider: LLMProvider, 

289 shared_entities: list[str], 

290 new_content_excerpts: dict[str, str], 

291 model: str | None, 

292) -> WikiUpdateResult: 

293 """Run one update prompt and persist the result. 

294 

295 Composes ``new_content`` as the union of excerpts keyed by 

296 entities the wiki and new doc share, so the LLM only sees evidence 

297 relevant to this wiki's topic. 

298 """ 

299 new_content_parts = [ 

300 f"[{e}] {new_content_excerpts.get(e, '').strip()}" 

301 for e in shared_entities 

302 if new_content_excerpts.get(e, "").strip() 

303 ] 

304 if not new_content_parts: 

305 return WikiUpdateResult( 

306 page_id=wiki.page_id, 

307 verdict="NO_CHANGE", 

308 detail="no excerpt content for shared entities", 

309 ) 

310 new_content = "\n\n".join(new_content_parts) 

311 

312 msg = _UPDATE_PROMPT.format( 

313 title=wiki.title, 

314 revision=wiki.revision, 

315 wiki_content=wiki.content, 

316 new_content=new_content, 

317 ) 

318 try: 

319 completion = await provider.complete( 

320 [Message(role="user", content=msg)], 

321 model=model, 

322 max_tokens=UPDATE_LLM_MAX_TOKENS, 

323 temperature=0.0, 

324 response_format={"type": "json_object"}, 

325 ) 

326 except Exception as exc: # noqa: BLE001 

327 _logger.warning( 

328 "wiki_incremental.llm: call failed for page=%s (%s)", 

329 wiki.page_id, 

330 exc, 

331 ) 

332 return WikiUpdateResult( 

333 page_id=wiki.page_id, 

334 verdict="FAILED", 

335 detail=str(exc), 

336 ) 

337 

338 try: 

339 data = json.loads(completion.text) 

340 except (json.JSONDecodeError, AttributeError) as exc: 

341 _logger.warning( 

342 "wiki_incremental.parse: bad JSON for page=%s (%s)", 

343 wiki.page_id, 

344 exc, 

345 ) 

346 return WikiUpdateResult( 

347 page_id=wiki.page_id, 

348 verdict="FAILED", 

349 detail=f"bad JSON: {exc}", 

350 ) 

351 

352 verdict = str(data.get("verdict", "")).upper() 

353 if verdict == "NO_CHANGE": 

354 return WikiUpdateResult(page_id=wiki.page_id, verdict="NO_CHANGE") 

355 

356 if verdict != "UPDATE": 

357 return WikiUpdateResult( 

358 page_id=wiki.page_id, 

359 verdict="FAILED", 

360 detail=f"unrecognised verdict: {verdict!r}", 

361 ) 

362 

363 revised_content = (data.get("revised_content") or "").strip() 

364 if not revised_content: 

365 return WikiUpdateResult( 

366 page_id=wiki.page_id, 

367 verdict="FAILED", 

368 detail="UPDATE verdict with empty revised_content", 

369 ) 

370 revised_title = (data.get("revised_title") or wiki.title).strip() 

371 

372 # Save as a new revision via ``PageIndexStore.save_wiki_page`` — its 

373 # upsert semantics auto-bump revision and archive the prior version. 

374 # The wiki's existing provenance is preserved: we re-parse it from 

375 # ``source_ids`` (M12.6 format: ``"<doc_id>:<line_num>"``) if present; 

376 # otherwise we pass an empty list (provenance row already exists in 

377 # ``astrocyte_pi_wiki_provenance`` from the initial compile and is 

378 # untouched by content-only revisions). 

379 from dataclasses import replace # noqa: PLC0415 

380 

381 new_wiki = replace( 

382 wiki, 

383 title=revised_title, 

384 content=revised_content, 

385 revision=wiki.revision + 1, 

386 revised_at=datetime.now(tz=timezone.utc), 

387 ) 

388 provenance: list[tuple[str, int]] = [] 

389 for sid in wiki.source_ids or []: 

390 if ":" in sid: 

391 doc_id, _, line_str = sid.rpartition(":") 

392 try: 

393 provenance.append((doc_id, int(line_str))) 

394 except ValueError: 

395 continue 

396 try: 

397 await page_index_store.save_wiki_page( 

398 page=new_wiki, 

399 embedding=None, 

400 provenance=provenance, 

401 ) 

402 except Exception as exc: # noqa: BLE001 

403 _logger.warning( 

404 "wiki_incremental.save: page=%s revision=%d failed (%s)", 

405 wiki.page_id, 

406 new_wiki.revision, 

407 exc, 

408 ) 

409 return WikiUpdateResult( 

410 page_id=wiki.page_id, 

411 verdict="FAILED", 

412 detail=f"save failed: {exc}", 

413 ) 

414 

415 return WikiUpdateResult( 

416 page_id=wiki.page_id, 

417 verdict="UPDATED", 

418 new_revision=new_wiki.revision, 

419 )