Coverage for astrocyte/_astrocyte.py: 79%

711 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""Core Astrocyte class — the main entry point for the framework. 

2 

3Handles tier routing, policy enforcement, capability negotiation, 

4multi-bank orchestration, and hook dispatch. 

5""" 

6 

7from __future__ import annotations 

8 

9import inspect 

10import logging 

11from datetime import datetime, timezone 

12from pathlib import Path 

13from typing import TYPE_CHECKING, Any, Literal 

14 

15from astrocyte._hooks import HookHandler, HookManager 

16from astrocyte._log_safety import safe as _safe_log 

17from astrocyte._multi_bank import MultiBankOrchestrator 

18from astrocyte._output_scanner import OutputScanner 

19from astrocyte._policy import PolicyEnforcer 

20from astrocyte._provider_dispatch import ProviderDispatcher 

21from astrocyte._recall_params import RecallParams 

22from astrocyte._validation import validate_bank_id 

23from astrocyte.analytics import BankMetricsCollector, compute_bank_health, counters_to_quality_point 

24from astrocyte.config import AstrocyteConfig, load_config 

25from astrocyte.errors import ( 

26 AccessDenied, 

27 ConfigError, 

28 MipRoutingError, 

29 ProviderUnavailable, 

30) 

31from astrocyte.identity import context_principal_label 

32from astrocyte.lifecycle import LifecycleManager 

33from astrocyte.mip.router import MipRouter 

34from astrocyte.policy.observability import MetricsCollector, StructuredLogger, span, timed 

35from astrocyte.recall.authority import apply_recall_authority 

36from astrocyte.types import ( 

37 AccessGrant, 

38 AstrocyteContext, 

39 AuditResult, 

40 BankHealth, 

41 CompileResult, 

42 Entity, 

43 ForgetRequest, 

44 ForgetResult, 

45 GraphHit, 

46 HealthStatus, 

47 HistoryResult, 

48 LegalHold, 

49 LifecycleRunResult, 

50 MemoryHit, 

51 MentalModel, 

52 MultiBankStrategy, 

53 QualityDataPoint, 

54 RecallRequest, 

55 RecallResult, 

56 ReflectRequest, 

57 ReflectResult, 

58 RetainRequest, 

59 RetainResult, 

60) 

61 

62if TYPE_CHECKING: 

63 from astrocyte.pipeline.orchestrator import PipelineOrchestrator 

64 from astrocyte.provider import EngineProvider 

65 

66logger = logging.getLogger("astrocyte") 

67 

68 

69def _normalize_multi_bank_strategy( 

70 strategy: Literal["cascade", "parallel", "first_match"] | MultiBankStrategy | None, 

71) -> MultiBankStrategy: 

72 if strategy is None: 

73 return MultiBankStrategy(mode="parallel") 

74 if isinstance(strategy, MultiBankStrategy): 

75 return strategy 

76 if strategy in ("cascade", "parallel", "first_match"): 

77 return MultiBankStrategy(mode=strategy) 

78 raise ConfigError(f"Unknown multi-bank strategy: {strategy!r}") 

79 

80 

81class Astrocyte: 

82 """The Astrocyte memory framework — unified API for AI agent memory. 

83 

84 Usage: 

85 brain = Astrocyte.from_config("astrocyte.yaml") 

86 await brain.retain("Calvin prefers dark mode", bank_id="user-123") 

87 hits = await brain.recall("What are Calvin's preferences?", bank_id="user-123") 

88 """ 

89 

90 def __init__(self, config: AstrocyteConfig) -> None: 

91 self._config = config 

92 self._logger = StructuredLogger(level=config.observability.log_level) 

93 

94 # Extracted subsystems 

95 self._policy = PolicyEnforcer(config) 

96 self._hook_manager = HookManager(self._logger) 

97 self._output_scanner = OutputScanner(config, self._logger) 

98 self._dispatcher = ProviderDispatcher(config) 

99 

100 # Metrics 

101 self._metrics = MetricsCollector(enabled=config.observability.prometheus_enabled) 

102 

103 # Lifecycle 

104 self._lifecycle = LifecycleManager(config.lifecycle) 

105 

106 # MIP router (loaded from mip.yaml if configured) 

107 self._mip_router: MipRouter | None = None 

108 if config.mip_config_path: 

109 from astrocyte.mip.loader import load_mip_config 

110 

111 mip_config = load_mip_config(config.mip_config_path) 

112 self._mip_router = MipRouter(mip_config) 

113 

114 # Analytics 

115 self._analytics = BankMetricsCollector() 

116 

117 # Provider state (managed by dispatcher, exposed for wiring) 

118 self._engine_provider: EngineProvider | None = None 

119 self._pipeline: PipelineOrchestrator | None = None 

120 

121 # Multi-bank orchestrator (wired lazily after provider is set) 

122 self._multi_bank: MultiBankOrchestrator | None = None 

123 

124 # M8: wiki store (optional; enables brain.compile()) 

125 self._wiki_store: object | None = None 

126 # M8 W4: async compile queue (optional; enables automatic threshold triggering) 

127 self._compile_queue: object | None = None 

128 # M9: mental-model store (optional; enables /v1/mental-models endpoints). 

129 # First-class replacement for the prior wiki-piggyback (kind="concept" + 

130 # metadata["_mental_model"] = True) — see provider.MentalModelStore docs. 

131 self._mental_model_store: object | None = None 

132 # M10: source-document / chunk store (optional; enables source-aware 

133 # retain to preserve provenance back to the originating document and 

134 # chunk). Deployments without a SourceStore work exactly as before — 

135 # vectors stay anonymous flat rows. 

136 self._source_store: object | None = None 

137 

138 @property 

139 def config(self) -> AstrocyteConfig: 

140 """Loaded :class:`~astrocyte.config.AstrocyteConfig` (read-only for callers).""" 

141 return self._config 

142 

143 async def __aenter__(self) -> "Astrocyte": 

144 await self.start_background_tasks() 

145 return self 

146 

147 async def __aexit__(self, *exc: object) -> None: 

148 await self.stop_background_tasks() 

149 

150 async def start_background_tasks(self) -> None: 

151 """Start optional background services configured on this instance. 

152 

153 Today this covers the M8 compile queue. Keeping the lifecycle on 

154 ``Astrocyte`` lets library users and the REST gateway start the same 

155 services without reaching into private attributes. 

156 """ 

157 queue = self._compile_queue 

158 if queue is not None and hasattr(queue, "start"): 

159 await queue.start() # type: ignore[union-attr] 

160 

161 async def stop_background_tasks(self) -> None: 

162 """Stop optional background services configured on this instance.""" 

163 queue = self._compile_queue 

164 if queue is not None and hasattr(queue, "stop"): 

165 await queue.stop() # type: ignore[union-attr] 

166 shutdown = getattr(self._pipeline, "shutdown", None) 

167 if shutdown is not None: 

168 result = shutdown() 

169 if inspect.isawaitable(result): 

170 _ = await result 

171 

172 @classmethod 

173 def from_config(cls, path: str | Path) -> "Astrocyte": 

174 """Create an Astrocyte instance from a YAML config file.""" 

175 config = load_config(path) 

176 return cls(config) 

177 

178 @classmethod 

179 def from_config_dict(cls, data: dict[str, str | int | float | bool | None | dict | list]) -> "Astrocyte": 

180 """Create an Astrocyte instance from a config dictionary (for testing).""" 

181 from astrocyte.config import _dict_to_config, validate_astrocyte_config 

182 

183 config = _dict_to_config(data) 

184 validate_astrocyte_config(config) 

185 return cls(config) 

186 

187 def set_engine_provider(self, provider: EngineProvider) -> None: 

188 """Set the engine provider (for programmatic setup).""" 

189 from astrocyte.provider import check_spi_version 

190 

191 check_spi_version(provider, "EngineProvider") 

192 self._engine_provider = provider 

193 self._dispatcher.engine_provider = provider 

194 if hasattr(provider, "capabilities"): 

195 self._dispatcher.capabilities = provider.capabilities() 

196 self._rebuild_tiered_retrieval() 

197 self._rebuild_multi_bank() 

198 

199 def set_wiki_store(self, wiki_store: object) -> None: 

200 """Set the WikiStore provider (M8 wiki compile). Optional. 

201 

202 When a WikiStore is configured, ``brain.compile()`` becomes available. 

203 Compile is disabled by default — banks that don't opt in keep today's 

204 retain/recall behaviour exactly. 

205 

206 Args: 

207 wiki_store: Any object satisfying the 

208 :class:`~astrocyte.provider.WikiStore` protocol. 

209 """ 

210 from astrocyte.provider import check_spi_version 

211 

212 check_spi_version(wiki_store, "WikiStore") 

213 self._wiki_store = wiki_store 

214 

215 def set_mental_model_store(self, store: object) -> None: 

216 """Set the :class:`~astrocyte.provider.MentalModelStore` provider. Optional. 

217 

218 When configured, the gateway's ``/v1/mental-models`` endpoints 

219 (and any in-process consumer) route reads/writes through this 

220 store instead of erroring with HTTP 501. Cuts the prior 

221 wiki-piggyback path (``kind="concept"`` + metadata discriminator) 

222 — mental models now live in their own 

223 ``astrocyte_mental_models`` table with proper revision history. 

224 

225 Also wires the agentic reflect loop so ``search_mental_models`` 

226 becomes available as a tool — the highest-quality tier in the 

227 hierarchical retrieval order. Idempotent: if a pipeline is 

228 already attached, the service is pushed onto it; otherwise 

229 ``set_pipeline`` will pick up the store and build the service 

230 when it runs. 

231 """ 

232 from astrocyte.provider import check_spi_version 

233 

234 check_spi_version(store, "MentalModelStore") 

235 self._mental_model_store = store 

236 if self._pipeline is not None: 

237 from astrocyte.pipeline.mental_model import MentalModelService 

238 

239 self._pipeline.mental_model_service = MentalModelService(store) 

240 

241 def set_source_store(self, store: object) -> None: 

242 """Set the :class:`~astrocyte.provider.SourceStore` provider. Optional. 

243 

244 When configured, ingestion paths can persist 

245 :class:`~astrocyte.types.SourceDocument` and 

246 :class:`~astrocyte.types.SourceChunk` rows alongside vectors so memory 

247 retains a backreference to its originating document. Deployments 

248 without a SourceStore continue to work — vectors remain anonymous 

249 flat rows with ``chunk_id = NULL``. 

250 

251 Args: 

252 store: Any object satisfying the 

253 :class:`~astrocyte.provider.SourceStore` protocol. 

254 """ 

255 from astrocyte.provider import check_spi_version 

256 

257 check_spi_version(store, "SourceStore") 

258 self._source_store = store 

259 

260 def set_compile_queue(self, queue: object) -> None: 

261 """Set the async compile queue (M8 W4 threshold trigger). Optional. 

262 

263 When a :class:`~astrocyte.pipeline.compile_trigger.CompileQueue` is 

264 configured, each successful ``brain.retain()`` call notifies the queue. 

265 The queue fires a background compile job whenever the bank crosses the 

266 configured size or staleness threshold. 

267 

268 The queue must be started (``await queue.start()``) before the first 

269 retain call, and stopped (``await queue.stop()``) on shutdown. 

270 

271 Args: 

272 queue: A :class:`~astrocyte.pipeline.compile_trigger.CompileQueue` 

273 instance (or any object with a compatible ``notify_retain`` 

274 method). 

275 """ 

276 self._compile_queue = queue 

277 

278 def use_pageindex_pipeline( 

279 self, 

280 store: Any, 

281 embedding_provider: Any | None = None, 

282 *, 

283 document_resolver: Any | None = None, 

284 ) -> None: 

285 """M32 — install the PageIndex recall pipeline as the active 

286 retrieval stack. 

287 

288 After this call, ``self.recall(query, ...)`` routes through 

289 :class:`~astrocyte.pipeline.pageindex_pipeline.PageIndexPipeline` 

