Coverage for astrocyte/_ingest_spi.py: 100%

16 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""Shared SPI types for engine→Memory Engine composition. 

2 

3Each media-specific engine (Document, Conversation, future Image/Audio) 

4has its own ingestor that walks a media representation and calls a 

5Memory Engine retain function with per-chunk text + opaque metadata. 

6This module defines the shared signature + result shape so ingestors 

7across engines have a consistent contract. 

8 

9The Memory Engine itself doesn't import from here — it just accepts 

10``retain(content, metadata, ...)`` calls. This file is the producer- 

11side contract; consumers (e.g. ``AstrocyteClient.retain_text``) only 

12need to match the ``MemoryRetainFn`` signature. 

13""" 

14 

15from __future__ import annotations 

16 

17from dataclasses import dataclass, field 

18from typing import Any, Awaitable, Protocol 

19 

20 

21class MemoryRetainFn(Protocol): 

22 """The minimal Memory Engine retain signature ingestors call. 

23 

24 Any callable matching this shape can be passed to an ingestor. 

25 Production: bound method on ``AstrocyteClient``. Tests: in-memory 

26 list-appending closure. 

27 """ 

28 

29 def __call__( 

30 self, 

31 *, 

32 bank_id: str, 

33 content: str, 

34 metadata: dict[str, Any], 

35 ) -> Awaitable[None]: 

36 """Retain one segment into the named bank. Returns an awaitable 

37 that resolves when the segment is persisted. Implementations: 

38 ``AstrocyteClient.retain_text`` in production, list-appending 

39 closure in tests.""" 

40 

41 

42@dataclass 

43class IngestResult: 

44 """Summary returned by every ingestor after walking a source. 

45 

46 ``segments_emitted`` is the count of distinct retain calls made 

47 (nodes for documents, chunks for conversations). ``source_id`` is 

48 the upstream entity ID (document.id or conversation.id) so callers 

49 can correlate ingest outcomes back to the source. 

50 

51 ``failures`` lists any segments whose retain call raised — we 

52 swallow per-segment failures in the ingestor (one bad chunk 

53 shouldn't kill the whole ingest) and surface them here. 

54 """ 

55 

56 bank_id: str 

57 source_kind: str # "astrocyte.documents" | "astrocyte.conversations" | ... 

58 source_id: str 

59 segments_emitted: int 

60 failures: list[dict[str, Any]] = field(default_factory=list) 

61 metadata: dict[str, Any] = field(default_factory=dict) 

62 

63 @property 

64 def ok(self) -> bool: 

65 return not self.failures