Coverage for astrocyte/integrations/claude_managed_agents.py: 41%

116 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""Claude Managed Agents integration — Astrocyte memory as custom tools. 

2 

3Claude Managed Agents (https://platform.claude.com/docs/en/managed-agents/overview) 

4is Anthropic's cloud-hosted agent platform. It uses the standard ``anthropic`` 

5SDK with the ``managed-agents-2026-04-01`` beta header. 

6 

7This module provides helpers for wiring Astrocyte memory operations as 

8custom tools on a Managed Agent. Custom tools are defined at agent creation 

9and executed by YOUR application via the SSE event loop — the agent emits 

10``agent.custom_tool_use`` events, you call Astrocyte, and send back 

11``user.custom_tool_result`` events. 

12 

13Integration pattern: 

14 1. Call ``memory_tool_definitions()`` to get tool dicts for agent creation. 

15 2. Create the agent with these tools via ``client.beta.agents.create(tools=...)``. 

16 3. In your event loop, call ``handle_memory_tool()`` when you see an 

17 ``agent.custom_tool_use`` event for a memory tool. 

18 4. Send the result back as a ``user.custom_tool_result`` event. 

19 

20Usage — agent creation: 

21 from anthropic import Anthropic 

22 from astrocyte import Astrocyte 

23 from astrocyte.integrations.claude_managed_agents import ( 

24 memory_tool_definitions, 

25 handle_memory_tool, 

26 is_memory_tool, 

27 run_session_with_memory, 

28 ) 

29 

30 brain = Astrocyte.from_config("astrocyte.yaml") 

31 client = Anthropic() 

32 

33 agent = client.beta.agents.create( 

34 name="Assistant with memory", 

35 model="claude-sonnet-4-6", 

36 system="You are a helpful assistant with long-term memory.", 

37 tools=[ 

38 {"type": "agent_toolset_20260401"}, 

39 *memory_tool_definitions(), 

40 ], 

41 ) 

42 

43Usage — event loop (manual): 

44 with client.beta.sessions.events.stream(session.id) as stream: 

45 events_by_id = {} 

46 for event in stream: 

47 if event.type == "agent.custom_tool_use": 

48 events_by_id[event.id] = event 

49 

50 elif event.type == "session.status_idle" and (stop := event.stop_reason): 

51 if stop.type == "requires_action": 

52 for event_id in stop.event_ids: 

53 tool_event = events_by_id[event_id] 

54 if is_memory_tool(tool_event.name): 

55 result = await handle_memory_tool( 

56 brain, tool_event.name, tool_event.input, 

57 bank_id="user-123", 

58 ) 

59 client.beta.sessions.events.send( 

60 session.id, 

61 events=[{ 

62 "type": "user.custom_tool_result", 

63 "custom_tool_use_id": event_id, 

64 "content": [{"type": "text", "text": result}], 

65 }], 

66 ) 

67 elif stop.type == "end_turn": 

68 break 

69 

70Usage — high-level helper: 

71 result = await run_session_with_memory( 

72 client, brain, 

73 session_id=session.id, 

74 prompt="What do you remember about my preferences?", 

75 bank_id="user-123", 

76 ) 

77 print(result) 

78 

79 # When deleting the session, send user.interrupt first (see delete_managed_session). 

80 

81``memory_reflect`` uses :meth:`astrocyte._astrocyte.Astrocyte.reflect`; when YAML sets 

82``recall_authority.enabled`` and ``apply_to_reflect: true``, synthesis includes the 

83``<authority_context>`` block automatically (same behavior as direct ``brain.reflect``). 

84""" 

85 

86from __future__ import annotations 

87 

88import json 

89import logging 

90from typing import TYPE_CHECKING, Any 

91 

92from astrocyte.types import AstrocyteContext 

93 

94if TYPE_CHECKING: 

95 from astrocyte._astrocyte import Astrocyte 

96 

97logger = logging.getLogger(__name__) 

98 

99# --------------------------------------------------------------------------- 

100# Tool names — canonical set 

101# --------------------------------------------------------------------------- 

102 

103MEMORY_RETAIN = "memory_retain" 

104MEMORY_RECALL = "memory_recall" 

105MEMORY_REFLECT = "memory_reflect" 

106MEMORY_FORGET = "memory_forget" 

107 

108_ALL_MEMORY_TOOLS = {MEMORY_RETAIN, MEMORY_RECALL, MEMORY_REFLECT, MEMORY_FORGET} 

109 

110 

111def is_memory_tool(name: str) -> bool: 

112 """Check whether a tool name is an Astrocyte memory tool.""" 

113 return name in _ALL_MEMORY_TOOLS 

114 

115 

116# --------------------------------------------------------------------------- 

117# Tool definitions for agent creation 

118# --------------------------------------------------------------------------- 

119 

120 

121def memory_tool_definitions( 

122 *, 

123 include_reflect: bool = True, 

124 include_forget: bool = False, 

125) -> list[dict[str, Any]]: 

126 """Return custom tool definitions for ``client.beta.agents.create(tools=...)``. 

127 

128 Each dict has ``type: "custom"`` plus ``name``, ``description``, and 

129 ``input_schema`` — the format expected by the Managed Agents API. 

130 

131 Args: 

132 include_reflect: Include the ``memory_reflect`` tool (default True). 

133 include_forget: Include the ``memory_forget`` tool (default False). 

134 

135 Returns: 

136 List of tool definition dicts ready to spread into the agent's tool list. 

137 """ 

138 tools: list[dict[str, Any]] = [] 

139 

140 tools.append( 

141 { 

142 "type": "custom", 

143 "name": MEMORY_RETAIN, 

144 "description": ( 

145 "Store content into long-term memory for future recall. " 

146 "Use this to remember facts, preferences, decisions, or any information " 

147 "that should persist across conversations. " 

148 "Optionally pass comma-separated tags for categorization." 

149 ), 

150 "input_schema": { 

151 "type": "object", 

152 "properties": { 

153 "content": { 

154 "type": "string", 

155 "description": "The text content to store in memory.", 

156 }, 

157 "tags": { 

158 "type": "string", 

159 "description": "Optional comma-separated tags (e.g. 'preference,important').", 

160 }, 

161 }, 

162 "required": ["content"], 

163 }, 

164 } 

165 ) 

166 

167 tools.append( 

168 { 

169 "type": "custom", 

170 "name": MEMORY_RECALL, 

171 "description": ( 

172 "Search long-term memory for information relevant to a query. Returns scored hits ranked by relevance." 

173 ), 

174 "input_schema": { 

175 "type": "object", 

176 "properties": { 

177 "query": { 

178 "type": "string", 

179 "description": "The search query to find relevant memories.", 

180 }, 

181 "max_results": { 

182 "type": "integer", 

183 "description": "Maximum number of results to return (default 5).", 

184 }, 

185 }, 

186 "required": ["query"], 

187 }, 

188 } 

189 ) 

190 

191 if include_reflect: 

192 tools.append( 

193 { 

194 "type": "custom", 

195 "name": MEMORY_REFLECT, 

196 "description": ( 

197 "Synthesize a comprehensive answer from long-term memory. " 

198 "Use this instead of recall when you need a narrative answer " 

199 "rather than raw search hits." 

200 ), 

201 "input_schema": { 

202 "type": "object", 

203 "properties": { 

204 "query": { 

205 "type": "string", 

206 "description": "The question to answer from memory.", 

207 }, 

208 }, 

209 "required": ["query"], 

210 }, 

211 } 

212 ) 

213 

214 if include_forget: 

215 tools.append( 

216 { 

217 "type": "custom", 

218 "name": MEMORY_FORGET, 

219 "description": ( 

220 "Remove specific memories by their IDs. Pass a comma-separated list of memory IDs to delete." 

221 ), 

222 "input_schema": { 

223 "type": "object", 

224 "properties": { 

225 "memory_ids": { 

226 "type": "string", 

227 "description": "Comma-separated memory IDs to delete.", 

228 }, 

229 }, 

230 "required": ["memory_ids"], 

231 }, 

232 } 

233 ) 

234 

235 return tools 

236 

237 

238# --------------------------------------------------------------------------- 

239# Tag parsing helper 

240# --------------------------------------------------------------------------- 

241 

242 

243def _parse_tags(raw: Any) -> list[str] | None: 

244 """Parse comma-separated tag string into a list.""" 

245 if isinstance(raw, str) and raw.strip(): 

246 return [t.strip() for t in raw.split(",") if t.strip()] 

247 return None 

248 

249 

250# --------------------------------------------------------------------------- 

251# Tool handler — called from your event loop 

252# --------------------------------------------------------------------------- 

253 

254 

255async def handle_memory_tool( 

256 brain: Astrocyte, 

257 tool_name: str, 

258 tool_input: dict[str, Any], 

259 *, 

260 bank_id: str, 

261 context: AstrocyteContext | None = None, 

262) -> str: 

263 """Execute an Astrocyte memory tool and return the result as a string. 

264 

265 Call this from your SSE event loop when you receive an 

266 ``agent.custom_tool_use`` event for a memory tool. 

267 

268 Args: 

269 brain: The Astrocyte instance. 

270 tool_name: The tool name from the event (e.g. ``"memory_retain"``). 

271 tool_input: The tool input dict from the event. 

272 bank_id: The memory bank to operate on. 

273 

274 Returns: 

275 A JSON string (or plain text for reflect) to send back as 

276 ``user.custom_tool_result`` content. 

277 

278 Raises: 

279 ValueError: If ``tool_name`` is not a recognized memory tool. 

280 """ 

281 if tool_name == MEMORY_RETAIN: 

282 tag_list = _parse_tags(tool_input.get("tags")) 

283 result = await brain.retain(tool_input["content"], bank_id=bank_id, tags=tag_list, context=context) 

284 return json.dumps( 

285 { 

286 "stored": result.stored, 

287 "memory_id": result.memory_id, 

288 } 

289 ) 

290 

291 elif tool_name == MEMORY_RECALL: 

292 max_results = tool_input.get("max_results", 5) 

293 result = await brain.recall(tool_input["query"], bank_id=bank_id, max_results=max_results, context=context) 

294 hits = [{"text": h.text, "score": round(h.score, 4)} for h in result.hits] 

295 return json.dumps({"hits": hits, "total": result.total_available}) 

296 

297 elif tool_name == MEMORY_REFLECT: 

298 result = await brain.reflect(tool_input["query"], bank_id=bank_id, context=context) 

299 return result.answer 

300 

301 elif tool_name == MEMORY_FORGET: 

302 ids_raw = tool_input["memory_ids"] 

303 memory_ids = [mid.strip() for mid in ids_raw.split(",") if mid.strip()] 

304 result = await brain.forget(bank_id, memory_ids=memory_ids, context=context) 

305 return json.dumps({"deleted_count": result.deleted_count}) 

306 

307 else: 

308 raise ValueError(f"Unknown memory tool: {tool_name}") 

309 

310 

311# --------------------------------------------------------------------------- 

312# High-level session runner 

313# --------------------------------------------------------------------------- 

314 

315 

316def _format_managed_session_error(event: Any) -> str: 

317 """Best-effort message for ``session.error`` events (SDK shapes vary).""" 

318 err = getattr(event, "error", None) 

319 if err is None: 

320 return repr(event) 

321 if isinstance(err, str): 

322 return err 

323 model_dump = getattr(err, "model_dump", None) 

324 if callable(model_dump): 

325 payload = None 

326 try: 

327 payload = model_dump(mode="json") 

328 except Exception: 

329 try: 

330 payload = model_dump() 

331 except Exception: 

332 logger.debug("managed session error: model_dump failed", exc_info=True) 

333 if payload is not None: 

334 try: 

335 return json.dumps(payload, default=str) 

336 except Exception: 

337 logger.debug("managed session error: json.dumps(payload) failed", exc_info=True) 

338 parts: list[str] = [] 

339 for key in ("type", "message", "code", "detail"): 

340 val = getattr(err, key, None) 

341 if val is not None: 

342 parts.append(f"{key}={val!r}") 

343 return ", ".join(parts) if parts else repr(err) 

344 

345 

346async def run_session_with_memory( 

347 client: Any, 

348 brain: Astrocyte, 

349 *, 

350 session_id: str, 

351 prompt: str, 

352 bank_id: str, 

353 context: AstrocyteContext | None = None, 

354 non_memory_tool_handler: Any | None = None, 

355 timeout_seconds: float = 120, 

356) -> str: 

357 """Run a Managed Agents session, handling Astrocyte memory tool calls. 

358 

359 Opens an SSE stream, sends the prompt, and processes events until 

360 the agent finishes (``end_turn``). Memory tool calls are handled 

361 automatically; non-memory custom tool calls are delegated to 

362 ``non_memory_tool_handler`` if provided. 

363 

364 Args: 

365 client: An ``anthropic.Anthropic`` client instance. 

366 brain: The Astrocyte instance. 

367 session_id: The session ID to interact with. 

368 prompt: The user message to send. 

369 bank_id: The memory bank for all memory operations. 

370 non_memory_tool_handler: Optional async callable 

371 ``(name: str, input: dict) -> str`` for non-memory custom tools. 

372 timeout_seconds: Maximum time to wait for completion. 

373 

374 Returns: 

375 The concatenated agent message text from the session. 

376 """ 

377 import asyncio 

378 

379 agent_text_parts: list[str] = [] 

380 events_by_id: dict[str, Any] = {} 

381 

382 async def _run() -> str: 

383 with client.beta.sessions.events.stream(session_id) as stream: 

384 # Send the user message after the stream opens 

385 client.beta.sessions.events.send( 

386 session_id, 

387 events=[ 

388 { 

389 "type": "user.message", 

390 "content": [{"type": "text", "text": prompt}], 

391 }, 

392 ], 

393 ) 

394 

395 for event in stream: 

396 if event.type == "agent.message": 

397 for block in event.content: 

398 if hasattr(block, "text"): 

399 agent_text_parts.append(block.text) 

400 

401 elif event.type == "agent.custom_tool_use": 

402 events_by_id[event.id] = event 

403 

404 elif event.type == "session.error": 

405 raise RuntimeError(f"Managed Agents session error: {_format_managed_session_error(event)}") 

406 

407 elif event.type == "session.status_idle": 

408 stop = getattr(event, "stop_reason", None) 

409 if stop is None: 

410 continue 

411 

412 if stop.type == "requires_action": 

413 for event_id in stop.event_ids: 

414 tool_event = events_by_id.get(event_id) 

415 if tool_event is None: 

416 continue 

417 

418 if is_memory_tool(tool_event.name): 

419 result_text = await handle_memory_tool( 

420 brain, 

421 tool_event.name, 

422 tool_event.input, 

423 bank_id=bank_id, 

424 context=context, 

425 ) 

426 elif non_memory_tool_handler is not None: 

427 result_text = await non_memory_tool_handler(tool_event.name, tool_event.input) 

428 else: 

429 result_text = json.dumps({"error": f"No handler for tool: {tool_event.name}"}) 

430 

431 client.beta.sessions.events.send( 

432 session_id, 

433 events=[ 

434 { 

435 "type": "user.custom_tool_result", 

436 "custom_tool_use_id": event_id, 

437 "content": [{"type": "text", "text": result_text}], 

438 }, 

439 ], 

440 ) 

441 

442 elif stop.type == "end_turn": 

443 break 

444 

445 elif event.type == "session.status_terminated": 

446 error_msg = getattr(event, "error", None) 

447 raise RuntimeError(f"Session terminated: {error_msg}") 

448 

449 return "".join(agent_text_parts) 

450 

451 return await asyncio.wait_for(_run(), timeout=timeout_seconds) 

452 

453 

454def delete_managed_session(client: Any, session_id: str) -> None: 

455 """Delete a Managed Agents session. 

456 

457 The API returns 400 if the session is still considered **running** (for 

458 example immediately after closing the per-turn SSE stream). In that case 

459 the error says to send a ``user.interrupt`` event or wait for completion. 

460 We send ``user.interrupt`` first (errors ignored if already idle), then 

461 ``DELETE`` the session. 

462 """ 

463 try: 

464 client.beta.sessions.events.send( 

465 session_id, 

466 events=[{"type": "user.interrupt"}], 

467 ) 

468 except Exception: 

469 logger.debug("user.interrupt before session delete failed (ignored)", exc_info=True) 

470 client.beta.sessions.delete(session_id)