290 (the same stack the bench harness validates) instead of the 

291 legacy ``orchestrator.recall()`` / vector_store path. 

292 

293 This closes the bench-vs-API parity gap surfaced in the v0.15.0 

294 ship audit (docs/_design/m32-stack-unification.md). All 

295 M14-M31 cycle work (RRF fusion, fact↔chunk pairing, 

296 per-Q-type prompts, M27 fields, M28-M29 coreference, M30 

297 parallelization, M31 session_filter + event_date) now flows 

298 through the public API. 

299 

300 Args: 

301 store: A configured :class:`PageIndexStore` (Postgres or 

302 in-memory). 

303 embedding_provider: An :class:`LLMProvider` to embed query 

304 strings. Defaults to the Astrocyte instance's configured 

305 provider when ``None``. 

306 document_resolver: Optional ``(bank_id) -> document_id`` 

307 callable for single-doc scoping. See 

308 ``PageIndexPipeline.__init__`` for the contract. 

309 """ 

310 from astrocyte.pipeline.pageindex_pipeline import ( # noqa: PLC0415 

311 PageIndexPipeline, 

312 ) 

313 

314 if embedding_provider is None: 

315 embedding_provider = getattr(self, "_provider", None) 

316 if embedding_provider is None: 

317 raise ValueError( 

318 "use_pageindex_pipeline: no embedding_provider supplied and " 

319 "no provider configured on Astrocyte instance", 

320 ) 

321 self._dispatcher.pageindex_pipeline = PageIndexPipeline( 

322 store=store, 

323 embedding_provider=embedding_provider, 

324 config=self._config, 

325 document_resolver=document_resolver, 

326 ) 

327 

328 def set_pipeline(self, pipeline: PipelineOrchestrator) -> None: 

329 """Set the Tier 1 pipeline orchestrator (for programmatic setup).""" 

330 self._pipeline = pipeline 

331 self._dispatcher.pipeline = pipeline 

332 from astrocyte.pipeline.extraction import merged_extraction_profiles 

333 

334 pipeline.extraction_profiles = merged_extraction_profiles(self._config) 

335 pipeline.recall_authority = self._config.recall_authority 

336 

337 # If a mental-model store was set BEFORE the pipeline was 

338 # attached, push the service through now so ``search_mental_models`` 

339 # is wired into the agentic reflect loop. The reverse direction 

340 # (store set after pipeline) is handled in ``set_mental_model_store``. 

341 if self._mental_model_store is not None: 

342 from astrocyte.pipeline.mental_model import MentalModelService 

343 

344 pipeline.mental_model_service = MentalModelService( 

345 self._mental_model_store, # type: ignore[arg-type] 

346 ) 

347 

348 # Cross-encoder reranker (Hindsight parity) — only loaded when the 

349 # operator opts in via config. Lazy load: deferring the model load 

350 # to first-use keeps process startup cheap and keeps the 

351 # ``sentence-transformers`` dep optional. ``None`` here means 

352 # ``_rank_reflect_context`` falls through to the heuristic. 

353 cer_cfg = self._config.cross_encoder_rerank 

354 if cer_cfg.enabled: 

355 from astrocyte.pipeline.cross_encoder_rerank import ( 

356 get_default_cross_encoder, 

357 ) 

358 

359 pipeline.cross_encoder = get_default_cross_encoder( 

360 cer_cfg.model_name, 

361 force_cpu=cer_cfg.force_cpu, 

362 ) 

363 pipeline.cross_encoder_top_k = cer_cfg.top_k 

364 else: 

365 pipeline.cross_encoder = None 

366 

367 # Spreading activation through entity links (Hindsight parity). 

368 # Enabled at recall time when ``spreading_activation.enabled`` 

369 # AND a graph store is configured AND the adapter implements 

370 # ``expand_entities_via_links``. Falls back to seed-only when 

371 # the adapter doesn't support multi-hop traversal. 

372 # Causal-link extraction at retain time (Hindsight parity). 

373 cl_cfg = self._config.causal_links 

374 pipeline.causal_links_enabled = cl_cfg.enabled 

375 pipeline.causal_max_pairs_per_memory = cl_cfg.max_pairs_per_memory 

376 pipeline.causal_min_confidence = cl_cfg.min_confidence 

377 

378 # Semantic-kNN graph at retain time (Hindsight parity, C3a). 

379 slg_cfg = self._config.semantic_link_graph 

380 pipeline.semantic_link_graph_enabled = slg_cfg.enabled 

381 pipeline.semantic_link_graph_top_k = slg_cfg.top_k 

382 pipeline.semantic_link_graph_threshold = slg_cfg.similarity_threshold 

383 

384 # Structured fact extraction at retain time. 

385 sfe_cfg = self._config.structured_fact_extraction 

386 pipeline.structured_fact_extraction_enabled = sfe_cfg.enabled 

387 pipeline.structured_fact_extraction_max_facts = sfe_cfg.max_facts_per_call 

388 pipeline.structured_fact_extraction_mode = sfe_cfg.extraction_mode 

389 pipeline.structured_fact_extraction_chunk_strategy = sfe_cfg.chunk_strategy 

390 pipeline.structured_fact_extraction_chunk_max_size = sfe_cfg.chunk_max_size 

391 pipeline.structured_fact_extraction_parallel_chunks = sfe_cfg.parallel_chunks 

392 pipeline.structured_fact_extraction_parallel_chunks_max_concurrency = sfe_cfg.parallel_chunks_max_concurrency 

393 

394 # Entity co-occurrence link cap (2026-05-06 retain-profile fix). 

395 # The all-pairs Cartesian product was 34% of retain wall on the 

396 # LME profile; capping the entity set bounds it to O(K²) per 

397 # retain regardless of corpus size. 

398 coocc_cfg = self._config.entity_cooccurrence 

399 pipeline.entity_cooccurrence_enabled = coocc_cfg.enabled 

400 pipeline.entity_cooccurrence_max_entities = coocc_cfg.max_entities_per_memory 

401 

402 # Query analyzer (temporal constraint extraction at recall time). 

403 qa_cfg = self._config.query_analyzer 

404 pipeline.query_analyzer_enabled = qa_cfg.enabled 

405 pipeline.query_analyzer_allow_llm_fallback = qa_cfg.allow_llm_fallback 

406 # M18a-1: extended temporal-expansion pattern set. Env-var override 

407 # (``ASTROCYTE_M18_ENABLE_TEMPORAL_EXPANSION=1``) gives bench-time 

408 # control without code change; otherwise the per-bank config wins. 

409 import os as _os # noqa: PLC0415 

410 

411 _env = _os.environ.get("ASTROCYTE_M18_ENABLE_TEMPORAL_EXPANSION", "").lower() 

412 if _env in ("1", "true", "yes"): 

413 pipeline.query_analyzer_enable_temporal_expansion = True 

414 elif _env in ("0", "false", "no"): 

415 pipeline.query_analyzer_enable_temporal_expansion = False 

416 else: 

417 pipeline.query_analyzer_enable_temporal_expansion = qa_cfg.enable_temporal_expansion 

418 

419 # Link expansion (Hindsight parity, C3) — replaces the previous 

420 # spreading-activation BFS with the 3-parallel-signal path. 

421 # Reuses the legacy ``spreading_activation:`` config block so 

422 # benchmarks don't have to rewrite their YAML; the relevant 

423 # knobs (expansion_limit, etc.) translate directly. Block-name 

424 # rename can come later as a backward-incompatible change. 

425 sa_cfg = self._config.spreading_activation 

426 if sa_cfg.enabled: 

427 from astrocyte.pipeline.link_expansion import LinkExpansionParams 

428 

429 pipeline.link_expansion_params = LinkExpansionParams( 

430 expansion_limit=sa_cfg.expansion_limit, 

431 ) 

432 else: 

433 pipeline.link_expansion_params = None 

434 

435 # Adversarial-defense layer. 

436 # M9 BM25-IDF wiring — opt-in flag plumbed through to ``parallel_retrieve``. 

437 pipeline.bm25_idf_enabled = self._config.bm25_idf.enabled 

438 

439 # M10 source-aware retain + recall. The store handle and the three 

440 # behavioural flags are kept independently configurable so a 

441 # deployment can enable provenance ingest without paying for the 

442 # recall-side chunk expansion (which only helps multi-hop / split- 

443 # evidence questions and adds one DB roundtrip per top-K hit). 

444 sar_cfg = self._config.source_aware_retrieval 

445 pipeline.source_store = self._source_store 

446 pipeline.source_retain_provenance = sar_cfg.retain_provenance 

447 pipeline.source_chunk_expansion = sar_cfg.chunk_expansion 

448 pipeline.source_expansion_score_multiplier = sar_cfg.expansion_score_multiplier 

449 pipeline.source_expansion_max_per_hit = sar_cfg.expansion_max_per_hit 

450 

451 ad_cfg = self._config.adversarial_defense 

452 pipeline.adversarial_abstention_enabled = ad_cfg.abstention_enabled 

453 pipeline.adversarial_abstention_floor = ad_cfg.abstention_floor 

454 pipeline.adversarial_premise_verification_enabled = ad_cfg.premise_verification_enabled 

455 pipeline.adversarial_premise_min_confidence = ad_cfg.premise_verification_min_confidence 

456 pipeline.adversarial_prompt_enabled = ad_cfg.adversarial_prompt_enabled 

457 

458 # Agentic reflect loop. 

459 ar_cfg = self._config.agentic_reflect 

460 if ar_cfg.enabled: 

461 from astrocyte.pipeline.agentic_reflect import AgenticReflectParams 

462 

463 pipeline.agentic_reflect_params = AgenticReflectParams( 

464 max_iterations=ar_cfg.max_iterations, 

465 recall_step_max_results=ar_cfg.recall_step_max_results, 

466 max_evidence_pool_size=ar_cfg.max_evidence_pool_size, 

467 # Adversarial-defense rules in the system prompt are 

468 # opt-in via the separate adversarial_defense block — 

469 # the agentic loop's own ``adversarial_defense`` param 

470 # mirrors that flag so a user enabling adversarial 

471 # defense automatically gets the loop-level prompt 

472 # tightening too. 

473 adversarial_defense=ad_cfg.adversarial_prompt_enabled, 

474 ) 

475 else: 

476 pipeline.agentic_reflect_params = None 

477 # Wire the LLM provider to the MIP router for intent-layer escalation 

478 if self._mip_router and hasattr(pipeline, "llm_provider"): 

479 self._mip_router._llm_provider = pipeline.llm_provider 

480 # Expose router for per-bank MIP resolution at recall time (P3) 

481 pipeline.mip_router = self._mip_router 

482 self._rebuild_tiered_retrieval() 

483 self._rebuild_multi_bank() 

484 

485 def _rebuild_tiered_retrieval(self) -> None: 

486 """Construct :class:`~astrocyte.pipeline.tiered_retrieval.TieredRetriever` when enabled.""" 

487 from astrocyte.hybrid import HybridEngineProvider 

488 from astrocyte.pipeline.recall_cache import RecallCache 

489 from astrocyte.pipeline.recent_buffer import RecentMemoryBuffer 

490 from astrocyte.pipeline.tiered_retrieval import TieredRetriever 

491 

492 self._dispatcher.tiered_retriever = None 

493 if not self._pipeline or not self._config.tiered_retrieval.enabled: 

494 return 

495 trc = self._config.tiered_retrieval 

496 if self._engine_provider is not None and trc.full_recall != "hybrid": 

497 return 

498 full_recall_fn = None 

499 if trc.full_recall == "hybrid": 

500 if not isinstance(self._engine_provider, HybridEngineProvider): 

501 logger.warning( 

502 "tiered_retrieval.full_recall=hybrid requires HybridEngineProvider; " 

503 "tiered retrieval disabled until a hybrid provider is set", 

504 ) 

505 return 

506 full_recall_fn = self._engine_provider.recall 

507 rcc = self._config.recall_cache 

508 cache: RecallCache | None = None 

509 if rcc.enabled: 

510 cache = RecallCache( 

511 similarity_threshold=rcc.similarity_threshold, 

512 max_entries=rcc.max_entries, 

513 ttl_seconds=rcc.ttl_seconds, 

514 ) 

515 recent = RecentMemoryBuffer() 

516 self._dispatcher.tiered_retriever = TieredRetriever( 

517 self._pipeline, 

518 recall_cache=cache, 

519 recent_buffer=recent, 

520 min_results=trc.min_results, 

521 min_score=trc.min_score, 

522 max_tier=trc.max_tier, 

523 full_recall=full_recall_fn, 

524 ) 

525 

526 def _rebuild_multi_bank(self) -> None: 

527 """Rebuild MultiBankOrchestrator when provider changes.""" 

528 self._multi_bank = MultiBankOrchestrator( 

529 do_recall=self._dispatcher.recall, 

530 make_request=self._make_recall_request, 

531 circuit_breaker_record_failure=self._policy.record_failure, 

532 metrics=self._metrics, 

533 provider_name=self._dispatcher.provider_name, 

534 ) 

535 

536 def set_access_grants(self, grants: list[AccessGrant]) -> None: 

537 """Configure access grants.""" 

538 self._policy.set_access_grants(grants) 

539 

540 @property 

541 def _rate_limiters(self) -> dict: 

542 """Expose rate limiters for testing/introspection.""" 

543 return self._policy._rate_limiters 

544 

545 def register_hook(self, event_type: str, handler: HookHandler) -> None: 

546 """Register an event hook handler.""" 

547 self._hook_manager.register(event_type, handler) 

548 

549 # --------------------------------------------------------------------------- 

550 # Public API 

551 # --------------------------------------------------------------------------- 

552 

553 async def retain( 

554 self, 

555 content: str, 

556 bank_id: str, 

557 *, 

558 metadata: dict[str, str | int | float | bool | None] | None = None, 

559 tags: list[str] | None = None, 

560 context: AstrocyteContext | None = None, 

561 content_type: str = "text", 

562 extraction_profile: str | None = None, 

563 occurred_at: datetime | None = None, 

564 source: str | None = None, 

565 pii_detected: bool = False, 

566 ) -> RetainResult: 

567 """Store content into memory. 

