Coverage for astrocyte/mcp.py: 51%

329 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""Astrocyte MCP server — exposes memory tools via Model Context Protocol. 

2 

3Usage: 

4 astrocyte-mcp --config astrocyte.yaml 

5 astrocyte-mcp --config astrocyte.yaml --transport sse --port 8080 

6 

7See docs/_design/mcp-server.md for the full specification. 

8 

9Note: Access control in MCP deployments typically uses one principal from 

10``mcp.principal`` (defaults to ``agent:mcp``), or pass 

11``astrocyte_context=...`` to :func:`create_mcp_server` when your host maps 

12callers to identity. Per-caller identity still requires transport-level auth 

13at the MCP host. 

14""" 

15 

16from __future__ import annotations 

17 

18import argparse 

19import json 

20import logging 

21from datetime import datetime 

22from typing import Any 

23 

24from fastmcp import FastMCP 

25 

26from astrocyte._astrocyte import Astrocyte 

27from astrocyte._mcp_identity import JwtIdentityMiddleware, build_jwt_middleware 

28from astrocyte.config import AstrocyteConfig, access_grants_for_astrocyte, load_config 

29from astrocyte.types import AstrocyteContext 

30 

31logger = logging.getLogger("astrocyte.mcp") 

32 

33# --------------------------------------------------------------------------- 

34# Server factory 

35# --------------------------------------------------------------------------- 

36 

37 

38def create_mcp_server( 

39 brain: Astrocyte, 

40 config: AstrocyteConfig, 

41 *, 

42 astrocyte_context: AstrocyteContext | None = None, 

43 jwt_middleware: JwtIdentityMiddleware | None = None, 

44) -> FastMCP: 

45 """Create a FastMCP server wired to an Astrocyte instance. 

46 

47 Identity resolution order (per call): 

48 

49 1. If ``astrocyte_context`` was passed at server-creation time, it is 

50 used as-is (legacy embedding pattern). This takes precedence over 

51 all header-based resolution so tests and embedded deployments that 

52 pre-bind identity are unaffected. 

53 2. Else if ``jwt_middleware`` is set OR 

54 ``config.identity.jwt_middleware.enabled`` is true, per-call 

55 identity is resolved from the inbound request's Authorization 

56 header (identity spec §3 Gap 1 wiring). See 

57 ``docs/_plugins/jwt-identity-middleware.md`` for operator setup. 

58 3. Else the server falls back to the static ``mcp.principal`` label 

59 (default ``agent:mcp``) — the pre-middleware behavior. 

60 

61 Args: 

62 jwt_middleware: Optional pre-built middleware. When None but the 

63 config enables JWT middleware, the server constructs one via 

64 :func:`build_jwt_middleware`. Pass explicitly to inject a 

65 test decoder or bypass PyJWT. 

66 """ 

67 

68 mcp_cfg = config.mcp 

69 mcp = FastMCP( 

70 name="astrocyte-memory", 

71 instructions=( 

72 "Astrocyte memory server. Use memory_retain to store information, " 

73 "memory_recall to search memories, memory_reflect to synthesize answers, " 

74 "and memory_forget to remove memories." 

75 ), 

76 ) 

77 

78 # Static context — still built for the pre-middleware path (legacy 

79 # deployments that don't enable JWT middleware). When middleware is 

80 # enabled, this is only used if header resolution is somehow bypassed. 

81 static_ctx = astrocyte_context or AstrocyteContext( 

82 principal=mcp_cfg.principal or "agent:mcp", 

83 ) 

84 

85 # Build or accept JWT middleware. Explicit > config > none. 

86 if jwt_middleware is None and config.identity.jwt_middleware.enabled: 

87 jwt_middleware = build_jwt_middleware(config.identity.jwt_middleware) 

88 

89 def _resolve_context() -> AstrocyteContext: 

90 """Resolve the AstrocyteContext for the current request. 

91 

92 Called on every tool invocation so the middleware sees each call's 

93 headers. Static ``astrocyte_context`` (when set at server creation) 

94 always wins — that preserves backward compatibility for embedded 

95 hosts that pre-bind identity. 

96 """ 

97 # An explicit pre-bound context always wins (embedded / test use). 

98 if astrocyte_context is not None: 

99 return astrocyte_context 

100 # Middleware path — pull headers, resolve identity, build ctx. 

101 if jwt_middleware is not None: 

102 try: 

103 from fastmcp.server.dependencies import get_http_headers 

104 

105 headers = get_http_headers(include={"authorization"}) 

106 except Exception: 

107 # stdio transport or pre-request context — no headers. 

108 # Middleware policy decides how to treat the missing 

109 # header (fail-closed vs anonymous). 

110 headers = {} 

111 return jwt_middleware.resolve(headers) 

112 # No middleware, no pre-bound ctx — fall through to static. 

113 return static_ctx 

114 

115 # Default bank 

116 default_bank = mcp_cfg.default_bank_id 

117 

118 _MAX_TAGS = 20 

119 _MAX_TAG_LENGTH = 255 

120 

121 def _resolve_bank(bank_id: str | None) -> str: 

122 if bank_id: 

123 return bank_id 

124 if default_bank: 

125 return default_bank 

126 raise ValueError("bank_id is required (no default_bank_id configured)") 

127 

128 def _validate_tags(tags: list[str] | None) -> list[str] | None: 

129 if not tags: 

130 return tags 

131 if len(tags) > _MAX_TAGS: 

132 raise ValueError(f"Too many tags ({len(tags)}), max {_MAX_TAGS}") 

133 for tag in tags: 

134 if not isinstance(tag, str) or len(tag) > _MAX_TAG_LENGTH: 

135 raise ValueError(f"Invalid tag (must be string, max {_MAX_TAG_LENGTH} chars)") 

136 return tags 

137 

138 # ── memory_retain ────────────────────────────────────────────── 

139 

140 @mcp.tool() 

141 async def memory_retain( 

142 content: str, 

143 bank_id: str | None = None, 

144 tags: list[str] | None = None, 

145 metadata: dict[str, str] | None = None, 

146 occurred_at: str | None = None, 

147 ) -> str: 

148 """Store content into memory. 

