Coverage for astrocyte/task_backend.py: 92%
73 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""Pluggable backend for background task execution.
3Adopted from Hindsight (``hindsight_api/engine/task_backend.py``) but
4scoped down to what Astrocyte uses today. Hindsight's full implementation
5includes a Postgres-LISTEN/NOTIFY broker for distributed worker pools;
6we don't have that architecture yet (planned: daemon/worker split, future
7sprint). For now we ship two backends:
9 - ``SyncTaskBackend``: executes tasks immediately, inline. For tests
10 and CLI/embedded use.
11 - ``AsyncFireAndTrackBackend``: ``asyncio.create_task`` per submission,
12 tracks running tasks, awaits them on ``shutdown()``. Matches the
13 current ``orchestrator._run_consolidation()`` and
14 ``compile_trigger._worker()`` patterns but gives them a uniform
15 interface and a shutdown-coordinated drain.
17Both implement the same ``TaskBackend`` interface so a future
18``PostgresBrokerTaskBackend`` can drop in without changing callers.
20Usage pattern:
22 backend = AsyncFireAndTrackBackend()
23 backend.set_executor(run_task) # async fn (task_dict) -> None
24 await backend.initialize()
25 ...
26 await backend.submit({"type": "consolidation", "bank_id": "..."})
27 ...
28 await backend.shutdown() # drains in-flight tasks
30Why an executor callback + dict, not direct coroutine submission: it lets
31the same task representation move across process boundaries later (the
32"broker" backend would persist the dict and a worker would dequeue it).
33"""
35from __future__ import annotations
37import asyncio
38import logging
39from abc import ABC, abstractmethod
40from typing import Any, Awaitable, Callable
42logger = logging.getLogger(__name__)
45Executor = Callable[[dict[str, Any]], Awaitable[None]]
48# ─── abstract base ────────────────────────────────────────────────────
51class TaskBackend(ABC):
52 """Abstract base for task execution backends.
54 Tasks are pure dicts so backends can serialize/persist them. The
55 executor (callback) routes each dict to the appropriate handler.
56 """
58 def __init__(self) -> None:
59 self._executor: Executor | None = None
60 self._initialized = False
62 def set_executor(self, executor: Executor) -> None:
63 """Register the callback that runs each submitted task."""
64 self._executor = executor
66 @abstractmethod
67 async def initialize(self) -> None:
68 """Bring the backend up (e.g. open DB connection, start poller)."""
70 @abstractmethod
71 async def submit(self, task: dict[str, Any]) -> None:
72 """Submit a task for execution.
74 Backends may execute immediately (sync), schedule (async fire),
75 or persist (broker). All return promptly; the caller never
76 awaits the actual task completion via this method.
77 """
79 @abstractmethod
80 async def shutdown(self) -> None:
81 """Drain in-flight tasks and release resources."""
83 async def _run(self, task: dict[str, Any]) -> None:
84 """Execute via the registered executor. Errors are logged + swallowed."""
85 if self._executor is None:
86 task_type = task.get("type", "<no type>")
87 logger.warning("task_backend: no executor for task type=%s; dropping", task_type)
88 return
89 try:
90 await self._executor(task)
91 except Exception as exc: # noqa: BLE001
92 logger.warning(
93 "task_backend: executor failed for type=%s: %s",
94 task.get("type", "<no type>"),
95 exc,
96 )
99# ─── synchronous backend (tests / CLI) ────────────────────────────────
102class SyncTaskBackend(TaskBackend):
103 """Executes tasks inline. Useful for tests and CLI / embedded use."""
105 async def initialize(self) -> None:
106 self._initialized = True
108 async def submit(self, task: dict[str, Any]) -> None:
109 await self._run(task)
111 async def shutdown(self) -> None:
112 self._initialized = False
115# ─── async fire-and-track backend (current "background work" pattern) ─
118class AsyncFireAndTrackBackend(TaskBackend):
119 """Spawns ``asyncio.create_task`` per submit and tracks the resulting tasks.
121 Closes a gap in the raw ``asyncio.create_task`` pattern: nothing
122 awaits the spawned task, so failures are silently swallowed and the
123 event loop may shut down with in-flight work. This backend tracks
124 every submitted task; on ``shutdown()`` it gathers them with a
125 bounded timeout so the shutdown is observable.
127 Failures inside individual tasks are logged via ``_run`` but never
128 raise to the spawner — matches the existing fire-and-forget pattern.
129 """
131 def __init__(self, *, shutdown_timeout_seconds: float = 30.0) -> None:
132 super().__init__()
133 self._inflight: set[asyncio.Task[None]] = set()
134 self._shutdown_timeout = shutdown_timeout_seconds
136 async def initialize(self) -> None:
137 self._initialized = True
139 async def submit(self, task: dict[str, Any]) -> None:
140 if not self._initialized:
141 logger.warning("task_backend: submit before initialize for type=%s", task.get("type", "<no type>"))
142 try:
143 t = asyncio.create_task(self._run(task), name=f"astrocyte.task.{task.get('type', 'unknown')}")
144 except RuntimeError:
145 # No running loop (rare); fall through to sync execution
146 logger.debug("task_backend: no event loop; running inline")
147 await self._run(task)
148 return
149 self._inflight.add(t)
150 t.add_done_callback(self._inflight.discard)
152 async def shutdown(self) -> None:
153 self._initialized = False
154 if not self._inflight:
155 return
156 pending = list(self._inflight)
157 logger.info("task_backend: draining %d in-flight tasks", len(pending))
158 try:
159 await asyncio.wait_for(
160 asyncio.gather(*pending, return_exceptions=True),
161 timeout=self._shutdown_timeout,
162 )
163 except asyncio.TimeoutError:
164 logger.warning(
165 "task_backend: shutdown timed out after %.1fs; %d tasks still running",
166 self._shutdown_timeout,
167 len(self._inflight),
168 )
170 @property
171 def inflight_count(self) -> int:
172 return len(self._inflight)
175# ─── default singleton ────────────────────────────────────────────────
177_DEFAULT_BACKEND: TaskBackend | None = None
180def get_default_task_backend() -> TaskBackend:
181 """Return the process-wide default task backend.
183 Defaults to ``SyncTaskBackend`` so embedded / test contexts work
184 without explicit setup. Production setup should call
185 ``set_default_task_backend(AsyncFireAndTrackBackend())`` or a
186 future broker backend at startup.
187 """
188 global _DEFAULT_BACKEND
189 if _DEFAULT_BACKEND is None:
190 _DEFAULT_BACKEND = SyncTaskBackend()
191 return _DEFAULT_BACKEND
194def set_default_task_backend(backend: TaskBackend) -> None:
195 global _DEFAULT_BACKEND
196 _DEFAULT_BACKEND = backend