568 

569 Args: 

570 content_type: ``text``, ``conversation``, ``document``, etc. 

571 extraction_profile: Name under YAML ``extraction_profiles:``. 

572 occurred_at: When the content originally occurred. 

573 source: Origin identifier for the content. 

574 pii_detected: Whether PII was already detected upstream. 

575 """ 

576 validate_bank_id(bank_id) 

577 with span("astrocyte.retain", {"astrocyte.bank_id": bank_id}): 

578 # Input size validation — reject oversized content before pipeline processing 

579 input_error = self._policy.validate_retain_input(content, tags) 

580 if input_error: 

581 return RetainResult(stored=False, error=input_error) 

582 

583 # Access control 

584 self._policy.check_access(bank_id, "write", context) 

585 

586 # MIP routing (before policy layer) 

587 mip_pipeline = None 

588 mip_rule_name = None 

589 if self._mip_router: 

590 from astrocyte.identity import resolve_actor 

591 from astrocyte.mip.rule_engine import RuleEngineInput 

592 

593 # Identity spec §3 Gap 2: resolved actor flows into MIP so 

594 # rules can branch on principal_type / principal_id / etc. 

595 # When no context is supplied (legacy callers), actor is 

596 # None and principal_* match conditions simply never fire. 

597 actor_identity = resolve_actor(context) if context else None 

598 

599 mip_input = RuleEngineInput( 

600 content=content, 

601 content_type=content_type, 

602 metadata=metadata, 

603 tags=tags, 

604 pii_detected=pii_detected, 

605 source=source, 

606 actor_identity=actor_identity, 

607 ) 

608 routing = await self._mip_router.route(mip_input) 

609 if routing.bank_id: 

610 # Re-check access for new bank 

611 if routing.bank_id != bank_id: 

612 self._policy.check_access(routing.bank_id, "write", context) 

613 bank_id = routing.bank_id 

614 if routing.tags is not None: 

615 tags = routing.tags 

616 if routing.retain_policy == "reject": 

617 return RetainResult(stored=False, error="Rejected by MIP routing rule") 

618 mip_pipeline = routing.pipeline 

619 mip_rule_name = routing.rule_name 

620 

621 # Rate limiting + quota (atomic to prevent TOCTOU) 

622 self._policy.check_rate_and_quota(bank_id, "retain") 

623 

624 # Content validation 

625 errors = self._policy.validate_content(content, content_type) 

626 if errors: 

627 return RetainResult(stored=False, error="; ".join(errors)) 

628 

629 # PII scanning (async for LLM/rules_then_llm modes) 

630 content, pii_matches = await self._policy.scan_pii(content, self._config.barriers.pii.mode) 

631 if pii_matches: 

632 pii_action = self._policy.pii_action 

633 self._logger.log( 

634 "astrocyte.policy.pii_detected", 

635 bank_id=bank_id, 

636 operation="retain", 

637 data={"pii_types": ",".join(m.pii_type for m in pii_matches), "action": pii_action}, 

638 ) 

639 self._metrics.inc_counter( 

640 "astrocyte_pii_detected_total", 

641 {"bank_id": bank_id, "action": pii_action}, 

642 ) 

643 await self._hook_manager.fire( 

644 "on_pii_detected", 

645 bank_id=bank_id, 

646 data={ 

647 "pii_types": ",".join(m.pii_type for m in pii_matches), 

648 "action": pii_action, 

649 }, 

650 ) 

651 

652 # Metadata sanitization 

653 metadata, meta_warnings = self._policy.sanitize_metadata(metadata) 

654 

655 # Build request 

656 request = RetainRequest( 

657 content=content, 

658 bank_id=bank_id, 

659 metadata=metadata, 

660 tags=tags, 

661 occurred_at=occurred_at, 

662 source=source, 

663 content_type=content_type, 

664 extraction_profile=extraction_profile, 

665 mip_pipeline=mip_pipeline, 

666 mip_rule_name=mip_rule_name, 

667 ) 

668 

669 # Route to provider 

670 try: 

671 self._policy.check_circuit(self._provider_name) 

672 with timed() as t: 

673 result = await self._dispatcher.retain(request) 

674 self._policy.record_success() 

675 self._policy.record_quota(bank_id, "retain") 

676 self._metrics.inc_counter( 

677 "astrocyte_retain_total", 

678 {"bank_id": bank_id, "provider": self._provider_name, "status": "ok"}, 

679 ) 

680 self._metrics.observe_histogram( 

681 "astrocyte_retain_duration_seconds", 

682 t["elapsed_ms"] / 1000, 

683 {"bank_id": bank_id, "provider": self._provider_name}, 

684 ) 

685 self._analytics.record_retain( 

686 bank_id, 

687 len(content), 

688 deduplicated=getattr(result, "deduplicated", False), 

689 ) 

690 await self._hook_manager.fire( 

691 "on_retain", 

692 bank_id=bank_id, 

693 data={ 

694 "memory_id": result.memory_id or "", 

695 "content_length": len(content), 

696 }, 

697 ) 

698 # M8 W4: notify the compile queue so it can trigger a background 

699 # compile when the bank crosses the configured threshold. 

700 if self._compile_queue is not None and result.stored: 

701 self._compile_queue.notify_retain(bank_id) # type: ignore[union-attr] 

702 return result 

703 except ProviderUnavailable: 

704 self._policy.handle_degraded_retain(self._provider_name) 

705 return RetainResult(stored=False, error="Provider unavailable (degraded mode)") 

706 except Exception: 

707 self._policy.record_failure() 

708 self._metrics.inc_counter( 

709 "astrocyte_retain_total", 

710 {"bank_id": bank_id, "provider": self._provider_name, "status": "error"}, 

711 ) 

712 raise 

713 

714 async def _make_recall_request( 

715 self, 

716 query: str, 

717 bank_id: str, 

718 max_results: int, 

719 max_tokens: int | None, 

720 tags: list[str] | None, 

721 params: RecallParams, 

722 ) -> RecallRequest: 

723 from astrocyte.recall.proxy import merge_manual_and_proxy_hits 

724 

725 ext = await merge_manual_and_proxy_hits( 

726 self._config, 

727 query=query, 

728 bank_id=bank_id, 

729 manual=params.external_context, 

730 metrics=self._metrics, 

731 ) 

732 return RecallRequest( 

733 query=query, 

734 bank_id=bank_id, 

735 max_results=max_results, 

736 max_tokens=max_tokens, 

737 tags=tags, 

738 fact_types=params.fact_types, 

739 time_range=params.time_range, 

740 include_sources=params.include_sources, 

741 layer_weights=params.layer_weights, 

742 detail_level=params.detail_level, 

743 external_context=ext, 

744 as_of=params.as_of, # M9 

745 query_reference_date=params.query_reference_date, 

746 session_id=params.session_id, # M31 Fix 2 

747 ) 

748 

749 async def recall( 

750 self, 

751 query: str, 

752 bank_id: str | None = None, 

753 *, 

754 banks: list[str] | None = None, 

755 strategy: Literal["cascade", "parallel", "first_match"] | MultiBankStrategy | None = None, 

756 max_results: int = 10, 

757 max_tokens: int | None = None, 

758 tags: list[str] | None = None, 

759 context: AstrocyteContext | None = None, 

760 external_context: list[MemoryHit] | None = None, 

761 fact_types: list[str] | None = None, 

762 time_range: tuple[datetime, datetime] | None = None, 

763 include_sources: bool = False, 

764 layer_weights: dict[str, float] | None = None, 

765 detail_level: str | None = None, 

766 as_of: datetime | None = None, 

767 query_reference_date: datetime | None = None, 

768 session_id: str | None = None, 

769 ) -> RecallResult: 

770 """Retrieve relevant memories for a query. 

771 

772 With multiple ``banks``, use ``strategy`` (or a :class:`MultiBankStrategy`) to choose 

773 ``parallel`` (default), ``cascade`` (widen until enough hits), or ``first_match``. 

774 

775 Args: 

776 external_context: Extra :class:`MemoryHit` items merged with retrieval via RRF. 

777 fact_types: Filter by fact type (e.g. ``["preference", "event"]``). 

778 time_range: ``(start, end)`` datetime tuple to scope retrieval. 

779 include_sources: Include source metadata in results. 

780 layer_weights: Per-layer scoring weights for tiered retrieval. 

781 detail_level: Granularity hint (``"summary"`` / ``"full"``). 

782 

783 When ``tiered_retrieval.enabled`` is set and a pipeline is configured, recall uses 

784 :class:`~astrocyte.pipeline.tiered_retrieval.TieredRetriever` for cheap tiers. 

