Coverage for astrocyte/conversations/storage.py: 100%

25 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""ConversationStore SPI — persist conversations + their turns. 

2 

3Abstract base + in-memory impl for tests / embedded use. Postgres impl 

4lives in ``adapters-storage-py/astrocyte-postgres/`` so the 

5Conversation Engine doesn't depend on Postgres directly. 

6 

7SPI shape (narrow, mirroring DocumentStore): 

8 - ``save_conversation(c)`` — upsert conversation + all turns 

9 - ``get_conversation(id)`` — fetch with all turns 

10 - ``list_conversations(limit)`` — paginate; control plane / debugging 

11 - ``delete_conversation(id)`` — drop conversation AND its turns 

12""" 

13 

14from __future__ import annotations 

15 

16from abc import ABC, abstractmethod 

17 

18from astrocyte.conversations.types import Conversation 

19 

20 

21class ConversationNotFoundError(Exception): 

22 """Raised when a requested conversation_id doesn't exist.""" 

23 

24 

25class ConversationStore(ABC): 

26 """Persistence SPI for conversations + ordered turns.""" 

27 

28 @abstractmethod 

29 async def save_conversation(self, conversation: Conversation) -> None: 

30 """Upsert a conversation. Replaces any prior turns (full re-save). 

31 

32 Idempotent — calling twice with the same ``conversation.id`` 

33 replaces rather than duplicates. The full turn list is 

34 re-written each call; if you need incremental append, use 

35 ``append_turns`` (Phase 3+ if needed). 

36 """ 

37 

38 @abstractmethod 

39 async def get_conversation(self, conversation_id: str) -> Conversation | None: 

40 """Fetch a conversation with all its turns in order. None if not found.""" 

41 

42 @abstractmethod 

43 async def list_conversations(self, *, limit: int = 100) -> list[Conversation]: 

44 """List conversations newest-first, with their turns. 

45 

46 For Phase 2.5 this loads turns eagerly. If conversations grow 

47 large in production, an ``include_turns=False`` variant is a 

48 natural follow-on. 

49 """ 

50 

51 @abstractmethod 

52 async def delete_conversation(self, conversation_id: str) -> None: 

53 """Delete a conversation and all its turns. No-op if not found.""" 

54 

55 

56# ─── in-memory impl ──────────────────────────────────────────────────── 

57 

58 

59class InMemoryConversationStore(ConversationStore): 

60 """Pure-Python ConversationStore backed by a dict. Not thread-safe. 

61 

62 For unit tests, CLI/embedded use, smoke tests of the Conversation 

63 Engine before wiring to Postgres. 

64 """ 

65 

66 def __init__(self) -> None: 

67 self._convs: dict[str, Conversation] = {} 

68 

69 async def save_conversation(self, conversation: Conversation) -> None: 

70 # Store a copy of the public fields so external mutation of the 

71 # caller's Conversation object doesn't silently change stored state 

72 self._convs[conversation.id] = Conversation( 

73 id=conversation.id, 

74 turns=list(conversation.turns), # shallow copy 

75 source_uri=conversation.source_uri, 

76 title=conversation.title, 

77 created_at=conversation.created_at, 

78 metadata=dict(conversation.metadata), 

79 ) 

80 

81 async def get_conversation(self, conversation_id: str) -> Conversation | None: 

82 return self._convs.get(conversation_id) 

83 

84 async def list_conversations(self, *, limit: int = 100) -> list[Conversation]: 

85 convs = sorted(self._convs.values(), key=lambda c: c.created_at, reverse=True) 

86 return convs[:limit] 

87 

88 async def delete_conversation(self, conversation_id: str) -> None: 

89 self._convs.pop(conversation_id, None)