149 

150 Args: 

151 content: The text to memorize. 

152 bank_id: Memory bank to store in. Uses default if omitted. 

153 tags: Optional tags for filtering. 

154 metadata: Optional key-value metadata. 

155 occurred_at: ISO 8601 timestamp of when the event happened. 

156 """ 

157 try: 

158 bid = _resolve_bank(bank_id) 

159 tags = _validate_tags(tags) 

160 kwargs: dict[str, Any] = {} 

161 if occurred_at: 

162 try: 

163 kwargs["occurred_at"] = datetime.fromisoformat(occurred_at) 

164 except ValueError: 

165 return json.dumps({"stored": False, "error": "Invalid occurred_at format; expected ISO 8601"}) 

166 result = await brain.retain( 

167 content, 

168 bank_id=bid, 

169 tags=tags, 

170 metadata=metadata, 

171 context=_resolve_context(), 

172 **kwargs, 

173 ) 

174 if result.stored: 

175 return json.dumps({"stored": True, "memory_id": result.memory_id}) 

176 return json.dumps({"stored": False, "error": "Failed to store memory"}) 

177 except Exception as exc: 

178 logger.exception("memory_retain failed") 

179 return json.dumps({"stored": False, "error": type(exc).__name__}) 

180 

181 # ── memory_recall ────────────────────────────────────────────── 

182 

183 @mcp.tool() 

184 async def memory_recall( 

185 query: str, 

186 bank_id: str | None = None, 

187 banks: list[str] | None = None, 

188 strategy: str | None = None, 

189 max_results: int = 10, 

190 tags: list[str] | None = None, 

191 session_id: str | None = None, 

192 ) -> str: 

193 """Retrieve relevant memories for a query — RAW retrieval, no synthesis. 

194 

195 Returns a ranked list of memory ``hits``. The caller is expected to 