785 """ 

786 # Resolve bank(s) 

787 bank_ids = self._policy.resolve_read_bank_ids(bank_id, banks, context) 

788 

789 max_tokens = max_tokens or self._config.homeostasis.recall_max_tokens 

790 

791 with span( 

792 "astrocyte.recall", 

793 { 

794 "astrocyte.bank_count": len(bank_ids), 

795 "astrocyte.bank_id": bank_ids[0] if len(bank_ids) == 1 else f"{bank_ids[0]}+{len(bank_ids) - 1}", 

796 }, 

797 ): 

798 # Access control for all banks 

799 for bid in bank_ids: 

800 self._policy.check_access(bid, "read", context) 

801 

802 # Rate limiting + quota (atomic per-bank) 

803 for bid in bank_ids: 

804 self._policy.check_rate_and_quota(bid, "recall") 

805 

806 # Build typed params for internal helpers 

807 _rp = RecallParams( 

808 external_context=external_context, 

809 fact_types=fact_types, 

810 time_range=time_range, 

811 include_sources=include_sources, 

812 layer_weights=layer_weights, 

813 detail_level=detail_level, 

814 as_of=as_of, # M9 

815 query_reference_date=query_reference_date, 

816 session_id=session_id, # M31 Fix 2 

817 ) 

818 

819 # Single bank — direct 

820 if len(bank_ids) == 1: 

821 request = await self._make_recall_request( 

822 query, 

823 bank_ids[0], 

824 max_results, 

825 max_tokens, 

826 tags, 

827 _rp, 

828 ) 

829 try: 

830 self._policy.check_circuit(self._provider_name) 

831 with timed() as t: 

832 result = await self._dispatcher.recall(request) 

833 self._policy.record_success() 

834 self._metrics.inc_counter( 

835 "astrocyte_recall_total", 

836 {"bank_id": bank_ids[0], "provider": self._provider_name, "status": "ok"}, 

837 ) 

838 self._metrics.observe_histogram( 

839 "astrocyte_recall_duration_seconds", 

840 t["elapsed_ms"] / 1000, 

841 {"bank_id": bank_ids[0], "provider": self._provider_name}, 

842 ) 

843 top_score = result.hits[0].score if result.hits else 0.0 

844 self._analytics.record_recall( 

845 bank_ids[0], 

846 len(result.hits), 

847 top_score, 

848 ) 

849 await self._hook_manager.fire( 

850 "on_recall", 

851 bank_id=bank_ids[0], 

852 data={ 

853 "query_length": len(query), 

854 "result_count": len(result.hits), 

855 }, 

856 ) 

857 if self._config.dlp.scan_recall_output: 

858 result = self._output_scanner.scan_recall(result) 

859 return apply_recall_authority(result, self._config.recall_authority) 

860 except ProviderUnavailable: 

861 return self._policy.handle_degraded_recall(self._provider_name) 

862 except Exception: 

863 self._policy.record_failure() 

864 raise 

865 

866 strat = _normalize_multi_bank_strategy(strategy) 

867 result = await self._multi_bank.recall( 

868 query, 

869 bank_ids, 

870 max_results, 

871 max_tokens, 

872 tags, 

873 _rp, 

874 strat, 

875 ) 

876 if self._config.dlp.scan_recall_output: 

877 result = self._output_scanner.scan_recall(result) 

878 return apply_recall_authority(result, self._config.recall_authority) 

879 

880 async def reflect( 

881 self, 

882 query: str, 

883 bank_id: str | None = None, 

884 *, 

885 banks: list[str] | None = None, 

886 strategy: Literal["cascade", "parallel", "first_match"] | MultiBankStrategy | None = None, 

887 max_tokens: int | None = None, 

888 tags: list[str] | None = None, 

889 context: AstrocyteContext | None = None, 

890 include_sources: bool = True, 

891 dispositions: Any | None = None, 

892 as_of: datetime | None = None, 

893 query_reference_date: datetime | None = None, 

894 ) -> ReflectResult: 

895 """Synthesize an answer from memory. 

896 

897 Args: 

898 tags: Filter recall by tags during reflect. 

899 include_sources: Include source memories in the result. 

900 dispositions: Emotional/tonal dispositions for synthesis. 

901 as_of: M9 time-travel anchor — filters underlying recall to 

902 ``retained_at <= as_of``. For legal-hold / audit-replay 

903 use cases ("show me what the system knew at time X"). Do 

904 NOT pass for relative-phrase anchoring on benchmarks 

905 whose retain happens after the dataset's question dates; 

906 use ``query_reference_date`` instead. 

907 query_reference_date: Reference date for resolving relative 

908 temporal phrases (``"yesterday"``, ``"last week"``, 

909 ``"3 days ago"``). Required when running a benchmark 

910 whose dataset predates the wall-clock — e.g. LongMemEval 

911 (2023-vintage) measured in 2026 must pass the question's 

912 contemporaneous date here. Unlike ``as_of``, setting 

913 this alone does NOT filter out memories retained after 

914 this date — the entire corpus stays in scope. 

915 

916 Supports multi-bank reflect: pass ``banks`` (and optionally ``strategy``) to 

917 recall across multiple banks and synthesize over the fused results. 

918 """ 

919 # Resolve bank(s) 

920 bank_ids = self._policy.resolve_read_bank_ids(bank_id, banks, context) 

921 

922 max_tokens = max_tokens or self._config.homeostasis.reflect_max_tokens 

923 primary_bank = bank_ids[0] 

924 

925 with span( 

926 "astrocyte.reflect", 

927 { 

928 "astrocyte.bank_count": len(bank_ids), 

929 "astrocyte.bank_id": bank_ids[0] if len(bank_ids) == 1 else f"{bank_ids[0]}+{len(bank_ids) - 1}", 

930 }, 

931 ): 

932 # Access control for all banks 

933 for bid in bank_ids: 

934 self._policy.check_access(bid, "read", context) 

935 

936 for bid in bank_ids: 

937 self._policy.check_rate_and_quota(bid, "reflect") 

938 

939 # ── Single bank: delegate to provider/pipeline reflect ── 

940 if len(bank_ids) == 1: 

941 request = ReflectRequest( 

942 query=query, 

943 bank_id=primary_bank, 

944 max_tokens=max_tokens, 

945 include_sources=include_sources, 

946 dispositions=dispositions, 

947 tags=tags, 

948 as_of=as_of, 

949 query_reference_date=query_reference_date, 

950 ) 

951 try: 

952 self._policy.check_circuit(self._provider_name) 

953 with timed() as t: 

954 result = await self._dispatcher.reflect(request) 

955 self._policy.record_success() 

956 except ProviderUnavailable: 

957 return ReflectResult(answer="Memory unavailable", sources=[]) 

958 except Exception: 

959 self._policy.record_failure() 

960 raise 

961 

962 # ── Multi-bank: recall across banks, then synthesize ── 

963 else: 

964 strat = _normalize_multi_bank_strategy(strategy) 

965 with timed() as t: 

966 _rp = RecallParams( 

967 include_sources=include_sources, 

968 as_of=as_of, 

969 query_reference_date=query_reference_date, 

970 ) 

971 recall_result = await self._multi_bank.recall( 

972 query, 

973 bank_ids, 

974 max_results=20, # Larger set for synthesis context 

975 max_tokens=None, # Budget applied after synthesis 

976 tags=tags, 

977 params=_rp, 

978 strategy=strat, 

979 ) 

980 auth_ctx: str | None = None 

981 ra = self._config.recall_authority 

982 if ra.enabled and ra.apply_to_reflect: 

983 recall_result = apply_recall_authority(recall_result, ra) 

984 auth_ctx = recall_result.authority_context 

985 result = await self._dispatcher.reflect_from_hits( 

986 query=query, 

987 hits=recall_result.hits, 

988 bank_id=primary_bank, 

989 max_tokens=max_tokens, 

990 dispositions=dispositions, 

991 authority_context=auth_ctx, 

992 ) 

993 

994 self._analytics.record_reflect( 

995 primary_bank, 

996 success=bool(result.answer.strip()), 

997 ) 

998 self._policy.record_quota(primary_bank, "reflect") 

999 self._metrics.inc_counter( 

1000 "astrocyte_reflect_total", 

1001 {"bank_id": ",".join(bank_ids), "provider": self._provider_name, "status": "ok"}, 

1002 ) 

1003 self._metrics.observe_histogram( 

1004 "astrocyte_reflect_duration_seconds", 

1005 t["elapsed_ms"] / 1000, 

1006 {"bank_id": ",".join(bank_ids), "provider": self._provider_name}, 

1007 ) 

1008 await self._hook_manager.fire( 

1009 "on_reflect", 

1010 bank_id=primary_bank, 

1011 data={ 

1012 "query_length": len(query), 

1013 "answer_length": len(result.answer), 

1014 "bank_count": len(bank_ids), 

1015 }, 

1016 ) 

1017 if self._config.dlp.scan_reflect_output: 

1018 result = self._output_scanner.scan_reflect(result) 

1019 return result 

1020 

1021 # ── M21 — Mental model CRUD (Hindsight parity) ──────────────────── 

1022 

1023 def _mental_model_service(self) -> Any: 

1024 """Return the configured MentalModelService or raise. 

1025 

1026 Mental-model CRUD requires a store to have been set via 

1027 :meth:`set_mental_model_store`; otherwise these methods raise 

1028 :class:`RuntimeError` with a hint. Cleanest place to fail — 

1029 attempts to call the methods without storage would otherwise 

1030 crash deeper inside the pipeline. 

1031 """ 

1032 if self._mental_model_store is None: 

1033 raise RuntimeError( 

1034 "mental_model_store not configured — call " 

1035 "Astrocyte.set_mental_model_store(store) before using " 

1036 "mental-model CRUD." 

1037 ) 

1038 from astrocyte.pipeline.mental_model import MentalModelService 

1039 

1040 return MentalModelService(self._mental_model_store) # type: ignore[arg-type] 

1041 

1042 async def list_mental_models( 

1043 self, 

1044 bank_id: str | None = None, 

1045 *, 

1046 scope: str | None = None, 

1047 context: AstrocyteContext | None = None, 

1048 ) -> "list[MentalModel]": 

1049 """List mental models in a bank. Optionally filter by scope.""" 

1050 bid = self._policy.resolve_read_bank_ids(bank_id, None, context)[0] 

1051 self._policy.check_access(bid, "read", context) 

1052 return await self._mental_model_service().list(bid, scope=scope) 

1053 

1054 async def get_mental_model( 

1055 self, 

1056 model_id: str, 

1057 bank_id: str | None = None, 

1058 *, 

1059 context: AstrocyteContext | None = None, 

1060 ) -> "MentalModel | None": 

1061 """Fetch one mental model by id. Returns None if not found / deleted.""" 

1062 bid = self._policy.resolve_read_bank_ids(bank_id, None, context)[0] 

1063 self._policy.check_access(bid, "read", context) 

1064 return await self._mental_model_service().get(bank_id=bid, model_id=model_id) 

1065 

1066 async def create_mental_model( 

1067 self, 

1068 *, 

1069 model_id: str, 

1070 title: str, 

1071 content: str | None = None, 

1072 sections: list[dict] | None = None, 

1073 scope: str = "bank", 

1074 source_ids: list[str] | None = None, 

1075 bank_id: str | None = None, 

1076 context: AstrocyteContext | None = None, 

1077 ) -> "MentalModel": 

1078 """Create or refresh a mental model. 

1079 

1080 Two content shapes (mutually exclusive — provide one): 

1081 

1082 - ``content``: raw markdown string. Parsed on first refresh into 

1083 a structured doc; legacy-shape path. 

1084 - ``sections``: list of section dicts matching 

1085 :class:`astrocyte.pipeline.structured_doc.Section` JSON shape. 

1086 Modern path — the structured doc is populated immediately and 

1087 ``content`` is rendered from it. 

1088 

1089 Pass exactly one. If both are provided, ``sections`` wins. 

1090 """ 

1091 bid = self._policy.resolve_read_bank_ids(bank_id, None, context)[0] 

1092 self._policy.check_access(bid, "write", context) 

1093 self._policy.check_rate_and_quota(bid, "retain") 

1094 

1095 service = self._mental_model_service() 

1096 

1097 if sections is not None: 

1098 from astrocyte.pipeline.structured_doc import ( 

1099 StructuredDocument, 

1100 render_document, 

1101 ) 

1102 

1103 doc = StructuredDocument.model_validate({"sections": sections}) 

1104 rendered = render_document(doc) 

1105 return await service.create( 

1106 bank_id=bid, 

1107 model_id=model_id, 

1108 title=title, 

1109 content=rendered, 

1110 scope=scope, 

1111 source_ids=source_ids, 

1112 structured_doc=doc.model_dump(), 

1113 ) 

1114 

1115 if content is None: 

1116 raise ValueError("create_mental_model requires either `content` or `sections`") 

1117 

1118 return await service.create( 

1119 bank_id=bid, 

1120 model_id=model_id, 

1121 title=title, 

1122 content=content, 

1123 scope=scope, 

1124 source_ids=source_ids, 

1125 ) 

1126 

1127 async def update_mental_model( 

1128 self, 

1129 *, 

1130 model_id: str, 

1131 operations: list[dict], 

1132 bank_id: str | None = None, 

1133 context: AstrocyteContext | None = None, 

1134 ) -> "tuple[MentalModel, dict] | None": 

1135 """Apply structured delta operations to a mental model. 

