Coverage for astrocyte/pipeline/lint.py: 99%

107 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""M8 W6: Wiki page lint pass. 

2 

3LintEngine runs periodic checks over compiled wiki pages, detecting: 

4 

5- **Stale**: one or more ``source_ids`` no longer exist in the VectorStore 

6 (the underlying raw memory was forgotten). Action: flag for recompile. 

7 

8- **Orphan**: *all* ``source_ids`` are gone — the page has no evidence left. 

9 Action: candidate for archival per memory-lifecycle.md §2. 

10 

11- **Contradiction**: two pages in the same bank make conflicting claims. 

12 Detected via lightweight LLM call (opt-in; requires an LLMProvider). 

13 Action: flag both pages; surface in audit log. 

14 

15The lint pass is additive — it only annotates pages with issues; it never 

16deletes or modifies page content. Stale pages are marked in their metadata 

17and queued for the next compile cycle. All issues are returned in 

18``LintResult`` for the caller to act on. 

19 

20Design reference: docs/_design/llm-wiki-compile.md §7. 

21""" 

22 

23from __future__ import annotations 

24 

25import time 

26from dataclasses import dataclass 

27from typing import TYPE_CHECKING, Literal 

28 

29if TYPE_CHECKING: 

30 from astrocyte.provider import LLMProvider, VectorStore, WikiStore 

31 from astrocyte.types import WikiPage 

32 

33# --------------------------------------------------------------------------- 

34# Data model 

35# --------------------------------------------------------------------------- 

36 

37LintIssueKind = Literal["stale", "orphan", "contradiction"] 

38 

39 

40@dataclass 

41class LintIssue: 

42 """A single lint finding on a wiki page.""" 

43 

44 kind: LintIssueKind 

45 page_id: str 

46 bank_id: str 

47 detail: str # Human-readable description of the issue 

48 action: Literal["recompile", "archive", "review"] # Recommended action 

49 

50 #: For ``contradiction`` issues: the other page involved (if known). 

51 peer_page_id: str | None = None 

52 

53 

54@dataclass 

55class LintResult: 

56 """Aggregate result of a lint pass over one bank.""" 

57 

58 bank_id: str 

59 pages_checked: int 

60 stale_count: int # Pages with ≥1 missing source but ≥1 remaining 

61 orphan_count: int # Pages with 0 remaining sources 

62 contradiction_count: int # Page-pairs flagged by contradiction detection 

63 issues: list[LintIssue] 

64 elapsed_ms: int 

65 error: str | None = None 

66 

67 

68# --------------------------------------------------------------------------- 

69# LintEngine 

70# --------------------------------------------------------------------------- 

71 

72_CONTRADICTION_SYSTEM_PROMPT = """\ 

73You are a fact-checker. Given two wiki page excerpts, determine whether they \ 

74make contradictory factual claims. 

75 

76Respond with exactly one of: 

77- "CONTRADICTION: <brief explanation>" — if they conflict 

78- "OK" — if they do not conflict or the comparison is inconclusive 

79 

80Output only the single-line verdict. No preamble. 

81""" 

82 

83 

84class LintEngine: 

85 """Runs lint checks over compiled wiki pages in a bank. 

86 

87 Usage:: 

88 

89 engine = LintEngine(vector_store, wiki_store) 

90 result = await engine.run("user-alice") 

91 for issue in result.issues: 

92 print(issue.kind, issue.page_id, issue.action) 

93 

94 Contradiction detection is opt-in (requires an LLM provider and makes 

95 one LLM call per page-pair):: 

96 

97 engine = LintEngine(vector_store, wiki_store, llm_provider, 

98 detect_contradictions=True) 

