Coverage for astrocyte/ingest/registry.py: 72%

57 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""Registry of ingest sources loaded from config (M4).""" 

2 

3from __future__ import annotations 

4 

5from astrocyte._discovery import discover_entry_points, resolve_provider 

6from astrocyte.config import SourceConfig 

7from astrocyte.errors import ConfigError 

8from astrocyte.ingest.source import IngestSource, WebhookIngestSource 

9from astrocyte.ingest.webhook import RetainCallable 

10 

11 

12class SourceRegistry: 

13 """Keeps :class:`IngestSource` instances keyed by ``sources`` config id.""" 

14 

15 def __init__(self) -> None: 

16 self._by_id: dict[str, IngestSource] = {} 

17 

18 def register(self, source: IngestSource) -> None: 

19 self._by_id[source.source_id] = source 

20 

21 def get(self, source_id: str) -> IngestSource | None: 

22 return self._by_id.get(source_id) 

23 

24 def all_sources(self) -> list[IngestSource]: 

25 return list(self._by_id.values()) 

26 

27 async def start_all(self) -> None: 

28 for s in self._by_id.values(): 

29 await s.start() 

30 

31 async def stop_all(self) -> None: 

32 for s in self._by_id.values(): 

33 await s.stop() 

34 

35 @classmethod 

36 def from_sources_config( 

37 cls, 

38 sources: dict[str, SourceConfig] | None, 

39 *, 

40 retain: RetainCallable | None = None, 

41 ) -> SourceRegistry: 

42 reg = cls() 

43 if not sources: 

44 return reg 

45 for sid, cfg in sources.items(): 

46 st = (cfg.type or "").strip().lower() 

47 if st == "webhook": 

48 reg.register(WebhookIngestSource(str(sid), cfg)) 

49 elif st in ("poll", "api_poll"): 

50 driver = (cfg.driver or "").strip().lower() 

51 if retain is None: 

52 raise ConfigError( 

53 f"sources.{sid}: type poll requires retain=... " 

54 "(use astrocyte.ingest.runtime.retain_callable_for_astrocyte(astrocyte))" 

55 ) 

56 try: 

57 source_cls = resolve_provider(driver, "ingest_poll_drivers") 

58 except LookupError as e: 

59 avail = sorted(discover_entry_points("ingest_poll_drivers").keys()) 

60 hint = ", ".join(avail) if avail else "none" 

61 raise ConfigError( 

62 f"sources.{sid}: poll driver {driver!r} is not installed or unknown. " 

63 f"Installed drivers: {hint}. " 

64 "For GitHub, install e.g. pip install astrocyte-ingestion-github or pip install 'astrocyte[poll]'." 

65 ) from e 

66 except Exception as e: 

67 raise ConfigError(f"sources.{sid}: failed to load poll driver {driver!r}: {e}") from e 

68 reg.register(source_cls(str(sid), cfg, retain=retain)) 

69 elif st == "stream": 

70 driver = (cfg.driver or "redis").strip().lower() 

71 if retain is None: 

72 raise ConfigError( 

73 f"sources.{sid}: type stream requires retain=... " 

74 "(use astrocyte.ingest.runtime.retain_callable_for_astrocyte(astrocyte))" 

75 ) 

76 try: 

77 source_cls = resolve_provider(driver, "ingest_stream_drivers") 

78 except LookupError as e: 

79 avail = sorted(discover_entry_points("ingest_stream_drivers").keys()) 

80 hint = ", ".join(avail) if avail else "none" 

81 raise ConfigError( 

82 f"sources.{sid}: stream driver {driver!r} is not installed or unknown. " 

83 f"Installed drivers: {hint}. " 

84 "For kafka + redis, install e.g. pip install 'astrocyte[stream]'." 

85 ) from e 

86 except Exception as e: 

87 raise ConfigError(f"sources.{sid}: failed to load stream driver {driver!r}: {e}") from e 

88 reg.register(source_cls(str(sid), cfg, retain=retain)) 

89 return reg