Coverage for astrocyte/compose/conversation_document.py: 0%
37 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""ConversationDocumentStore — many-to-many link between conversations and documents.
3Composition-layer SPI. Lives here (not in either engine) because it
4spans both the Conversation Engine and the Document Engine without
5belonging to either.
7This is what resolves the Path 3 (DE + CE → ME) composition scenario:
8 - At ingest time: call attach(conversation_id, document_id) after
9 uploading a document to a session.
10 - At recall time: call documents_for_conversation(conversation_id)
11 to get the doc_ids to pass to DocumentNavigator.search().
13The Postgres implementation lives in adapters-storage-py/astrocyte-postgres/
14and backs this SPI with migration 031 (astrocyte_conversation_documents).
15"""
17from __future__ import annotations
19from abc import ABC, abstractmethod
20from dataclasses import dataclass
21from datetime import datetime, timezone
24@dataclass
25class ConversationDocumentLink:
26 """One row in the conversation ↔ document association table."""
28 conversation_id: str
29 document_id: str
30 attached_at: datetime
31 attached_by: str | None = None # optional actor/speaker id
34class ConversationDocumentStore(ABC):
35 """SPI for the conversation ↔ document many-to-many relationship.
37 Cascade semantics (enforced by the Postgres implementation via FK):
38 detach(conversation_id, document_id)
39 → removes the structural link; document remains in the bank
40 and is still searchable via memory.recall(). Only an explicit
41 DocumentStore.delete_document() removes it from the bank.
43 delete conversation → CASCADE removes all its document associations
44 (documents themselves are unaffected)
46 delete document → CASCADE removes associations from all conversations
47 (conversations themselves are unaffected)
48 """
50 @abstractmethod
51 async def attach(
52 self,
53 conversation_id: str,
54 document_id: str,
55 *,
56 attached_by: str | None = None,
57 ) -> ConversationDocumentLink:
58 """Associate a document with a conversation.
60 Idempotent — attaching an already-attached document updates
61 attached_at and returns the link.
62 """
64 @abstractmethod
65 async def detach(self, conversation_id: str, document_id: str) -> None:
66 """Remove the structural link between a conversation and a document.
68 No-op if the link doesn't exist. Does NOT delete the document.
69 """
71 @abstractmethod
72 async def documents_for_conversation(self, conversation_id: str) -> list[str]:
73 """All document_ids attached to this conversation, ordered by attached_at asc."""
75 @abstractmethod
76 async def conversations_for_document(self, document_id: str) -> list[str]:
77 """All conversation_ids that reference this document, newest first."""
80# ── In-memory implementation (tests, embedded use) ────────────────────────────
83class InMemoryConversationDocumentStore(ConversationDocumentStore):
84 """Pure-Python implementation backed by dicts. Not thread-safe."""
86 def __init__(self) -> None:
87 # (conversation_id, document_id) → ConversationDocumentLink
88 self._links: dict[tuple[str, str], ConversationDocumentLink] = {}
90 async def attach(
91 self,
92 conversation_id: str,
93 document_id: str,
94 *,
95 attached_by: str | None = None,
96 ) -> ConversationDocumentLink:
97 key = (conversation_id, document_id)
98 link = ConversationDocumentLink(
99 conversation_id=conversation_id,
100 document_id=document_id,
101 attached_at=datetime.now(timezone.utc),
102 attached_by=attached_by,
103 )
104 self._links[key] = link
105 return link
107 async def detach(self, conversation_id: str, document_id: str) -> None:
108 self._links.pop((conversation_id, document_id), None)
110 async def documents_for_conversation(self, conversation_id: str) -> list[str]:
111 links = [
112 link for (cid, _), link in self._links.items()
113 if cid == conversation_id
114 ]
115 links.sort(key=lambda link: link.attached_at)
116 return [link.document_id for link in links]
118 async def conversations_for_document(self, document_id: str) -> list[str]:
119 links = [
120 link for (_, did), link in self._links.items()
121 if did == document_id
122 ]
123 links.sort(key=lambda link: link.attached_at, reverse=True)
124 return [link.conversation_id for link in links]