Coverage for astrocyte/mcp.py: 51%
329 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""Astrocyte MCP server — exposes memory tools via Model Context Protocol.
3Usage:
4 astrocyte-mcp --config astrocyte.yaml
5 astrocyte-mcp --config astrocyte.yaml --transport sse --port 8080
7See docs/_design/mcp-server.md for the full specification.
9Note: Access control in MCP deployments typically uses one principal from
10``mcp.principal`` (defaults to ``agent:mcp``), or pass
11``astrocyte_context=...`` to :func:`create_mcp_server` when your host maps
12callers to identity. Per-caller identity still requires transport-level auth
13at the MCP host.
14"""
16from __future__ import annotations
18import argparse
19import json
20import logging
21from datetime import datetime
22from typing import Any
24from fastmcp import FastMCP
26from astrocyte._astrocyte import Astrocyte
27from astrocyte._mcp_identity import JwtIdentityMiddleware, build_jwt_middleware
28from astrocyte.config import AstrocyteConfig, access_grants_for_astrocyte, load_config
29from astrocyte.types import AstrocyteContext
31logger = logging.getLogger("astrocyte.mcp")
33# ---------------------------------------------------------------------------
34# Server factory
35# ---------------------------------------------------------------------------
38def create_mcp_server(
39 brain: Astrocyte,
40 config: AstrocyteConfig,
41 *,
42 astrocyte_context: AstrocyteContext | None = None,
43 jwt_middleware: JwtIdentityMiddleware | None = None,
44) -> FastMCP:
45 """Create a FastMCP server wired to an Astrocyte instance.
47 Identity resolution order (per call):
49 1. If ``astrocyte_context`` was passed at server-creation time, it is
50 used as-is (legacy embedding pattern). This takes precedence over
51 all header-based resolution so tests and embedded deployments that
52 pre-bind identity are unaffected.
53 2. Else if ``jwt_middleware`` is set OR
54 ``config.identity.jwt_middleware.enabled`` is true, per-call
55 identity is resolved from the inbound request's Authorization
56 header (identity spec §3 Gap 1 wiring). See
57 ``docs/_plugins/jwt-identity-middleware.md`` for operator setup.
58 3. Else the server falls back to the static ``mcp.principal`` label
59 (default ``agent:mcp``) — the pre-middleware behavior.
61 Args:
62 jwt_middleware: Optional pre-built middleware. When None but the
63 config enables JWT middleware, the server constructs one via
64 :func:`build_jwt_middleware`. Pass explicitly to inject a
65 test decoder or bypass PyJWT.
66 """
68 mcp_cfg = config.mcp
69 mcp = FastMCP(
70 name="astrocyte-memory",
71 instructions=(
72 "Astrocyte memory server. Use memory_retain to store information, "
73 "memory_recall to search memories, memory_reflect to synthesize answers, "
74 "and memory_forget to remove memories."
75 ),
76 )
78 # Static context — still built for the pre-middleware path (legacy
79 # deployments that don't enable JWT middleware). When middleware is
80 # enabled, this is only used if header resolution is somehow bypassed.
81 static_ctx = astrocyte_context or AstrocyteContext(
82 principal=mcp_cfg.principal or "agent:mcp",
83 )
85 # Build or accept JWT middleware. Explicit > config > none.
86 if jwt_middleware is None and config.identity.jwt_middleware.enabled:
87 jwt_middleware = build_jwt_middleware(config.identity.jwt_middleware)
89 def _resolve_context() -> AstrocyteContext:
90 """Resolve the AstrocyteContext for the current request.
92 Called on every tool invocation so the middleware sees each call's
93 headers. Static ``astrocyte_context`` (when set at server creation)
94 always wins — that preserves backward compatibility for embedded
95 hosts that pre-bind identity.
96 """
97 # An explicit pre-bound context always wins (embedded / test use).
98 if astrocyte_context is not None:
99 return astrocyte_context
100 # Middleware path — pull headers, resolve identity, build ctx.
101 if jwt_middleware is not None:
102 try:
103 from fastmcp.server.dependencies import get_http_headers
105 headers = get_http_headers(include={"authorization"})
106 except Exception:
107 # stdio transport or pre-request context — no headers.
108 # Middleware policy decides how to treat the missing
109 # header (fail-closed vs anonymous).
110 headers = {}
111 return jwt_middleware.resolve(headers)
112 # No middleware, no pre-bound ctx — fall through to static.
113 return static_ctx
115 # Default bank
116 default_bank = mcp_cfg.default_bank_id
118 _MAX_TAGS = 20
119 _MAX_TAG_LENGTH = 255
121 def _resolve_bank(bank_id: str | None) -> str:
122 if bank_id:
123 return bank_id
124 if default_bank:
125 return default_bank
126 raise ValueError("bank_id is required (no default_bank_id configured)")
128 def _validate_tags(tags: list[str] | None) -> list[str] | None:
129 if not tags:
130 return tags
131 if len(tags) > _MAX_TAGS:
132 raise ValueError(f"Too many tags ({len(tags)}), max {_MAX_TAGS}")
133 for tag in tags:
134 if not isinstance(tag, str) or len(tag) > _MAX_TAG_LENGTH:
135 raise ValueError(f"Invalid tag (must be string, max {_MAX_TAG_LENGTH} chars)")
136 return tags
138 # ── memory_retain ──────────────────────────────────────────────
140 @mcp.tool()
141 async def memory_retain(
142 content: str,
143 bank_id: str | None = None,
144 tags: list[str] | None = None,
145 metadata: dict[str, str] | None = None,
146 occurred_at: str | None = None,
147 ) -> str:
148 """Store content into memory.
150 Args:
151 content: The text to memorize.
152 bank_id: Memory bank to store in. Uses default if omitted.
153 tags: Optional tags for filtering.
154 metadata: Optional key-value metadata.
155 occurred_at: ISO 8601 timestamp of when the event happened.
156 """
157 try:
158 bid = _resolve_bank(bank_id)
159 tags = _validate_tags(tags)
160 kwargs: dict[str, Any] = {}
161 if occurred_at:
162 try:
163 kwargs["occurred_at"] = datetime.fromisoformat(occurred_at)
164 except ValueError:
165 return json.dumps({"stored": False, "error": "Invalid occurred_at format; expected ISO 8601"})
166 result = await brain.retain(
167 content,
168 bank_id=bid,
169 tags=tags,
170 metadata=metadata,
171 context=_resolve_context(),
172 **kwargs,
173 )
174 if result.stored:
175 return json.dumps({"stored": True, "memory_id": result.memory_id})
176 return json.dumps({"stored": False, "error": "Failed to store memory"})
177 except Exception as exc:
178 logger.exception("memory_retain failed")
179 return json.dumps({"stored": False, "error": type(exc).__name__})
181 # ── memory_recall ──────────────────────────────────────────────
183 @mcp.tool()
184 async def memory_recall(
185 query: str,
186 bank_id: str | None = None,
187 banks: list[str] | None = None,
188 strategy: str | None = None,
189 max_results: int = 10,
190 tags: list[str] | None = None,
191 session_id: str | None = None,
192 ) -> str:
193 """Retrieve relevant memories for a query — RAW retrieval, no synthesis.
195 Returns a ranked list of memory ``hits``. The caller is expected to
196 compose its own answer from these hits (this is the "retrieval
197 primitive" — you, the calling agent, are the answerer).
199 **Use `memory_recall` when**: you want to read the underlying
200 memories and synthesize them yourself, OR you need to show citations
201 / quotes verbatim, OR you need fine control over which memories
202 end up in your final response.
204 **Use `memory_reflect` instead when**: you want a finished answer
205 you can pass straight through to the user. Reflect does its own
206 synthesis via an agentic loop (multi-step retrieval + reasoning)
207 and returns one composed answer with cited sources. Cheaper for
208 you than calling `memory_recall` + a follow-up LLM round-trip.
210 Args:
211 query: Natural language query.
212 bank_id: Single memory bank to search. Uses default if omitted.
213 banks: Multiple banks to search (overrides bank_id).
214 strategy: Multi-bank strategy: "cascade", "parallel", or "first_match".
215 max_results: Maximum number of results.
216 tags: Optional tag filter.
217 session_id: M31 Fix 2 — opaque session identifier. When the
218 calling application knows which conversation session the
219 query targets, pass it to scope retrieval to that
220 session's facts/sections. ``None`` (default) preserves
221 cross-session retrieval.
222 """
223 try:
224 max_results = max(1, min(max_results, mcp_cfg.max_results_limit))
225 tags = _validate_tags(tags)
227 if banks:
228 result = await brain.recall(
229 query,
230 banks=banks,
231 strategy=strategy,
232 max_results=max_results,
233 tags=tags,
234 context=_resolve_context(),
235 session_id=session_id, # M31 Fix 2
236 )
237 else:
238 bid = _resolve_bank(bank_id)
239 result = await brain.recall(
240 query,
241 bank_id=bid,
242 max_results=max_results,
243 tags=tags,
244 context=_resolve_context(),
245 session_id=session_id, # M31 Fix 2
246 )
248 hits = [
249 {
250 "text": h.text,
251 "score": round(h.score, 4),
252 "fact_type": h.fact_type,
253 "bank_id": h.bank_id,
254 "memory_id": h.memory_id,
255 }
256 for h in result.hits
257 ]
258 return json.dumps(
259 {
260 "hits": hits,
261 "total_available": result.total_available,
262 "truncated": result.truncated,
263 }
264 )
265 except Exception as exc:
266 logger.exception("memory_recall failed")
267 return json.dumps({"hits": [], "total_available": 0, "error": type(exc).__name__})
269 # ── memory_reflect ─────────────────────────────────────────────
271 if mcp_cfg.expose_reflect:
273 @mcp.tool()
274 async def memory_reflect(
275 query: str,
276 bank_id: str | None = None,
277 banks: list[str] | None = None,
278 strategy: str | None = None,
279 max_tokens: int | None = None,
280 include_sources: bool = True,
281 ) -> str:
282 """Get a finished, cited answer to a question from memory.
284 Returns ``{"answer": <synthesized text>, "sources": [...]}``.
285 The ``answer`` IS the response — Astrocyte's internal agentic
286 loop ran multi-step retrieval + reasoning and composed it.
288 **How to use the `answer` field:**
289 - Quote it verbatim, OR paraphrase it into your own voice, OR
290 return it directly to the user.
291 - **Do NOT pass it back through another LLM for re-synthesis.**
292 That double-synthesis pattern loses citation fidelity, doubles
293 cost, and produces strictly worse answers than either pure
294 path. If you want raw memories to compose yourself, use
295 `memory_recall` instead.
296 - `sources` carries the cited memory chunks for traceability /
297 UI rendering — show them as citations next to the answer,
298 don't re-feed them into a synth prompt.
300 **Use `memory_reflect` when**: the user asked a question and you
301 want a complete, citation-grounded answer back. Especially good
302 for multi-hop questions ("how did X relate to Y?"), synthesis
303 questions ("summarize what we discussed about Z"), and any
304 question where the answer requires reasoning across several
305 memories.
307 **Use `memory_recall` instead when**: you want raw memories
308 (e.g. to show as a search result list) or you want to do your
309 own synthesis with strong opinions on prompt / format.
311 Args:
312 query: The question to answer from memory.
313 bank_id: Single memory bank. Uses default if omitted.
314 banks: Multiple banks (overrides bank_id).
315 strategy: Multi-bank strategy: "cascade", "parallel", or "first_match".
316 max_tokens: Maximum tokens for the synthesized answer.
317 include_sources: Whether to include source memories
318 (default True — needed for showing citations).
319 """
320 try:
321 if banks:
322 result = await brain.reflect(
323 query,
324 banks=banks,
325 strategy=strategy,
326 max_tokens=max_tokens,
327 context=_resolve_context(),
328 include_sources=include_sources,
329 )
330 else:
331 bid = _resolve_bank(bank_id)
332 result = await brain.reflect(
333 query,
334 bank_id=bid,
335 max_tokens=max_tokens,
336 context=_resolve_context(),
337 include_sources=include_sources,
338 )
340 out: dict[str, Any] = {"answer": result.answer}
341 if include_sources and result.sources:
342 out["sources"] = [
343 {"text": s.text, "score": round(s.score, 4), "bank_id": s.bank_id} for s in result.sources
344 ]
345 return json.dumps(out)
346 except Exception as exc:
347 logger.exception("memory_reflect failed")
348 return json.dumps({"answer": "", "error": type(exc).__name__})
350 # ── memory_forget ──────────────────────────────────────────────
352 if mcp_cfg.expose_forget:
354 @mcp.tool()
355 async def memory_forget(
356 bank_id: str | None = None,
357 memory_ids: list[str] | None = None,
358 tags: list[str] | None = None,
359 ) -> str:
360 """Remove memories from a bank.
362 Args:
363 bank_id: Memory bank. Uses default if omitted.
364 memory_ids: Specific memory IDs to delete.
365 tags: Delete memories matching these tags.
366 """
367 try:
368 bid = _resolve_bank(bank_id)
369 tags = _validate_tags(tags)
370 result = await brain.forget(
371 bid,
372 memory_ids=memory_ids,
373 tags=tags,
374 context=_resolve_context(),
375 )
376 return json.dumps(
377 {
378 "deleted_count": result.deleted_count,
379 "archived_count": result.archived_count,
380 }
381 )
382 except Exception as exc:
383 logger.exception("memory_forget failed")
384 return json.dumps({"deleted_count": 0, "error": type(exc).__name__})
386 # ── memory_history ─────────────────────────────────────────────
388 @mcp.tool()
389 async def memory_history(
390 query: str,
391 bank_id: str | None = None,
392 as_of: str | None = None,
393 max_results: int = 10,
394 tags: list[str] | None = None,
395 ) -> str:
396 """Reconstruct what the agent knew at a past point in time (M9 time travel).
398 Args:
399 query: Recall query to run against the historical snapshot.
400 bank_id: Bank to query. Uses default if omitted.
401 as_of: ISO 8601 UTC datetime — memories retained after this moment
402 are hidden (e.g. "2025-01-01T00:00:00Z").
403 max_results: Maximum hits to return (default 10).
404 tags: Optional tag filter applied on top of the time filter.
405 """
406 try:
407 bid = _resolve_bank(bank_id)
408 tags = _validate_tags(tags)
409 if not as_of:
410 return json.dumps({"hits": [], "error": "as_of (ISO 8601) is required"})
411 try:
412 as_of_dt = datetime.fromisoformat(as_of)
413 except ValueError:
414 return json.dumps({"hits": [], "error": "Invalid as_of format; expected ISO 8601"})
415 result = await brain.history(
416 query,
417 bid,
418 as_of_dt,
419 max_results=max(1, min(max_results, mcp_cfg.max_results_limit)),
420 tags=tags,
421 context=_resolve_context(),
422 )
423 hits = [
424 {
425 "text": h.text,
426 "score": round(h.score, 4),
427 "fact_type": h.fact_type,
428 "bank_id": h.bank_id,
429 "memory_id": h.memory_id,
430 }
431 for h in result.hits
432 ]
433 return json.dumps(
434 {
435 "hits": hits,
436 "total_available": result.total_available,
437 "truncated": result.truncated,
438 "as_of": result.as_of.isoformat(),
439 "bank_id": result.bank_id,
440 }
441 )
442 except Exception as exc:
443 logger.exception("memory_history failed")
444 return json.dumps({"hits": [], "error": type(exc).__name__})
446 # ── memory_audit ────────────────────────────────────────────────
448 @mcp.tool()
449 async def memory_audit(
450 scope: str,
451 bank_id: str | None = None,
452 max_memories: int = 50,
453 tags: list[str] | None = None,
454 ) -> str:
455 """Identify knowledge gaps for a topic in a memory bank (M10).
457 Use this to discover what the agent does not know about a subject
458 before relying on recall for important decisions.
460 Args:
461 scope: Natural-language topic to audit (e.g. "Alice's employment history").
462 bank_id: Bank to audit. Uses default if omitted.
463 max_memories: Memories to retrieve and scan (default 50).
464 tags: Optional tag filter to narrow the retrieved memories.
465 """
466 try:
467 bid = _resolve_bank(bank_id)
468 tags = _validate_tags(tags)
469 result = await brain.audit(
470 scope,
471 bid,
472 max_memories=max(1, min(max_memories, 200)),
473 tags=tags,
474 )
475 gaps = [{"topic": g.topic, "severity": g.severity, "reason": g.reason} for g in result.gaps]
476 return json.dumps(
477 {
478 "scope": result.scope,
479 "bank_id": result.bank_id,
480 "coverage_score": round(result.coverage_score, 3),
481 "memories_scanned": result.memories_scanned,
482 "gaps": gaps,
483 }
484 )
485 except Exception as exc:
486 logger.exception("memory_audit failed")
487 return json.dumps({"gaps": [], "coverage_score": 0.0, "error": type(exc).__name__})
489 # ── memory_compile ─────────────────────────────────────────────
491 @mcp.tool()
492 async def memory_compile(
493 bank_id: str | None = None,
494 scope: str | None = None,
495 ) -> str:
496 """Compile raw memories into structured wiki pages (M8).
498 Synthesises a wiki page for each detected topic scope using the LLM.
499 Call this periodically to distil accumulated memories into a curated
500 knowledge base that recall can surface ahead of raw fragments.
502 Args:
503 bank_id: Bank to compile. Uses default if omitted.
504 scope: Compile only memories tagged with this scope string.
505 Omit to trigger full scope discovery (tag grouping +
506 embedding cluster labelling across the whole bank).
507 """
508 try:
509 bid = _resolve_bank(bank_id)
510 result = await brain.compile(bid, scope=scope)
511 out: dict[str, Any] = {
512 "bank_id": result.bank_id,
513 "pages_created": result.pages_created,
514 "pages_updated": result.pages_updated,
515 "scopes_compiled": result.scopes_compiled,
516 "noise_memories": result.noise_memories,
517 "tokens_used": result.tokens_used,
518 "elapsed_ms": result.elapsed_ms,
519 }
520 if result.error:
521 out["error"] = result.error
522 return json.dumps(out)
523 except Exception as exc:
524 logger.exception("memory_compile failed")
525 return json.dumps({"pages_created": 0, "pages_updated": 0, "error": type(exc).__name__})
527 # ── memory_graph_search ────────────────────────────────────────
529 @mcp.tool()
530 async def memory_graph_search(
531 query: str,
532 bank_id: str | None = None,
533 limit: int = 10,
534 ) -> str:
535 """Search the knowledge graph for entities matching a name.
537 Returns matching entities with their IDs. Use the IDs with
538 memory_graph_neighbors to traverse connected memories.
540 Args:
541 query: Entity name or partial name to search for.
542 bank_id: Bank whose graph to search. Uses default if omitted.
543 limit: Maximum number of entities to return.
544 """
545 try:
546 bid = _resolve_bank(bank_id)
547 entities = await brain.graph_search(query, bid, limit=limit)
548 return json.dumps(
549 {
550 "entities": [
551 {
552 "id": e.id,
553 "name": e.name,
554 "entity_type": e.entity_type,
555 "aliases": e.aliases or [],
556 }
557 for e in entities
558 ]
559 }
560 )
561 except Exception as exc:
562 logger.exception("memory_graph_search failed")
563 return json.dumps({"entities": [], "error": type(exc).__name__})
565 # ── memory_graph_neighbors ─────────────────────────────────────
567 @mcp.tool()
568 async def memory_graph_neighbors(
569 entity_ids: list[str],
570 bank_id: str | None = None,
571 max_depth: int = 2,
572 limit: int = 20,
573 ) -> str:
574 """Traverse the knowledge graph from seed entity IDs.
576 Walks the entity graph up to max_depth hops from each seed entity
577 and returns memories attached to discovered entities, scored by
578 proximity. Use memory_graph_search first to resolve entity IDs.
580 Args:
581 entity_ids: Seed entity IDs to start traversal from.
582 bank_id: Bank whose graph to traverse. Uses default if omitted.
583 max_depth: Maximum traversal depth (default 2).
584 limit: Maximum number of memory hits to return.
585 """
586 try:
587 bid = _resolve_bank(bank_id)
588 hits = await brain.graph_neighbors(entity_ids, bid, max_depth=max_depth, limit=limit)
589 return json.dumps(
590 {
591 "hits": [
592 {
593 "memory_id": h.memory_id,
594 "text": h.text,
595 "connected_entities": h.connected_entities,
596 "depth": h.depth,
597 "score": round(h.score, 4),
598 }
599 for h in hits
600 ]
601 }
602 )
603 except Exception as exc:
604 logger.exception("memory_graph_neighbors failed")
605 return json.dumps({"hits": [], "error": type(exc).__name__})
607 # ── admin tools (expose_admin=true) ────────────────────────────
609 if mcp_cfg.expose_admin:
611 @mcp.tool()
612 async def memory_lifecycle(
613 bank_id: str | None = None,
614 ) -> str:
615 """Run TTL lifecycle sweep on a bank — archives and deletes expired memories.
617 Args:
618 bank_id: Bank to sweep. Uses default if omitted.
619 """
620 try:
621 bid = _resolve_bank(bank_id)
622 result = await brain.run_lifecycle(bid)
623 return json.dumps(
624 {
625 "bank_id": bid,
626 "archived_count": result.archived_count,
627 "deleted_count": result.deleted_count,
628 "skipped_count": result.skipped_count,
629 }
630 )
631 except Exception as exc:
632 logger.exception("memory_lifecycle failed")
633 return json.dumps({"archived_count": 0, "deleted_count": 0, "error": type(exc).__name__})
635 @mcp.tool()
636 async def memory_bank_health(
637 bank_id: str | None = None,
638 ) -> str:
639 """Get health score and issues for a memory bank.
641 Args:
642 bank_id: Bank to assess. Uses default if omitted. Pass "__all__"
643 to get health for every bank that has recorded operations.
644 """
645 try:
646 if bank_id == "__all__":
647 results = await brain.all_bank_health()
648 return json.dumps(
649 {
650 "banks": [
651 {
652 "bank_id": r.bank_id,
653 "score": round(r.score, 3),
654 "status": r.status,
655 "issues": [
656 {"severity": i.severity, "code": i.code, "message": i.message} for i in r.issues
657 ],
658 }
659 for r in results
660 ]
661 }
662 )
663 bid = _resolve_bank(bank_id)
664 result = await brain.bank_health(bid)
665 return json.dumps(
666 {
667 "bank_id": result.bank_id,
668 "score": round(result.score, 3),
669 "status": result.status,
670 "issues": [
671 {"severity": i.severity, "code": i.code, "message": i.message} for i in result.issues
672 ],
673 "metrics": result.metrics,
674 }
675 )
676 except Exception as exc:
677 logger.exception("memory_bank_health failed")
678 return json.dumps({"score": 0.0, "error": type(exc).__name__})
680 @mcp.tool()
681 async def memory_hold_set(
682 hold_id: str,
683 reason: str,
684 bank_id: str | None = None,
685 set_by: str = "agent:mcp",
686 ) -> str:
687 """Place a bank under legal hold — blocks memory_forget until released.
689 Args:
690 hold_id: Unique identifier for this hold (used to release it later).
691 reason: Human-readable reason for the hold.
692 bank_id: Bank to place on hold. Uses default if omitted.
693 set_by: Actor label, default "agent:mcp".
694 """
695 try:
696 bid = _resolve_bank(bank_id)
697 hold = brain.set_legal_hold(bid, hold_id, reason, set_by=set_by)
698 return json.dumps(
699 {
700 "hold_id": hold.hold_id,
701 "bank_id": hold.bank_id,
702 "reason": hold.reason,
703 "set_by": hold.set_by,
704 "set_at": hold.set_at.isoformat(),
705 }
706 )
707 except Exception as exc:
708 logger.exception("memory_hold_set failed")
709 return json.dumps({"error": type(exc).__name__})
711 @mcp.tool()
712 async def memory_hold_release(
713 hold_id: str,
714 bank_id: str | None = None,
715 ) -> str:
716 """Release a legal hold from a bank.
718 Args:
719 hold_id: The hold ID to release.
720 bank_id: Bank to release the hold from. Uses default if omitted.
721 """
722 try:
723 bid = _resolve_bank(bank_id)
724 released = brain.release_legal_hold(bid, hold_id)
725 return json.dumps({"bank_id": bid, "hold_id": hold_id, "released": released})
726 except Exception as exc:
727 logger.exception("memory_hold_release failed")
728 return json.dumps({"released": False, "error": type(exc).__name__})
730 # ── memory_banks ───────────────────────────────────────────────
732 @mcp.tool()
733 async def memory_banks() -> str:
734 """List available memory banks."""
735 # For now, return banks from config. In future, query provider.
736 bank_ids: list[str] = []
737 if config.banks:
738 bank_ids = list(config.banks.keys())
739 if default_bank and default_bank not in bank_ids:
740 bank_ids.insert(0, default_bank)
741 return json.dumps({"banks": bank_ids, "default": default_bank})
743 # ── M21 — Mental-model CRUD ─────────────────────────────────────
744 #
745 # Hindsight parity: the 4 mental-model tools mirror Hindsight's
746 # ``_register_create_mental_model`` / ``_update_mental_model`` /
747 # ``_delete_mental_model`` MCP surface, with our addition of
748 # ``memory_list_mental_models`` for inspection. Requires
749 # :meth:`Astrocyte.set_mental_model_store` to have been called
750 # (gateway hosts wire this from the configured Postgres store).
752 @mcp.tool()
753 async def memory_list_mental_models(
754 bank_id: str | None = None,
755 scope: str | None = None,
756 ) -> str:
757 """List mental models in a bank.
759 Mental models are curated, refreshable summaries — e.g. "user
760 prefers async updates", "Project X status: blocked on review".
761 Use this to discover what authoritative summaries exist before
762 composing an answer.
764 Args:
765 bank_id: Bank to list. Uses default if omitted.
766 scope: Optional scope filter (e.g. ``"person:alice"``).
767 """
768 try:
769 bid = _resolve_bank(bank_id)
770 models = await brain.list_mental_models(
771 bank_id=bid,
772 scope=scope,
773 context=_resolve_context(),
774 )
775 out = [
776 {
777 "model_id": m.model_id,
778 "title": m.title,
779 "scope": m.scope,
780 "kind": m.kind,
781 "revision": m.revision,
782 "refreshed_at": m.refreshed_at.isoformat() if m.refreshed_at else None,
783 "source_count": len(m.source_ids),
784 }
785 for m in models
786 ]
787 return json.dumps({"models": out})
788 except Exception as exc:
789 logger.exception("memory_list_mental_models failed")
790 return json.dumps({"models": [], "error": type(exc).__name__})
792 @mcp.tool()
793 async def memory_create_mental_model(
794 model_id: str,
795 title: str,
796 content: str | None = None,
797 sections: list[dict] | None = None,
798 scope: str = "bank",
799 bank_id: str | None = None,
800 ) -> str:
801 """Author a new mental model.
803 Provide ONE of:
805 - ``content`` — raw markdown string; the structured doc is
806 parsed lazily on first refresh (legacy path).
807 - ``sections`` — list of section dicts matching the
808 ``StructuredDocument`` JSON shape; the structured doc is
809 populated immediately so future ``memory_update_mental_model``
810 calls can target sections by id (recommended path).
812 Use ``memory_create_directive`` instead for hard-rule
813 directives (those are mental models with ``kind="directive"``).
814 """
815 try:
816 bid = _resolve_bank(bank_id)
817 model = await brain.create_mental_model(
818 model_id=model_id,
819 title=title,
820 content=content,
821 sections=sections,
822 scope=scope,
823 bank_id=bid,
824 context=_resolve_context(),
825 )
826 return json.dumps(
827 {
828 "model_id": model.model_id,
829 "revision": model.revision,
830 "kind": model.kind,
831 "title": model.title,
832 }
833 )
834 except Exception as exc:
835 logger.exception("memory_create_mental_model failed")
836 return json.dumps({"model_id": None, "error": type(exc).__name__})
838 @mcp.tool()
839 async def memory_update_mental_model(
840 model_id: str,
841 operations: list[dict],
842 bank_id: str | None = None,
843 ) -> str:
844 """Apply structured delta operations to an existing mental model.
846 Operations let you modify a document without re-emitting
847 unchanged content — invaluable for refresh flows where the
848 LLM would otherwise drift on sections it didn't intend to
849 touch. Each operation is a dict with an ``op`` key; see
850 :mod:`astrocyte.pipeline.delta_ops` for the full schema:
852 - ``append_block`` / ``insert_block`` / ``replace_block`` /
853 ``remove_block`` — block-level edits within a section.
854 - ``add_section`` / ``remove_section`` / ``replace_section_blocks``
855 / ``rename_section`` — section-level edits.
857 Conservative-failure contract: invalid ops (unknown section
858 ids, out-of-range indices, malformed payloads) drop with a
859 logged reason in the response's ``skipped`` list; the document
860 never gets worse than its input.
862 Returns ``{"changed": bool, "revision": int, "applied": [...],
863 "skipped": [...]}`` so the caller can audit which ops landed.
864 """
865 try:
866 bid = _resolve_bank(bank_id)
867 result = await brain.update_mental_model(
868 model_id=model_id,
869 operations=operations,
870 bank_id=bid,
871 context=_resolve_context(),
872 )
873 if result is None:
874 return json.dumps({"changed": False, "error": "model not found"})
875 model, summary = result
876 return json.dumps(
877 {
878 "changed": summary.get("changed", False),
879 "revision": model.revision,
880 "applied": summary.get("applied", []),
881 "skipped": summary.get("skipped", []),
882 }
883 )
884 except Exception as exc:
885 logger.exception("memory_update_mental_model failed")
886 return json.dumps({"changed": False, "error": type(exc).__name__})
888 @mcp.tool()
889 async def memory_delete_mental_model(
890 model_id: str,
891 bank_id: str | None = None,
892 ) -> str:
893 """Soft-delete a mental model. Returns whether it existed."""
894 try:
895 bid = _resolve_bank(bank_id)
896 ok = await brain.delete_mental_model(
897 model_id,
898 bank_id=bid,
899 context=_resolve_context(),
900 )
901 return json.dumps({"deleted": ok})
902 except Exception as exc:
903 logger.exception("memory_delete_mental_model failed")
904 return json.dumps({"deleted": False, "error": type(exc).__name__})
906 @mcp.tool()
907 async def memory_refresh_mental_model(
908 model_id: str,
909 new_source_ids: list[str],
910 bank_id: str | None = None,
911 ) -> str:
912 """Re-derive a mental model from its sources after new memories have been retained.
914 Hindsight parity for ``mental_model.refresh()``. When new
915 memories that pertain to an existing model land via retain,
916 call this to fold them into the model's ``source_ids`` and
917 bump the revision. Use ``memory_update_mental_model`` for
918 targeted edits via delta ops; use this when the model should
919 be re-derived from an expanded provenance set.
921 Returns ``{"model_id", "revision", "source_count"}`` on
922 success or ``{"error": "model not found"}`` if the model
923 doesn't exist.
924 """
925 try:
926 bid = _resolve_bank(bank_id)
927 model = await brain.refresh_mental_model(
928 model_id,
929 new_source_ids,
930 bank_id=bid,
931 context=_resolve_context(),
932 )
933 if model is None:
934 return json.dumps({"model_id": model_id, "error": "model not found"})
935 return json.dumps(
936 {
937 "model_id": model.model_id,
938 "revision": model.revision,
939 "source_count": len(model.source_ids),
940 }
941 )
942 except Exception as exc:
943 logger.exception("memory_refresh_mental_model failed")
944 return json.dumps({"model_id": model_id, "error": type(exc).__name__})
946 @mcp.tool()
947 async def memory_create_directive(
948 rule_text: str,
949 directive_id: str | None = None,
950 scope: str = "bank",
951 bank_id: str | None = None,
952 ) -> str:
953 """Author a user-curated hard-rule directive.
955 Directives are mental models with ``kind="directive"``. They
956 participate in the same ``search_mental_models`` retrieval as
957 general / preference models, but the discriminator signals to
958 the answerer that the rule should be applied as a preference
959 override rather than as one input among many.
961 Architecturally replaces the M18a-2 ``directive_compile``
962 auto-extraction path that was deprecated in M19 (auto-compressed
963 directives replicated a −30pp SSP regression because they
964 overrode original preference nuance). User-authored directives
965 avoid that failure mode by definition — the user gets exactly
966 the rule they intended.
967 """
968 try:
969 bid = _resolve_bank(bank_id)
970 model = await brain.create_directive(
971 rule_text=rule_text,
972 directive_id=directive_id,
973 scope=scope,
974 bank_id=bid,
975 context=_resolve_context(),
976 )
977 return json.dumps(
978 {
979 "model_id": model.model_id,
980 "kind": model.kind,
981 "revision": model.revision,
982 }
983 )
984 except Exception as exc:
985 logger.exception("memory_create_directive failed")
986 return json.dumps({"model_id": None, "error": type(exc).__name__})
988 # ── M21 — Observation CRUD ─────────────────────────────────────
990 @mcp.tool()
991 async def memory_list_observations(
992 bank_id: str | None = None,
993 scope: str | None = None,
994 trend: str | None = None,
995 limit: int = 100,
996 ) -> str:
997 """List observations in a bank, optionally filtered by trend.
999 Observations are auto-consolidated distillations of raw
1000 memories (e.g. "user prefers iced coffee in summer"). Each
1001 carries a computed ``trend`` — one of ``new`` /
1002 ``strengthening`` / ``stable`` / ``weakening`` / ``stale``
1003 derived from the evidence timestamps. Use ``trend`` to filter
1004 for fresh insights (``new``, ``strengthening``) or to audit
1005 stale patterns that may no longer apply.
1006 """
1007 try:
1008 bid = _resolve_bank(bank_id)
1009 rows = await brain.list_observations(
1010 bank_id=bid,
1011 scope=scope,
1012 trend=trend,
1013 limit=limit,
1014 context=_resolve_context(),
1015 )
1016 return json.dumps({"observations": rows})
1017 except Exception as exc:
1018 logger.exception("memory_list_observations failed")
1019 return json.dumps({"observations": [], "error": type(exc).__name__})
1021 @mcp.tool()
1022 async def memory_get_observation(
1023 observation_id: str,
1024 bank_id: str | None = None,
1025 ) -> str:
1026 """Fetch one observation by id. Returns null if not found."""
1027 try:
1028 bid = _resolve_bank(bank_id)
1029 row = await brain.get_observation(
1030 observation_id,
1031 bank_id=bid,
1032 context=_resolve_context(),
1033 )
1034 return json.dumps({"observation": row})
1035 except Exception as exc:
1036 logger.exception("memory_get_observation failed")
1037 return json.dumps({"observation": None, "error": type(exc).__name__})
1039 @mcp.tool()
1040 async def memory_create_observation(
1041 text: str,
1042 source_ids: list[str] | None = None,
1043 scope: str | None = None,
1044 confidence: float = 0.9,
1045 bank_id: str | None = None,
1046 ) -> str:
1047 """Hand-author an observation (bypasses the autonomous consolidator).
1049 Use this when an agent or user wants to assert a fact directly
1050 without going through retain → consolidate. The observation
1051 becomes immediately visible to the recall pipeline's
1052 ``observation`` strategy and to ``search_observations`` in
1053 the agentic reflect loop.
1054 """
1055 try:
1056 bid = _resolve_bank(bank_id)
1057 row = await brain.create_observation(
1058 text=text,
1059 source_ids=source_ids,
1060 scope=scope,
1061 confidence=confidence,
1062 bank_id=bid,
1063 context=_resolve_context(),
1064 )
1065 return json.dumps({"observation": row})
1066 except Exception as exc:
1067 logger.exception("memory_create_observation failed")
1068 return json.dumps({"observation": None, "error": type(exc).__name__})
1070 @mcp.tool()
1071 async def memory_delete_observation(
1072 observation_id: str,
1073 bank_id: str | None = None,
1074 ) -> str:
1075 """Delete one observation from the dedicated observations bank."""
1076 try:
1077 bid = _resolve_bank(bank_id)
1078 ok = await brain.delete_observation(
1079 observation_id,
1080 bank_id=bid,
1081 context=_resolve_context(),
1082 )
1083 return json.dumps({"deleted": ok})
1084 except Exception as exc:
1085 logger.exception("memory_delete_observation failed")
1086 return json.dumps({"deleted": False, "error": type(exc).__name__})
1088 # ── memory_health ──────────────────────────────────────────────
1090 @mcp.tool()
1091 async def memory_health() -> str:
1092 """Check memory system health."""
1093 status = await brain.health()
1094 return json.dumps(
1095 {
1096 "healthy": status.healthy,
1097 "message": status.message,
1098 "latency_ms": round(status.latency_ms, 2) if status.latency_ms else None,
1099 }
1100 )
1102 return mcp
1105# ---------------------------------------------------------------------------
1106# CLI entry point
1107# ---------------------------------------------------------------------------
1110def main() -> None:
1111 """CLI entry point for astrocyte-mcp."""
1112 parser = argparse.ArgumentParser(
1113 prog="astrocyte-mcp",
1114 description="Astrocyte MCP server — memory tools for AI agents",
1115 )
1116 parser.add_argument(
1117 "--config",
1118 required=True,
1119 help="Path to astrocyte.yaml config file",
1120 )
1121 parser.add_argument(
1122 "--transport",
1123 choices=["stdio", "sse"],
1124 default="stdio",
1125 help="MCP transport (default: stdio)",
1126 )
1127 parser.add_argument(
1128 "--port",
1129 type=int,
1130 default=8080,
1131 help="Port for SSE transport (default: 8080)",
1132 )
1133 args = parser.parse_args()
1135 # Load config and create Astrocyte
1136 config = load_config(args.config)
1137 brain = Astrocyte(config)
1139 # Wire access grants from config
1140 grants = access_grants_for_astrocyte(config)
1141 if grants:
1142 brain.set_access_grants(grants)
1144 # Create MCP server
1145 mcp = create_mcp_server(brain, config)
1147 # Run
1148 if args.transport == "stdio":
1149 mcp.run(transport="stdio")
1150 else:
1151 mcp.run(transport="sse", port=args.port)
1154if __name__ == "__main__":
1155 main()