Coverage for astrocyte/ingest/supervisor.py: 71%
69 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""Shared lifecycle for long-running :class:`~astrocyte.ingest.source.IngestSource` instances.
3One **supervisor** per process (or per async runtime) owns ``start`` / ``stop`` and optional
4graceful shutdown — the same pattern for Redis Streams, Kafka, NATS, etc. Each transport
5implements :class:`~astrocyte.ingest.source.IngestSource` (background read loop inside
6``start()``); this module does **not** duplicate transport logic.
8Standalone HTTP stacks (e.g. FastAPI gateway) should drive an :class:`IngestSupervisor` from
9app lifespan. A **worker-only** process can use the same supervisor and optionally
10:func:`install_shutdown_signals` so SIGTERM/SIGINT trigger :meth:`IngestSupervisor.stop`.
11"""
13from __future__ import annotations
15import asyncio
16import logging
17import signal
18import sys
19from typing import Any
21from astrocyte.ingest.logutil import log_ingest_event
22from astrocyte.ingest.registry import SourceRegistry
23from astrocyte.types import HealthStatus
25logger = logging.getLogger("astrocyte.ingest.supervisor")
28class IngestSupervisor:
29 """Orchestrates ``SourceRegistry.start_all`` / ``stop_all`` with optional stop timeout.
31 Future hooks (backpressure, concurrency limits) can attach here without changing each
32 transport implementation.
33 """
35 def __init__(
36 self,
37 registry: SourceRegistry,
38 *,
39 stop_timeout_s: float | None = 30.0,
40 ) -> None:
41 self._registry = registry
42 self._stop_timeout_s = stop_timeout_s
43 self._started = False
45 @property
46 def registry(self) -> SourceRegistry:
47 return self._registry
49 async def start(self) -> None:
50 if self._started:
51 return
52 sources = self._registry.all_sources()
53 await self._registry.start_all()
54 self._started = True
55 log_ingest_event(
56 logger,
57 "ingest_supervisor_started",
58 source_count=len(sources),
59 source_ids=[s.source_id for s in sources],
60 )
62 async def stop(self) -> None:
63 if not self._started:
64 return
65 try:
66 if self._stop_timeout_s is not None:
67 await asyncio.wait_for(self._registry.stop_all(), timeout=self._stop_timeout_s)
68 else:
69 await self._registry.stop_all()
70 except TimeoutError:
71 log_ingest_event(
72 logger,
73 "ingest_supervisor_stop_timeout",
74 timeout_s=self._stop_timeout_s,
75 )
76 logger.warning("ingest supervisor: stop_all exceeded timeout %.1fs", self._stop_timeout_s)
77 raise
78 finally:
79 self._started = False
80 log_ingest_event(logger, "ingest_supervisor_stopped")
82 async def health_snapshot(self) -> list[dict[str, Any]]:
83 """Best-effort health for every registered source (admin / metrics)."""
84 out: list[dict[str, Any]] = []
85 for src in self._registry.all_sources():
86 row: dict[str, Any] = {"id": src.source_id, "type": src.source_type}
87 try:
88 hs = await src.health_check()
89 row["healthy"] = hs.healthy
90 row["message"] = hs.message
91 except Exception as e: # noqa: BLE001 — aggregate surface
92 row["healthy"] = False
93 row["message"] = str(e)
94 out.append(row)
95 return out
98def merge_source_health(rows: list[dict[str, Any]]) -> HealthStatus:
99 """Reduce a snapshot from :meth:`IngestSupervisor.health_snapshot` to one :class:`HealthStatus`."""
100 unhealthy = [r for r in rows if r.get("healthy") is False]
101 if unhealthy:
102 ids = ", ".join(str(r.get("id", "?")) for r in unhealthy)
103 return HealthStatus(healthy=False, message=f"unhealthy sources: {ids}")
104 return HealthStatus(healthy=True, message="all ingest sources healthy")
107def install_shutdown_signals(supervisor: IngestSupervisor) -> None:
108 """Register SIGINT/SIGTERM to schedule :meth:`IngestSupervisor.stop` (Unix + running loop).
110 Skip when embedding under uvicorn (it already handles process signals); use for **ingest-only**
111 workers.
112 """
113 if sys.platform == "win32":
114 return
115 loop = asyncio.get_running_loop()
117 def _schedule_stop() -> None:
118 task = asyncio.create_task(supervisor.stop())
119 _ = task # fire-and-forget; process exit may follow
121 for sig in (signal.SIGINT, signal.SIGTERM):
122 try:
123 loop.add_signal_handler(sig, _schedule_stop)
124 except NotImplementedError:
125 return