Coverage for astrocyte/ingest/webhook.py: 93%
59 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""Webhook ingest: verify HMAC, parse JSON, call retain (M4).
3HTTP adapters (FastAPI, Starlette, standalone gateway) should forward raw body bytes
4and headers to :func:`handle_webhook_ingest`.
5"""
7from __future__ import annotations
9import json
10from collections.abc import Awaitable, Callable, Mapping
11from dataclasses import dataclass
13from astrocyte.config import SourceConfig
14from astrocyte.errors import IngestError
15from astrocyte.ingest.bank_resolve import resolve_ingest_bank_id
16from astrocyte.ingest.hmac_auth import verify_hmac_sha256
17from astrocyte.ingest.payload import parse_ingest_json_object
18from astrocyte.types import Metadata, RetainResult
20RetainCallable = Callable[..., Awaitable[RetainResult]]
23def _header_ci(headers: Mapping[str, str], name: str) -> str | None:
24 want = name.lower()
25 for k, v in headers.items():
26 if k.lower() == want:
27 return v
28 return None
31def _parse_json_body(raw: bytes) -> tuple[str, str | None, str, Metadata | None]:
32 try:
33 data = json.loads(raw.decode("utf-8"))
34 except (UnicodeDecodeError, json.JSONDecodeError) as e:
35 raise IngestError(f"invalid JSON body: {e}") from e
36 if not isinstance(data, dict):
37 raise IngestError("JSON body must be an object")
38 return parse_ingest_json_object(data)
41@dataclass(frozen=True)
42class WebhookIngestResult:
43 ok: bool
44 http_status: int
45 error: str | None = None
46 retain_result: RetainResult | None = None
49async def handle_webhook_ingest(
50 *,
51 source_id: str,
52 source_config: SourceConfig,
53 raw_body: bytes,
54 headers: Mapping[str, str],
55 retain: RetainCallable,
56 principal: str | None = None,
57) -> WebhookIngestResult:
58 """Validate HMAC (when ``auth.type`` is ``hmac``), parse body, ``retain`` into target bank."""
59 auth = source_config.auth or {}
60 auth_type = (auth.get("type") or "").strip().lower()
62 if auth_type == "hmac":
63 secret = auth.get("secret")
64 if not secret or not isinstance(secret, str):
65 return WebhookIngestResult(ok=False, http_status=500, error="hmac secret not configured")
66 header_name = str(auth.get("header") or "X-Astrocyte-Signature")
67 sig = _header_ci(headers, header_name)
68 if not sig:
69 return WebhookIngestResult(ok=False, http_status=401, error="missing signature header")
70 if not verify_hmac_sha256(secret, raw_body, sig):
71 return WebhookIngestResult(ok=False, http_status=401, error="invalid signature")
72 elif auth_type in ("", "none"):
73 pass
74 else:
75 return WebhookIngestResult(ok=False, http_status=501, error=f"unsupported auth type: {auth_type!r}")
77 try:
78 text, json_principal, content_type, metadata = _parse_json_body(raw_body)
79 except IngestError as e:
80 return WebhookIngestResult(ok=False, http_status=400, error=str(e))
82 eff_principal = principal or json_principal
83 try:
84 bank_id = resolve_ingest_bank_id(source_config, principal=eff_principal)
85 except IngestError as e:
86 return WebhookIngestResult(ok=False, http_status=400, error=str(e))
88 profile = source_config.extraction_profile
90 result = await retain(
91 text,
92 bank_id,
93 metadata=metadata,
94 content_type=content_type,
95 extraction_profile=profile,
96 source=source_id,
97 )
98 return WebhookIngestResult(ok=True, http_status=200, retain_result=result)