Coverage for astrocyte/pipeline/compile_trigger.py: 91%

104 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""M8 W4: Threshold trigger + async compile queue. 

2 

3CompileQueue monitors per-bank memory growth and staleness, enqueueing 

4background compile jobs when either threshold is exceeded — without ever 

5blocking the retain path. 

6 

7Design reference: docs/_design/llm-wiki-compile.md §3.1, §8. 

8""" 

9 

10from __future__ import annotations 

11 

12import asyncio 

13import logging 

14from dataclasses import dataclass 

15from datetime import UTC, datetime 

16from typing import TYPE_CHECKING 

17 

18if TYPE_CHECKING: 

19 from astrocyte.pipeline.compile import CompileEngine 

20 

21_logger = logging.getLogger("astrocyte.compile_trigger") 

22 

23# Sentinel pushed into the queue to signal the worker to stop. 

24_STOP: str | None = None 

25 

26 

27# --------------------------------------------------------------------------- 

28# Configuration 

29# --------------------------------------------------------------------------- 

30 

31 

32@dataclass 

33class CompileTriggerConfig: 

34 """Thresholds that govern automatic compile triggering. 

35 

36 Either condition is sufficient to enqueue a compile job: 

37 

38 - **Size**: ``memories_since_last_compile >= size_threshold`` 

39 - **Staleness**: last compile is older than ``staleness_days`` AND 

40 ``memories_since_last_compile >= staleness_min_memories`` 

41 

42 Use ``size_threshold=0`` to trigger on every retain (useful for tests or 

43 high-frequency low-cost scenarios). 

44 """ 

45 

46 size_threshold: int = 50 

47 """New memories since last compile required to trigger on size alone.""" 

48 

49 staleness_days: float = 7.0 

50 """Days since last compile required for the staleness trigger.""" 

51 

52 staleness_min_memories: int = 10 

53 """Minimum new memories for the staleness condition to fire.""" 

54 

55 

56# --------------------------------------------------------------------------- 

57# Per-bank state 

58# --------------------------------------------------------------------------- 

59 

60 

61@dataclass 

62class BankCompileState: 

63 """Runtime state for one bank's compile trigger.""" 

64 

65 bank_id: str 

66 memories_since_last_compile: int = 0 

67 last_compile_at: datetime | None = None 

68 

69 def should_trigger(self, config: CompileTriggerConfig) -> bool: 

70 """Return True if either trigger condition is met.""" 

71 n = self.memories_since_last_compile 

72 

73 # Size trigger: enough new memories regardless of time 

74 if n >= config.size_threshold: 

75 return True 

76 

77 # Staleness trigger: old enough AND at least staleness_min_memories 

78 if self.last_compile_at is not None and n >= config.staleness_min_memories: 

79 age_days = (datetime.now(UTC) - self.last_compile_at).total_seconds() / 86400.0 

80 if age_days >= config.staleness_days: 

81 return True 

82 

83 return False 

84 

85 

86# --------------------------------------------------------------------------- 

87# Compile queue 

88# --------------------------------------------------------------------------- 

89 

90 

91class CompileQueue: 

92 """Async background queue for compile jobs. 

93 

94 Usage:: 

95 

96 engine = CompileEngine(vs, llm, ws) 

97 queue = CompileQueue(engine, CompileTriggerConfig(size_threshold=10)) 

98 await queue.start() 

99 

100 # In retain path (sync-safe — notify_retain is non-blocking): 

101 queue.notify_retain("user-alice") 

102 

103 # Graceful shutdown: 

104 await queue.stop() 

105 

106 Call :meth:`drain` in tests to wait for all pending jobs to finish before 

107 making assertions:: 

108 

109 queue.notify_retain("bank") 

110 await queue.drain() 

111 # CompileEngine.run("bank") has now completed 

112 """ 

113 

114 def __init__( 

115 self, 

116 engine: CompileEngine, 

117 config: CompileTriggerConfig | None = None, 

118 *, 

119 max_queue_size: int = 100, 

120 ) -> None: 

121 self._engine = engine 

122 self._config = config or CompileTriggerConfig() 

123 self._queue: asyncio.Queue[str | None] = asyncio.Queue(maxsize=max_queue_size) 

124 self._pending: set[str] = set() # banks queued but not yet running 

125 self._state: dict[str, BankCompileState] = {} 

126 self._task: asyncio.Task[None] | None = None 

127 self._started = False 

128 

129 # ------------------------------------------------------------------ 

130 # Public API 

131 # ------------------------------------------------------------------ 

132 

133 def notify_retain(self, bank_id: str) -> None: 

