Coverage for astrocyte/ingest/webhook.py: 93%

59 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""Webhook ingest: verify HMAC, parse JSON, call retain (M4). 

2 

3HTTP adapters (FastAPI, Starlette, standalone gateway) should forward raw body bytes 

4and headers to :func:`handle_webhook_ingest`. 

5""" 

6 

7from __future__ import annotations 

8 

9import json 

10from collections.abc import Awaitable, Callable, Mapping 

11from dataclasses import dataclass 

12 

13from astrocyte.config import SourceConfig 

14from astrocyte.errors import IngestError 

15from astrocyte.ingest.bank_resolve import resolve_ingest_bank_id 

16from astrocyte.ingest.hmac_auth import verify_hmac_sha256 

17from astrocyte.ingest.payload import parse_ingest_json_object 

18from astrocyte.types import Metadata, RetainResult 

19 

20RetainCallable = Callable[..., Awaitable[RetainResult]] 

21 

22 

23def _header_ci(headers: Mapping[str, str], name: str) -> str | None: 

24 want = name.lower() 

25 for k, v in headers.items(): 

26 if k.lower() == want: 

27 return v 

28 return None 

29 

30 

31def _parse_json_body(raw: bytes) -> tuple[str, str | None, str, Metadata | None]: 

32 try: 

33 data = json.loads(raw.decode("utf-8")) 

34 except (UnicodeDecodeError, json.JSONDecodeError) as e: 

35 raise IngestError(f"invalid JSON body: {e}") from e 

36 if not isinstance(data, dict): 

37 raise IngestError("JSON body must be an object") 

38 return parse_ingest_json_object(data) 

39 

40 

41@dataclass(frozen=True) 

42class WebhookIngestResult: 

43 ok: bool 

44 http_status: int 

45 error: str | None = None 

46 retain_result: RetainResult | None = None 

47 

48 

49async def handle_webhook_ingest( 

50 *, 

51 source_id: str, 

52 source_config: SourceConfig, 

53 raw_body: bytes, 

54 headers: Mapping[str, str], 

55 retain: RetainCallable, 

56 principal: str | None = None, 

57) -> WebhookIngestResult: 

58 """Validate HMAC (when ``auth.type`` is ``hmac``), parse body, ``retain`` into target bank.""" 

59 auth = source_config.auth or {} 

60 auth_type = (auth.get("type") or "").strip().lower() 

61 

62 if auth_type == "hmac": 

63 secret = auth.get("secret") 

64 if not secret or not isinstance(secret, str): 

65 return WebhookIngestResult(ok=False, http_status=500, error="hmac secret not configured") 

66 header_name = str(auth.get("header") or "X-Astrocyte-Signature") 

67 sig = _header_ci(headers, header_name) 

68 if not sig: 

69 return WebhookIngestResult(ok=False, http_status=401, error="missing signature header") 

70 if not verify_hmac_sha256(secret, raw_body, sig): 

71 return WebhookIngestResult(ok=False, http_status=401, error="invalid signature") 

72 elif auth_type in ("", "none"): 

73 pass 

74 else: 

75 return WebhookIngestResult(ok=False, http_status=501, error=f"unsupported auth type: {auth_type!r}") 

76 

77 try: 

78 text, json_principal, content_type, metadata = _parse_json_body(raw_body) 

79 except IngestError as e: 

80 return WebhookIngestResult(ok=False, http_status=400, error=str(e)) 

81 

82 eff_principal = principal or json_principal 

83 try: 

84 bank_id = resolve_ingest_bank_id(source_config, principal=eff_principal) 

85 except IngestError as e: 

86 return WebhookIngestResult(ok=False, http_status=400, error=str(e)) 

87 

88 profile = source_config.extraction_profile 

89 

90 result = await retain( 

91 text, 

92 bank_id, 

93 metadata=metadata, 

94 content_type=content_type, 

95 extraction_profile=profile, 

96 source=source_id, 

97 ) 

98 return WebhookIngestResult(ok=True, http_status=200, retain_result=result)