Coverage for astrocyte/integrations/managed_agents.py: 56%
98 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""Claude Managed Agents integration — session-aware memory for cloud-hosted agents.
3Managed Agents (https://claude.com/blog/claude-managed-agents) handles
4orchestration, sandboxing, and tool execution. Astrocyte handles what the
5agent remembers, for how long, and under what compliance rules.
7This module provides helpers for the two integration patterns:
91. **Single-agent with session-scoped memory**:
10 Memory banks are tied to SDK session IDs so each session gets isolated memory.
122. **Multi-agent coordination**:
13 A coordinator agent shares a bank namespace with sub-agents.
14 Each sub-agent writes to its own bank; all agents can read from
15 the shared coordinator bank. MIP routes cross-agent writes.
17Usage — single agent:
18 from astrocyte import Astrocyte
19 from astrocyte.integrations.managed_agents import create_memory_server
21 brain = Astrocyte.from_config("astrocyte.yaml")
22 server = create_memory_server(brain, session_id="sess-abc123")
24 options = ClaudeAgentOptions(
25 mcp_servers={"memory": server},
26 allowed_tools=["mcp__memory__*"],
27 )
29Usage — multi-agent:
30 from astrocyte.integrations.managed_agents import (
31 create_coordinator_server,
32 create_subagent_definition,
33 )
35 coord_server = create_coordinator_server(brain, session_id="sess-abc")
36 researcher = create_subagent_definition(
37 brain, role="researcher", session_id="sess-abc",
38 description="Research agent that explores files and web sources.",
39 prompt="You are a research specialist...",
40 tools=["Read", "Grep", "Glob", "WebSearch", "WebFetch"],
41 )
43 options = ClaudeAgentOptions(
44 mcp_servers={"memory": coord_server},
45 allowed_tools=["mcp__memory__*", "Agent"],
46 agents={"researcher": researcher},
47 )
48"""
50from __future__ import annotations
52import json
53from typing import TYPE_CHECKING, Any
55from astrocyte.types import AstrocyteContext
57if TYPE_CHECKING:
58 from astrocyte._astrocyte import Astrocyte
61# ---------------------------------------------------------------------------
62# Bank naming conventions
63# ---------------------------------------------------------------------------
66def session_bank_id(session_id: str) -> str:
67 """Derive a bank_id from an SDK session ID."""
68 return f"session-{session_id}"
71def agent_bank_id(session_id: str, role: str) -> str:
72 """Derive a per-agent bank_id within a session."""
73 return f"session-{session_id}:agent-{role}"
76def coordinator_bank_id(session_id: str) -> str:
77 """The shared coordinator bank for a multi-agent session."""
78 return f"session-{session_id}:coordinator"
81# ---------------------------------------------------------------------------
82# Shared helpers — parse tags
83# ---------------------------------------------------------------------------
86def _parse_tags(raw: Any) -> list[str] | None:
87 """Parse comma-separated tag string into a list."""
88 if isinstance(raw, str) and raw:
89 return [t.strip() for t in raw.split(",")]
90 return None
93def _format_sdk_response(data: Any) -> dict[str, Any]:
94 """Wrap data in SDK MCP response format."""
95 text = data if isinstance(data, str) else json.dumps(data)
96 return {"content": [{"type": "text", "text": text}]}
99# ---------------------------------------------------------------------------
100# Handler logic — testable without SDK dependency
101# ---------------------------------------------------------------------------
104async def _handle_retain(
105 brain: Astrocyte,
106 bank_id: str,
107 args: dict[str, Any],
108 *,
109 context: AstrocyteContext | None = None,
110) -> dict[str, Any]:
111 """Retain handler: store content into a specific bank."""
112 tag_list = _parse_tags(args.get("tags"))
113 result = await brain.retain(args["content"], bank_id=bank_id, tags=tag_list, context=context)
114 return _format_sdk_response({"stored": result.stored, "memory_id": result.memory_id})
117async def _handle_coordinator_recall(
118 brain: Astrocyte,
119 session_id: str,
120 coord_bank: str,
121 args: dict[str, Any],
122 *,
123 context: AstrocyteContext | None = None,
124) -> dict[str, Any]:
125 """Coordinator recall: search coordinator bank + optional sub-agent banks."""
126 query_text = args["query"]
127 max_results = args.get("max_results", 10)
128 include_agents = args.get("include_agents", "")
130 banks = [coord_bank]
131 if include_agents:
132 for role in include_agents.split(","):
133 role = role.strip()
134 if role:
135 banks.append(agent_bank_id(session_id, role))
137 result = await brain.recall(
138 query_text,
139 banks=banks,
140 strategy="parallel",
141 max_results=max_results,
142 context=context,
143 )
144 hits = [{"text": h.text, "score": round(h.score, 4), "bank_id": h.bank_id} for h in result.hits]
145 return _format_sdk_response({"hits": hits, "total": result.total_available})
148async def _handle_coordinator_reflect(
149 brain: Astrocyte,
150 session_id: str,
151 coord_bank: str,
152 args: dict[str, Any],
153 *,
154 context: AstrocyteContext | None = None,
155) -> dict[str, Any]:
156 """Coordinator reflect: synthesize from coordinator + optional sub-agent banks."""
157 query_text = args["query"]
158 include_agents = args.get("include_agents", "")
160 banks = [coord_bank]
161 if include_agents:
162 for role in include_agents.split(","):
163 role = role.strip()
164 if role:
165 banks.append(agent_bank_id(session_id, role))
167 result = await brain.reflect(query_text, banks=banks, strategy="parallel", context=context)
168 return _format_sdk_response(result.answer)
171async def _handle_subagent_recall(
172 brain: Astrocyte,
173 own_bank: str,
174 coord_bank: str,
175 args: dict[str, Any],
176 *,
177 context: AstrocyteContext | None = None,
178) -> dict[str, Any]:
179 """Sub-agent recall: search own bank + coordinator bank."""
180 query_text = args["query"]
181 max_results = args.get("max_results", 10)
183 result = await brain.recall(
184 query_text,
185 banks=[own_bank, coord_bank],
186 strategy="parallel",
187 max_results=max_results,
188 context=context,
189 )
190 hits = [{"text": h.text, "score": round(h.score, 4), "bank_id": h.bank_id} for h in result.hits]
191 return _format_sdk_response({"hits": hits, "total": result.total_available})
194# ---------------------------------------------------------------------------
195# Single-agent: session-scoped memory server
196# ---------------------------------------------------------------------------
199def create_memory_server(
200 brain: Astrocyte,
201 *,
202 session_id: str,
203 server_name: str = "astrocyte_memory",
204 include_reflect: bool = True,
205 include_forget: bool = False,
206 context: AstrocyteContext | None = None,
207) -> Any:
208 """Create an in-process MCP server with session-scoped memory.
210 The bank_id is derived from the SDK session_id, so each session
211 gets its own isolated memory namespace.
213 Requires ``claude_agent_sdk`` to be installed.
214 Returns an MCP server for ``ClaudeAgentOptions.mcp_servers``.
215 """
216 from astrocyte.integrations.claude_agent_sdk import astrocyte_claude_agent_server
218 bank = session_bank_id(session_id)
219 return astrocyte_claude_agent_server(
220 brain,
221 bank_id=bank,
222 server_name=server_name,
223 include_reflect=include_reflect,
224 include_forget=include_forget,
225 context=context,
226 )
229# ---------------------------------------------------------------------------
230# Multi-agent: coordinator + sub-agent memory
231# ---------------------------------------------------------------------------
234def create_coordinator_server(
235 brain: Astrocyte,
236 *,
237 session_id: str,
238 server_name: str = "astrocyte_memory",
239 include_reflect: bool = True,
240 context: AstrocyteContext | None = None,
241) -> Any:
242 """Create an MCP server for the coordinator agent.
244 The coordinator writes to and reads from the shared coordinator bank.
245 It can also read from any sub-agent bank via multi-bank recall.
247 Requires ``claude_agent_sdk`` to be installed.
248 """
249 from claude_agent_sdk import create_sdk_mcp_server, tool
251 coord_bank = coordinator_bank_id(session_id)
253 sdk_tools = []
255 @tool(
256 "memory_retain",
257 "Store a finding, decision, or coordination note into shared memory. All sub-agents can read this.",
258 {"content": str, "tags": str},
259 )
260 async def memory_retain(args: dict[str, Any]) -> dict[str, Any]:
261 return await _handle_retain(brain, coord_bank, args, context=context)
263 sdk_tools.append(memory_retain)
265 @tool(
266 "memory_recall",
267 "Search shared memory and optionally sub-agent memory banks. "
268 "Pass agent roles as comma-separated 'include_agents' to also search their banks.",
269 {"query": str, "max_results": int, "include_agents": str},
270 )
271 async def memory_recall(args: dict[str, Any]) -> dict[str, Any]:
272 return await _handle_coordinator_recall(brain, session_id, coord_bank, args, context=context)
274 sdk_tools.append(memory_recall)
276 if include_reflect:
278 @tool(
279 "memory_reflect",
280 "Synthesize a comprehensive answer from shared memory and sub-agent banks.",
281 {"query": str, "include_agents": str},
282 )
283 async def memory_reflect(args: dict[str, Any]) -> dict[str, Any]:
284 return await _handle_coordinator_reflect(brain, session_id, coord_bank, args, context=context)
286 sdk_tools.append(memory_reflect)
288 return create_sdk_mcp_server(
289 name=server_name,
290 version="1.0.0",
291 tools=sdk_tools,
292 )
295def create_subagent_memory_server(
296 brain: Astrocyte,
297 *,
298 session_id: str,
299 role: str,
300 server_name: str | None = None,
301 context: AstrocyteContext | None = None,
302) -> Any:
303 """Create an MCP server for a sub-agent's private memory bank.
305 The sub-agent writes to its own bank (``session-{session_id}:agent-{role}``)
306 and can read from both its own bank and the shared coordinator bank.
308 Requires ``claude_agent_sdk`` to be installed.
309 """
310 from claude_agent_sdk import create_sdk_mcp_server, tool
312 own_bank = agent_bank_id(session_id, role)
313 coord_bank = coordinator_bank_id(session_id)
314 name = server_name or f"astrocyte_memory_{role}"
316 sdk_tools = []
318 @tool(
319 "memory_retain",
320 f"Store a finding into the {role} agent's private memory bank.",
321 {"content": str, "tags": str},
322 )
323 async def memory_retain(args: dict[str, Any]) -> dict[str, Any]:
324 return await _handle_retain(brain, own_bank, args, context=context)
326 sdk_tools.append(memory_retain)
328 @tool(
329 "memory_recall",
330 f"Search the {role} agent's memory and the shared coordinator memory.",
331 {"query": str, "max_results": int},
332 )
333 async def memory_recall(args: dict[str, Any]) -> dict[str, Any]:
334 return await _handle_subagent_recall(brain, own_bank, coord_bank, args, context=context)
336 sdk_tools.append(memory_recall)
338 return create_sdk_mcp_server(
339 name=name,
340 version="1.0.0",
341 tools=sdk_tools,
342 )
345def create_subagent_definition(
346 brain: Astrocyte,
347 *,
348 role: str,
349 session_id: str,
350 description: str,
351 prompt: str,
352 tools: list[str] | None = None,
353 model: str | None = None,
354 context: AstrocyteContext | None = None,
355) -> dict[str, Any]:
356 """Create an AgentDefinition dict for a sub-agent with its own memory bank.
358 Returns a dict compatible with ``ClaudeAgentOptions.agents``.
359 The sub-agent gets an MCP server that writes to its own bank
360 and reads from both its bank and the coordinator bank.
361 """
362 memory_server = create_subagent_memory_server(brain, session_id=session_id, role=role, context=context)
363 server_name = f"astrocyte_memory_{role}"
365 definition: dict[str, Any] = {
366 "description": description,
367 "prompt": prompt,
368 "mcpServers": [memory_server],
369 }
371 agent_tools = list(tools or [])
372 agent_tools.append(f"mcp__{server_name}__*")
373 definition["tools"] = agent_tools
375 if model:
376 definition["model"] = model
378 return definition