134 """Increment the memory counter for *bank_id* and enqueue if threshold met. 

135 

136 Non-blocking and sync-safe — designed to be called from the hot retain 

137 path without holding any lock or awaiting anything. Duplicate banks 

138 already in the queue are silently skipped so the queue never fills with 

139 the same bank repeated N times. 

140 """ 

141 state = self._state.setdefault(bank_id, BankCompileState(bank_id=bank_id)) 

142 state.memories_since_last_compile += 1 

143 

144 if not state.should_trigger(self._config): 

145 return 

146 

147 if bank_id in self._pending: 

148 # Already queued — the in-flight compile will cover these memories. 

149 return 

150 

151 try: 

152 self._queue.put_nowait(bank_id) 

153 self._pending.add(bank_id) 

154 _logger.debug("Compile job enqueued for bank %r", bank_id) 

155 except asyncio.QueueFull: 

156 _logger.warning( 

157 "Compile queue full (%d slots); skipping bank %r. " 

158 "Consider increasing max_queue_size or lowering thresholds.", 

159 self._queue.maxsize, 

160 bank_id, 

161 ) 

162 

163 async def start(self) -> None: 

164 """Start the background worker task. Idempotent.""" 

165 if self._started: 

166 return 

167 self._started = True 

168 self._task = asyncio.create_task(self._worker(), name="astrocyte.compile_worker") 

169 _logger.debug("Compile worker started") 

170 

171 async def stop(self, *, timeout: float = 5.0) -> None: 

172 """Signal the worker to stop and wait up to *timeout* seconds.""" 

173 if not self._started or self._task is None: 

174 return 

175 self._started = False 

176 try: 

177 self._queue.put_nowait(_STOP) 

178 except asyncio.QueueFull: 

179 self._task.cancel() 

180 try: 

181 await asyncio.wait_for(self._task, timeout=timeout) 

182 except (asyncio.TimeoutError, asyncio.CancelledError): 

183 _logger.warning("Compile worker did not stop within %.1fs; cancelling.", timeout) 

184 self._task.cancel() 

185 self._task = None 

186 _logger.debug("Compile worker stopped") 

187 

188 async def drain(self, *, timeout: float = 5.0) -> None: 

189 """Wait for all currently queued jobs to finish. 

190 

191 Raises :exc:`asyncio.TimeoutError` if jobs do not complete within 

192 *timeout* seconds. Safe to call when the worker is not started 

193 (returns immediately if the queue is empty). 

194 """ 

195 if self._queue.empty() and not self._pending: 

196 return 

197 await asyncio.wait_for(self._queue.join(), timeout=timeout) 

198 

199 @property 

200 def pending_banks(self) -> set[str]: 

201 """Banks currently in the queue awaiting compilation (snapshot).""" 

202 return set(self._pending) 

203 

204 @property 

205 def state(self) -> dict[str, BankCompileState]: 

206 """Per-bank compile state, keyed by bank_id (snapshot).""" 

207 return dict(self._state) 

208 

209 # ------------------------------------------------------------------ 

210 # Background worker 

211 # ------------------------------------------------------------------ 

212 

213 async def _worker(self) -> None: 

214 """Process compile jobs from the queue until a stop sentinel is received.""" 

215 _logger.debug("Compile worker loop started") 

216 while True: 

217 bank_id = await self._queue.get() 

218 try: 

219 if bank_id is _STOP: 

220 _logger.debug("Compile worker received stop signal") 

221 break 

222 

223 self._pending.discard(bank_id) 

224 _logger.info("Running background compile for bank %r", bank_id) 

225 try: 

226 result = await self._engine.run(bank_id) 

227 _logger.info( 

228 "Background compile done for bank %r: %d created, %d updated, %d noise, %d tokens, %dms", 

229 bank_id, 

230 result.pages_created, 

231 result.pages_updated, 

232 result.noise_memories, 

233 result.tokens_used, 

234 result.elapsed_ms, 

235 ) 

236 if result.error: 

237 _logger.error( 

238 "Background compile for bank %r reported error: %s", 

239 bank_id, 

240 result.error, 

241 ) 

242 # Reset counter; record compile time 

243 state = self._state.setdefault(bank_id, BankCompileState(bank_id=bank_id)) 

244 state.memories_since_last_compile = 0 

245 state.last_compile_at = datetime.now(UTC) 

246 

247 except Exception: 

248 _logger.exception( 

249 "Background compile for bank %r raised unexpectedly; skipping.", 

250 bank_id, 

251 ) 

252 finally: 

253 self._queue.task_done()