Coverage for astrocyte/ingest/registry.py: 72%
57 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""Registry of ingest sources loaded from config (M4)."""
3from __future__ import annotations
5from astrocyte._discovery import discover_entry_points, resolve_provider
6from astrocyte.config import SourceConfig
7from astrocyte.errors import ConfigError
8from astrocyte.ingest.source import IngestSource, WebhookIngestSource
9from astrocyte.ingest.webhook import RetainCallable
12class SourceRegistry:
13 """Keeps :class:`IngestSource` instances keyed by ``sources`` config id."""
15 def __init__(self) -> None:
16 self._by_id: dict[str, IngestSource] = {}
18 def register(self, source: IngestSource) -> None:
19 self._by_id[source.source_id] = source
21 def get(self, source_id: str) -> IngestSource | None:
22 return self._by_id.get(source_id)
24 def all_sources(self) -> list[IngestSource]:
25 return list(self._by_id.values())
27 async def start_all(self) -> None:
28 for s in self._by_id.values():
29 await s.start()
31 async def stop_all(self) -> None:
32 for s in self._by_id.values():
33 await s.stop()
35 @classmethod
36 def from_sources_config(
37 cls,
38 sources: dict[str, SourceConfig] | None,
39 *,
40 retain: RetainCallable | None = None,
41 ) -> SourceRegistry:
42 reg = cls()
43 if not sources:
44 return reg
45 for sid, cfg in sources.items():
46 st = (cfg.type or "").strip().lower()
47 if st == "webhook":
48 reg.register(WebhookIngestSource(str(sid), cfg))
49 elif st in ("poll", "api_poll"):
50 driver = (cfg.driver or "").strip().lower()
51 if retain is None:
52 raise ConfigError(
53 f"sources.{sid}: type poll requires retain=... "
54 "(use astrocyte.ingest.runtime.retain_callable_for_astrocyte(astrocyte))"
55 )
56 try:
57 source_cls = resolve_provider(driver, "ingest_poll_drivers")
58 except LookupError as e:
59 avail = sorted(discover_entry_points("ingest_poll_drivers").keys())
60 hint = ", ".join(avail) if avail else "none"
61 raise ConfigError(
62 f"sources.{sid}: poll driver {driver!r} is not installed or unknown. "
63 f"Installed drivers: {hint}. "
64 "For GitHub, install e.g. pip install astrocyte-ingestion-github or pip install 'astrocyte[poll]'."
65 ) from e
66 except Exception as e:
67 raise ConfigError(f"sources.{sid}: failed to load poll driver {driver!r}: {e}") from e
68 reg.register(source_cls(str(sid), cfg, retain=retain))
69 elif st == "stream":
70 driver = (cfg.driver or "redis").strip().lower()
71 if retain is None:
72 raise ConfigError(
73 f"sources.{sid}: type stream requires retain=... "
74 "(use astrocyte.ingest.runtime.retain_callable_for_astrocyte(astrocyte))"
75 )
76 try:
77 source_cls = resolve_provider(driver, "ingest_stream_drivers")
78 except LookupError as e:
79 avail = sorted(discover_entry_points("ingest_stream_drivers").keys())
80 hint = ", ".join(avail) if avail else "none"
81 raise ConfigError(
82 f"sources.{sid}: stream driver {driver!r} is not installed or unknown. "
83 f"Installed drivers: {hint}. "
84 "For kafka + redis, install e.g. pip install 'astrocyte[stream]'."
85 ) from e
86 except Exception as e:
87 raise ConfigError(f"sources.{sid}: failed to load stream driver {driver!r}: {e}") from e
88 reg.register(source_cls(str(sid), cfg, retain=retain))
89 return reg