Coverage for astrocyte/task_backend.py: 92%

73 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""Pluggable backend for background task execution. 

2 

3Adopted from Hindsight (``hindsight_api/engine/task_backend.py``) but 

4scoped down to what Astrocyte uses today. Hindsight's full implementation 

5includes a Postgres-LISTEN/NOTIFY broker for distributed worker pools; 

6we don't have that architecture yet (planned: daemon/worker split, future 

7sprint). For now we ship two backends: 

8 

9 - ``SyncTaskBackend``: executes tasks immediately, inline. For tests 

10 and CLI/embedded use. 

11 - ``AsyncFireAndTrackBackend``: ``asyncio.create_task`` per submission, 

12 tracks running tasks, awaits them on ``shutdown()``. Matches the 

13 current ``orchestrator._run_consolidation()`` and 

14 ``compile_trigger._worker()`` patterns but gives them a uniform 

15 interface and a shutdown-coordinated drain. 

16 

17Both implement the same ``TaskBackend`` interface so a future 

18``PostgresBrokerTaskBackend`` can drop in without changing callers. 

19 

20Usage pattern: 

21 

22 backend = AsyncFireAndTrackBackend() 

23 backend.set_executor(run_task) # async fn (task_dict) -> None 

24 await backend.initialize() 

25 ... 

26 await backend.submit({"type": "consolidation", "bank_id": "..."}) 

27 ... 

28 await backend.shutdown() # drains in-flight tasks 

29 

30Why an executor callback + dict, not direct coroutine submission: it lets 

31the same task representation move across process boundaries later (the 

32"broker" backend would persist the dict and a worker would dequeue it). 

33""" 

34 

35from __future__ import annotations 

36 

37import asyncio 

38import logging 

39from abc import ABC, abstractmethod 

40from typing import Any, Awaitable, Callable 

41 

42logger = logging.getLogger(__name__) 

43 

44 

45Executor = Callable[[dict[str, Any]], Awaitable[None]] 

46 

47 

48# ─── abstract base ──────────────────────────────────────────────────── 

49 

50 

51class TaskBackend(ABC): 

52 """Abstract base for task execution backends. 

53 

54 Tasks are pure dicts so backends can serialize/persist them. The 

55 executor (callback) routes each dict to the appropriate handler. 

56 """ 

57 

58 def __init__(self) -> None: 

59 self._executor: Executor | None = None 

60 self._initialized = False 

61 

62 def set_executor(self, executor: Executor) -> None: 

63 """Register the callback that runs each submitted task.""" 

64 self._executor = executor 

65 

66 @abstractmethod 

67 async def initialize(self) -> None: 

68 """Bring the backend up (e.g. open DB connection, start poller).""" 

69 

70 @abstractmethod 

71 async def submit(self, task: dict[str, Any]) -> None: 

72 """Submit a task for execution. 

73 

74 Backends may execute immediately (sync), schedule (async fire), 

75 or persist (broker). All return promptly; the caller never 

76 awaits the actual task completion via this method. 

77 """ 

78 

79 @abstractmethod 

80 async def shutdown(self) -> None: 

81 """Drain in-flight tasks and release resources.""" 

82 

83 async def _run(self, task: dict[str, Any]) -> None: 

84 """Execute via the registered executor. Errors are logged + swallowed.""" 

85 if self._executor is None: 

86 task_type = task.get("type", "<no type>") 

87 logger.warning("task_backend: no executor for task type=%s; dropping", task_type) 

88 return 

89 try: 

90 await self._executor(task) 

91 except Exception as exc: # noqa: BLE001 

92 logger.warning( 

93 "task_backend: executor failed for type=%s: %s", 

94 task.get("type", "<no type>"), 

95 exc, 

96 ) 

97 

98 

99# ─── synchronous backend (tests / CLI) ──────────────────────────────── 

100 

101 

102class SyncTaskBackend(TaskBackend): 

103 """Executes tasks inline. Useful for tests and CLI / embedded use.""" 

104 

105 async def initialize(self) -> None: 

106 self._initialized = True 

107 

108 async def submit(self, task: dict[str, Any]) -> None: 

109 await self._run(task) 

110 

111 async def shutdown(self) -> None: 

112 self._initialized = False 

113 

114 

115# ─── async fire-and-track backend (current "background work" pattern) ─ 

116 

117 

118class AsyncFireAndTrackBackend(TaskBackend): 

119 """Spawns ``asyncio.create_task`` per submit and tracks the resulting tasks. 

120 

121 Closes a gap in the raw ``asyncio.create_task`` pattern: nothing 

122 awaits the spawned task, so failures are silently swallowed and the 

123 event loop may shut down with in-flight work. This backend tracks 

124 every submitted task; on ``shutdown()`` it gathers them with a 

125 bounded timeout so the shutdown is observable. 

126 

127 Failures inside individual tasks are logged via ``_run`` but never 

128 raise to the spawner — matches the existing fire-and-forget pattern. 

129 """ 

130 

131 def __init__(self, *, shutdown_timeout_seconds: float = 30.0) -> None: 

132 super().__init__() 

133 self._inflight: set[asyncio.Task[None]] = set() 

134 self._shutdown_timeout = shutdown_timeout_seconds 

135 

136 async def initialize(self) -> None: 

137 self._initialized = True 

138 

139 async def submit(self, task: dict[str, Any]) -> None: 

140 if not self._initialized: 

141 logger.warning("task_backend: submit before initialize for type=%s", task.get("type", "<no type>")) 

142 try: 

143 t = asyncio.create_task(self._run(task), name=f"astrocyte.task.{task.get('type', 'unknown')}") 

144 except RuntimeError: 

145 # No running loop (rare); fall through to sync execution 

146 logger.debug("task_backend: no event loop; running inline") 

147 await self._run(task) 

148 return 

149 self._inflight.add(t) 

150 t.add_done_callback(self._inflight.discard) 

151 

152 async def shutdown(self) -> None: 

153 self._initialized = False 

154 if not self._inflight: 

155 return 

156 pending = list(self._inflight) 

157 logger.info("task_backend: draining %d in-flight tasks", len(pending)) 

158 try: 

159 await asyncio.wait_for( 

160 asyncio.gather(*pending, return_exceptions=True), 

161 timeout=self._shutdown_timeout, 

162 ) 

163 except asyncio.TimeoutError: 

164 logger.warning( 

165 "task_backend: shutdown timed out after %.1fs; %d tasks still running", 

166 self._shutdown_timeout, 

167 len(self._inflight), 

168 ) 

169 

170 @property 

171 def inflight_count(self) -> int: 

172 return len(self._inflight) 

173 

174 

175# ─── default singleton ──────────────────────────────────────────────── 

176 

177_DEFAULT_BACKEND: TaskBackend | None = None 

178 

179 

180def get_default_task_backend() -> TaskBackend: 

181 """Return the process-wide default task backend. 

182 

183 Defaults to ``SyncTaskBackend`` so embedded / test contexts work 

184 without explicit setup. Production setup should call 

185 ``set_default_task_backend(AsyncFireAndTrackBackend())`` or a 

186 future broker backend at startup. 

187 """ 

188 global _DEFAULT_BACKEND 

189 if _DEFAULT_BACKEND is None: 

190 _DEFAULT_BACKEND = SyncTaskBackend() 

191 return _DEFAULT_BACKEND 

192 

193 

194def set_default_task_backend(backend: TaskBackend) -> None: 

195 global _DEFAULT_BACKEND 

196 _DEFAULT_BACKEND = backend