Coverage for astrocyte/pipeline/compile_trigger.py: 91%
104 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""M8 W4: Threshold trigger + async compile queue.
3CompileQueue monitors per-bank memory growth and staleness, enqueueing
4background compile jobs when either threshold is exceeded — without ever
5blocking the retain path.
7Design reference: docs/_design/llm-wiki-compile.md §3.1, §8.
8"""
10from __future__ import annotations
12import asyncio
13import logging
14from dataclasses import dataclass
15from datetime import UTC, datetime
16from typing import TYPE_CHECKING
18if TYPE_CHECKING:
19 from astrocyte.pipeline.compile import CompileEngine
21_logger = logging.getLogger("astrocyte.compile_trigger")
23# Sentinel pushed into the queue to signal the worker to stop.
24_STOP: str | None = None
27# ---------------------------------------------------------------------------
28# Configuration
29# ---------------------------------------------------------------------------
32@dataclass
33class CompileTriggerConfig:
34 """Thresholds that govern automatic compile triggering.
36 Either condition is sufficient to enqueue a compile job:
38 - **Size**: ``memories_since_last_compile >= size_threshold``
39 - **Staleness**: last compile is older than ``staleness_days`` AND
40 ``memories_since_last_compile >= staleness_min_memories``
42 Use ``size_threshold=0`` to trigger on every retain (useful for tests or
43 high-frequency low-cost scenarios).
44 """
46 size_threshold: int = 50
47 """New memories since last compile required to trigger on size alone."""
49 staleness_days: float = 7.0
50 """Days since last compile required for the staleness trigger."""
52 staleness_min_memories: int = 10
53 """Minimum new memories for the staleness condition to fire."""
56# ---------------------------------------------------------------------------
57# Per-bank state
58# ---------------------------------------------------------------------------
61@dataclass
62class BankCompileState:
63 """Runtime state for one bank's compile trigger."""
65 bank_id: str
66 memories_since_last_compile: int = 0
67 last_compile_at: datetime | None = None
69 def should_trigger(self, config: CompileTriggerConfig) -> bool:
70 """Return True if either trigger condition is met."""
71 n = self.memories_since_last_compile
73 # Size trigger: enough new memories regardless of time
74 if n >= config.size_threshold:
75 return True
77 # Staleness trigger: old enough AND at least staleness_min_memories
78 if self.last_compile_at is not None and n >= config.staleness_min_memories:
79 age_days = (datetime.now(UTC) - self.last_compile_at).total_seconds() / 86400.0
80 if age_days >= config.staleness_days:
81 return True
83 return False
86# ---------------------------------------------------------------------------
87# Compile queue
88# ---------------------------------------------------------------------------
91class CompileQueue:
92 """Async background queue for compile jobs.
94 Usage::
96 engine = CompileEngine(vs, llm, ws)
97 queue = CompileQueue(engine, CompileTriggerConfig(size_threshold=10))
98 await queue.start()
100 # In retain path (sync-safe — notify_retain is non-blocking):
101 queue.notify_retain("user-alice")
103 # Graceful shutdown:
104 await queue.stop()
106 Call :meth:`drain` in tests to wait for all pending jobs to finish before
107 making assertions::
109 queue.notify_retain("bank")
110 await queue.drain()
111 # CompileEngine.run("bank") has now completed
112 """
114 def __init__(
115 self,
116 engine: CompileEngine,
117 config: CompileTriggerConfig | None = None,
118 *,
119 max_queue_size: int = 100,
120 ) -> None:
121 self._engine = engine
122 self._config = config or CompileTriggerConfig()
123 self._queue: asyncio.Queue[str | None] = asyncio.Queue(maxsize=max_queue_size)
124 self._pending: set[str] = set() # banks queued but not yet running
125 self._state: dict[str, BankCompileState] = {}
126 self._task: asyncio.Task[None] | None = None
127 self._started = False
129 # ------------------------------------------------------------------
130 # Public API
131 # ------------------------------------------------------------------
133 def notify_retain(self, bank_id: str) -> None:
134 """Increment the memory counter for *bank_id* and enqueue if threshold met.
136 Non-blocking and sync-safe — designed to be called from the hot retain
137 path without holding any lock or awaiting anything. Duplicate banks
138 already in the queue are silently skipped so the queue never fills with
139 the same bank repeated N times.
140 """
141 state = self._state.setdefault(bank_id, BankCompileState(bank_id=bank_id))
142 state.memories_since_last_compile += 1
144 if not state.should_trigger(self._config):
145 return
147 if bank_id in self._pending:
148 # Already queued — the in-flight compile will cover these memories.
149 return
151 try:
152 self._queue.put_nowait(bank_id)
153 self._pending.add(bank_id)
154 _logger.debug("Compile job enqueued for bank %r", bank_id)
155 except asyncio.QueueFull:
156 _logger.warning(
157 "Compile queue full (%d slots); skipping bank %r. "
158 "Consider increasing max_queue_size or lowering thresholds.",
159 self._queue.maxsize,
160 bank_id,
161 )
163 async def start(self) -> None:
164 """Start the background worker task. Idempotent."""
165 if self._started:
166 return
167 self._started = True
168 self._task = asyncio.create_task(self._worker(), name="astrocyte.compile_worker")
169 _logger.debug("Compile worker started")
171 async def stop(self, *, timeout: float = 5.0) -> None:
172 """Signal the worker to stop and wait up to *timeout* seconds."""
173 if not self._started or self._task is None:
174 return
175 self._started = False
176 try:
177 self._queue.put_nowait(_STOP)
178 except asyncio.QueueFull:
179 self._task.cancel()
180 try:
181 await asyncio.wait_for(self._task, timeout=timeout)
182 except (asyncio.TimeoutError, asyncio.CancelledError):
183 _logger.warning("Compile worker did not stop within %.1fs; cancelling.", timeout)
184 self._task.cancel()
185 self._task = None
186 _logger.debug("Compile worker stopped")
188 async def drain(self, *, timeout: float = 5.0) -> None:
189 """Wait for all currently queued jobs to finish.
191 Raises :exc:`asyncio.TimeoutError` if jobs do not complete within
192 *timeout* seconds. Safe to call when the worker is not started
193 (returns immediately if the queue is empty).
194 """
195 if self._queue.empty() and not self._pending:
196 return
197 await asyncio.wait_for(self._queue.join(), timeout=timeout)
199 @property
200 def pending_banks(self) -> set[str]:
201 """Banks currently in the queue awaiting compilation (snapshot)."""
202 return set(self._pending)
204 @property
205 def state(self) -> dict[str, BankCompileState]:
206 """Per-bank compile state, keyed by bank_id (snapshot)."""
207 return dict(self._state)
209 # ------------------------------------------------------------------
210 # Background worker
211 # ------------------------------------------------------------------
213 async def _worker(self) -> None:
214 """Process compile jobs from the queue until a stop sentinel is received."""
215 _logger.debug("Compile worker loop started")
216 while True:
217 bank_id = await self._queue.get()
218 try:
219 if bank_id is _STOP:
220 _logger.debug("Compile worker received stop signal")
221 break
223 self._pending.discard(bank_id)
224 _logger.info("Running background compile for bank %r", bank_id)
225 try:
226 result = await self._engine.run(bank_id)
227 _logger.info(
228 "Background compile done for bank %r: %d created, %d updated, %d noise, %d tokens, %dms",
229 bank_id,
230 result.pages_created,
231 result.pages_updated,
232 result.noise_memories,
233 result.tokens_used,
234 result.elapsed_ms,
235 )
236 if result.error:
237 _logger.error(
238 "Background compile for bank %r reported error: %s",
239 bank_id,
240 result.error,
241 )
242 # Reset counter; record compile time
243 state = self._state.setdefault(bank_id, BankCompileState(bank_id=bank_id))
244 state.memories_since_last_compile = 0
245 state.last_compile_at = datetime.now(UTC)
247 except Exception:
248 _logger.exception(
249 "Background compile for bank %r raised unexpectedly; skipping.",
250 bank_id,
251 )
252 finally:
253 self._queue.task_done()