1136 

1137 See :func:`astrocyte.pipeline.delta_ops.apply_operations` for 

1138 the operation schema. Conservative-failure contract — invalid 

1139 ops drop with a logged reason; the document never gets worse 

1140 than its input. 

1141 

1142 Returns ``(updated_model, summary)`` where summary is 

1143 ``{"applied": [...], "skipped": [...], "changed": bool}`` — 

1144 the audit trail of which ops landed. Returns ``None`` when 

1145 the model doesn't exist. 

1146 """ 

1147 bid = self._policy.resolve_read_bank_ids(bank_id, None, context)[0] 

1148 self._policy.check_access(bid, "write", context) 

1149 self._policy.check_rate_and_quota(bid, "retain") 

1150 return await self._mental_model_service().update_via_ops( 

1151 bank_id=bid, 

1152 model_id=model_id, 

1153 operations=operations, 

1154 ) 

1155 

1156 async def delete_mental_model( 

1157 self, 

1158 model_id: str, 

1159 bank_id: str | None = None, 

1160 *, 

1161 context: AstrocyteContext | None = None, 

1162 ) -> bool: 

1163 """Soft-delete a mental model. Returns True iff it existed.""" 

1164 bid = self._policy.resolve_read_bank_ids(bank_id, None, context)[0] 

1165 self._policy.check_access(bid, "forget", context) 

1166 return await self._mental_model_service().delete(bank_id=bid, model_id=model_id) 

1167 

1168 async def refresh_mental_model( 

1169 self, 

1170 model_id: str, 

1171 new_source_ids: list[str], 

1172 bank_id: str | None = None, 

1173 *, 

1174 context: AstrocyteContext | None = None, 

1175 ) -> "MentalModel | None": 

1176 """M28 — re-derive a mental model from new/expanded source memories. 

1177 

1178 Hindsight parity for ``mental_model.refresh()``: after retain 

1179 adds memories that pertain to an existing model, this folds 

1180 them into the model's ``source_ids`` and bumps the revision. 

1181 

1182 The store-level implementation merges ``new_source_ids`` into 

1183 the existing set (order-preserving dedup); future enhancement 

1184 invokes the LLM compile pipeline against the merged source set 

1185 to also rewrite ``content`` and ``structured_doc`` — the SPI 

1186 surface is the same. 

1187 

1188 Returns the refreshed :class:`MentalModel` or ``None`` if the 

1189 model doesn't exist. 

1190 """ 

1191 bid = self._policy.resolve_read_bank_ids(bank_id, None, context)[0] 

1192 self._policy.check_access(bid, "write", context) 

1193 self._policy.check_rate_and_quota(bid, "retain") 

1194 return await self._mental_model_service().refresh_from_sources( 

1195 bank_id=bid, 

1196 model_id=model_id, 

1197 new_source_ids=list(new_source_ids), 

1198 ) 

1199 

1200 async def create_directive( 

1201 self, 

1202 *, 

1203 rule_text: str, 

1204 directive_id: str | None = None, 

1205 scope: str = "bank", 

1206 bank_id: str | None = None, 

1207 context: AstrocyteContext | None = None, 

1208 ) -> "MentalModel": 

1209 """Create a user-authored hard rule (M18b/M19 deferred item). 

1210 

1211 Directives are stored as :class:`MentalModel` rows with 

1212 ``kind="directive"``. They participate in the agentic reflect 

1213 loop's ``search_mental_models`` tool exactly like ``general`` 

1214 / ``preference`` models, but the ``directive`` discriminator 

1215 signals to the answerer that the rule should be applied as a 

1216 hard preference override rather than as one input among many. 

1217 

1218 Mirrors Hindsight's ``mental_models.subtype="directive"`` 

1219 rows. Architecturally replaces the M18a-2 ``directive_compile`` 

1220 auto-extraction path that was deprecated in M19 (it replicated 

1221 a −30pp SSP regression because compressed auto-directives 

1222 overrode original preference nuance). 

1223 """ 

1224 import uuid as _uuid 

1225 

1226 bid = self._policy.resolve_read_bank_ids(bank_id, None, context)[0] 

1227 self._policy.check_access(bid, "write", context) 

1228 self._policy.check_rate_and_quota(bid, "retain") 

1229 

1230 if not rule_text.strip(): 

1231 raise ValueError("create_directive requires non-empty rule_text") 

1232 

1233 mid = directive_id or f"directive:{_uuid.uuid4().hex[:12]}" 

1234 service = self._mental_model_service() 

1235 # Author the directive with a single-paragraph structured doc so 

1236 # subsequent refreshes can use delta_ops cleanly. 

1237 from astrocyte.pipeline.structured_doc import ( 

1238 ParagraphBlock, 

1239 Section, 

1240 StructuredDocument, 

1241 render_document, 

1242 ) 

1243 

1244 doc = StructuredDocument( 

1245 sections=[ 

1246 Section( 

1247 id="rule", 

1248 heading="Rule", 

1249 level=2, 

1250 blocks=[ParagraphBlock(text=rule_text.strip())], 

1251 ), 

1252 ], 

1253 ) 

1254 rendered = render_document(doc) 

1255 return await service.create( 

1256 bank_id=bid, 

1257 model_id=mid, 

1258 title="Directive", 

1259 content=rendered, 

1260 scope=scope, 

1261 kind="directive", 

1262 structured_doc=doc.model_dump(), 

1263 ) 

1264 

1265 # ── M21 — Observation CRUD (Hindsight parity) ──────────────────── 

1266 

1267 async def list_observations( 

1268 self, 

1269 bank_id: str | None = None, 

1270 *, 

1271 scope: str | None = None, 

1272 trend: "str | None" = None, 

1273 limit: int = 100, 

1274 context: AstrocyteContext | None = None, 

1275 ) -> list[dict]: 

1276 """List observations in a bank. 

1277 

1278 Returns a list of dicts ``{id, text, trend, proof_count, 

1279 source_ids, confidence, updated_at}`` so callers (incl. the 

1280 MCP tool) get a JSON-shaped output without round-tripping 

1281 through ``VectorHit``. 

1282 

1283 Args: 

1284 scope: Filter by ``_obs_scope`` metadata (default ``"bank"`` 

1285 if observations were created without a tag). 

1286 trend: Filter by computed trend (``"new"`` / ``"stable"`` / 

1287 ``"strengthening"`` / ``"weakening"`` / ``"stale"``). 

1288 Trend is computed at call time from each observation's 

1289 source-timestamp metadata; see 

1290 :func:`astrocyte.pipeline.observation.compute_observation_trend`. 

1291 limit: Max rows. Hard cap 1000. 

1292 """ 

1293 from astrocyte.pipeline.observation import ( 

1294 compute_observation_trend, 

1295 obs_bank_id, 

1296 ) 

1297 

1298 bid = self._policy.resolve_read_bank_ids(bank_id, None, context)[0] 

1299 self._policy.check_access(bid, "read", context) 

1300 

1301 if self._pipeline is None or not hasattr(self._pipeline, "vector_store"): 

1302 raise RuntimeError("vector_store not configured on the pipeline") 

1303 store = self._pipeline.vector_store # type: ignore[attr-defined] 

1304 if store is None: 

1305 raise RuntimeError("vector_store is not set") 

1306 

1307 obs_bank = obs_bank_id(bid) 

1308 cap = max(1, min(int(limit), 1000)) 

1309 items = await store.list_vectors( 

1310 obs_bank, 

1311 offset=0, 

1312 limit=cap, 

1313 ) 

1314 out: list[dict] = [] 

1315 for item in items: 

1316 meta = item.metadata or {} 

1317 if scope is not None and str(meta.get("_obs_scope", "")) != scope: 

1318 continue 

1319 t = compute_observation_trend(dict(meta)).value 

1320 if trend is not None and t != trend: 

1321 continue 

1322 out.append( 

1323 { 

1324 "id": item.id, 

1325 "text": item.text, 

1326 "trend": t, 

1327 "proof_count": int(meta.get("_obs_proof_count", 1)), 

1328 "source_ids": meta.get("_obs_source_ids", "[]"), 

1329 "confidence": meta.get("_obs_confidence"), 

1330 "updated_at": meta.get("_obs_updated_at"), 

1331 "scope": meta.get("_obs_scope"), 

1332 } 

1333 ) 

1334 return out 

1335 

1336 async def get_observation( 

1337 self, 

1338 observation_id: str, 

1339 bank_id: str | None = None, 

1340 *, 

1341 context: AstrocyteContext | None = None, 

1342 ) -> dict | None: 

1343 """Fetch one observation by id. Returns None if not found.""" 

1344 rows = await self.list_observations( 

1345 bank_id=bank_id, 

1346 limit=1000, 

1347 context=context, 

1348 ) 

1349 for r in rows: 

1350 if r["id"] == observation_id: 

1351 return r 

1352 return None 

1353 

1354 async def create_observation( 

1355 self, 

1356 *, 

1357 text: str, 

1358 source_ids: list[str] | None = None, 

1359 scope: str | None = None, 

1360 confidence: float = 0.9, 

1361 bank_id: str | None = None, 

1362 context: AstrocyteContext | None = None, 

1363 ) -> dict: 

1364 """User-authored observation. 

1365 

1366 Bypasses the autonomous consolidator — for cases where an 

1367 agent or user wants to assert a fact directly without going 

1368 through retain → consolidate. The observation is stored in 

1369 the dedicated ``::obs`` bank with proof_count=1 and the given 

1370 source_ids (may be empty for hand-authored entries). 

1371 """ 

1372 from astrocyte.pipeline.observation import ( 

1373 ObservationConsolidator, 

1374 observation_scope, 

1375 ) 

1376 

1377 bid = self._policy.resolve_read_bank_ids(bank_id, None, context)[0] 

1378 self._policy.check_access(bid, "write", context) 

1379 self._policy.check_rate_and_quota(bid, "retain") 

1380 

1381 if self._pipeline is None or not hasattr(self._pipeline, "vector_store"): 

1382 raise RuntimeError("vector_store not configured on the pipeline") 

1383 store = self._pipeline.vector_store # type: ignore[attr-defined] 

1384 llm = self._pipeline.llm_provider # type: ignore[attr-defined] 

1385 if store is None or llm is None: 

1386 raise RuntimeError("vector_store / llm_provider not set") 

1387 

1388 consolidator = ObservationConsolidator() 

1389 obs_scope_value = scope or observation_scope(bid) 

1390 ok = await consolidator._apply_create( # noqa: SLF001 — single-step CRUD 

1391 {"text": text.strip(), "confidence": confidence}, 

1392 bid, 

1393 store, 

1394 llm, 

1395 source_ids=list(source_ids or []), 

1396 now_iso=datetime.now(timezone.utc).isoformat(), 

1397 scope=obs_scope_value, 

1398 ) 

1399 if not ok: 

1400 raise ValueError("observation create rejected (empty text or confidence below threshold)") 

1401 # The freshly-created row is at the top of the obs bank — list and 

1402 # find it. (For multi-writer setups, callers can use the returned 

1403 # text + scope as the identity check.) 

1404 rows = await self.list_observations(bank_id=bid, scope=obs_scope_value, context=context) 

1405 for r in rows: 

1406 if r["text"] == text.strip(): 

1407 return r 

1408 # Fall back: return a minimal dict so the caller doesn't get None 

1409 return {"text": text.strip(), "scope": obs_scope_value, "trend": "new"} 

1410 

1411 async def delete_observation( 

1412 self, 

1413 observation_id: str, 

1414 bank_id: str | None = None, 

1415 *, 

1416 context: AstrocyteContext | None = None, 

1417 ) -> bool: 

1418 """Delete one observation from the dedicated obs bank.""" 

1419 from astrocyte.pipeline.observation import obs_bank_id 

1420 

1421 bid = self._policy.resolve_read_bank_ids(bank_id, None, context)[0] 

1422 self._policy.check_access(bid, "forget", context) 

1423 

1424 if self._pipeline is None or not hasattr(self._pipeline, "vector_store"): 

1425 raise RuntimeError("vector_store not configured on the pipeline") 

1426 store = self._pipeline.vector_store # type: ignore[attr-defined] 

1427 if store is None: 

1428 return False 

1429 obs_bank = obs_bank_id(bid) 

1430 deleted = await store.delete([observation_id], obs_bank) 

1431 return deleted > 0 

1432 

1433 async def clear_bank( 

1434 self, 

1435 bank_id: str, 

1436 *, 

1437 context: AstrocyteContext | None = None, 

1438 ) -> ForgetResult: 

1439 """Delete all memories in a bank. Requires admin access if ACL enabled.""" 

1440 return await self.forget(bank_id, scope="all", context=context) 

1441 

1442 async def forget( 

1443 self, 

1444 bank_id: str, 

1445 *, 

1446 memory_ids: list[str] | None = None, 

1447 tags: list[str] | None = None, 

1448 scope: str | None = None, 

1449 context: AstrocyteContext | None = None, 

1450 compliance: bool = False, 

1451 reason: str | None = None, 

1452 before_date: datetime | None = None, 

1453 ) -> ForgetResult: 

1454 """Remove memories. 

