Coverage for astrocyte/conversations/storage.py: 100%
25 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""ConversationStore SPI — persist conversations + their turns.
3Abstract base + in-memory impl for tests / embedded use. Postgres impl
4lives in ``adapters-storage-py/astrocyte-postgres/`` so the
5Conversation Engine doesn't depend on Postgres directly.
7SPI shape (narrow, mirroring DocumentStore):
8 - ``save_conversation(c)`` — upsert conversation + all turns
9 - ``get_conversation(id)`` — fetch with all turns
10 - ``list_conversations(limit)`` — paginate; control plane / debugging
11 - ``delete_conversation(id)`` — drop conversation AND its turns
12"""
14from __future__ import annotations
16from abc import ABC, abstractmethod
18from astrocyte.conversations.types import Conversation
21class ConversationNotFoundError(Exception):
22 """Raised when a requested conversation_id doesn't exist."""
25class ConversationStore(ABC):
26 """Persistence SPI for conversations + ordered turns."""
28 @abstractmethod
29 async def save_conversation(self, conversation: Conversation) -> None:
30 """Upsert a conversation. Replaces any prior turns (full re-save).
32 Idempotent — calling twice with the same ``conversation.id``
33 replaces rather than duplicates. The full turn list is
34 re-written each call; if you need incremental append, use
35 ``append_turns`` (Phase 3+ if needed).
36 """
38 @abstractmethod
39 async def get_conversation(self, conversation_id: str) -> Conversation | None:
40 """Fetch a conversation with all its turns in order. None if not found."""
42 @abstractmethod
43 async def list_conversations(self, *, limit: int = 100) -> list[Conversation]:
44 """List conversations newest-first, with their turns.
46 For Phase 2.5 this loads turns eagerly. If conversations grow
47 large in production, an ``include_turns=False`` variant is a
48 natural follow-on.
49 """
51 @abstractmethod
52 async def delete_conversation(self, conversation_id: str) -> None:
53 """Delete a conversation and all its turns. No-op if not found."""
56# ─── in-memory impl ────────────────────────────────────────────────────
59class InMemoryConversationStore(ConversationStore):
60 """Pure-Python ConversationStore backed by a dict. Not thread-safe.
62 For unit tests, CLI/embedded use, smoke tests of the Conversation
63 Engine before wiring to Postgres.
64 """
66 def __init__(self) -> None:
67 self._convs: dict[str, Conversation] = {}
69 async def save_conversation(self, conversation: Conversation) -> None:
70 # Store a copy of the public fields so external mutation of the
71 # caller's Conversation object doesn't silently change stored state
72 self._convs[conversation.id] = Conversation(
73 id=conversation.id,
74 turns=list(conversation.turns), # shallow copy
75 source_uri=conversation.source_uri,
76 title=conversation.title,
77 created_at=conversation.created_at,
78 metadata=dict(conversation.metadata),
79 )
81 async def get_conversation(self, conversation_id: str) -> Conversation | None:
82 return self._convs.get(conversation_id)
84 async def list_conversations(self, *, limit: int = 100) -> list[Conversation]:
85 convs = sorted(self._convs.values(), key=lambda c: c.created_at, reverse=True)
86 return convs[:limit]
88 async def delete_conversation(self, conversation_id: str) -> None:
89 self._convs.pop(conversation_id, None)