Coverage for astrocyte/ingest/source.py: 85%

40 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""IngestSource protocol and webhook implementation (M4).""" 

2 

3from __future__ import annotations 

4 

5from typing import Protocol, runtime_checkable 

6 

7from astrocyte.config import SourceConfig 

8from astrocyte.types import HealthStatus 

9 

10 

11@runtime_checkable 

12class IngestSource(Protocol): 

13 """Inbound data source (webhook, stream, poll — M4+).""" 

14 

15 @property 

16 def source_id(self) -> str: 

17 pass 

18 

19 @property 

20 def source_type(self) -> str: 

21 pass 

22 

23 async def start(self) -> None: 

24 pass 

25 

26 async def stop(self) -> None: 

27 pass 

28 

29 async def health_check(self) -> HealthStatus: 

30 pass 

31 

32 

33class WebhookIngestSource: 

34 """Webhook source handle: lifecycle for registry / health; HTTP binding is adapter-level.""" 

35 

36 def __init__(self, source_id: str, config: SourceConfig) -> None: 

37 self._source_id = source_id 

38 self._config = config 

39 self._running = False 

40 

41 @property 

42 def source_id(self) -> str: 

43 return self._source_id 

44 

45 @property 

46 def source_type(self) -> str: 

47 return "webhook" 

48 

49 @property 

50 def config(self) -> SourceConfig: 

51 return self._config 

52 

53 async def start(self) -> None: 

54 self._running = True 

55 

56 async def stop(self) -> None: 

57 self._running = False 

58 

59 async def health_check(self) -> HealthStatus: 

60 if self._running: 

61 return HealthStatus(healthy=True, message="webhook source started") 

62 return HealthStatus(healthy=False, message="webhook source stopped")