1455 

1456 Args: 

1457 scope: ``"all"`` to delete everything in a bank (requires admin). 

1458 compliance: Bypass legal holds for right-to-forget (requires context). 

1459 reason: Audit reason when ``compliance=True``. 

1460 before_date: Delete memories created before this date. 

1461 """ 

1462 validate_bank_id(bank_id) 

1463 with span("astrocyte.forget", {"astrocyte.bank_id": bank_id}): 

1464 # scope="all" requires admin permission 

1465 if scope == "all": 

1466 if self._config.access_control.enabled: 

1467 self._policy.check_access(bank_id, "admin", context) 

1468 else: 

1469 self._policy.check_access(bank_id, "forget", context) 

1470 

1471 # Legal hold check — compliance=True bypasses for right-to-forget. 

1472 # Even when access_control is disabled, compliance bypass requires 

1473 # explicit context (caller must identify themselves). 

1474 # MIP forget policy resolution (Phase 4) — runs BEFORE the existing 

1475 # legal-hold check so that a rule with respect_legal_hold=True wins 

1476 # over the caller's compliance bypass, and so audit logs fire even 

1477 # on policy refusal. 

1478 mip_forget = self._mip_router.resolve_forget_for_bank(bank_id) if self._mip_router else None 

1479 if mip_forget is not None: 

1480 # max_per_call: cap blast radius for selective deletes by id 

1481 if ( 

1482 mip_forget.max_per_call is not None 

1483 and memory_ids is not None 

1484 and len(memory_ids) > mip_forget.max_per_call 

1485 ): 

1486 raise MipRoutingError( 

1487 f"forget rejected: {len(memory_ids)} memory_ids exceeds " 

1488 f"forget.max_per_call={mip_forget.max_per_call} for bank {bank_id!r}" 

1489 ) 

1490 

1491 # min_age_days: refuse forget when any targeted record is younger. 

1492 # Best-effort: requires the engine to populate `_created_at` in 

1493 # metadata at retain time. Records lacking this stamp are skipped 

1494 # with a warning (degraded enforcement, not a hard failure). 

1495 if mip_forget.min_age_days is not None and mip_forget.min_age_days > 0 and memory_ids: 

1496 too_young = await self._collect_too_young_ids( 

1497 bank_id, 

1498 memory_ids, 

1499 mip_forget.min_age_days, 

1500 ) 

1501 if too_young: 

1502 raise MipRoutingError( 

1503 f"forget rejected: {len(too_young)} record(s) in {bank_id!r} " 

1504 f"younger than forget.min_age_days={mip_forget.min_age_days} " 

1505 f"({sorted(too_young)[:5]}{'...' if len(too_young) > 5 else ''})" 

1506 ) 

1507 

1508 # audit: emit a structured log line before any deletion occurs 

1509 if mip_forget.audit in ("required", "recommended"): 

1510 self._logger.log( 

1511 "astrocyte.mip.forget.audit", 

1512 bank_id=bank_id, 

1513 data={ 

1514 "mode": str(mip_forget.mode or "soft"), 

1515 "audit": str(mip_forget.audit), 

1516 "cascade": bool(mip_forget.cascade) if mip_forget.cascade is not None else True, 

1517 "memory_ids_count": len(memory_ids) if memory_ids else 0, 

1518 "scope": scope or "selective", 

1519 "actor": context_principal_label(context) if context else "anonymous", 

1520 "reason": str(reason) if reason else None, 

1521 }, 

1522 level=logging.WARNING, 

1523 ) 

1524 

1525 # respect_legal_hold: when True, override the compliance bypass. 

1526 # The MIP rule is the source of truth; compliance flag cannot 

1527 # circumvent a rule that explicitly demands legal hold respect. 

1528 if mip_forget.respect_legal_hold: 

1529 self._lifecycle.check_forget_allowed(bank_id) 

1530 

1531 if not compliance: 

1532 # Skip duplicate check if MIP already enforced legal hold above. 

1533 if mip_forget is None or not mip_forget.respect_legal_hold: 

1534 self._lifecycle.check_forget_allowed(bank_id) 

1535 else: 

1536 if context is None: 

1537 raise AccessDenied("anonymous", bank_id, "compliance_forget") 

1538 # Log compliance forget with actor identity for audit trail 

1539 principal_label = context_principal_label(context) 

1540 audit_reason = reason or "compliance_forget_request" 

1541 self._logger.log( 

1542 "astrocyte.compliance.forget", 

1543 bank_id=bank_id, 

1544 data={ 

1545 "actor": principal_label, 

1546 "reason": str(audit_reason), 

1547 "scope": scope or "selective", 

1548 }, 

1549 level=logging.WARNING, 

1550 ) 

1551 # When access control is enabled, also require admin permission 

1552 if self._config.access_control.enabled: 

1553 self._policy.check_access(bank_id, "admin", context) 

1554 

1555 # Soft-delete path (mip_forget.mode == "soft"): mark records with 

1556 # `_deleted: true` instead of physically removing. Recall is 

1557 # responsible for filtering them out. Falls through to hard delete 

1558 # with a warning if the engine doesn't expose `soft_delete`. 

1559 soft_mode = mip_forget is not None and mip_forget.mode == "soft" and memory_ids is not None 

1560 if soft_mode: 

1561 soft_fn = getattr(self._engine_provider, "soft_delete", None) 

1562 if soft_fn is not None: 

1563 deleted = await soft_fn(bank_id, memory_ids) 

1564 

1565 # Same dedup-cache invalidation as the hard-delete path — 

1566 # a soft-deleted memory is no longer reachable on recall, 

1567 # so its embedding shouldn't keep blocking re-retains. 

1568 if self._pipeline is not None and deleted > 0: 

1569 self._pipeline.invalidate_dedup_cache(bank_id, memory_ids) 

1570 

1571 await self._hook_manager.fire( 

1572 "on_forget", 

1573 bank_id=bank_id, 

1574 data={"deleted_count": deleted, "archived_count": 0, "soft": True}, 

1575 ) 

1576 return ForgetResult(deleted_count=deleted) 

1577 logging.getLogger("astrocyte.mip").warning( 

1578 "forget.mode=soft requested for bank=%s but engine %s does not " 

1579 "implement soft_delete(); falling back to hard delete", 

1580 _safe_log(bank_id), 

1581 type(self._engine_provider).__name__ if self._engine_provider else "pipeline", 

1582 ) 

1583 

1584 request = ForgetRequest( 

1585 bank_id=bank_id, 

1586 memory_ids=memory_ids, 

1587 tags=tags, 

1588 before_date=before_date, 

1589 scope=scope, 

1590 ) 

1591 result = await self._dispatcher.forget(request) 

1592 

1593 # Drop the forgotten memories from the in-memory dedup cache so 

1594 # a re-retain of similar content produces a fresh row instead of 

1595 # silently no-op'ing with "All chunks are near-duplicates". The 

1596 # DedupDetector lives on the pipeline; safe to skip when no 

1597 # pipeline is configured (provider-direct deployments). 

1598 if self._pipeline is not None and result.deleted_count > 0: 

1599 self._pipeline.invalidate_dedup_cache(bank_id, memory_ids) 

1600 

1601 await self._hook_manager.fire( 

1602 "on_forget", 

1603 bank_id=bank_id, 

1604 data={ 

1605 "deleted_count": result.deleted_count, 

1606 "archived_count": result.archived_count, 

1607 }, 

1608 ) 

1609 return result 

1610 

1611 async def _collect_too_young_ids( 

1612 self, 

1613 bank_id: str, 

1614 memory_ids: list[str], 

1615 min_age_days: int, 

1616 ) -> list[str]: 

1617 """Return the subset of ``memory_ids`` younger than ``min_age_days``. 

1618 

1619 Best-effort enforcement of ``forget.min_age_days``: relies on records 

1620 carrying a ``_created_at`` ISO timestamp in metadata (stamped by the 

1621 engine at retain time). Records lacking the stamp are skipped with a 

1622 warning — degraded enforcement, not a hard failure, since older data 

1623 predating the stamp shouldn't permanently block legitimate forgets. 

1624 """ 

1625 from datetime import timedelta 

1626 

1627 wanted = set(memory_ids) 

1628 try: 

1629 result = await self._dispatcher.recall( 

1630 RecallRequest(query="*", bank_id=bank_id, max_results=10000), 

1631 ) 

1632 except Exception as exc: # pragma: no cover — defensive 

1633 logger.warning( 

1634 "min_age_days check skipped: recall failed for bank=%s: %s", 

1635 _safe_log(bank_id), 

1636 _safe_log(exc), 

1637 ) 

1638 return [] 

1639 

1640 now = datetime.now(timezone.utc) 

1641 threshold = timedelta(days=min_age_days) 

1642 too_young: list[str] = [] 

1643 seen: set[str] = set() 

1644 missing_stamp = 0 

1645 

1646 for hit in result.hits: 

1647 if hit.memory_id is None or hit.memory_id not in wanted: 

1648 continue 

1649 seen.add(hit.memory_id) 

1650 stamp = (hit.metadata or {}).get("_created_at") 

1651 if stamp is None: 

1652 missing_stamp += 1 

1653 continue 

1654 if isinstance(stamp, str): 

1655 try: 

1656 created_at = datetime.fromisoformat(stamp) 

1657 except ValueError: 

1658 missing_stamp += 1 

1659 continue 

1660 elif isinstance(stamp, datetime): 

1661 created_at = stamp 

1662 else: 

1663 missing_stamp += 1 

1664 continue 

1665 if created_at.tzinfo is None: 

1666 created_at = created_at.replace(tzinfo=timezone.utc) 

1667 if (now - created_at) < threshold: 

1668 too_young.append(hit.memory_id) 

1669 

1670 if missing_stamp: 

1671 mip_logger = logging.getLogger("astrocyte.mip") 

1672 mip_logger.warning( 

1673 "min_age_days enforcement degraded: %d/%d record(s) in bank=%s " 

1674 "lack `_created_at` metadata and were skipped", 

1675 missing_stamp, 

1676 len(seen), 

1677 _safe_log(bank_id), 

1678 ) 

1679 # One increment per forget call that encountered unstamped records. 

1680 # The warning log above carries the exact count; this counter 

1681 # fires dashboards / alerts when the condition occurs at all. 

1682 self._metrics.inc_counter( 

1683 "astrocyte_forget_unstamped_records_total", 

1684 {"bank_id": bank_id}, 

1685 ) 

1686 return too_young 

1687 

1688 async def compile( 

1689 self, 

1690 bank_id: str, 

1691 scope: str | None = None, 

1692 ) -> CompileResult: 

1693 """Compile raw memories into WikiPages for ``bank_id`` (M8). 

