Coverage for astrocyte/integrations/livekit.py: 100%

37 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""LiveKit Agents integration — Astrocyte memory for real-time voice/video agents. 

2 

3Usage: 

4 from astrocyte import Astrocyte 

5 from astrocyte.integrations.livekit import AstrocyteLiveKitMemory 

6 

7 brain = Astrocyte.from_config("astrocyte.yaml") 

8 memory = AstrocyteLiveKitMemory(brain, bank_id="session-123") 

9 

10 # In a LiveKit agent session handler 

11 async def on_session(ctx: AgentContext): 

12 # Recall relevant context for the conversation 

13 context = await memory.get_session_context("user preferences") 

14 

15 # After the conversation, retain key information 

16 await memory.retain_from_session( 

17 "User mentioned they prefer morning appointments", 

18 session_id=ctx.session_id, 

19 ) 

20 

21LiveKit Agents are real-time voice/video agents. Memory integration is 

22different from batch agents — it's about: 

231. Loading context at session start (pre-fetch relevant memories) 

242. Retaining key information after sessions (post-session summarization) 

253. Mid-session recall for long-running conversations 

26""" 

27 

28from __future__ import annotations 

29 

30from typing import TYPE_CHECKING, Any 

31 

32if TYPE_CHECKING: 

33 from astrocyte._astrocyte import Astrocyte 

34 

35from astrocyte.types import AstrocyteContext 

36 

37 

38class AstrocyteLiveKitMemory: 

39 """Astrocyte memory adapter for LiveKit real-time agents. 

40 

41 Designed for the real-time voice/video agent lifecycle: 

42 - Session start: load relevant memories as context 

43 - Mid-session: recall for dynamic context enrichment 

44 - Session end: retain key takeaways 

45 """ 

46 

47 def __init__( 

48 self, 

49 brain: Astrocyte, 

50 bank_id: str, 

51 *, 

52 context: AstrocyteContext | None = None, 

53 session_bank_prefix: str | None = None, 

54 max_context_items: int = 10, 

55 ) -> None: 

56 self.brain = brain 

57 self.bank_id = bank_id 

58 self._context = context 

59 self._session_prefix = session_bank_prefix 

60 self.max_context_items = max_context_items 

61 

62 def _session_bank(self, session_id: str | None) -> str: 

63 """Per-session bank or shared bank.""" 

64 if session_id and self._session_prefix: 

65 return f"{self._session_prefix}{session_id}" 

66 return self.bank_id 

67 

68 async def get_session_context( 

69 self, 

70 query: str, 

71 *, 

72 session_id: str | None = None, 

73 max_results: int | None = None, 

74 ) -> str: 

75 """Load relevant memories as context for a session start. 

76 

77 Returns formatted text suitable for injection into a system prompt. 

78 """ 

79 bank = self._session_bank(session_id) 

80 result = await self.brain.recall( 

81 query, 

82 bank_id=bank, 

83 max_results=max_results or self.max_context_items, 

84 context=self._context, 

85 ) 

86 if not result.hits: 

87 return "" 

88 return "\n".join(f"- {h.text}" for h in result.hits) 

89 

90 async def recall_mid_session( 

91 self, 

92 query: str, 

93 *, 

94 session_id: str | None = None, 

95 max_results: int = 3, 

96 ) -> list[dict[str, Any]]: 

97 """Quick recall during an active session for context enrichment. 

98 

99 Returns structured results for programmatic use. 

100 """ 

101 bank = self._session_bank(session_id) 

102 result = await self.brain.recall(query, bank_id=bank, max_results=max_results, context=self._context) 

103 return [{"text": h.text, "score": h.score, "memory_id": h.memory_id} for h in result.hits] 

104 

105 async def retain_from_session( 

106 self, 

107 content: str, 

108 *, 

109 session_id: str | None = None, 

110 tags: list[str] | None = None, 

111 ) -> str | None: 

112 """Retain key information from a session. Returns memory_id.""" 

113 bank = self._session_bank(session_id) 

114 meta: dict[str, Any] = {"source": "livekit"} 

115 if session_id: 

116 meta["session_id"] = session_id 

117 

118 all_tags = list(tags or []) 

119 all_tags.append("livekit") 

120 

121 result = await self.brain.retain(content, bank_id=bank, tags=all_tags, metadata=meta, context=self._context) 

122 return result.memory_id if result.stored else None 

123 

124 async def summarize_session( 

125 self, 

126 query: str = "Summarize the key points from this session", 

127 *, 

128 session_id: str | None = None, 

129 ) -> str: 

130 """Use reflect to synthesize session memories into a summary.""" 

131 bank = self._session_bank(session_id) 

132 result = await self.brain.reflect(query, bank_id=bank, context=self._context) 

133 return result.answer