Coverage for astrocyte/ingest/source.py: 85%
40 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""IngestSource protocol and webhook implementation (M4)."""
3from __future__ import annotations
5from typing import Protocol, runtime_checkable
7from astrocyte.config import SourceConfig
8from astrocyte.types import HealthStatus
11@runtime_checkable
12class IngestSource(Protocol):
13 """Inbound data source (webhook, stream, poll — M4+)."""
15 @property
16 def source_id(self) -> str:
17 pass
19 @property
20 def source_type(self) -> str:
21 pass
23 async def start(self) -> None:
24 pass
26 async def stop(self) -> None:
27 pass
29 async def health_check(self) -> HealthStatus:
30 pass
33class WebhookIngestSource:
34 """Webhook source handle: lifecycle for registry / health; HTTP binding is adapter-level."""
36 def __init__(self, source_id: str, config: SourceConfig) -> None:
37 self._source_id = source_id
38 self._config = config
39 self._running = False
41 @property
42 def source_id(self) -> str:
43 return self._source_id
45 @property
46 def source_type(self) -> str:
47 return "webhook"
49 @property
50 def config(self) -> SourceConfig:
51 return self._config
53 async def start(self) -> None:
54 self._running = True
56 async def stop(self) -> None:
57 self._running = False
59 async def health_check(self) -> HealthStatus:
60 if self._running:
61 return HealthStatus(healthy=True, message="webhook source started")
62 return HealthStatus(healthy=False, message="webhook source stopped")