Coverage for astrocyte/ingest/fastapi_app.py: 93%
30 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""Optional ASGI app for webhook ingest (requires ``astrocyte[gateway]``).
3Uses **Starlette** (pulled in by FastAPI) so the route receives a real
4:class:`starlette.requests.Request` (raw body + headers for HMAC). Uvicorn runs this
5the same way as a FastAPI app. Full OpenAPI + JWT gateway is roadmap M6.
7Use :func:`create_ingest_webhook_app` behind uvicorn or any ASGI server.
8"""
10from __future__ import annotations
12from typing import TYPE_CHECKING, Any
14if TYPE_CHECKING:
15 from starlette.applications import Starlette
17__all__ = ["create_ingest_webhook_app"]
20def create_ingest_webhook_app(
21 astrocyte: Any,
22 sources: dict[str, Any],
23) -> "Starlette":
24 """Return a Starlette ASGI app with ``POST /v1/ingest/webhook/{source_id}``.
26 ``sources`` maps source id → :class:`~astrocyte.config.SourceConfig` (same as ``astrocyte.yml`` ``sources:``).
27 ``astrocyte`` must expose ``async def retain(...)`` like :class:`~astrocyte._astrocyte.Astrocyte`.
28 """
29 from starlette.applications import Starlette
30 from starlette.requests import Request
31 from starlette.responses import JSONResponse
32 from starlette.routing import Route
34 from astrocyte.config import SourceConfig
35 from astrocyte.ingest.webhook import handle_webhook_ingest
37 async def ingest_webhook(request: Request) -> JSONResponse:
38 source_id = request.path_params["source_id"]
39 cfg = sources.get(source_id)
40 if cfg is None:
41 return JSONResponse({"ok": False, "error": "unknown source"}, status_code=404)
42 if not isinstance(cfg, SourceConfig):
43 return JSONResponse({"ok": False, "error": "invalid source config"}, status_code=500)
45 raw = await request.body()
46 headers = {k: v for k, v in request.headers.items()}
48 result = await handle_webhook_ingest(
49 source_id=source_id,
50 source_config=cfg,
51 raw_body=raw,
52 headers=headers,
53 retain=astrocyte.retain,
54 )
56 payload: dict[str, Any] = {"ok": result.ok, "error": result.error}
57 if result.retain_result is not None:
58 rr = result.retain_result
59 payload["stored"] = rr.stored
60 payload["memory_id"] = rr.memory_id
61 payload["deduplicated"] = getattr(rr, "deduplicated", False)
62 if rr.error:
63 payload["retain_error"] = rr.error
64 return JSONResponse(payload, status_code=result.http_status)
66 return Starlette(
67 routes=[
68 Route(
69 "/v1/ingest/webhook/{source_id}",
70 endpoint=ingest_webhook,
71 methods=["POST"],
72 ),
73 ],
74 )