1694 

1695 Synthesises a structured wiki page for each detected topic scope using 

1696 an LLM. Compiled pages are stored in the WikiStore and embedded back 

1697 into the VectorStore (``memory_layer="compiled"``) so the recall pipeline 

1698 can surface them ahead of raw memory fragments. 

1699 

1700 Args: 

1701 bank_id: Bank to compile. 

1702 scope: If provided, compile only memories tagged with this scope string. 

1703 If ``None``, trigger full scope discovery: 

1704 

1705 1. Tagged memories are grouped by tag (each tag = one page). 

1706 2. Untagged memories are clustered by embedding similarity 

1707 (DBSCAN); each cluster is labelled with a lightweight LLM 

1708 call. Noise points are held for the next compile cycle. 

1709 

1710 Returns: 

1711 :class:`~astrocyte.types.CompileResult` with pages created/updated, 

1712 noise count, and token usage. 

1713 

1714 Raises: 

1715 :class:`~astrocyte.errors.ConfigError`: If no WikiStore has been 

1716 configured (call ``set_wiki_store()`` before ``compile()``). 

1717 :class:`~astrocyte.errors.ProviderUnavailable`: If the Tier 1 pipeline 

1718 has not been configured. 

1719 

1720 Example:: 

1721 

1722 # Explicit — compile memories tagged "incident-response" 

1723 result = await brain.compile("eng-team", scope="incident-response") 

1724 

1725 # Automatic — discover scopes from tags and embedding clusters 

1726 result = await brain.compile("eng-team") 

1727 

1728 print(result.pages_created, result.pages_updated, result.tokens_used) 

1729 """ 

1730 validate_bank_id(bank_id) 

1731 

1732 if self._wiki_store is None: 

1733 raise ConfigError( 

1734 "brain.compile() requires a WikiStore. Call brain.set_wiki_store(store) before compiling." 

1735 ) 

1736 

1737 if self._pipeline is None: 

1738 raise ProviderUnavailable( 

1739 "brain.compile() requires a Tier 1 pipeline. Call brain.set_pipeline(pipeline) before compiling." 

1740 ) 

1741 

1742 from astrocyte.pipeline.compile import CompileEngine 

1743 

1744 engine = CompileEngine( 

1745 vector_store=self._pipeline.vector_store, 

1746 llm_provider=self._pipeline.llm_provider, 

1747 wiki_store=self._wiki_store, # type: ignore[arg-type] 

1748 ) 

1749 

1750 with span("compile", {"bank_id": bank_id, "scope": scope or "auto"}): 

1751 result = await engine.run(bank_id, scope=scope) 

1752 

1753 if result.error: 

1754 logger.warning( 

1755 "compile failed for bank %s scope %s: %s", 

1756 _safe_log(bank_id), 

1757 _safe_log(scope or "auto"), 

1758 _safe_log(result.error), 

1759 ) 

1760 else: 

1761 logger.info( 

1762 "compile complete bank=%s scope=%s pages_created=%d pages_updated=%d noise=%d tokens=%d elapsed_ms=%d", 

1763 _safe_log(bank_id), 

1764 _safe_log(scope or "auto"), 

1765 result.pages_created, 

1766 result.pages_updated, 

1767 result.noise_memories, 

1768 result.tokens_used, 

1769 result.elapsed_ms, 

1770 ) 

1771 

1772 return result 

1773 

1774 # --------------------------------------------------------------------------- 

1775 # Graph traversal (public API) 

1776 # --------------------------------------------------------------------------- 

1777 

1778 async def graph_search( 

1779 self, 

1780 query: str, 

1781 bank_id: str, 

1782 limit: int = 10, 

1783 ) -> list[Entity]: 

1784 """Search the knowledge graph for entities matching *query*. 

1785 

1786 Performs a name/alias search against the graph store and returns 

1787 matching :class:`~astrocyte.types.Entity` objects. Useful for 

1788 resolving entity references before calling :meth:`graph_neighbors`. 

1789 

1790 Args: 

1791 query: Name or partial name to search for. 

1792 bank_id: Memory bank whose graph to search. 

1793 limit: Maximum number of entities to return. 

1794 

1795 Returns: 

1796 Matching entities ordered by relevance. 

1797 

1798 Raises: 

1799 :class:`~astrocyte.errors.ConfigError`: If no graph store has been 

1800 configured (set ``graph_store`` on the pipeline). 

1801 """ 

1802 from astrocyte.errors import ConfigError 

1803 

1804 validate_bank_id(bank_id) 

1805 graph_store = getattr(self._pipeline, "graph_store", None) if self._pipeline else None 

1806 if graph_store is None: 

1807 raise ConfigError( 

1808 "graph_search() requires a GraphStore. Configure a graph_store provider in astrocyte.yaml." 

1809 ) 

1810 return await graph_store.query_entities(query, bank_id, limit=limit) 

1811 

1812 async def graph_neighbors( 

1813 self, 

1814 entity_ids: list[str], 

1815 bank_id: str, 

1816 max_depth: int = 2, 

1817 limit: int = 20, 

1818 ) -> list[GraphHit]: 

1819 """Traverse the knowledge graph from *entity_ids* and return connected memories. 

1820 

1821 Walks the entity graph up to *max_depth* hops from each seed entity 

1822 and returns the memories attached to discovered entities, scored by 

1823 proximity. 

1824 

1825 Args: 

1826 entity_ids: Seed entity IDs to start traversal from. 

1827 bank_id: Memory bank whose graph to traverse. 

1828 max_depth: Maximum traversal depth (default 2). 

1829 limit: Maximum number of memory hits to return. 

1830 

1831 Returns: 

1832 :class:`~astrocyte.types.GraphHit` objects sorted by relevance. 

1833 

1834 Raises: 

1835 :class:`~astrocyte.errors.ConfigError`: If no graph store has been 

1836 configured. 

1837 """ 

1838 from astrocyte.errors import ConfigError 

1839 

1840 validate_bank_id(bank_id) 

1841 if not entity_ids: 

1842 return [] 

1843 graph_store = getattr(self._pipeline, "graph_store", None) if self._pipeline else None 

1844 if graph_store is None: 

1845 raise ConfigError( 

1846 "graph_neighbors() requires a GraphStore. Configure a graph_store provider in astrocyte.yaml." 

1847 ) 

1848 return await graph_store.query_neighbors(entity_ids, bank_id, max_depth=max_depth, limit=limit) 

1849 

1850 async def history( 

1851 self, 

1852 query: str, 

1853 bank_id: str, 

1854 as_of: datetime, 

1855 *, 

1856 max_results: int = 10, 

1857 max_tokens: int | None = None, 

1858 tags: list[str] | None = None, 

1859 ) -> HistoryResult: 

1860 """Reconstruct what the agent knew at a past point in time (M9 time travel). 

1861 

1862 Returns memories that existed in *bank_id* at the moment *as_of* — i.e. 

1863 only memories whose ``retained_at`` timestamp is on or before *as_of*. 

1864 Memories retained after *as_of* are excluded, giving a faithful snapshot 

1865 of the agent's knowledge at that instant. 

1866 

1867 Args: 

1868 query: The recall query to run against the historical snapshot. 

1869 bank_id: Bank to query. 

1870 as_of: UTC datetime. Memories retained after this moment are hidden. 

1871 max_results: Maximum number of hits to return. 

1872 max_tokens: Optional token budget for the result set. 

1873 tags: Optional tag filter (applied on top of the time filter). 

1874 

1875 Returns: 

1876 :class:`~astrocyte.types.HistoryResult` with hits and the ``as_of`` 

1877 timestamp embedded for traceability. 

1878 

1879 Raises: 

1880 ConfigError: If no pipeline is configured (no vector store to query). 

1881 

1882 Example:: 

1883 

1884 from datetime import datetime, UTC 

1885 snapshot = await brain.history( 

1886 "What did we know about Alice?", 

1887 bank_id="user-alice", 

1888 as_of=datetime(2025, 1, 1, tzinfo=UTC), 

1889 ) 

1890 for hit in snapshot.hits: 

1891 print(hit.retained_at, hit.text) 

1892 """ 

1893 recall_result = await self.recall( 

1894 query, 

1895 bank_id=bank_id, 

1896 max_results=max_results, 

1897 max_tokens=max_tokens, 

1898 tags=tags, 

1899 as_of=as_of, 

1900 ) 

1901 return HistoryResult( 

1902 hits=recall_result.hits, 

1903 total_available=recall_result.total_available, 

1904 truncated=recall_result.truncated, 

1905 as_of=as_of, 

1906 bank_id=bank_id, 

1907 trace=recall_result.trace, 

1908 ) 

1909 

1910 async def audit( 

1911 self, 

1912 scope: str, 

1913 bank_id: str, 

1914 *, 

1915 max_memories: int = 50, 

1916 max_tokens: int | None = None, 

1917 tags: list[str] | None = None, 

1918 ) -> AuditResult: 

1919 """Identify knowledge gaps for a topic in a memory bank (M10 gap analysis). 

1920 

1921 Recalls up to *max_memories* relevant memories for *scope*, then calls 

1922 an LLM audit judge to assess what is missing or under-covered. The 

1923 result includes a list of :class:`~astrocyte.types.GapItem` objects and 

1924 a ``coverage_score`` between 0 (empty bank) and 1 (comprehensive). 

1925 

1926 This is a diagnostic operation — it does not modify any stored memory. 

1927 It does consume LLM tokens proportional to the number of memories scanned. 

1928 

1929 Args: 

1930 scope: Natural-language description of the topic to audit 

1931 (e.g. ``"Alice's employment history"``). 

1932 bank_id: Bank to audit. 

1933 max_memories: Maximum number of memories to retrieve and pass to 

1934 the audit judge. Defaults to ``50``. 

1935 max_tokens: Optional token budget applied to retrieved memories 

1936 before the judge call. 

1937 tags: Optional tag filter to narrow which memories are retrieved. 

1938 

1939 Returns: 

1940 :class:`~astrocyte.types.AuditResult` with gaps and coverage score. 

1941 

1942 Raises: 

1943 ConfigError: If no pipeline is configured. 

1944 

1945 Example:: 

1946 

1947 result = await brain.audit( 

1948 "Alice's employment history", 

1949 bank_id="user-alice", 

1950 ) 

1951 print(f"Coverage: {result.coverage_score:.0%}") 

1952 for gap in result.gaps: 

1953 print(f"[{gap.severity}] {gap.topic}: {gap.reason}") 