196 compose its own answer from these hits (this is the "retrieval 

197 primitive" — you, the calling agent, are the answerer). 

198 

199 **Use `memory_recall` when**: you want to read the underlying 

200 memories and synthesize them yourself, OR you need to show citations 

201 / quotes verbatim, OR you need fine control over which memories 

202 end up in your final response. 

203 

204 **Use `memory_reflect` instead when**: you want a finished answer 

205 you can pass straight through to the user. Reflect does its own 

206 synthesis via an agentic loop (multi-step retrieval + reasoning) 

207 and returns one composed answer with cited sources. Cheaper for 

208 you than calling `memory_recall` + a follow-up LLM round-trip. 

209 

210 Args: 

211 query: Natural language query. 

212 bank_id: Single memory bank to search. Uses default if omitted. 

213 banks: Multiple banks to search (overrides bank_id). 

214 strategy: Multi-bank strategy: "cascade", "parallel", or "first_match". 

215 max_results: Maximum number of results. 

216 tags: Optional tag filter. 

217 session_id: M31 Fix 2 — opaque session identifier. When the 

218 calling application knows which conversation session the 

219 query targets, pass it to scope retrieval to that 

220 session's facts/sections. ``None`` (default) preserves 

221 cross-session retrieval. 

222 """ 

223 try: 

224 max_results = max(1, min(max_results, mcp_cfg.max_results_limit)) 

225 tags = _validate_tags(tags) 

226 

227 if banks: 

228 result = await brain.recall( 

229 query, 

230 banks=banks, 

231 strategy=strategy, 

232 max_results=max_results, 

233 tags=tags, 

234 context=_resolve_context(), 

235 session_id=session_id, # M31 Fix 2 

236 ) 

237 else: 

238 bid = _resolve_bank(bank_id) 

239 result = await brain.recall( 

240 query, 

241 bank_id=bid, 

242 max_results=max_results, 

243 tags=tags, 

244 context=_resolve_context(), 

245 session_id=session_id, # M31 Fix 2 

246 ) 

247 

248 hits = [ 

249 { 

250 "text": h.text, 

251 "score": round(h.score, 4), 

252 "fact_type": h.fact_type, 

253 "bank_id": h.bank_id, 

254 "memory_id": h.memory_id, 

255 } 

256 for h in result.hits 

257 ] 

258 return json.dumps( 

259 { 

260 "hits": hits, 

261 "total_available": result.total_available, 

262 "truncated": result.truncated, 

263 } 

264 ) 

265 except Exception as exc: 

266 logger.exception("memory_recall failed") 

267 return json.dumps({"hits": [], "total_available": 0, "error": type(exc).__name__}) 

268 

269 # ── memory_reflect ───────────────────────────────────────────── 

270 

271 if mcp_cfg.expose_reflect: 

272 

273 @mcp.tool() 

274 async def memory_reflect( 

275 query: str, 

276 bank_id: str | None = None, 

277 banks: list[str] | None = None, 

278 strategy: str | None = None, 

279 max_tokens: int | None = None, 

280 include_sources: bool = True, 

281 ) -> str: 

282 """Get a finished, cited answer to a question from memory. 

283 

284 Returns ``{"answer": <synthesized text>, "sources": [...]}``. 

285 The ``answer`` IS the response — Astrocyte's internal agentic 

286 loop ran multi-step retrieval + reasoning and composed it. 

287 

288 **How to use the `answer` field:** 

289 - Quote it verbatim, OR paraphrase it into your own voice, OR 

290 return it directly to the user. 

291 - **Do NOT pass it back through another LLM for re-synthesis.** 

292 That double-synthesis pattern loses citation fidelity, doubles 

293 cost, and produces strictly worse answers than either pure 

294 path. If you want raw memories to compose yourself, use 

295 `memory_recall` instead. 

296 - `sources` carries the cited memory chunks for traceability / 

297 UI rendering — show them as citations next to the answer, 

298 don't re-feed them into a synth prompt. 

299 

300 **Use `memory_reflect` when**: the user asked a question and you 

301 want a complete, citation-grounded answer back. Especially good 

302 for multi-hop questions ("how did X relate to Y?"), synthesis 

303 questions ("summarize what we discussed about Z"), and any 

304 question where the answer requires reasoning across several 

305 memories. 

306 

307 **Use `memory_recall` instead when**: you want raw memories 

308 (e.g. to show as a search result list) or you want to do your 

309 own synthesis with strong opinions on prompt / format. 

310 

311 Args: 

312 query: The question to answer from memory. 

313 bank_id: Single memory bank. Uses default if omitted. 

314 banks: Multiple banks (overrides bank_id). 

315 strategy: Multi-bank strategy: "cascade", "parallel", or "first_match". 

316 max_tokens: Maximum tokens for the synthesized answer. 

317 include_sources: Whether to include source memories 

318 (default True — needed for showing citations). 

319 """ 

320 try: 

321 if banks: 

322 result = await brain.reflect( 

323 query, 

324 banks=banks, 

325 strategy=strategy, 

326 max_tokens=max_tokens, 

327 context=_resolve_context(), 

328 include_sources=include_sources, 

329 ) 

330 else: 

331 bid = _resolve_bank(bank_id) 

332 result = await brain.reflect( 

333 query, 

334 bank_id=bid, 

335 max_tokens=max_tokens, 

336 context=_resolve_context(), 

337 include_sources=include_sources, 

338 ) 

339 

340 out: dict[str, Any] = {"answer": result.answer} 

341 if include_sources and result.sources: 

342 out["sources"] = [ 

343 {"text": s.text, "score": round(s.score, 4), "bank_id": s.bank_id} for s in result.sources 

344 ] 

345 return json.dumps(out) 

346 except Exception as exc: 

347 logger.exception("memory_reflect failed") 

348 return json.dumps({"answer": "", "error": type(exc).__name__}) 

349 

350 # ── memory_forget ────────────────────────────────────────────── 

351 

352 if mcp_cfg.expose_forget: 

353 

354 @mcp.tool() 

355 async def memory_forget( 

356 bank_id: str | None = None, 

357 memory_ids: list[str] | None = None, 

358 tags: list[str] | None = None, 

359 ) -> str: 

360 """Remove memories from a bank. 

361 

362 Args: 

363 bank_id: Memory bank. Uses default if omitted. 

364 memory_ids: Specific memory IDs to delete. 

365 tags: Delete memories matching these tags. 

366 """ 

367 try: 

368 bid = _resolve_bank(bank_id) 

369 tags = _validate_tags(tags) 

370 result = await brain.forget( 

371 bid, 

372 memory_ids=memory_ids, 

373 tags=tags, 

374 context=_resolve_context(), 

375 ) 

376 return json.dumps( 

377 { 

378 "deleted_count": result.deleted_count, 

379 "archived_count": result.archived_count, 

380 } 

381 ) 

382 except Exception as exc: 

383 logger.exception("memory_forget failed") 

384 return json.dumps({"deleted_count": 0, "error": type(exc).__name__}) 

385 

386 # ── memory_history ───────────────────────────────────────────── 

387 

388 @mcp.tool() 

389 async def memory_history( 

390 query: str, 

391 bank_id: str | None = None, 

392 as_of: str | None = None, 

393 max_results: int = 10, 

394 tags: list[str] | None = None, 

395 ) -> str: 

396 """Reconstruct what the agent knew at a past point in time (M9 time travel). 

397 

398 Args: 

399 query: Recall query to run against the historical snapshot. 

400 bank_id: Bank to query. Uses default if omitted. 

401 as_of: ISO 8601 UTC datetime — memories retained after this moment 

402 are hidden (e.g. "2025-01-01T00:00:00Z"). 

403 max_results: Maximum hits to return (default 10). 

404 tags: Optional tag filter applied on top of the time filter. 

405 """ 

406 try: 

407 bid = _resolve_bank(bank_id) 

408 tags = _validate_tags(tags) 

409 if not as_of: 

410 return json.dumps({"hits": [], "error": "as_of (ISO 8601) is required"}) 

411 try: 

412 as_of_dt = datetime.fromisoformat(as_of) 

413 except ValueError: 

414 return json.dumps({"hits": [], "error": "Invalid as_of format; expected ISO 8601"}) 

415 result = await brain.history( 

416 query, 

417 bid, 

418 as_of_dt, 

419 max_results=max(1, min(max_results, mcp_cfg.max_results_limit)), 

420 tags=tags, 

421 context=_resolve_context(), 

422 ) 

423 hits = [ 

424 { 

425 "text": h.text, 

426 "score": round(h.score, 4), 

427 "fact_type": h.fact_type, 

428 "bank_id": h.bank_id, 

429 "memory_id": h.memory_id, 

430 } 

431 for h in result.hits 

432 ] 

433 return json.dumps( 

434 { 

435 "hits": hits, 

436 "total_available": result.total_available, 

437 "truncated": result.truncated, 

438 "as_of": result.as_of.isoformat(), 

439 "bank_id": result.bank_id, 

440 } 

441 ) 

442 except Exception as exc: 

443 logger.exception("memory_history failed") 

444 return json.dumps({"hits": [], "error": type(exc).__name__}) 

445 

446 # ── memory_audit ──────────────────────────────────────────────── 

447 

448 @mcp.tool() 

449 async def memory_audit( 

450 scope: str, 

451 bank_id: str | None = None, 

452 max_memories: int = 50, 

453 tags: list[str] | None = None, 

454 ) -> str: 

455 """Identify knowledge gaps for a topic in a memory bank (M10). 

456 

457 Use this to discover what the agent does not know about a subject 

458 before relying on recall for important decisions. 

459 

460 Args: 

461 scope: Natural-language topic to audit (e.g. "Alice's employment history"). 

462 bank_id: Bank to audit. Uses default if omitted. 

463 max_memories: Memories to retrieve and scan (default 50). 

464 tags: Optional tag filter to narrow the retrieved memories. 

465 """ 

466 try: 

467 bid = _resolve_bank(bank_id) 

468 tags = _validate_tags(tags) 

469 result = await brain.audit( 

470 scope, 

471 bid, 

472 max_memories=max(1, min(max_memories, 200)), 

473 tags=tags, 

474 ) 

475 gaps = [{"topic": g.topic, "severity": g.severity, "reason": g.reason} for g in result.gaps] 

476 return json.dumps( 

477 { 

478 "scope": result.scope, 

479 "bank_id": result.bank_id, 

480 "coverage_score": round(result.coverage_score, 3), 

481 "memories_scanned": result.memories_scanned, 

482 "gaps": gaps, 

483 } 

484 ) 

485 except Exception as exc: 

486 logger.exception("memory_audit failed") 

487 return json.dumps({"gaps": [], "coverage_score": 0.0, "error": type(exc).__name__}) 

488 

489 # ── memory_compile ───────────────────────────────────────────── 

490 

491 @mcp.tool() 

492 async def memory_compile( 

493 bank_id: str | None = None, 

494 scope: str | None = None, 

495 ) -> str: 

496 """Compile raw memories into structured wiki pages (M8). 

497 

498 Synthesises a wiki page for each detected topic scope using the LLM. 

499 Call this periodically to distil accumulated memories into a curated 

500 knowledge base that recall can surface ahead of raw fragments. 

501 

502 Args: 

503 bank_id: Bank to compile. Uses default if omitted. 

504 scope: Compile only memories tagged with this scope string. 

505 Omit to trigger full scope discovery (tag grouping + 

506 embedding cluster labelling across the whole bank). 

507 """ 

508 try: 

509 bid = _resolve_bank(bank_id) 

510 result = await brain.compile(bid, scope=scope) 

511 out: dict[str, Any] = { 

512 "bank_id": result.bank_id, 

513 "pages_created": result.pages_created, 

514 "pages_updated": result.pages_updated, 

515 "scopes_compiled": result.scopes_compiled, 

516 "noise_memories": result.noise_memories, 

517 "tokens_used": result.tokens_used, 

518 "elapsed_ms": result.elapsed_ms, 

519 } 

520 if result.error: 

521 out["error"] = result.error 

522 return json.dumps(out) 

523 except Exception as exc: 

524 logger.exception("memory_compile failed") 

525 return json.dumps({"pages_created": 0, "pages_updated": 0, "error": type(exc).__name__}) 

526 

527 # ── memory_graph_search ──────────────────────────────────────── 

528 

529 @mcp.tool() 

530 async def memory_graph_search( 

531 query: str, 

532 bank_id: str | None = None, 

533 limit: int = 10, 

534 ) -> str: 

535 """Search the knowledge graph for entities matching a name. 

536 

537 Returns matching entities with their IDs. Use the IDs with 

538 memory_graph_neighbors to traverse connected memories. 

539 

540 Args: 

541 query: Entity name or partial name to search for. 

542 bank_id: Bank whose graph to search. Uses default if omitted. 

543 limit: Maximum number of entities to return. 

544 """ 

545 try: 

546 bid = _resolve_bank(bank_id) 

547 entities = await brain.graph_search(query, bid, limit=limit) 

548 return json.dumps( 

549 { 

550 "entities": [ 

551 { 

552 "id": e.id, 

553 "name": e.name, 

554 "entity_type": e.entity_type, 

555 "aliases": e.aliases or [], 

556 } 

557 for e in entities 

558 ] 

559 } 

560 ) 

561 except Exception as exc: 

562 logger.exception("memory_graph_search failed") 

563 return json.dumps({"entities": [], "error": type(exc).__name__}) 

564 

565 # ── memory_graph_neighbors ───────────────────────────────────── 

566 

567 @mcp.tool() 

568 async def memory_graph_neighbors( 

569 entity_ids: list[str], 

570 bank_id: str | None = None, 

571 max_depth: int = 2, 

572 limit: int = 20, 

573 ) -> str: 

574 """Traverse the knowledge graph from seed entity IDs. 

575 

576 Walks the entity graph up to max_depth hops from each seed entity 

577 and returns memories attached to discovered entities, scored by 

578 proximity. Use memory_graph_search first to resolve entity IDs. 

579 

580 Args: 

581 entity_ids: Seed entity IDs to start traversal from. 

582 bank_id: Bank whose graph to traverse. Uses default if omitted. 

583 max_depth: Maximum traversal depth (default 2). 

584 limit: Maximum number of memory hits to return. 

585 """ 

586 try: 

587 bid = _resolve_bank(bank_id) 

588 hits = await brain.graph_neighbors(entity_ids, bid, max_depth=max_depth, limit=limit) 

589 return json.dumps( 

590 { 

591 "hits": [ 

592 { 

593 "memory_id": h.memory_id, 

594 "text": h.text, 

595 "connected_entities": h.connected_entities, 

596 "depth": h.depth, 

597 "score": round(h.score, 4), 

598 } 

599 for h in hits 

600 ] 

601 } 

602 ) 

603 except Exception as exc: 

604 logger.exception("memory_graph_neighbors failed") 

605 return json.dumps({"hits": [], "error": type(exc).__name__}) 

606 

607 # ── admin tools (expose_admin=true) ──────────────────────────── 

608 

609 if mcp_cfg.expose_admin: 

610 

611 @mcp.tool() 

612 async def memory_lifecycle( 

613 bank_id: str | None = None, 

614 ) -> str: 

615 """Run TTL lifecycle sweep on a bank — archives and deletes expired memories. 

616 

617 Args: 

618 bank_id: Bank to sweep. Uses default if omitted. 

619 """ 

620 try: 

621 bid = _resolve_bank(bank_id) 

622 result = await brain.run_lifecycle(bid) 

623 return json.dumps( 

624 { 

625 "bank_id": bid, 

626 "archived_count": result.archived_count, 

627 "deleted_count": result.deleted_count, 

628 "skipped_count": result.skipped_count, 

629 } 

630 ) 

631 except Exception as exc: 

632 logger.exception("memory_lifecycle failed") 

633 return json.dumps({"archived_count": 0, "deleted_count": 0, "error": type(exc).__name__}) 

634 

635 @mcp.tool() 

636 async def memory_bank_health( 

637 bank_id: str | None = None, 

638 ) -> str: 

639 """Get health score and issues for a memory bank. 

640 

641 Args: 

642 bank_id: Bank to assess. Uses default if omitted. Pass "__all__" 

643 to get health for every bank that has recorded operations. 

644 """ 

645 try: 

646 if bank_id == "__all__": 

647 results = await brain.all_bank_health() 

648 return json.dumps( 

649 { 

650 "banks": [ 

651 { 

652 "bank_id": r.bank_id, 

653 "score": round(r.score, 3), 

654 "status": r.status, 

655 "issues": [ 

656 {"severity": i.severity, "code": i.code, "message": i.message} for i in r.issues 

657 ], 

658 } 

659 for r in results 

660 ] 

661 } 

662 ) 

663 bid = _resolve_bank(bank_id) 

664 result = await brain.bank_health(bid) 

665 return json.dumps( 

666 { 

667 "bank_id": result.bank_id, 

668 "score": round(result.score, 3), 

669 "status": result.status, 

670 "issues": [ 

671 {"severity": i.severity, "code": i.code, "message": i.message} for i in result.issues 

672 ], 

673 "metrics": result.metrics, 

674 } 

675 ) 

676 except Exception as exc: 

677 logger.exception("memory_bank_health failed") 

678 return json.dumps({"score": 0.0, "error": type(exc).__name__}) 

679 

680 @mcp.tool() 

681 async def memory_hold_set( 

682 hold_id: str, 

683 reason: str, 

684 bank_id: str | None = None, 

685 set_by: str = "agent:mcp", 

686 ) -> str: 

687 """Place a bank under legal hold — blocks memory_forget until released. 

688 

689 Args: 

690 hold_id: Unique identifier for this hold (used to release it later). 

691 reason: Human-readable reason for the hold. 

692 bank_id: Bank to place on hold. Uses default if omitted. 

693 set_by: Actor label, default "agent:mcp". 

694 """ 

695 try: 

696 bid = _resolve_bank(bank_id) 

697 hold = brain.set_legal_hold(bid, hold_id, reason, set_by=set_by) 

698 return json.dumps( 

699 { 

700 "hold_id": hold.hold_id, 

701 "bank_id": hold.bank_id, 

702 "reason": hold.reason, 

703 "set_by": hold.set_by, 

704 "set_at": hold.set_at.isoformat(), 

705 } 

706 ) 

707 except Exception as exc: 

708 logger.exception("memory_hold_set failed") 

709 return json.dumps({"error": type(exc).__name__}) 

710 

711 @mcp.tool() 

712 async def memory_hold_release( 

713 hold_id: str, 

714 bank_id: str | None = None, 

715 ) -> str: 

716 """Release a legal hold from a bank. 

717 

718 Args: 

719 hold_id: The hold ID to release. 

720 bank_id: Bank to release the hold from. Uses default if omitted. 

721 """ 

722 try: 

723 bid = _resolve_bank(bank_id) 

724 released = brain.release_legal_hold(bid, hold_id) 

725 return json.dumps({"bank_id": bid, "hold_id": hold_id, "released": released}) 

726 except Exception as exc: 

727 logger.exception("memory_hold_release failed") 

728 return json.dumps({"released": False, "error": type(exc).__name__}) 

729 

730 # ── memory_banks ─────────────────────────────────────────────── 

731 

732 @mcp.tool() 

733 async def memory_banks() -> str: 

734 """List available memory banks.""" 

735 # For now, return banks from config. In future, query provider. 

736 bank_ids: list[str] = [] 

737 if config.banks: 

738 bank_ids = list(config.banks.keys()) 

739 if default_bank and default_bank not in bank_ids: 

740 bank_ids.insert(0, default_bank) 

741 return json.dumps({"banks": bank_ids, "default": default_bank}) 

742 

743 # ── M21 — Mental-model CRUD ───────────────────────────────────── 

744 # 

745 # Hindsight parity: the 4 mental-model tools mirror Hindsight's 

746 # ``_register_create_mental_model`` / ``_update_mental_model`` / 

747 # ``_delete_mental_model`` MCP surface, with our addition of 

748 # ``memory_list_mental_models`` for inspection. Requires 

749 # :meth:`Astrocyte.set_mental_model_store` to have been called 

750 # (gateway hosts wire this from the configured Postgres store). 

751 

752 @mcp.tool() 

753 async def memory_list_mental_models( 

754 bank_id: str | None = None, 

755 scope: str | None = None, 

756 ) -> str: 

757 """List mental models in a bank. 

758 

759 Mental models are curated, refreshable summaries — e.g. "user 

760 prefers async updates", "Project X status: blocked on review". 

761 Use this to discover what authoritative summaries exist before 

762 composing an answer. 

763 

764 Args: 

765 bank_id: Bank to list. Uses default if omitted. 

766 scope: Optional scope filter (e.g. ``"person:alice"``). 

767 """ 

768 try: 

769 bid = _resolve_bank(bank_id) 

770 models = await brain.list_mental_models( 

771 bank_id=bid, 

772 scope=scope, 

773 context=_resolve_context(), 

774 ) 

775 out = [ 

776 { 

777 "model_id": m.model_id, 

778 "title": m.title, 

779 "scope": m.scope, 

780 "kind": m.kind, 

781 "revision": m.revision, 

782 "refreshed_at": m.refreshed_at.isoformat() if m.refreshed_at else None, 

783 "source_count": len(m.source_ids), 

784 } 

785 for m in models 

786 ] 

787 return json.dumps({"models": out}) 

788 except Exception as exc: 

789 logger.exception("memory_list_mental_models failed") 

790 return json.dumps({"models": [], "error": type(exc).__name__}) 

791 

792 @mcp.tool() 

793 async def memory_create_mental_model( 

794 model_id: str, 

795 title: str, 

796 content: str | None = None, 

797 sections: list[dict] | None = None, 

798 scope: str = "bank", 

799 bank_id: str | None = None, 

800 ) -> str: 

801 """Author a new mental model. 

802 

803 Provide ONE of: 

804 

805 - ``content`` — raw markdown string; the structured doc is 

806 parsed lazily on first refresh (legacy path). 

807 - ``sections`` — list of section dicts matching the 

808 ``StructuredDocument`` JSON shape; the structured doc is 

809 populated immediately so future ``memory_update_mental_model`` 

810 calls can target sections by id (recommended path). 

811 

812 Use ``memory_create_directive`` instead for hard-rule 

813 directives (those are mental models with ``kind="directive"``). 

814 """ 

815 try: 

816 bid = _resolve_bank(bank_id) 

817 model = await brain.create_mental_model( 

818 model_id=model_id, 

819 title=title, 

820 content=content, 

821 sections=sections, 

822 scope=scope, 

823 bank_id=bid, 

824 context=_resolve_context(), 

825 ) 

826 return json.dumps( 

827 { 

828 "model_id": model.model_id, 

829 "revision": model.revision, 

830 "kind": model.kind, 

831 "title": model.title, 

832 } 

833 ) 

834 except Exception as exc: 

835 logger.exception("memory_create_mental_model failed") 

836 return json.dumps({"model_id": None, "error": type(exc).__name__}) 

837 

838 @mcp.tool() 

839 async def memory_update_mental_model( 

840 model_id: str, 

841 operations: list[dict], 

842 bank_id: str | None = None, 

843 ) -> str: 

844 """Apply structured delta operations to an existing mental model. 

845 

846 Operations let you modify a document without re-emitting 

847 unchanged content — invaluable for refresh flows where the 

848 LLM would otherwise drift on sections it didn't intend to 

849 touch. Each operation is a dict with an ``op`` key; see 

850 :mod:`astrocyte.pipeline.delta_ops` for the full schema: 

851 

852 - ``append_block`` / ``insert_block`` / ``replace_block`` / 

853 ``remove_block`` — block-level edits within a section. 

854 - ``add_section`` / ``remove_section`` / ``replace_section_blocks`` 

855 / ``rename_section`` — section-level edits. 

856 

857 Conservative-failure contract: invalid ops (unknown section 

858 ids, out-of-range indices, malformed payloads) drop with a 

859 logged reason in the response's ``skipped`` list; the document 

860 never gets worse than its input. 

861 

862 Returns ``{"changed": bool, "revision": int, "applied": [...], 

863 "skipped": [...]}`` so the caller can audit which ops landed. 

864 """ 

865 try: 

866 bid = _resolve_bank(bank_id) 

867 result = await brain.update_mental_model( 

868 model_id=model_id, 

869 operations=operations, 

870 bank_id=bid, 

871 context=_resolve_context(), 

872 ) 

873 if result is None: 

874 return json.dumps({"changed": False, "error": "model not found"}) 

875 model, summary = result 

876 return json.dumps( 

877 { 

878 "changed": summary.get("changed", False), 

879 "revision": model.revision, 

880 "applied": summary.get("applied", []), 

881 "skipped": summary.get("skipped", []), 

882 } 

883 ) 

884 except Exception as exc: 

885 logger.exception("memory_update_mental_model failed") 

886 return json.dumps({"changed": False, "error": type(exc).__name__}) 

887 

888 @mcp.tool() 

889 async def memory_delete_mental_model( 

890 model_id: str, 

891 bank_id: str | None = None, 

892 ) -> str: 

893 """Soft-delete a mental model. Returns whether it existed.""" 

894 try: 

895 bid = _resolve_bank(bank_id) 

896 ok = await brain.delete_mental_model( 

897 model_id, 

898 bank_id=bid, 

899 context=_resolve_context(), 

900 ) 

901 return json.dumps({"deleted": ok}) 

902 except Exception as exc: 

903 logger.exception("memory_delete_mental_model failed") 

904 return json.dumps({"deleted": False, "error": type(exc).__name__}) 

905 

906 @mcp.tool() 

907 async def memory_refresh_mental_model( 

908 model_id: str, 

909 new_source_ids: list[str], 

910 bank_id: str | None = None, 

911 ) -> str: 

912 """Re-derive a mental model from its sources after new memories have been retained. 

913 

914 Hindsight parity for ``mental_model.refresh()``. When new 

915 memories that pertain to an existing model land via retain, 

916 call this to fold them into the model's ``source_ids`` and 

917 bump the revision. Use ``memory_update_mental_model`` for 

918 targeted edits via delta ops; use this when the model should 

919 be re-derived from an expanded provenance set. 

920 

921 Returns ``{"model_id", "revision", "source_count"}`` on 

922 success or ``{"error": "model not found"}`` if the model 

923 doesn't exist. 

924 """ 

925 try: 

926 bid = _resolve_bank(bank_id) 

927 model = await brain.refresh_mental_model( 

928 model_id, 

929 new_source_ids, 

930 bank_id=bid, 

931 context=_resolve_context(), 

932 ) 

933 if model is None: 

934 return json.dumps({"model_id": model_id, "error": "model not found"}) 

935 return json.dumps( 

936 { 

937 "model_id": model.model_id, 

938 "revision": model.revision, 

939 "source_count": len(model.source_ids), 

940 } 

941 ) 

942 except Exception as exc: 

943 logger.exception("memory_refresh_mental_model failed") 

944 return json.dumps({"model_id": model_id, "error": type(exc).__name__}) 

945 

946 @mcp.tool() 

947 async def memory_create_directive( 

948 rule_text: str, 

949 directive_id: str | None = None, 

950 scope: str = "bank", 

951 bank_id: str | None = None, 

952 ) -> str: 

953 """Author a user-curated hard-rule directive. 

954 

955 Directives are mental models with ``kind="directive"``. They 

956 participate in the same ``search_mental_models`` retrieval as 

957 general / preference models, but the discriminator signals to 

958 the answerer that the rule should be applied as a preference 

959 override rather than as one input among many. 

960 

961 Architecturally replaces the M18a-2 ``directive_compile`` 

962 auto-extraction path that was deprecated in M19 (auto-compressed 

963 directives replicated a −30pp SSP regression because they 

964 overrode original preference nuance). User-authored directives 

965 avoid that failure mode by definition — the user gets exactly 

966 the rule they intended. 

967 """ 

968 try: 

969 bid = _resolve_bank(bank_id) 

970 model = await brain.create_directive( 

971 rule_text=rule_text, 

972 directive_id=directive_id, 

973 scope=scope, 

974 bank_id=bid, 

975 context=_resolve_context(), 

976 ) 

977 return json.dumps( 

978 { 

979 "model_id": model.model_id, 

980 "kind": model.kind, 

981 "revision": model.revision, 

982 } 

983 ) 

984 except Exception as exc: 

985 logger.exception("memory_create_directive failed") 

986 return json.dumps({"model_id": None, "error": type(exc).__name__}) 

987 

988 # ── M21 — Observation CRUD ───────────────────────────────────── 

989 

990 @mcp.tool() 

991 async def memory_list_observations( 

992 bank_id: str | None = None, 

993 scope: str | None = None, 

994 trend: str | None = None, 

995 limit: int = 100, 

996 ) -> str: 

997 """List observations in a bank, optionally filtered by trend. 

998 

999 Observations are auto-consolidated distillations of raw 

1000 memories (e.g. "user prefers iced coffee in summer"). Each 

1001 carries a computed ``trend`` — one of ``new`` / 

1002 ``strengthening`` / ``stable`` / ``weakening`` / ``stale`` 

1003 derived from the evidence timestamps. Use ``trend`` to filter 

1004 for fresh insights (``new``, ``strengthening``) or to audit 

1005 stale patterns that may no longer apply. 

1006 """ 

1007 try: 

1008 bid = _resolve_bank(bank_id) 

1009 rows = await brain.list_observations( 

1010 bank_id=bid, 

1011 scope=scope, 

1012 trend=trend, 

1013 limit=limit, 

1014 context=_resolve_context(), 

1015 ) 

1016 return json.dumps({"observations": rows}) 

1017 except Exception as exc: 

1018 logger.exception("memory_list_observations failed") 

1019 return json.dumps({"observations": [], "error": type(exc).__name__}) 

1020 

1021 @mcp.tool() 

1022 async def memory_get_observation( 

1023 observation_id: str, 

1024 bank_id: str | None = None, 

1025 ) -> str: 

1026 """Fetch one observation by id. Returns null if not found.""" 

1027 try: 

1028 bid = _resolve_bank(bank_id) 

1029 row = await brain.get_observation( 

1030 observation_id, 

1031 bank_id=bid, 

1032 context=_resolve_context(), 

1033 ) 

1034 return json.dumps({"observation": row}) 

1035 except Exception as exc: 

1036 logger.exception("memory_get_observation failed") 

1037 return json.dumps({"observation": None, "error": type(exc).__name__}) 

1038 

1039 @mcp.tool() 

1040 async def memory_create_observation( 

1041 text: str, 

1042 source_ids: list[str] | None = None, 

1043 scope: str | None = None, 

1044 confidence: float = 0.9, 

1045 bank_id: str | None = None, 

1046 ) -> str: 

1047 """Hand-author an observation (bypasses the autonomous consolidator). 

1048 

1049 Use this when an agent or user wants to assert a fact directly 

1050 without going through retain → consolidate. The observation 

1051 becomes immediately visible to the recall pipeline's 

1052 ``observation`` strategy and to ``search_observations`` in 

1053 the agentic reflect loop. 

1054 """ 

1055 try: 

1056 bid = _resolve_bank(bank_id) 

1057 row = await brain.create_observation( 

1058 text=text, 

1059 source_ids=source_ids, 

1060 scope=scope, 

1061 confidence=confidence, 

1062 bank_id=bid, 

1063 context=_resolve_context(), 

1064 ) 

1065 return json.dumps({"observation": row}) 

1066 except Exception as exc: 

1067 logger.exception("memory_create_observation failed") 

1068 return json.dumps({"observation": None, "error": type(exc).__name__}) 

1069 

1070 @mcp.tool() 

1071 async def memory_delete_observation( 

1072 observation_id: str, 

1073 bank_id: str | None = None, 

1074 ) -> str: 

1075 """Delete one observation from the dedicated observations bank.""" 

1076 try: 

1077 bid = _resolve_bank(bank_id) 

1078 ok = await brain.delete_observation( 

1079 observation_id, 

1080 bank_id=bid, 

1081 context=_resolve_context(), 

1082 ) 

1083 return json.dumps({"deleted": ok}) 

1084 except Exception as exc: 

1085 logger.exception("memory_delete_observation failed") 

1086 return json.dumps({"deleted": False, "error": type(exc).__name__}) 

1087 

1088 # ── memory_health ────────────────────────────────────────────── 

1089 

1090 @mcp.tool() 

1091 async def memory_health() -> str: 

1092 """Check memory system health.""" 

1093 status = await brain.health() 

1094 return json.dumps( 

1095 { 

1096 "healthy": status.healthy, 

1097 "message": status.message, 

1098 "latency_ms": round(status.latency_ms, 2) if status.latency_ms else None, 

1099 } 

1100 ) 

1101 

1102 return mcp 

1103 

1104 

1105# --------------------------------------------------------------------------- 

1106# CLI entry point 

1107# --------------------------------------------------------------------------- 

1108 

1109 

1110def main() -> None: 

1111 """CLI entry point for astrocyte-mcp.""" 

1112 parser = argparse.ArgumentParser( 

1113 prog="astrocyte-mcp", 

1114 description="Astrocyte MCP server — memory tools for AI agents", 

1115 ) 

1116 parser.add_argument( 

1117 "--config", 

1118 required=True, 

1119 help="Path to astrocyte.yaml config file", 

1120 ) 

1121 parser.add_argument( 

1122 "--transport", 

1123 choices=["stdio", "sse"], 

1124 default="stdio", 

1125 help="MCP transport (default: stdio)", 

1126 ) 

1127 parser.add_argument( 

1128 "--port", 

1129 type=int, 

1130 default=8080, 

1131 help="Port for SSE transport (default: 8080)", 

1132 ) 

1133 args = parser.parse_args() 

1134 

1135 # Load config and create Astrocyte 

1136 config = load_config(args.config) 

1137 brain = Astrocyte(config) 

1138 

1139 # Wire access grants from config 

1140 grants = access_grants_for_astrocyte(config) 

1141 if grants: 

1142 brain.set_access_grants(grants) 

1143 

1144 # Create MCP server 

1145 mcp = create_mcp_server(brain, config) 

1146 

1147 # Run 

1148 if args.transport == "stdio": 

1149 mcp.run(transport="stdio") 

1150 else: 

1151 mcp.run(transport="sse", port=args.port) 

1152 

1153 

1154if __name__ == "__main__": 

1155 main()