99 """ 

100 

101 def __init__( 

102 self, 

103 vector_store: VectorStore, 

104 wiki_store: WikiStore, 

105 llm_provider: LLMProvider | None = None, 

106 *, 

107 detect_contradictions: bool = False, 

108 contradiction_model: str | None = None, 

109 max_contradiction_pairs: int = 50, 

110 ) -> None: 

111 self._vs = vector_store 

112 self._ws = wiki_store 

113 self._llm = llm_provider 

114 self._detect_contradictions = detect_contradictions and llm_provider is not None 

115 self._contradiction_model = contradiction_model 

116 self._max_pairs = max_contradiction_pairs 

117 

118 # ------------------------------------------------------------------ 

119 # Public entry point 

120 # ------------------------------------------------------------------ 

121 

122 async def run(self, bank_id: str) -> LintResult: 

123 """Run all lint checks over the wiki pages in *bank_id*. 

124 

125 Args: 

126 bank_id: The bank to lint. 

127 

128 Returns: 

129 :class:`LintResult` with issue lists and counts. On unexpected 

130 error, ``error`` is set and ``issues`` reflects partial progress. 

131 """ 

132 start = time.monotonic() 

133 issues: list[LintIssue] = [] 

134 stale_count = 0 

135 orphan_count = 0 

136 contradiction_count = 0 

137 

138 try: 

139 pages = await self._ws.list_pages(bank_id) 

140 pages_checked = len(pages) 

141 

142 # Build the set of live raw-memory IDs once (scan the VectorStore 

143 # once per lint run rather than once per page). 

144 live_ids = await self._fetch_live_ids(bank_id) 

145 

146 # ── Staleness / orphan checks ───────────────────────────────── 

147 for page in pages: 

148 issue = self._check_stale_or_orphan(page, bank_id, live_ids) 

149 if issue is not None: 

150 issues.append(issue) 

151 if issue.kind == "orphan": 

152 orphan_count += 1 

153 else: 

154 stale_count += 1 

155 

156 # ── Contradiction detection (opt-in) ────────────────────────── 

157 if self._detect_contradictions: 

158 contradiction_issues = await self._detect_page_contradictions(pages, bank_id) 

159 issues.extend(contradiction_issues) 

160 contradiction_count = len(contradiction_issues) 

161 

162 except Exception as exc: 

163 elapsed_ms = int((time.monotonic() - start) * 1000) 

164 return LintResult( 

165 bank_id=bank_id, 

166 pages_checked=0, 

167 stale_count=stale_count, 

168 orphan_count=orphan_count, 

169 contradiction_count=contradiction_count, 

170 issues=issues, 

171 elapsed_ms=elapsed_ms, 

172 error=str(exc), 

173 ) 

174 

175 elapsed_ms = int((time.monotonic() - start) * 1000) 

176 return LintResult( 

177 bank_id=bank_id, 

178 pages_checked=pages_checked, 

179 stale_count=stale_count, 

180 orphan_count=orphan_count, 

181 contradiction_count=contradiction_count, 

182 issues=issues, 

183 elapsed_ms=elapsed_ms, 

184 ) 

185 

186 # ------------------------------------------------------------------ 

187 # Staleness / orphan 

188 # ------------------------------------------------------------------ 

189 

190 def _check_stale_or_orphan( 

191 self, 

192 page: WikiPage, 

193 bank_id: str, 

194 live_ids: set[str], 

195 ) -> LintIssue | None: 

196 """Return a stale or orphan issue if any source_ids are missing, else None.""" 

197 if not page.source_ids: 

198 # No source_ids recorded — cannot determine staleness; skip 

199 return None 

200 

201 missing = [sid for sid in page.source_ids if sid not in live_ids] 

202 if not missing: 

203 return None # All sources still live 

204 

205 if len(missing) == len(page.source_ids): 

206 # Every source is gone — orphan 

207 return LintIssue( 

208 kind="orphan", 

209 page_id=page.page_id, 

210 bank_id=bank_id, 

211 detail=( 

212 f"All {len(page.source_ids)} source memories have been deleted " 

213 f"or forgotten. Page has no remaining evidence." 

214 ), 

215 action="archive", 

216 ) 

217 

218 # Partial loss — stale 

219 return LintIssue( 

220 kind="stale", 

221 page_id=page.page_id, 

222 bank_id=bank_id, 

223 detail=( 

224 f"{len(missing)}/{len(page.source_ids)} source memories missing: " 

225 + ", ".join(missing[:5]) 

226 + ("…" if len(missing) > 5 else "") 

227 ), 

228 action="recompile", 

229 ) 

230 

231 # ------------------------------------------------------------------ 

232 # Contradiction detection 

233 # ------------------------------------------------------------------ 

234 

235 async def _detect_page_contradictions( 

236 self, 

237 pages: list[WikiPage], 

238 bank_id: str, 

239 ) -> list[LintIssue]: 

240 """Check page pairs for contradictory claims via LLM. 