1954 """ 

1955 from astrocyte.pipeline.audit import run_audit 

1956 

1957 recall_result = await self.recall( 

1958 scope, 

1959 bank_id=bank_id, 

1960 max_results=max_memories, 

1961 max_tokens=max_tokens, 

1962 tags=tags, 

1963 ) 

1964 

1965 pipeline = self._pipeline 

1966 if pipeline is None: 

1967 from astrocyte.exceptions import ConfigError 

1968 

1969 raise ConfigError("No pipeline configured — call set_pipeline() first.") 

1970 

1971 return await run_audit( 

1972 scope=scope, 

1973 bank_id=bank_id, 

1974 memories=recall_result.hits, 

1975 llm_provider=pipeline.llm_provider, 

1976 trace=recall_result.trace, 

1977 ) 

1978 

1979 async def health(self) -> HealthStatus: 

1980 """Check system health.""" 

1981 with timed() as t: 

1982 if self._engine_provider: 

1983 status = await self._engine_provider.health() 

1984 elif self._pipeline: 

1985 status = await self._pipeline.vector_store.health() 

1986 else: 

1987 status = HealthStatus(healthy=True, message="No provider configured") 

1988 status.latency_ms = t["elapsed_ms"] 

1989 return status 

1990 

1991 # --------------------------------------------------------------------------- 

1992 # Lifecycle — legal hold + TTL 

1993 # --------------------------------------------------------------------------- 

1994 

1995 def set_legal_hold(self, bank_id: str, hold_id: str, reason: str, *, set_by: str = "user:api") -> LegalHold: 

1996 """Place a bank under legal hold. Blocks forget() until released.""" 

1997 return self._lifecycle.set_legal_hold(bank_id, hold_id, reason, set_by=set_by) 

1998 

1999 def release_legal_hold(self, bank_id: str, hold_id: str) -> bool: 

2000 """Release a legal hold from a bank. Returns True if hold existed.""" 

2001 return self._lifecycle.release_legal_hold(bank_id, hold_id) 

2002 

2003 def is_under_hold(self, bank_id: str) -> bool: 

2004 """Check if bank is under legal hold.""" 

2005 return self._lifecycle.is_under_hold(bank_id) 

2006 

2007 async def run_lifecycle(self, bank_id: str) -> LifecycleRunResult: 

2008 """Run TTL lifecycle check on a bank. Scan memories, archive/delete as needed. 

2009 

2010 Note: v1 treats "archive" as delete — no separate archive storage yet. 

2011 The LifecycleAction.action distinguishes the reason (ttl_unretrieved vs ttl_expired) 

2012 so callers can differentiate, but both result in deletion from the provider. 

2013 """ 

2014 from astrocyte.types import LifecycleAction 

2015 

2016 if not self._config.lifecycle.enabled: 

2017 return LifecycleRunResult(archived_count=0, deleted_count=0, skipped_count=0, actions=[]) 

2018 

2019 now = datetime.now(timezone.utc) 

2020 actions: list[LifecycleAction] = [] 

2021 to_delete: list[str] = [] 

2022 

2023 # Scan memories via paginated list_vectors (avoids query="*" + 10K limit) 

2024 vector_store = self._pipeline.vector_store if self._pipeline else None 

2025 if vector_store and hasattr(vector_store, "list_vectors"): 

2026 scan_offset = 0 

2027 scan_batch = 200 

2028 while True: 

2029 items = await vector_store.list_vectors(bank_id, offset=scan_offset, limit=scan_batch) 

2030 if not items: 

2031 break 

2032 for item in items: 

2033 created_at = item.metadata.get("_created_at") if item.metadata else None 

2034 last_recalled = item.metadata.get("_last_recalled_at") if item.metadata else None 

2035 if isinstance(created_at, str): 

2036 created_at = datetime.fromisoformat(created_at) 

2037 if isinstance(last_recalled, str): 

2038 last_recalled = datetime.fromisoformat(last_recalled) 

2039 action = self._lifecycle.evaluate_memory_ttl( 

2040 memory_id=item.id, 

2041 bank_id=bank_id, 

2042 created_at=created_at, 

2043 last_recalled_at=last_recalled, 

2044 tags=item.metadata.get("_tags", "").split(",") 

2045 if item.metadata and item.metadata.get("_tags") 

2046 else None, 

2047 fact_type=item.metadata.get("_fact_type") if item.metadata else None, 

2048 now=now, 

2049 ) 

2050 actions.append(action) 

2051 if action.action in ("delete", "archive"): 

2052 to_delete.append(item.id) 

2053 scan_offset += len(items) 

2054 if len(items) < scan_batch: 

2055 break 

2056 if scan_offset > 100_000: 

2057 logger.warning("Lifecycle scan capped at 100k vectors for bank %s", _safe_log(bank_id)) 

2058 break 

2059 else: 

2060 # Fallback for engine-only or stores without list_vectors 

2061 logger.warning( 

2062 "Lifecycle scan using query='*' fallback for bank %s — " 

2063 "results may be incomplete. Use a pipeline with list_vectors support for full coverage.", 

2064 _safe_log(bank_id), 

2065 ) 

2066 result = await self._dispatcher.recall(RecallRequest(query="*", bank_id=bank_id, max_results=10000)) 

2067 for hit in result.hits: 

2068 created_at = hit.metadata.get("_created_at") if hit.metadata else None 

2069 last_recalled = hit.metadata.get("_last_recalled_at") if hit.metadata else None 

2070 if isinstance(created_at, str): 

2071 created_at = datetime.fromisoformat(created_at) 

2072 if isinstance(last_recalled, str): 

2073 last_recalled = datetime.fromisoformat(last_recalled) 

2074 action = self._lifecycle.evaluate_memory_ttl( 

2075 memory_id=hit.memory_id or "", 

2076 bank_id=bank_id, 

2077 created_at=created_at, 

2078 last_recalled_at=last_recalled, 

2079 tags=hit.tags, 

2080 fact_type=hit.fact_type, 

2081 now=now, 

2082 ) 

2083 actions.append(action) 

2084 if action.action in ("delete", "archive"): 

2085 to_delete.append(hit.memory_id or "") 

2086 

2087 # Batch delete/archive 

2088 deleted = 0 

2089 if to_delete: 

2090 forget_result = await self._dispatcher.forget(ForgetRequest(bank_id=bank_id, memory_ids=to_delete)) 

2091 deleted = forget_result.deleted_count 

2092 

2093 archived = sum(1 for a in actions if a.action == "archive") 

2094 skipped = sum(1 for a in actions if a.action == "keep") 

2095 

2096 # Run dedup consolidation if pipeline has a vector store 

2097 consolidation_removed = 0 

2098 if self._pipeline and self._pipeline.vector_store: 

2099 from astrocyte.pipeline.consolidation import run_consolidation 

2100 

2101 cons_result = await run_consolidation( 

2102 self._pipeline.vector_store, 

2103 bank_id, 

2104 graph_store=getattr(self._pipeline, "graph_store", None), 

2105 ) 

2106 consolidation_removed = cons_result.duplicates_removed 

2107 

2108 return LifecycleRunResult( 

2109 archived_count=archived, 

2110 deleted_count=deleted + consolidation_removed, 

2111 skipped_count=skipped, 

2112 actions=actions, 

2113 ) 

2114 

2115 # --------------------------------------------------------------------------- 

2116 # Bank health & analytics 

2117 # --------------------------------------------------------------------------- 

2118 

2119 async def bank_health(self, bank_id: str) -> "BankHealth": 

2120 """Compute health score and issues for a bank. 

2121 

2122 Uses in-memory operation counters collected since process start. 

2123 Optionally enriches with memory count from the vector store. 

2124 """ 

2125 counters = self._analytics.get_counters(bank_id) 

2126 memory_count = 0 

2127 if self._pipeline and self._pipeline.vector_store: 

2128 try: 

2129 items = await self._pipeline.vector_store.list_vectors(bank_id, limit=0) 

2130 memory_count = len(items) 

2131 except Exception: 

2132 logger.debug( 

2133 "list_vectors(limit=0) failed or unsupported; bank_health memory_count=0", 

2134 exc_info=True, 

2135 ) 

2136 return compute_bank_health(bank_id, counters, memory_count) 

2137 

2138 async def all_bank_health(self) -> list["BankHealth"]: 

2139 """Compute health for all banks that have recorded operations.""" 

2140 results = [] 

2141 for bid in self._analytics.bank_ids(): 

2142 results.append(await self.bank_health(bid)) 

2143 return results 

2144 

2145 def bank_quality_snapshot(self, bank_id: str) -> "QualityDataPoint": 

2146 """Return a QualityDataPoint snapshot of current counters for trend tracking.""" 

2147 counters = self._analytics.get_counters(bank_id) 

2148 return counters_to_quality_point(counters) 

2149 

2150 # --------------------------------------------------------------------------- 

2151 # Memory portability 

2152 # --------------------------------------------------------------------------- 

2153 

2154 async def export_bank( 

2155 self, 

2156 bank_id: str, 

2157 path: str, 

2158 *, 

2159 include_embeddings: bool = False, 

2160 include_entities: bool = True, 

2161 allowed_roots: list[str] | None = None, 

2162 allow_uncontained: bool = False, 

2163 context: AstrocyteContext | None = None, 

2164 ) -> int: 

2165 """Export a memory bank to AMA (Astrocyte Memory Archive) JSONL format. 

2166 

2167 ``allowed_roots`` / ``allow_uncontained`` propagate to the path 

2168 containment check in ``astrocyte.portability``. By default 

2169 (no roots configured and ``allow_uncontained=False``), the 

2170 export refuses to run — set ``ASTROCYTE_PORTABILITY_ROOTS``, 

2171 pass ``allowed_roots=[<dir>]``, or set ``allow_uncontained=True``. 

2172 

2173 Returns the number of memories exported. 

2174 """ 

2175 from astrocyte.portability import export_bank as _export 

2176 

2177 self._policy.check_access(bank_id, "admin", context) 

2178 

2179 count = await _export( 

2180 recall_fn=self._dispatcher.recall, 

2181 bank_id=bank_id, 

2182 path=path, 

2183 provider_name=self._provider_name, 

2184 include_embeddings=include_embeddings, 

2185 include_entities=include_entities, 

2186 allowed_roots=allowed_roots, 

2187 allow_uncontained=allow_uncontained, 

2188 ) 

2189 await self._hook_manager.fire("on_export", bank_id=bank_id, data={"memory_count": count, "path": path}) 

2190 return count 

2191 

2192 async def import_bank( 

2193 self, 

2194 bank_id: str, 

2195 path: str, 

2196 *, 

2197 on_conflict: str = "skip", 

2198 allowed_roots: list[str] | None = None, 

2199 allow_uncontained: bool = False, 

2200 context: AstrocyteContext | None = None, 

2201 progress_fn: Any = None, 

2202 ) -> Any: 

2203 """Import memories from an AMA JSONL file into a bank. 

2204 

2205 See ``export_bank`` for ``allowed_roots`` / ``allow_uncontained`` semantics. 

2206 

2207 Returns an ImportResult with imported/skipped/errors counts. 

2208 """ 

2209 from astrocyte.portability import ImportResult 

2210 from astrocyte.portability import import_bank as _import 

2211 

2212 self._policy.check_access(bank_id, "admin", context) 

2213 

2214 result: ImportResult = await _import( 

2215 retain_fn=self._dispatcher.retain, 

2216 bank_id=bank_id, 

2217 path=path, 

2218 on_conflict=on_conflict, 

2219 progress_fn=progress_fn, 

2220 allowed_roots=allowed_roots, 

2221 allow_uncontained=allow_uncontained, 

2222 ) 

2223 await self._hook_manager.fire( 

2224 "on_import", 

2225 bank_id=bank_id, 

2226 data={ 

2227 "imported": result.imported, 

2228 "skipped": result.skipped, 

2229 "errors": result.errors, 

2230 }, 

2231 ) 

2232 return result 

2233 

2234 # --------------------------------------------------------------------------- 

2235 # Internal routing 

2236 # --------------------------------------------------------------------------- 

2237 

2238 @property 

2239 def _provider_name(self) -> str: 

2240 return self._config.provider or "pipeline" 

2241 

2242 # --------------------------------------------------------------------------- 

2243 # Backwards-compat thin wrappers (tests access these directly) 

2244 # --------------------------------------------------------------------------- 

2245 

2246 @property 

2247 def _tiered_retriever(self): 

2248 """Expose tiered retriever for testing/introspection.""" 

2249 return self._dispatcher.tiered_retriever 

2250 

2251 async def _do_retain(self, request: RetainRequest) -> RetainResult: 

2252 return await self._dispatcher.retain(request) 

2253 

2254 async def _do_recall(self, request: RecallRequest) -> RecallResult: 

2255 return await self._dispatcher.recall(request) 

2256 

2257 async def _do_forget(self, request: ForgetRequest) -> ForgetResult: 

2258 return await self._dispatcher.forget(request)