Coverage for astrocyte/ingest/runtime.py: 100%

11 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""Attach :class:`~astrocyte._astrocyte.Astrocyte` to ingest :func:`~astrocyte.ingest.webhook.RetainCallable` for stream sources.""" 

2 

3from __future__ import annotations 

4 

5from typing import TYPE_CHECKING 

6 

7from astrocyte.types import AstrocyteContext, RetainResult 

8 

9if TYPE_CHECKING: 

10 from astrocyte._astrocyte import Astrocyte 

11 

12from astrocyte.ingest.webhook import RetainCallable 

13 

14 

15def retain_callable_for_astrocyte(astrocyte: Astrocyte) -> RetainCallable: 

16 """Build a ``retain`` async callable suitable for :class:`SourceRegistry.from_sources_config` (``retain=``).""" 

17 

18 async def retain( 

19 text: str, 

20 bank_id: str, 

21 *, 

22 metadata: dict[str, str | int | float | bool | None] | None = None, 

23 content_type: str = "text", 

24 extraction_profile: str | None = None, 

25 source: str | None = None, 

26 principal: str | None = None, 

27 context: AstrocyteContext | None = None, 

28 ) -> RetainResult: 

29 ctx = context 

30 if ctx is None and principal: 

31 ctx = AstrocyteContext(principal=principal) 

32 return await astrocyte.retain( 

33 text, 

34 bank_id, 

35 metadata=metadata, 

36 content_type=content_type, 

37 extraction_profile=extraction_profile, 

38 source=source, 

39 context=ctx, 

40 ) 

41 

42 return retain