Coverage for astrocyte/ingest/supervisor.py: 71%

69 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""Shared lifecycle for long-running :class:`~astrocyte.ingest.source.IngestSource` instances. 

2 

3One **supervisor** per process (or per async runtime) owns ``start`` / ``stop`` and optional 

4graceful shutdown — the same pattern for Redis Streams, Kafka, NATS, etc. Each transport 

5implements :class:`~astrocyte.ingest.source.IngestSource` (background read loop inside 

6``start()``); this module does **not** duplicate transport logic. 

7 

8Standalone HTTP stacks (e.g. FastAPI gateway) should drive an :class:`IngestSupervisor` from 

9app lifespan. A **worker-only** process can use the same supervisor and optionally 

10:func:`install_shutdown_signals` so SIGTERM/SIGINT trigger :meth:`IngestSupervisor.stop`. 

11""" 

12 

13from __future__ import annotations 

14 

15import asyncio 

16import logging 

17import signal 

18import sys 

19from typing import Any 

20 

21from astrocyte.ingest.logutil import log_ingest_event 

22from astrocyte.ingest.registry import SourceRegistry 

23from astrocyte.types import HealthStatus 

24 

25logger = logging.getLogger("astrocyte.ingest.supervisor") 

26 

27 

28class IngestSupervisor: 

29 """Orchestrates ``SourceRegistry.start_all`` / ``stop_all`` with optional stop timeout. 

30 

31 Future hooks (backpressure, concurrency limits) can attach here without changing each 

32 transport implementation. 

33 """ 

34 

35 def __init__( 

36 self, 

37 registry: SourceRegistry, 

38 *, 

39 stop_timeout_s: float | None = 30.0, 

40 ) -> None: 

41 self._registry = registry 

42 self._stop_timeout_s = stop_timeout_s 

43 self._started = False 

44 

45 @property 

46 def registry(self) -> SourceRegistry: 

47 return self._registry 

48 

49 async def start(self) -> None: 

50 if self._started: 

51 return 

52 sources = self._registry.all_sources() 

53 await self._registry.start_all() 

54 self._started = True 

55 log_ingest_event( 

56 logger, 

57 "ingest_supervisor_started", 

58 source_count=len(sources), 

59 source_ids=[s.source_id for s in sources], 

60 ) 

61 

62 async def stop(self) -> None: 

63 if not self._started: 

64 return 

65 try: 

66 if self._stop_timeout_s is not None: 

67 await asyncio.wait_for(self._registry.stop_all(), timeout=self._stop_timeout_s) 

68 else: 

69 await self._registry.stop_all() 

70 except TimeoutError: 

71 log_ingest_event( 

72 logger, 

73 "ingest_supervisor_stop_timeout", 

74 timeout_s=self._stop_timeout_s, 

75 ) 

76 logger.warning("ingest supervisor: stop_all exceeded timeout %.1fs", self._stop_timeout_s) 

77 raise 

78 finally: 

79 self._started = False 

80 log_ingest_event(logger, "ingest_supervisor_stopped") 

81 

82 async def health_snapshot(self) -> list[dict[str, Any]]: 

83 """Best-effort health for every registered source (admin / metrics).""" 

84 out: list[dict[str, Any]] = [] 

85 for src in self._registry.all_sources(): 

86 row: dict[str, Any] = {"id": src.source_id, "type": src.source_type} 

87 try: 

88 hs = await src.health_check() 

89 row["healthy"] = hs.healthy 

90 row["message"] = hs.message 

91 except Exception as e: # noqa: BLE001 — aggregate surface 

92 row["healthy"] = False 

93 row["message"] = str(e) 

94 out.append(row) 

95 return out 

96 

97 

98def merge_source_health(rows: list[dict[str, Any]]) -> HealthStatus: 

99 """Reduce a snapshot from :meth:`IngestSupervisor.health_snapshot` to one :class:`HealthStatus`.""" 

100 unhealthy = [r for r in rows if r.get("healthy") is False] 

101 if unhealthy: 

102 ids = ", ".join(str(r.get("id", "?")) for r in unhealthy) 

103 return HealthStatus(healthy=False, message=f"unhealthy sources: {ids}") 

104 return HealthStatus(healthy=True, message="all ingest sources healthy") 

105 

106 

107def install_shutdown_signals(supervisor: IngestSupervisor) -> None: 

108 """Register SIGINT/SIGTERM to schedule :meth:`IngestSupervisor.stop` (Unix + running loop). 

109 

110 Skip when embedding under uvicorn (it already handles process signals); use for **ingest-only** 

111 workers. 

112 """ 

113 if sys.platform == "win32": 

114 return 

115 loop = asyncio.get_running_loop() 

116 

117 def _schedule_stop() -> None: 

118 task = asyncio.create_task(supervisor.stop()) 

119 _ = task # fire-and-forget; process exit may follow 

120 

121 for sig in (signal.SIGINT, signal.SIGTERM): 

122 try: 

123 loop.add_signal_handler(sig, _schedule_stop) 

124 except NotImplementedError: 

125 return