241 

242 Checks up to ``max_contradiction_pairs`` pairs (upper-triangular) to 

243 bound LLM cost. Returns one ``LintIssue`` per contradicting pair 

244 (both pages referenced via ``peer_page_id``). 

245 """ 

246 from astrocyte.types import Message 

247 

248 issues: list[LintIssue] = [] 

249 pairs_checked = 0 

250 

251 for i in range(len(pages)): 

252 for j in range(i + 1, len(pages)): 

253 if pairs_checked >= self._max_pairs: 

254 break 

255 

256 pa, pb = pages[i], pages[j] 

257 pairs_checked += 1 

258 

259 excerpt_a = f"## {pa.title}\n{pa.content[:400]}" 

260 excerpt_b = f"## {pb.title}\n{pb.content[:400]}" 

261 user_prompt = f"Page A:\n{excerpt_a}\n\nPage B:\n{excerpt_b}" 

262 

263 try: 

264 completion = await self._llm.complete( # type: ignore[union-attr] 

265 [ 

266 Message(role="system", content=_CONTRADICTION_SYSTEM_PROMPT), 

267 Message(role="user", content=user_prompt), 

268 ], 

269 model=self._contradiction_model, 

270 max_tokens=80, 

271 ) 

272 verdict = completion.text.strip() 

273 except Exception: 

274 continue # LLM failure is non-fatal; skip pair 

275 

276 if verdict.upper().startswith("CONTRADICTION"): 

277 explanation = verdict[len("CONTRADICTION:") :].strip() if ":" in verdict else verdict 

278 issues.append( 

279 LintIssue( 

280 kind="contradiction", 

281 page_id=pa.page_id, 

282 bank_id=bank_id, 

283 detail=f"Contradiction with {pb.page_id!r}: {explanation}", 

284 action="review", 

285 peer_page_id=pb.page_id, 

286 ) 

287 ) 

288 # Also flag the peer page 

289 issues.append( 

290 LintIssue( 

291 kind="contradiction", 

292 page_id=pb.page_id, 

293 bank_id=bank_id, 

294 detail=f"Contradiction with {pa.page_id!r}: {explanation}", 

295 action="review", 

296 peer_page_id=pa.page_id, 

297 ) 

298 ) 

299 

300 if pairs_checked >= self._max_pairs: 

301 break 

302 

303 return issues 

304 

305 # ------------------------------------------------------------------ 

306 # Helpers 

307 # ------------------------------------------------------------------ 

308 

309 async def _fetch_live_ids(self, bank_id: str) -> set[str]: 

310 """Return the set of all raw-memory IDs currently in the VectorStore.""" 

311 live: set[str] = set() 

312 offset = 0 

313 batch = 200 

314 while True: 

315 chunk = await self._vs.list_vectors(bank_id, offset=offset, limit=batch) 

316 if not chunk: 

317 break 

318 for item in chunk: 

319 # Only count raw memories — exclude compiled wiki pages 

320 if item.memory_layer != "compiled" and item.fact_type != "wiki": 

321 live.add(item.id) 

322 if len(chunk) < batch: 

323 break 

324 offset += batch 

325 return live