Coverage for astrocyte/ingest/runtime.py: 100%
11 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""Attach :class:`~astrocyte._astrocyte.Astrocyte` to ingest :func:`~astrocyte.ingest.webhook.RetainCallable` for stream sources."""
3from __future__ import annotations
5from typing import TYPE_CHECKING
7from astrocyte.types import AstrocyteContext, RetainResult
9if TYPE_CHECKING:
10 from astrocyte._astrocyte import Astrocyte
12from astrocyte.ingest.webhook import RetainCallable
15def retain_callable_for_astrocyte(astrocyte: Astrocyte) -> RetainCallable:
16 """Build a ``retain`` async callable suitable for :class:`SourceRegistry.from_sources_config` (``retain=``)."""
18 async def retain(
19 text: str,
20 bank_id: str,
21 *,
22 metadata: dict[str, str | int | float | bool | None] | None = None,
23 content_type: str = "text",
24 extraction_profile: str | None = None,
25 source: str | None = None,
26 principal: str | None = None,
27 context: AstrocyteContext | None = None,
28 ) -> RetainResult:
29 ctx = context
30 if ctx is None and principal:
31 ctx = AstrocyteContext(principal=principal)
32 return await astrocyte.retain(
33 text,
34 bank_id,
35 metadata=metadata,
36 content_type=content_type,
37 extraction_profile=extraction_profile,
38 source=source,
39 context=ctx,
40 )
42 return retain