Coverage for astrocyte/integrations/claude_managed_agents.py: 41%
116 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""Claude Managed Agents integration — Astrocyte memory as custom tools.
3Claude Managed Agents (https://platform.claude.com/docs/en/managed-agents/overview)
4is Anthropic's cloud-hosted agent platform. It uses the standard ``anthropic``
5SDK with the ``managed-agents-2026-04-01`` beta header.
7This module provides helpers for wiring Astrocyte memory operations as
8custom tools on a Managed Agent. Custom tools are defined at agent creation
9and executed by YOUR application via the SSE event loop — the agent emits
10``agent.custom_tool_use`` events, you call Astrocyte, and send back
11``user.custom_tool_result`` events.
13Integration pattern:
14 1. Call ``memory_tool_definitions()`` to get tool dicts for agent creation.
15 2. Create the agent with these tools via ``client.beta.agents.create(tools=...)``.
16 3. In your event loop, call ``handle_memory_tool()`` when you see an
17 ``agent.custom_tool_use`` event for a memory tool.
18 4. Send the result back as a ``user.custom_tool_result`` event.
20Usage — agent creation:
21 from anthropic import Anthropic
22 from astrocyte import Astrocyte
23 from astrocyte.integrations.claude_managed_agents import (
24 memory_tool_definitions,
25 handle_memory_tool,
26 is_memory_tool,
27 run_session_with_memory,
28 )
30 brain = Astrocyte.from_config("astrocyte.yaml")
31 client = Anthropic()
33 agent = client.beta.agents.create(
34 name="Assistant with memory",
35 model="claude-sonnet-4-6",
36 system="You are a helpful assistant with long-term memory.",
37 tools=[
38 {"type": "agent_toolset_20260401"},
39 *memory_tool_definitions(),
40 ],
41 )
43Usage — event loop (manual):
44 with client.beta.sessions.events.stream(session.id) as stream:
45 events_by_id = {}
46 for event in stream:
47 if event.type == "agent.custom_tool_use":
48 events_by_id[event.id] = event
50 elif event.type == "session.status_idle" and (stop := event.stop_reason):
51 if stop.type == "requires_action":
52 for event_id in stop.event_ids:
53 tool_event = events_by_id[event_id]
54 if is_memory_tool(tool_event.name):
55 result = await handle_memory_tool(
56 brain, tool_event.name, tool_event.input,
57 bank_id="user-123",
58 )
59 client.beta.sessions.events.send(
60 session.id,
61 events=[{
62 "type": "user.custom_tool_result",
63 "custom_tool_use_id": event_id,
64 "content": [{"type": "text", "text": result}],
65 }],
66 )
67 elif stop.type == "end_turn":
68 break
70Usage — high-level helper:
71 result = await run_session_with_memory(
72 client, brain,
73 session_id=session.id,
74 prompt="What do you remember about my preferences?",
75 bank_id="user-123",
76 )
77 print(result)
79 # When deleting the session, send user.interrupt first (see delete_managed_session).
81``memory_reflect`` uses :meth:`astrocyte._astrocyte.Astrocyte.reflect`; when YAML sets
82``recall_authority.enabled`` and ``apply_to_reflect: true``, synthesis includes the
83``<authority_context>`` block automatically (same behavior as direct ``brain.reflect``).
84"""
86from __future__ import annotations
88import json
89import logging
90from typing import TYPE_CHECKING, Any
92from astrocyte.types import AstrocyteContext
94if TYPE_CHECKING:
95 from astrocyte._astrocyte import Astrocyte
97logger = logging.getLogger(__name__)
99# ---------------------------------------------------------------------------
100# Tool names — canonical set
101# ---------------------------------------------------------------------------
103MEMORY_RETAIN = "memory_retain"
104MEMORY_RECALL = "memory_recall"
105MEMORY_REFLECT = "memory_reflect"
106MEMORY_FORGET = "memory_forget"
108_ALL_MEMORY_TOOLS = {MEMORY_RETAIN, MEMORY_RECALL, MEMORY_REFLECT, MEMORY_FORGET}
111def is_memory_tool(name: str) -> bool:
112 """Check whether a tool name is an Astrocyte memory tool."""
113 return name in _ALL_MEMORY_TOOLS
116# ---------------------------------------------------------------------------
117# Tool definitions for agent creation
118# ---------------------------------------------------------------------------
121def memory_tool_definitions(
122 *,
123 include_reflect: bool = True,
124 include_forget: bool = False,
125) -> list[dict[str, Any]]:
126 """Return custom tool definitions for ``client.beta.agents.create(tools=...)``.
128 Each dict has ``type: "custom"`` plus ``name``, ``description``, and
129 ``input_schema`` — the format expected by the Managed Agents API.
131 Args:
132 include_reflect: Include the ``memory_reflect`` tool (default True).
133 include_forget: Include the ``memory_forget`` tool (default False).
135 Returns:
136 List of tool definition dicts ready to spread into the agent's tool list.
137 """
138 tools: list[dict[str, Any]] = []
140 tools.append(
141 {
142 "type": "custom",
143 "name": MEMORY_RETAIN,
144 "description": (
145 "Store content into long-term memory for future recall. "
146 "Use this to remember facts, preferences, decisions, or any information "
147 "that should persist across conversations. "
148 "Optionally pass comma-separated tags for categorization."
149 ),
150 "input_schema": {
151 "type": "object",
152 "properties": {
153 "content": {
154 "type": "string",
155 "description": "The text content to store in memory.",
156 },
157 "tags": {
158 "type": "string",
159 "description": "Optional comma-separated tags (e.g. 'preference,important').",
160 },
161 },
162 "required": ["content"],
163 },
164 }
165 )
167 tools.append(
168 {
169 "type": "custom",
170 "name": MEMORY_RECALL,
171 "description": (
172 "Search long-term memory for information relevant to a query. Returns scored hits ranked by relevance."
173 ),
174 "input_schema": {
175 "type": "object",
176 "properties": {
177 "query": {
178 "type": "string",
179 "description": "The search query to find relevant memories.",
180 },
181 "max_results": {
182 "type": "integer",
183 "description": "Maximum number of results to return (default 5).",
184 },
185 },
186 "required": ["query"],
187 },
188 }
189 )
191 if include_reflect:
192 tools.append(
193 {
194 "type": "custom",
195 "name": MEMORY_REFLECT,
196 "description": (
197 "Synthesize a comprehensive answer from long-term memory. "
198 "Use this instead of recall when you need a narrative answer "
199 "rather than raw search hits."
200 ),
201 "input_schema": {
202 "type": "object",
203 "properties": {
204 "query": {
205 "type": "string",
206 "description": "The question to answer from memory.",
207 },
208 },
209 "required": ["query"],
210 },
211 }
212 )
214 if include_forget:
215 tools.append(
216 {
217 "type": "custom",
218 "name": MEMORY_FORGET,
219 "description": (
220 "Remove specific memories by their IDs. Pass a comma-separated list of memory IDs to delete."
221 ),
222 "input_schema": {
223 "type": "object",
224 "properties": {
225 "memory_ids": {
226 "type": "string",
227 "description": "Comma-separated memory IDs to delete.",
228 },
229 },
230 "required": ["memory_ids"],
231 },
232 }
233 )
235 return tools
238# ---------------------------------------------------------------------------
239# Tag parsing helper
240# ---------------------------------------------------------------------------
243def _parse_tags(raw: Any) -> list[str] | None:
244 """Parse comma-separated tag string into a list."""
245 if isinstance(raw, str) and raw.strip():
246 return [t.strip() for t in raw.split(",") if t.strip()]
247 return None
250# ---------------------------------------------------------------------------
251# Tool handler — called from your event loop
252# ---------------------------------------------------------------------------
255async def handle_memory_tool(
256 brain: Astrocyte,
257 tool_name: str,
258 tool_input: dict[str, Any],
259 *,
260 bank_id: str,
261 context: AstrocyteContext | None = None,
262) -> str:
263 """Execute an Astrocyte memory tool and return the result as a string.
265 Call this from your SSE event loop when you receive an
266 ``agent.custom_tool_use`` event for a memory tool.
268 Args:
269 brain: The Astrocyte instance.
270 tool_name: The tool name from the event (e.g. ``"memory_retain"``).
271 tool_input: The tool input dict from the event.
272 bank_id: The memory bank to operate on.
274 Returns:
275 A JSON string (or plain text for reflect) to send back as
276 ``user.custom_tool_result`` content.
278 Raises:
279 ValueError: If ``tool_name`` is not a recognized memory tool.
280 """
281 if tool_name == MEMORY_RETAIN:
282 tag_list = _parse_tags(tool_input.get("tags"))
283 result = await brain.retain(tool_input["content"], bank_id=bank_id, tags=tag_list, context=context)
284 return json.dumps(
285 {
286 "stored": result.stored,
287 "memory_id": result.memory_id,
288 }
289 )
291 elif tool_name == MEMORY_RECALL:
292 max_results = tool_input.get("max_results", 5)
293 result = await brain.recall(tool_input["query"], bank_id=bank_id, max_results=max_results, context=context)
294 hits = [{"text": h.text, "score": round(h.score, 4)} for h in result.hits]
295 return json.dumps({"hits": hits, "total": result.total_available})
297 elif tool_name == MEMORY_REFLECT:
298 result = await brain.reflect(tool_input["query"], bank_id=bank_id, context=context)
299 return result.answer
301 elif tool_name == MEMORY_FORGET:
302 ids_raw = tool_input["memory_ids"]
303 memory_ids = [mid.strip() for mid in ids_raw.split(",") if mid.strip()]
304 result = await brain.forget(bank_id, memory_ids=memory_ids, context=context)
305 return json.dumps({"deleted_count": result.deleted_count})
307 else:
308 raise ValueError(f"Unknown memory tool: {tool_name}")
311# ---------------------------------------------------------------------------
312# High-level session runner
313# ---------------------------------------------------------------------------
316def _format_managed_session_error(event: Any) -> str:
317 """Best-effort message for ``session.error`` events (SDK shapes vary)."""
318 err = getattr(event, "error", None)
319 if err is None:
320 return repr(event)
321 if isinstance(err, str):
322 return err
323 model_dump = getattr(err, "model_dump", None)
324 if callable(model_dump):
325 payload = None
326 try:
327 payload = model_dump(mode="json")
328 except Exception:
329 try:
330 payload = model_dump()
331 except Exception:
332 logger.debug("managed session error: model_dump failed", exc_info=True)
333 if payload is not None:
334 try:
335 return json.dumps(payload, default=str)
336 except Exception:
337 logger.debug("managed session error: json.dumps(payload) failed", exc_info=True)
338 parts: list[str] = []
339 for key in ("type", "message", "code", "detail"):
340 val = getattr(err, key, None)
341 if val is not None:
342 parts.append(f"{key}={val!r}")
343 return ", ".join(parts) if parts else repr(err)
346async def run_session_with_memory(
347 client: Any,
348 brain: Astrocyte,
349 *,
350 session_id: str,
351 prompt: str,
352 bank_id: str,
353 context: AstrocyteContext | None = None,
354 non_memory_tool_handler: Any | None = None,
355 timeout_seconds: float = 120,
356) -> str:
357 """Run a Managed Agents session, handling Astrocyte memory tool calls.
359 Opens an SSE stream, sends the prompt, and processes events until
360 the agent finishes (``end_turn``). Memory tool calls are handled
361 automatically; non-memory custom tool calls are delegated to
362 ``non_memory_tool_handler`` if provided.
364 Args:
365 client: An ``anthropic.Anthropic`` client instance.
366 brain: The Astrocyte instance.
367 session_id: The session ID to interact with.
368 prompt: The user message to send.
369 bank_id: The memory bank for all memory operations.
370 non_memory_tool_handler: Optional async callable
371 ``(name: str, input: dict) -> str`` for non-memory custom tools.
372 timeout_seconds: Maximum time to wait for completion.
374 Returns:
375 The concatenated agent message text from the session.
376 """
377 import asyncio
379 agent_text_parts: list[str] = []
380 events_by_id: dict[str, Any] = {}
382 async def _run() -> str:
383 with client.beta.sessions.events.stream(session_id) as stream:
384 # Send the user message after the stream opens
385 client.beta.sessions.events.send(
386 session_id,
387 events=[
388 {
389 "type": "user.message",
390 "content": [{"type": "text", "text": prompt}],
391 },
392 ],
393 )
395 for event in stream:
396 if event.type == "agent.message":
397 for block in event.content:
398 if hasattr(block, "text"):
399 agent_text_parts.append(block.text)
401 elif event.type == "agent.custom_tool_use":
402 events_by_id[event.id] = event
404 elif event.type == "session.error":
405 raise RuntimeError(f"Managed Agents session error: {_format_managed_session_error(event)}")
407 elif event.type == "session.status_idle":
408 stop = getattr(event, "stop_reason", None)
409 if stop is None:
410 continue
412 if stop.type == "requires_action":
413 for event_id in stop.event_ids:
414 tool_event = events_by_id.get(event_id)
415 if tool_event is None:
416 continue
418 if is_memory_tool(tool_event.name):
419 result_text = await handle_memory_tool(
420 brain,
421 tool_event.name,
422 tool_event.input,
423 bank_id=bank_id,
424 context=context,
425 )
426 elif non_memory_tool_handler is not None:
427 result_text = await non_memory_tool_handler(tool_event.name, tool_event.input)
428 else:
429 result_text = json.dumps({"error": f"No handler for tool: {tool_event.name}"})
431 client.beta.sessions.events.send(
432 session_id,
433 events=[
434 {
435 "type": "user.custom_tool_result",
436 "custom_tool_use_id": event_id,
437 "content": [{"type": "text", "text": result_text}],
438 },
439 ],
440 )
442 elif stop.type == "end_turn":
443 break
445 elif event.type == "session.status_terminated":
446 error_msg = getattr(event, "error", None)
447 raise RuntimeError(f"Session terminated: {error_msg}")
449 return "".join(agent_text_parts)
451 return await asyncio.wait_for(_run(), timeout=timeout_seconds)
454def delete_managed_session(client: Any, session_id: str) -> None:
455 """Delete a Managed Agents session.
457 The API returns 400 if the session is still considered **running** (for
458 example immediately after closing the per-turn SSE stream). In that case
459 the error says to send a ``user.interrupt`` event or wait for completion.
460 We send ``user.interrupt`` first (errors ignored if already idle), then
461 ``DELETE`` the session.
462 """
463 try:
464 client.beta.sessions.events.send(
465 session_id,
466 events=[{"type": "user.interrupt"}],
467 )
468 except Exception:
469 logger.debug("user.interrupt before session delete failed (ignored)", exc_info=True)
470 client.beta.sessions.delete(session_id)