Coverage for astrocyte/compose/conversation_document.py: 0%

37 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""ConversationDocumentStore — many-to-many link between conversations and documents. 

2 

3Composition-layer SPI. Lives here (not in either engine) because it 

4spans both the Conversation Engine and the Document Engine without 

5belonging to either. 

6 

7This is what resolves the Path 3 (DE + CE → ME) composition scenario: 

8 - At ingest time: call attach(conversation_id, document_id) after 

9 uploading a document to a session. 

10 - At recall time: call documents_for_conversation(conversation_id) 

11 to get the doc_ids to pass to DocumentNavigator.search(). 

12 

13The Postgres implementation lives in adapters-storage-py/astrocyte-postgres/ 

14and backs this SPI with migration 031 (astrocyte_conversation_documents). 

15""" 

16 

17from __future__ import annotations 

18 

19from abc import ABC, abstractmethod 

20from dataclasses import dataclass 

21from datetime import datetime, timezone 

22 

23 

24@dataclass 

25class ConversationDocumentLink: 

26 """One row in the conversation ↔ document association table.""" 

27 

28 conversation_id: str 

29 document_id: str 

30 attached_at: datetime 

31 attached_by: str | None = None # optional actor/speaker id 

32 

33 

34class ConversationDocumentStore(ABC): 

35 """SPI for the conversation ↔ document many-to-many relationship. 

36 

37 Cascade semantics (enforced by the Postgres implementation via FK): 

38 detach(conversation_id, document_id) 

39 → removes the structural link; document remains in the bank 

40 and is still searchable via memory.recall(). Only an explicit 

41 DocumentStore.delete_document() removes it from the bank. 

42 

43 delete conversation → CASCADE removes all its document associations 

44 (documents themselves are unaffected) 

45 

46 delete document → CASCADE removes associations from all conversations 

47 (conversations themselves are unaffected) 

48 """ 

49 

50 @abstractmethod 

51 async def attach( 

52 self, 

53 conversation_id: str, 

54 document_id: str, 

55 *, 

56 attached_by: str | None = None, 

57 ) -> ConversationDocumentLink: 

58 """Associate a document with a conversation. 

59 

60 Idempotent — attaching an already-attached document updates 

61 attached_at and returns the link. 

62 """ 

63 

64 @abstractmethod 

65 async def detach(self, conversation_id: str, document_id: str) -> None: 

66 """Remove the structural link between a conversation and a document. 

67 

68 No-op if the link doesn't exist. Does NOT delete the document. 

69 """ 

70 

71 @abstractmethod 

72 async def documents_for_conversation(self, conversation_id: str) -> list[str]: 

73 """All document_ids attached to this conversation, ordered by attached_at asc.""" 

74 

75 @abstractmethod 

76 async def conversations_for_document(self, document_id: str) -> list[str]: 

77 """All conversation_ids that reference this document, newest first.""" 

78 

79 

80# ── In-memory implementation (tests, embedded use) ──────────────────────────── 

81 

82 

83class InMemoryConversationDocumentStore(ConversationDocumentStore): 

84 """Pure-Python implementation backed by dicts. Not thread-safe.""" 

85 

86 def __init__(self) -> None: 

87 # (conversation_id, document_id) → ConversationDocumentLink 

88 self._links: dict[tuple[str, str], ConversationDocumentLink] = {} 

89 

90 async def attach( 

91 self, 

92 conversation_id: str, 

93 document_id: str, 

94 *, 

95 attached_by: str | None = None, 

96 ) -> ConversationDocumentLink: 

97 key = (conversation_id, document_id) 

98 link = ConversationDocumentLink( 

99 conversation_id=conversation_id, 

100 document_id=document_id, 

101 attached_at=datetime.now(timezone.utc), 

102 attached_by=attached_by, 

103 ) 

104 self._links[key] = link 

105 return link 

106 

107 async def detach(self, conversation_id: str, document_id: str) -> None: 

108 self._links.pop((conversation_id, document_id), None) 

109 

110 async def documents_for_conversation(self, conversation_id: str) -> list[str]: 

111 links = [ 

112 link for (cid, _), link in self._links.items() 

113 if cid == conversation_id 

114 ] 

115 links.sort(key=lambda link: link.attached_at) 

116 return [link.document_id for link in links] 

117 

118 async def conversations_for_document(self, document_id: str) -> list[str]: 

119 links = [ 

120 link for (_, did), link in self._links.items() 

121 if did == document_id 

122 ] 

123 links.sort(key=lambda link: link.attached_at, reverse=True) 

124 return [link.conversation_id for link in links]