Coverage for astrocyte/integrations/managed_agents.py: 56%

98 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""Claude Managed Agents integration — session-aware memory for cloud-hosted agents. 

2 

3Managed Agents (https://claude.com/blog/claude-managed-agents) handles 

4orchestration, sandboxing, and tool execution. Astrocyte handles what the 

5agent remembers, for how long, and under what compliance rules. 

6 

7This module provides helpers for the two integration patterns: 

8 

91. **Single-agent with session-scoped memory**: 

10 Memory banks are tied to SDK session IDs so each session gets isolated memory. 

11 

122. **Multi-agent coordination**: 

13 A coordinator agent shares a bank namespace with sub-agents. 

14 Each sub-agent writes to its own bank; all agents can read from 

15 the shared coordinator bank. MIP routes cross-agent writes. 

16 

17Usage — single agent: 

18 from astrocyte import Astrocyte 

19 from astrocyte.integrations.managed_agents import create_memory_server 

20 

21 brain = Astrocyte.from_config("astrocyte.yaml") 

22 server = create_memory_server(brain, session_id="sess-abc123") 

23 

24 options = ClaudeAgentOptions( 

25 mcp_servers={"memory": server}, 

26 allowed_tools=["mcp__memory__*"], 

27 ) 

28 

29Usage — multi-agent: 

30 from astrocyte.integrations.managed_agents import ( 

31 create_coordinator_server, 

32 create_subagent_definition, 

33 ) 

34 

35 coord_server = create_coordinator_server(brain, session_id="sess-abc") 

36 researcher = create_subagent_definition( 

37 brain, role="researcher", session_id="sess-abc", 

38 description="Research agent that explores files and web sources.", 

39 prompt="You are a research specialist...", 

40 tools=["Read", "Grep", "Glob", "WebSearch", "WebFetch"], 

41 ) 

42 

43 options = ClaudeAgentOptions( 

44 mcp_servers={"memory": coord_server}, 

45 allowed_tools=["mcp__memory__*", "Agent"], 

46 agents={"researcher": researcher}, 

47 ) 

48""" 

49 

50from __future__ import annotations 

51 

52import json 

53from typing import TYPE_CHECKING, Any 

54 

55from astrocyte.types import AstrocyteContext 

56 

57if TYPE_CHECKING: 

58 from astrocyte._astrocyte import Astrocyte 

59 

60 

61# --------------------------------------------------------------------------- 

62# Bank naming conventions 

63# --------------------------------------------------------------------------- 

64 

65 

66def session_bank_id(session_id: str) -> str: 

67 """Derive a bank_id from an SDK session ID.""" 

68 return f"session-{session_id}" 

69 

70 

71def agent_bank_id(session_id: str, role: str) -> str: 

72 """Derive a per-agent bank_id within a session.""" 

73 return f"session-{session_id}:agent-{role}" 

74 

75 

76def coordinator_bank_id(session_id: str) -> str: 

77 """The shared coordinator bank for a multi-agent session.""" 

78 return f"session-{session_id}:coordinator" 

79 

80 

81# --------------------------------------------------------------------------- 

82# Shared helpers — parse tags 

83# --------------------------------------------------------------------------- 

84 

85 

86def _parse_tags(raw: Any) -> list[str] | None: 

87 """Parse comma-separated tag string into a list.""" 

88 if isinstance(raw, str) and raw: 

89 return [t.strip() for t in raw.split(",")] 

90 return None 

91 

92 

93def _format_sdk_response(data: Any) -> dict[str, Any]: 

94 """Wrap data in SDK MCP response format.""" 

95 text = data if isinstance(data, str) else json.dumps(data) 

96 return {"content": [{"type": "text", "text": text}]} 

97 

98 

99# --------------------------------------------------------------------------- 

100# Handler logic — testable without SDK dependency 

101# --------------------------------------------------------------------------- 

102 

103 

104async def _handle_retain( 

105 brain: Astrocyte, 

106 bank_id: str, 

107 args: dict[str, Any], 

108 *, 

109 context: AstrocyteContext | None = None, 

110) -> dict[str, Any]: 

111 """Retain handler: store content into a specific bank.""" 

112 tag_list = _parse_tags(args.get("tags")) 

113 result = await brain.retain(args["content"], bank_id=bank_id, tags=tag_list, context=context) 

114 return _format_sdk_response({"stored": result.stored, "memory_id": result.memory_id}) 

115 

116 

117async def _handle_coordinator_recall( 

118 brain: Astrocyte, 

119 session_id: str, 

120 coord_bank: str, 

121 args: dict[str, Any], 

122 *, 

123 context: AstrocyteContext | None = None, 

124) -> dict[str, Any]: 

125 """Coordinator recall: search coordinator bank + optional sub-agent banks.""" 

126 query_text = args["query"] 

127 max_results = args.get("max_results", 10) 

128 include_agents = args.get("include_agents", "") 

129 

130 banks = [coord_bank] 

131 if include_agents: 

132 for role in include_agents.split(","): 

133 role = role.strip() 

134 if role: 

135 banks.append(agent_bank_id(session_id, role)) 

136 

137 result = await brain.recall( 

138 query_text, 

139 banks=banks, 

140 strategy="parallel", 

141 max_results=max_results, 

142 context=context, 

143 ) 

144 hits = [{"text": h.text, "score": round(h.score, 4), "bank_id": h.bank_id} for h in result.hits] 

145 return _format_sdk_response({"hits": hits, "total": result.total_available}) 

146 

147 

148async def _handle_coordinator_reflect( 

149 brain: Astrocyte, 

150 session_id: str, 

151 coord_bank: str, 

152 args: dict[str, Any], 

153 *, 

154 context: AstrocyteContext | None = None, 

155) -> dict[str, Any]: 

156 """Coordinator reflect: synthesize from coordinator + optional sub-agent banks.""" 

157 query_text = args["query"] 

158 include_agents = args.get("include_agents", "") 

159 

160 banks = [coord_bank] 

161 if include_agents: 

162 for role in include_agents.split(","): 

163 role = role.strip() 

164 if role: 

165 banks.append(agent_bank_id(session_id, role)) 

166 

167 result = await brain.reflect(query_text, banks=banks, strategy="parallel", context=context) 

168 return _format_sdk_response(result.answer) 

169 

170 

171async def _handle_subagent_recall( 

172 brain: Astrocyte, 

173 own_bank: str, 

174 coord_bank: str, 

175 args: dict[str, Any], 

176 *, 

177 context: AstrocyteContext | None = None, 

178) -> dict[str, Any]: 

179 """Sub-agent recall: search own bank + coordinator bank.""" 

180 query_text = args["query"] 

181 max_results = args.get("max_results", 10) 

182 

183 result = await brain.recall( 

184 query_text, 

185 banks=[own_bank, coord_bank], 

186 strategy="parallel", 

187 max_results=max_results, 

188 context=context, 

189 ) 

190 hits = [{"text": h.text, "score": round(h.score, 4), "bank_id": h.bank_id} for h in result.hits] 

191 return _format_sdk_response({"hits": hits, "total": result.total_available}) 

192 

193 

194# --------------------------------------------------------------------------- 

195# Single-agent: session-scoped memory server 

196# --------------------------------------------------------------------------- 

197 

198 

199def create_memory_server( 

200 brain: Astrocyte, 

201 *, 

202 session_id: str, 

203 server_name: str = "astrocyte_memory", 

204 include_reflect: bool = True, 

205 include_forget: bool = False, 

206 context: AstrocyteContext | None = None, 

207) -> Any: 

208 """Create an in-process MCP server with session-scoped memory. 

209 

210 The bank_id is derived from the SDK session_id, so each session 

211 gets its own isolated memory namespace. 

212 

213 Requires ``claude_agent_sdk`` to be installed. 

214 Returns an MCP server for ``ClaudeAgentOptions.mcp_servers``. 

215 """ 

216 from astrocyte.integrations.claude_agent_sdk import astrocyte_claude_agent_server 

217 

218 bank = session_bank_id(session_id) 

219 return astrocyte_claude_agent_server( 

220 brain, 

221 bank_id=bank, 

222 server_name=server_name, 

223 include_reflect=include_reflect, 

224 include_forget=include_forget, 

225 context=context, 

226 ) 

227 

228 

229# --------------------------------------------------------------------------- 

230# Multi-agent: coordinator + sub-agent memory 

231# --------------------------------------------------------------------------- 

232 

233 

234def create_coordinator_server( 

235 brain: Astrocyte, 

236 *, 

237 session_id: str, 

238 server_name: str = "astrocyte_memory", 

239 include_reflect: bool = True, 

240 context: AstrocyteContext | None = None, 

241) -> Any: 

242 """Create an MCP server for the coordinator agent. 

243 

244 The coordinator writes to and reads from the shared coordinator bank. 

245 It can also read from any sub-agent bank via multi-bank recall. 

246 

247 Requires ``claude_agent_sdk`` to be installed. 

248 """ 

249 from claude_agent_sdk import create_sdk_mcp_server, tool 

250 

251 coord_bank = coordinator_bank_id(session_id) 

252 

253 sdk_tools = [] 

254 

255 @tool( 

256 "memory_retain", 

257 "Store a finding, decision, or coordination note into shared memory. All sub-agents can read this.", 

258 {"content": str, "tags": str}, 

259 ) 

260 async def memory_retain(args: dict[str, Any]) -> dict[str, Any]: 

261 return await _handle_retain(brain, coord_bank, args, context=context) 

262 

263 sdk_tools.append(memory_retain) 

264 

265 @tool( 

266 "memory_recall", 

267 "Search shared memory and optionally sub-agent memory banks. " 

268 "Pass agent roles as comma-separated 'include_agents' to also search their banks.", 

269 {"query": str, "max_results": int, "include_agents": str}, 

270 ) 

271 async def memory_recall(args: dict[str, Any]) -> dict[str, Any]: 

272 return await _handle_coordinator_recall(brain, session_id, coord_bank, args, context=context) 

273 

274 sdk_tools.append(memory_recall) 

275 

276 if include_reflect: 

277 

278 @tool( 

279 "memory_reflect", 

280 "Synthesize a comprehensive answer from shared memory and sub-agent banks.", 

281 {"query": str, "include_agents": str}, 

282 ) 

283 async def memory_reflect(args: dict[str, Any]) -> dict[str, Any]: 

284 return await _handle_coordinator_reflect(brain, session_id, coord_bank, args, context=context) 

285 

286 sdk_tools.append(memory_reflect) 

287 

288 return create_sdk_mcp_server( 

289 name=server_name, 

290 version="1.0.0", 

291 tools=sdk_tools, 

292 ) 

293 

294 

295def create_subagent_memory_server( 

296 brain: Astrocyte, 

297 *, 

298 session_id: str, 

299 role: str, 

300 server_name: str | None = None, 

301 context: AstrocyteContext | None = None, 

302) -> Any: 

303 """Create an MCP server for a sub-agent's private memory bank. 

304 

305 The sub-agent writes to its own bank (``session-{session_id}:agent-{role}``) 

306 and can read from both its own bank and the shared coordinator bank. 

307 

308 Requires ``claude_agent_sdk`` to be installed. 

309 """ 

310 from claude_agent_sdk import create_sdk_mcp_server, tool 

311 

312 own_bank = agent_bank_id(session_id, role) 

313 coord_bank = coordinator_bank_id(session_id) 

314 name = server_name or f"astrocyte_memory_{role}" 

315 

316 sdk_tools = [] 

317 

318 @tool( 

319 "memory_retain", 

320 f"Store a finding into the {role} agent's private memory bank.", 

321 {"content": str, "tags": str}, 

322 ) 

323 async def memory_retain(args: dict[str, Any]) -> dict[str, Any]: 

324 return await _handle_retain(brain, own_bank, args, context=context) 

325 

326 sdk_tools.append(memory_retain) 

327 

328 @tool( 

329 "memory_recall", 

330 f"Search the {role} agent's memory and the shared coordinator memory.", 

331 {"query": str, "max_results": int}, 

332 ) 

333 async def memory_recall(args: dict[str, Any]) -> dict[str, Any]: 

334 return await _handle_subagent_recall(brain, own_bank, coord_bank, args, context=context) 

335 

336 sdk_tools.append(memory_recall) 

337 

338 return create_sdk_mcp_server( 

339 name=name, 

340 version="1.0.0", 

341 tools=sdk_tools, 

342 ) 

343 

344 

345def create_subagent_definition( 

346 brain: Astrocyte, 

347 *, 

348 role: str, 

349 session_id: str, 

350 description: str, 

351 prompt: str, 

352 tools: list[str] | None = None, 

353 model: str | None = None, 

354 context: AstrocyteContext | None = None, 

355) -> dict[str, Any]: 

356 """Create an AgentDefinition dict for a sub-agent with its own memory bank. 

357 

358 Returns a dict compatible with ``ClaudeAgentOptions.agents``. 

359 The sub-agent gets an MCP server that writes to its own bank 

360 and reads from both its bank and the coordinator bank. 

361 """ 

362 memory_server = create_subagent_memory_server(brain, session_id=session_id, role=role, context=context) 

363 server_name = f"astrocyte_memory_{role}" 

364 

365 definition: dict[str, Any] = { 

366 "description": description, 

367 "prompt": prompt, 

368 "mcpServers": [memory_server], 

369 } 

370 

371 agent_tools = list(tools or []) 

372 agent_tools.append(f"mcp__{server_name}__*") 

373 definition["tools"] = agent_tools 

374 

375 if model: 

376 definition["model"] = model 

377 

378 return definition