Coverage for astrocyte/ingest/fastapi_app.py: 93%

30 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""Optional ASGI app for webhook ingest (requires ``astrocyte[gateway]``). 

2 

3Uses **Starlette** (pulled in by FastAPI) so the route receives a real 

4:class:`starlette.requests.Request` (raw body + headers for HMAC). Uvicorn runs this 

5the same way as a FastAPI app. Full OpenAPI + JWT gateway is roadmap M6. 

6 

7Use :func:`create_ingest_webhook_app` behind uvicorn or any ASGI server. 

8""" 

9 

10from __future__ import annotations 

11 

12from typing import TYPE_CHECKING, Any 

13 

14if TYPE_CHECKING: 

15 from starlette.applications import Starlette 

16 

17__all__ = ["create_ingest_webhook_app"] 

18 

19 

20def create_ingest_webhook_app( 

21 astrocyte: Any, 

22 sources: dict[str, Any], 

23) -> "Starlette": 

24 """Return a Starlette ASGI app with ``POST /v1/ingest/webhook/{source_id}``. 

25 

26 ``sources`` maps source id → :class:`~astrocyte.config.SourceConfig` (same as ``astrocyte.yml`` ``sources:``). 

27 ``astrocyte`` must expose ``async def retain(...)`` like :class:`~astrocyte._astrocyte.Astrocyte`. 

28 """ 

29 from starlette.applications import Starlette 

30 from starlette.requests import Request 

31 from starlette.responses import JSONResponse 

32 from starlette.routing import Route 

33 

34 from astrocyte.config import SourceConfig 

35 from astrocyte.ingest.webhook import handle_webhook_ingest 

36 

37 async def ingest_webhook(request: Request) -> JSONResponse: 

38 source_id = request.path_params["source_id"] 

39 cfg = sources.get(source_id) 

40 if cfg is None: 

41 return JSONResponse({"ok": False, "error": "unknown source"}, status_code=404) 

42 if not isinstance(cfg, SourceConfig): 

43 return JSONResponse({"ok": False, "error": "invalid source config"}, status_code=500) 

44 

45 raw = await request.body() 

46 headers = {k: v for k, v in request.headers.items()} 

47 

48 result = await handle_webhook_ingest( 

49 source_id=source_id, 

50 source_config=cfg, 

51 raw_body=raw, 

52 headers=headers, 

53 retain=astrocyte.retain, 

54 ) 

55 

56 payload: dict[str, Any] = {"ok": result.ok, "error": result.error} 

57 if result.retain_result is not None: 

58 rr = result.retain_result 

59 payload["stored"] = rr.stored 

60 payload["memory_id"] = rr.memory_id 

61 payload["deduplicated"] = getattr(rr, "deduplicated", False) 

62 if rr.error: 

63 payload["retain_error"] = rr.error 

64 return JSONResponse(payload, status_code=result.http_status) 

65 

66 return Starlette( 

67 routes=[ 

68 Route( 

69 "/v1/ingest/webhook/{source_id}", 

70 endpoint=ingest_webhook, 

71 methods=["POST"], 

72 ), 

73 ], 

74 )