Coverage for astrocyte/_astrocyte.py: 79%
711 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""Core Astrocyte class — the main entry point for the framework.
3Handles tier routing, policy enforcement, capability negotiation,
4multi-bank orchestration, and hook dispatch.
5"""
7from __future__ import annotations
9import inspect
10import logging
11from datetime import datetime, timezone
12from pathlib import Path
13from typing import TYPE_CHECKING, Any, Literal
15from astrocyte._hooks import HookHandler, HookManager
16from astrocyte._log_safety import safe as _safe_log
17from astrocyte._multi_bank import MultiBankOrchestrator
18from astrocyte._output_scanner import OutputScanner
19from astrocyte._policy import PolicyEnforcer
20from astrocyte._provider_dispatch import ProviderDispatcher
21from astrocyte._recall_params import RecallParams
22from astrocyte._validation import validate_bank_id
23from astrocyte.analytics import BankMetricsCollector, compute_bank_health, counters_to_quality_point
24from astrocyte.config import AstrocyteConfig, load_config
25from astrocyte.errors import (
26 AccessDenied,
27 ConfigError,
28 MipRoutingError,
29 ProviderUnavailable,
30)
31from astrocyte.identity import context_principal_label
32from astrocyte.lifecycle import LifecycleManager
33from astrocyte.mip.router import MipRouter
34from astrocyte.policy.observability import MetricsCollector, StructuredLogger, span, timed
35from astrocyte.recall.authority import apply_recall_authority
36from astrocyte.types import (
37 AccessGrant,
38 AstrocyteContext,
39 AuditResult,
40 BankHealth,
41 CompileResult,
42 Entity,
43 ForgetRequest,
44 ForgetResult,
45 GraphHit,
46 HealthStatus,
47 HistoryResult,
48 LegalHold,
49 LifecycleRunResult,
50 MemoryHit,
51 MentalModel,
52 MultiBankStrategy,
53 QualityDataPoint,
54 RecallRequest,
55 RecallResult,
56 ReflectRequest,
57 ReflectResult,
58 RetainRequest,
59 RetainResult,
60)
62if TYPE_CHECKING:
63 from astrocyte.pipeline.orchestrator import PipelineOrchestrator
64 from astrocyte.provider import EngineProvider
66logger = logging.getLogger("astrocyte")
69def _normalize_multi_bank_strategy(
70 strategy: Literal["cascade", "parallel", "first_match"] | MultiBankStrategy | None,
71) -> MultiBankStrategy:
72 if strategy is None:
73 return MultiBankStrategy(mode="parallel")
74 if isinstance(strategy, MultiBankStrategy):
75 return strategy
76 if strategy in ("cascade", "parallel", "first_match"):
77 return MultiBankStrategy(mode=strategy)
78 raise ConfigError(f"Unknown multi-bank strategy: {strategy!r}")
81class Astrocyte:
82 """The Astrocyte memory framework — unified API for AI agent memory.
84 Usage:
85 brain = Astrocyte.from_config("astrocyte.yaml")
86 await brain.retain("Calvin prefers dark mode", bank_id="user-123")
87 hits = await brain.recall("What are Calvin's preferences?", bank_id="user-123")
88 """
90 def __init__(self, config: AstrocyteConfig) -> None:
91 self._config = config
92 self._logger = StructuredLogger(level=config.observability.log_level)
94 # Extracted subsystems
95 self._policy = PolicyEnforcer(config)
96 self._hook_manager = HookManager(self._logger)
97 self._output_scanner = OutputScanner(config, self._logger)
98 self._dispatcher = ProviderDispatcher(config)
100 # Metrics
101 self._metrics = MetricsCollector(enabled=config.observability.prometheus_enabled)
103 # Lifecycle
104 self._lifecycle = LifecycleManager(config.lifecycle)
106 # MIP router (loaded from mip.yaml if configured)
107 self._mip_router: MipRouter | None = None
108 if config.mip_config_path:
109 from astrocyte.mip.loader import load_mip_config
111 mip_config = load_mip_config(config.mip_config_path)
112 self._mip_router = MipRouter(mip_config)
114 # Analytics
115 self._analytics = BankMetricsCollector()
117 # Provider state (managed by dispatcher, exposed for wiring)
118 self._engine_provider: EngineProvider | None = None
119 self._pipeline: PipelineOrchestrator | None = None
121 # Multi-bank orchestrator (wired lazily after provider is set)
122 self._multi_bank: MultiBankOrchestrator | None = None
124 # M8: wiki store (optional; enables brain.compile())
125 self._wiki_store: object | None = None
126 # M8 W4: async compile queue (optional; enables automatic threshold triggering)
127 self._compile_queue: object | None = None
128 # M9: mental-model store (optional; enables /v1/mental-models endpoints).
129 # First-class replacement for the prior wiki-piggyback (kind="concept" +
130 # metadata["_mental_model"] = True) — see provider.MentalModelStore docs.
131 self._mental_model_store: object | None = None
132 # M10: source-document / chunk store (optional; enables source-aware
133 # retain to preserve provenance back to the originating document and
134 # chunk). Deployments without a SourceStore work exactly as before —
135 # vectors stay anonymous flat rows.
136 self._source_store: object | None = None
138 @property
139 def config(self) -> AstrocyteConfig:
140 """Loaded :class:`~astrocyte.config.AstrocyteConfig` (read-only for callers)."""
141 return self._config
143 async def __aenter__(self) -> "Astrocyte":
144 await self.start_background_tasks()
145 return self
147 async def __aexit__(self, *exc: object) -> None:
148 await self.stop_background_tasks()
150 async def start_background_tasks(self) -> None:
151 """Start optional background services configured on this instance.
153 Today this covers the M8 compile queue. Keeping the lifecycle on
154 ``Astrocyte`` lets library users and the REST gateway start the same
155 services without reaching into private attributes.
156 """
157 queue = self._compile_queue
158 if queue is not None and hasattr(queue, "start"):
159 await queue.start() # type: ignore[union-attr]
161 async def stop_background_tasks(self) -> None:
162 """Stop optional background services configured on this instance."""
163 queue = self._compile_queue
164 if queue is not None and hasattr(queue, "stop"):
165 await queue.stop() # type: ignore[union-attr]
166 shutdown = getattr(self._pipeline, "shutdown", None)
167 if shutdown is not None:
168 result = shutdown()
169 if inspect.isawaitable(result):
170 _ = await result
172 @classmethod
173 def from_config(cls, path: str | Path) -> "Astrocyte":
174 """Create an Astrocyte instance from a YAML config file."""
175 config = load_config(path)
176 return cls(config)
178 @classmethod
179 def from_config_dict(cls, data: dict[str, str | int | float | bool | None | dict | list]) -> "Astrocyte":
180 """Create an Astrocyte instance from a config dictionary (for testing)."""
181 from astrocyte.config import _dict_to_config, validate_astrocyte_config
183 config = _dict_to_config(data)
184 validate_astrocyte_config(config)
185 return cls(config)
187 def set_engine_provider(self, provider: EngineProvider) -> None:
188 """Set the engine provider (for programmatic setup)."""
189 from astrocyte.provider import check_spi_version
191 check_spi_version(provider, "EngineProvider")
192 self._engine_provider = provider
193 self._dispatcher.engine_provider = provider
194 if hasattr(provider, "capabilities"):
195 self._dispatcher.capabilities = provider.capabilities()
196 self._rebuild_tiered_retrieval()
197 self._rebuild_multi_bank()
199 def set_wiki_store(self, wiki_store: object) -> None:
200 """Set the WikiStore provider (M8 wiki compile). Optional.
202 When a WikiStore is configured, ``brain.compile()`` becomes available.
203 Compile is disabled by default — banks that don't opt in keep today's
204 retain/recall behaviour exactly.
206 Args:
207 wiki_store: Any object satisfying the
208 :class:`~astrocyte.provider.WikiStore` protocol.
209 """
210 from astrocyte.provider import check_spi_version
212 check_spi_version(wiki_store, "WikiStore")
213 self._wiki_store = wiki_store
215 def set_mental_model_store(self, store: object) -> None:
216 """Set the :class:`~astrocyte.provider.MentalModelStore` provider. Optional.
218 When configured, the gateway's ``/v1/mental-models`` endpoints
219 (and any in-process consumer) route reads/writes through this
220 store instead of erroring with HTTP 501. Cuts the prior
221 wiki-piggyback path (``kind="concept"`` + metadata discriminator)
222 — mental models now live in their own
223 ``astrocyte_mental_models`` table with proper revision history.
225 Also wires the agentic reflect loop so ``search_mental_models``
226 becomes available as a tool — the highest-quality tier in the
227 hierarchical retrieval order. Idempotent: if a pipeline is
228 already attached, the service is pushed onto it; otherwise
229 ``set_pipeline`` will pick up the store and build the service
230 when it runs.
231 """
232 from astrocyte.provider import check_spi_version
234 check_spi_version(store, "MentalModelStore")
235 self._mental_model_store = store
236 if self._pipeline is not None:
237 from astrocyte.pipeline.mental_model import MentalModelService
239 self._pipeline.mental_model_service = MentalModelService(store)
241 def set_source_store(self, store: object) -> None:
242 """Set the :class:`~astrocyte.provider.SourceStore` provider. Optional.
244 When configured, ingestion paths can persist
245 :class:`~astrocyte.types.SourceDocument` and
246 :class:`~astrocyte.types.SourceChunk` rows alongside vectors so memory
247 retains a backreference to its originating document. Deployments
248 without a SourceStore continue to work — vectors remain anonymous
249 flat rows with ``chunk_id = NULL``.
251 Args:
252 store: Any object satisfying the
253 :class:`~astrocyte.provider.SourceStore` protocol.
254 """
255 from astrocyte.provider import check_spi_version
257 check_spi_version(store, "SourceStore")
258 self._source_store = store
260 def set_compile_queue(self, queue: object) -> None:
261 """Set the async compile queue (M8 W4 threshold trigger). Optional.
263 When a :class:`~astrocyte.pipeline.compile_trigger.CompileQueue` is
264 configured, each successful ``brain.retain()`` call notifies the queue.
265 The queue fires a background compile job whenever the bank crosses the
266 configured size or staleness threshold.
268 The queue must be started (``await queue.start()``) before the first
269 retain call, and stopped (``await queue.stop()``) on shutdown.
271 Args:
272 queue: A :class:`~astrocyte.pipeline.compile_trigger.CompileQueue`
273 instance (or any object with a compatible ``notify_retain``
274 method).
275 """
276 self._compile_queue = queue
278 def use_pageindex_pipeline(
279 self,
280 store: Any,
281 embedding_provider: Any | None = None,
282 *,
283 document_resolver: Any | None = None,
284 ) -> None:
285 """M32 — install the PageIndex recall pipeline as the active
286 retrieval stack.
288 After this call, ``self.recall(query, ...)`` routes through
289 :class:`~astrocyte.pipeline.pageindex_pipeline.PageIndexPipeline`
290 (the same stack the bench harness validates) instead of the
291 legacy ``orchestrator.recall()`` / vector_store path.
293 This closes the bench-vs-API parity gap surfaced in the v0.15.0
294 ship audit (docs/_design/m32-stack-unification.md). All
295 M14-M31 cycle work (RRF fusion, fact↔chunk pairing,
296 per-Q-type prompts, M27 fields, M28-M29 coreference, M30
297 parallelization, M31 session_filter + event_date) now flows
298 through the public API.
300 Args:
301 store: A configured :class:`PageIndexStore` (Postgres or
302 in-memory).
303 embedding_provider: An :class:`LLMProvider` to embed query
304 strings. Defaults to the Astrocyte instance's configured
305 provider when ``None``.
306 document_resolver: Optional ``(bank_id) -> document_id``
307 callable for single-doc scoping. See
308 ``PageIndexPipeline.__init__`` for the contract.
309 """
310 from astrocyte.pipeline.pageindex_pipeline import ( # noqa: PLC0415
311 PageIndexPipeline,
312 )
314 if embedding_provider is None:
315 embedding_provider = getattr(self, "_provider", None)
316 if embedding_provider is None:
317 raise ValueError(
318 "use_pageindex_pipeline: no embedding_provider supplied and "
319 "no provider configured on Astrocyte instance",
320 )
321 self._dispatcher.pageindex_pipeline = PageIndexPipeline(
322 store=store,
323 embedding_provider=embedding_provider,
324 config=self._config,
325 document_resolver=document_resolver,
326 )
328 def set_pipeline(self, pipeline: PipelineOrchestrator) -> None:
329 """Set the Tier 1 pipeline orchestrator (for programmatic setup)."""
330 self._pipeline = pipeline
331 self._dispatcher.pipeline = pipeline
332 from astrocyte.pipeline.extraction import merged_extraction_profiles
334 pipeline.extraction_profiles = merged_extraction_profiles(self._config)
335 pipeline.recall_authority = self._config.recall_authority
337 # If a mental-model store was set BEFORE the pipeline was
338 # attached, push the service through now so ``search_mental_models``
339 # is wired into the agentic reflect loop. The reverse direction
340 # (store set after pipeline) is handled in ``set_mental_model_store``.
341 if self._mental_model_store is not None:
342 from astrocyte.pipeline.mental_model import MentalModelService
344 pipeline.mental_model_service = MentalModelService(
345 self._mental_model_store, # type: ignore[arg-type]
346 )
348 # Cross-encoder reranker (Hindsight parity) — only loaded when the
349 # operator opts in via config. Lazy load: deferring the model load
350 # to first-use keeps process startup cheap and keeps the
351 # ``sentence-transformers`` dep optional. ``None`` here means
352 # ``_rank_reflect_context`` falls through to the heuristic.
353 cer_cfg = self._config.cross_encoder_rerank
354 if cer_cfg.enabled:
355 from astrocyte.pipeline.cross_encoder_rerank import (
356 get_default_cross_encoder,
357 )
359 pipeline.cross_encoder = get_default_cross_encoder(
360 cer_cfg.model_name,
361 force_cpu=cer_cfg.force_cpu,
362 )
363 pipeline.cross_encoder_top_k = cer_cfg.top_k
364 else:
365 pipeline.cross_encoder = None
367 # Spreading activation through entity links (Hindsight parity).
368 # Enabled at recall time when ``spreading_activation.enabled``
369 # AND a graph store is configured AND the adapter implements
370 # ``expand_entities_via_links``. Falls back to seed-only when
371 # the adapter doesn't support multi-hop traversal.
372 # Causal-link extraction at retain time (Hindsight parity).
373 cl_cfg = self._config.causal_links
374 pipeline.causal_links_enabled = cl_cfg.enabled
375 pipeline.causal_max_pairs_per_memory = cl_cfg.max_pairs_per_memory
376 pipeline.causal_min_confidence = cl_cfg.min_confidence
378 # Semantic-kNN graph at retain time (Hindsight parity, C3a).
379 slg_cfg = self._config.semantic_link_graph
380 pipeline.semantic_link_graph_enabled = slg_cfg.enabled
381 pipeline.semantic_link_graph_top_k = slg_cfg.top_k
382 pipeline.semantic_link_graph_threshold = slg_cfg.similarity_threshold
384 # Structured fact extraction at retain time.
385 sfe_cfg = self._config.structured_fact_extraction
386 pipeline.structured_fact_extraction_enabled = sfe_cfg.enabled
387 pipeline.structured_fact_extraction_max_facts = sfe_cfg.max_facts_per_call
388 pipeline.structured_fact_extraction_mode = sfe_cfg.extraction_mode
389 pipeline.structured_fact_extraction_chunk_strategy = sfe_cfg.chunk_strategy
390 pipeline.structured_fact_extraction_chunk_max_size = sfe_cfg.chunk_max_size
391 pipeline.structured_fact_extraction_parallel_chunks = sfe_cfg.parallel_chunks
392 pipeline.structured_fact_extraction_parallel_chunks_max_concurrency = sfe_cfg.parallel_chunks_max_concurrency
394 # Entity co-occurrence link cap (2026-05-06 retain-profile fix).
395 # The all-pairs Cartesian product was 34% of retain wall on the
396 # LME profile; capping the entity set bounds it to O(K²) per
397 # retain regardless of corpus size.
398 coocc_cfg = self._config.entity_cooccurrence
399 pipeline.entity_cooccurrence_enabled = coocc_cfg.enabled
400 pipeline.entity_cooccurrence_max_entities = coocc_cfg.max_entities_per_memory
402 # Query analyzer (temporal constraint extraction at recall time).
403 qa_cfg = self._config.query_analyzer
404 pipeline.query_analyzer_enabled = qa_cfg.enabled
405 pipeline.query_analyzer_allow_llm_fallback = qa_cfg.allow_llm_fallback
406 # M18a-1: extended temporal-expansion pattern set. Env-var override
407 # (``ASTROCYTE_M18_ENABLE_TEMPORAL_EXPANSION=1``) gives bench-time
408 # control without code change; otherwise the per-bank config wins.
409 import os as _os # noqa: PLC0415
411 _env = _os.environ.get("ASTROCYTE_M18_ENABLE_TEMPORAL_EXPANSION", "").lower()
412 if _env in ("1", "true", "yes"):
413 pipeline.query_analyzer_enable_temporal_expansion = True
414 elif _env in ("0", "false", "no"):
415 pipeline.query_analyzer_enable_temporal_expansion = False
416 else:
417 pipeline.query_analyzer_enable_temporal_expansion = qa_cfg.enable_temporal_expansion
419 # Link expansion (Hindsight parity, C3) — replaces the previous
420 # spreading-activation BFS with the 3-parallel-signal path.
421 # Reuses the legacy ``spreading_activation:`` config block so
422 # benchmarks don't have to rewrite their YAML; the relevant
423 # knobs (expansion_limit, etc.) translate directly. Block-name
424 # rename can come later as a backward-incompatible change.
425 sa_cfg = self._config.spreading_activation
426 if sa_cfg.enabled:
427 from astrocyte.pipeline.link_expansion import LinkExpansionParams
429 pipeline.link_expansion_params = LinkExpansionParams(
430 expansion_limit=sa_cfg.expansion_limit,
431 )
432 else:
433 pipeline.link_expansion_params = None
435 # Adversarial-defense layer.
436 # M9 BM25-IDF wiring — opt-in flag plumbed through to ``parallel_retrieve``.
437 pipeline.bm25_idf_enabled = self._config.bm25_idf.enabled
439 # M10 source-aware retain + recall. The store handle and the three
440 # behavioural flags are kept independently configurable so a
441 # deployment can enable provenance ingest without paying for the
442 # recall-side chunk expansion (which only helps multi-hop / split-
443 # evidence questions and adds one DB roundtrip per top-K hit).
444 sar_cfg = self._config.source_aware_retrieval
445 pipeline.source_store = self._source_store
446 pipeline.source_retain_provenance = sar_cfg.retain_provenance
447 pipeline.source_chunk_expansion = sar_cfg.chunk_expansion
448 pipeline.source_expansion_score_multiplier = sar_cfg.expansion_score_multiplier
449 pipeline.source_expansion_max_per_hit = sar_cfg.expansion_max_per_hit
451 ad_cfg = self._config.adversarial_defense
452 pipeline.adversarial_abstention_enabled = ad_cfg.abstention_enabled
453 pipeline.adversarial_abstention_floor = ad_cfg.abstention_floor
454 pipeline.adversarial_premise_verification_enabled = ad_cfg.premise_verification_enabled
455 pipeline.adversarial_premise_min_confidence = ad_cfg.premise_verification_min_confidence
456 pipeline.adversarial_prompt_enabled = ad_cfg.adversarial_prompt_enabled
458 # Agentic reflect loop.
459 ar_cfg = self._config.agentic_reflect
460 if ar_cfg.enabled:
461 from astrocyte.pipeline.agentic_reflect import AgenticReflectParams
463 pipeline.agentic_reflect_params = AgenticReflectParams(
464 max_iterations=ar_cfg.max_iterations,
465 recall_step_max_results=ar_cfg.recall_step_max_results,
466 max_evidence_pool_size=ar_cfg.max_evidence_pool_size,
467 # Adversarial-defense rules in the system prompt are
468 # opt-in via the separate adversarial_defense block —
469 # the agentic loop's own ``adversarial_defense`` param
470 # mirrors that flag so a user enabling adversarial
471 # defense automatically gets the loop-level prompt
472 # tightening too.
473 adversarial_defense=ad_cfg.adversarial_prompt_enabled,
474 )
475 else:
476 pipeline.agentic_reflect_params = None
477 # Wire the LLM provider to the MIP router for intent-layer escalation
478 if self._mip_router and hasattr(pipeline, "llm_provider"):
479 self._mip_router._llm_provider = pipeline.llm_provider
480 # Expose router for per-bank MIP resolution at recall time (P3)
481 pipeline.mip_router = self._mip_router
482 self._rebuild_tiered_retrieval()
483 self._rebuild_multi_bank()
485 def _rebuild_tiered_retrieval(self) -> None:
486 """Construct :class:`~astrocyte.pipeline.tiered_retrieval.TieredRetriever` when enabled."""
487 from astrocyte.hybrid import HybridEngineProvider
488 from astrocyte.pipeline.recall_cache import RecallCache
489 from astrocyte.pipeline.recent_buffer import RecentMemoryBuffer
490 from astrocyte.pipeline.tiered_retrieval import TieredRetriever
492 self._dispatcher.tiered_retriever = None
493 if not self._pipeline or not self._config.tiered_retrieval.enabled:
494 return
495 trc = self._config.tiered_retrieval
496 if self._engine_provider is not None and trc.full_recall != "hybrid":
497 return
498 full_recall_fn = None
499 if trc.full_recall == "hybrid":
500 if not isinstance(self._engine_provider, HybridEngineProvider):
501 logger.warning(
502 "tiered_retrieval.full_recall=hybrid requires HybridEngineProvider; "
503 "tiered retrieval disabled until a hybrid provider is set",
504 )
505 return
506 full_recall_fn = self._engine_provider.recall
507 rcc = self._config.recall_cache
508 cache: RecallCache | None = None
509 if rcc.enabled:
510 cache = RecallCache(
511 similarity_threshold=rcc.similarity_threshold,
512 max_entries=rcc.max_entries,
513 ttl_seconds=rcc.ttl_seconds,
514 )
515 recent = RecentMemoryBuffer()
516 self._dispatcher.tiered_retriever = TieredRetriever(
517 self._pipeline,
518 recall_cache=cache,
519 recent_buffer=recent,
520 min_results=trc.min_results,
521 min_score=trc.min_score,
522 max_tier=trc.max_tier,
523 full_recall=full_recall_fn,
524 )
526 def _rebuild_multi_bank(self) -> None:
527 """Rebuild MultiBankOrchestrator when provider changes."""
528 self._multi_bank = MultiBankOrchestrator(
529 do_recall=self._dispatcher.recall,
530 make_request=self._make_recall_request,
531 circuit_breaker_record_failure=self._policy.record_failure,
532 metrics=self._metrics,
533 provider_name=self._dispatcher.provider_name,
534 )
536 def set_access_grants(self, grants: list[AccessGrant]) -> None:
537 """Configure access grants."""
538 self._policy.set_access_grants(grants)
540 @property
541 def _rate_limiters(self) -> dict:
542 """Expose rate limiters for testing/introspection."""
543 return self._policy._rate_limiters
545 def register_hook(self, event_type: str, handler: HookHandler) -> None:
546 """Register an event hook handler."""
547 self._hook_manager.register(event_type, handler)
549 # ---------------------------------------------------------------------------
550 # Public API
551 # ---------------------------------------------------------------------------
553 async def retain(
554 self,
555 content: str,
556 bank_id: str,
557 *,
558 metadata: dict[str, str | int | float | bool | None] | None = None,
559 tags: list[str] | None = None,
560 context: AstrocyteContext | None = None,
561 content_type: str = "text",
562 extraction_profile: str | None = None,
563 occurred_at: datetime | None = None,
564 source: str | None = None,
565 pii_detected: bool = False,
566 ) -> RetainResult:
567 """Store content into memory.
569 Args:
570 content_type: ``text``, ``conversation``, ``document``, etc.
571 extraction_profile: Name under YAML ``extraction_profiles:``.
572 occurred_at: When the content originally occurred.
573 source: Origin identifier for the content.
574 pii_detected: Whether PII was already detected upstream.
575 """
576 validate_bank_id(bank_id)
577 with span("astrocyte.retain", {"astrocyte.bank_id": bank_id}):
578 # Input size validation — reject oversized content before pipeline processing
579 input_error = self._policy.validate_retain_input(content, tags)
580 if input_error:
581 return RetainResult(stored=False, error=input_error)
583 # Access control
584 self._policy.check_access(bank_id, "write", context)
586 # MIP routing (before policy layer)
587 mip_pipeline = None
588 mip_rule_name = None
589 if self._mip_router:
590 from astrocyte.identity import resolve_actor
591 from astrocyte.mip.rule_engine import RuleEngineInput
593 # Identity spec §3 Gap 2: resolved actor flows into MIP so
594 # rules can branch on principal_type / principal_id / etc.
595 # When no context is supplied (legacy callers), actor is
596 # None and principal_* match conditions simply never fire.
597 actor_identity = resolve_actor(context) if context else None
599 mip_input = RuleEngineInput(
600 content=content,
601 content_type=content_type,
602 metadata=metadata,
603 tags=tags,
604 pii_detected=pii_detected,
605 source=source,
606 actor_identity=actor_identity,
607 )
608 routing = await self._mip_router.route(mip_input)
609 if routing.bank_id:
610 # Re-check access for new bank
611 if routing.bank_id != bank_id:
612 self._policy.check_access(routing.bank_id, "write", context)
613 bank_id = routing.bank_id
614 if routing.tags is not None:
615 tags = routing.tags
616 if routing.retain_policy == "reject":
617 return RetainResult(stored=False, error="Rejected by MIP routing rule")
618 mip_pipeline = routing.pipeline
619 mip_rule_name = routing.rule_name
621 # Rate limiting + quota (atomic to prevent TOCTOU)
622 self._policy.check_rate_and_quota(bank_id, "retain")
624 # Content validation
625 errors = self._policy.validate_content(content, content_type)
626 if errors:
627 return RetainResult(stored=False, error="; ".join(errors))
629 # PII scanning (async for LLM/rules_then_llm modes)
630 content, pii_matches = await self._policy.scan_pii(content, self._config.barriers.pii.mode)
631 if pii_matches:
632 pii_action = self._policy.pii_action
633 self._logger.log(
634 "astrocyte.policy.pii_detected",
635 bank_id=bank_id,
636 operation="retain",
637 data={"pii_types": ",".join(m.pii_type for m in pii_matches), "action": pii_action},
638 )
639 self._metrics.inc_counter(
640 "astrocyte_pii_detected_total",
641 {"bank_id": bank_id, "action": pii_action},
642 )
643 await self._hook_manager.fire(
644 "on_pii_detected",
645 bank_id=bank_id,
646 data={
647 "pii_types": ",".join(m.pii_type for m in pii_matches),
648 "action": pii_action,
649 },
650 )
652 # Metadata sanitization
653 metadata, meta_warnings = self._policy.sanitize_metadata(metadata)
655 # Build request
656 request = RetainRequest(
657 content=content,
658 bank_id=bank_id,
659 metadata=metadata,
660 tags=tags,
661 occurred_at=occurred_at,
662 source=source,
663 content_type=content_type,
664 extraction_profile=extraction_profile,
665 mip_pipeline=mip_pipeline,
666 mip_rule_name=mip_rule_name,
667 )
669 # Route to provider
670 try:
671 self._policy.check_circuit(self._provider_name)
672 with timed() as t:
673 result = await self._dispatcher.retain(request)
674 self._policy.record_success()
675 self._policy.record_quota(bank_id, "retain")
676 self._metrics.inc_counter(
677 "astrocyte_retain_total",
678 {"bank_id": bank_id, "provider": self._provider_name, "status": "ok"},
679 )
680 self._metrics.observe_histogram(
681 "astrocyte_retain_duration_seconds",
682 t["elapsed_ms"] / 1000,
683 {"bank_id": bank_id, "provider": self._provider_name},
684 )
685 self._analytics.record_retain(
686 bank_id,
687 len(content),
688 deduplicated=getattr(result, "deduplicated", False),
689 )
690 await self._hook_manager.fire(
691 "on_retain",
692 bank_id=bank_id,
693 data={
694 "memory_id": result.memory_id or "",
695 "content_length": len(content),
696 },
697 )
698 # M8 W4: notify the compile queue so it can trigger a background
699 # compile when the bank crosses the configured threshold.
700 if self._compile_queue is not None and result.stored:
701 self._compile_queue.notify_retain(bank_id) # type: ignore[union-attr]
702 return result
703 except ProviderUnavailable:
704 self._policy.handle_degraded_retain(self._provider_name)
705 return RetainResult(stored=False, error="Provider unavailable (degraded mode)")
706 except Exception:
707 self._policy.record_failure()
708 self._metrics.inc_counter(
709 "astrocyte_retain_total",
710 {"bank_id": bank_id, "provider": self._provider_name, "status": "error"},
711 )
712 raise
714 async def _make_recall_request(
715 self,
716 query: str,
717 bank_id: str,
718 max_results: int,
719 max_tokens: int | None,
720 tags: list[str] | None,
721 params: RecallParams,
722 ) -> RecallRequest:
723 from astrocyte.recall.proxy import merge_manual_and_proxy_hits
725 ext = await merge_manual_and_proxy_hits(
726 self._config,
727 query=query,
728 bank_id=bank_id,
729 manual=params.external_context,
730 metrics=self._metrics,
731 )
732 return RecallRequest(
733 query=query,
734 bank_id=bank_id,
735 max_results=max_results,
736 max_tokens=max_tokens,
737 tags=tags,
738 fact_types=params.fact_types,
739 time_range=params.time_range,
740 include_sources=params.include_sources,
741 layer_weights=params.layer_weights,
742 detail_level=params.detail_level,
743 external_context=ext,
744 as_of=params.as_of, # M9
745 query_reference_date=params.query_reference_date,
746 session_id=params.session_id, # M31 Fix 2
747 )
749 async def recall(
750 self,
751 query: str,
752 bank_id: str | None = None,
753 *,
754 banks: list[str] | None = None,
755 strategy: Literal["cascade", "parallel", "first_match"] | MultiBankStrategy | None = None,
756 max_results: int = 10,
757 max_tokens: int | None = None,
758 tags: list[str] | None = None,
759 context: AstrocyteContext | None = None,
760 external_context: list[MemoryHit] | None = None,
761 fact_types: list[str] | None = None,
762 time_range: tuple[datetime, datetime] | None = None,
763 include_sources: bool = False,
764 layer_weights: dict[str, float] | None = None,
765 detail_level: str | None = None,
766 as_of: datetime | None = None,
767 query_reference_date: datetime | None = None,
768 session_id: str | None = None,
769 ) -> RecallResult:
770 """Retrieve relevant memories for a query.
772 With multiple ``banks``, use ``strategy`` (or a :class:`MultiBankStrategy`) to choose
773 ``parallel`` (default), ``cascade`` (widen until enough hits), or ``first_match``.
775 Args:
776 external_context: Extra :class:`MemoryHit` items merged with retrieval via RRF.
777 fact_types: Filter by fact type (e.g. ``["preference", "event"]``).
778 time_range: ``(start, end)`` datetime tuple to scope retrieval.
779 include_sources: Include source metadata in results.
780 layer_weights: Per-layer scoring weights for tiered retrieval.
781 detail_level: Granularity hint (``"summary"`` / ``"full"``).
783 When ``tiered_retrieval.enabled`` is set and a pipeline is configured, recall uses
784 :class:`~astrocyte.pipeline.tiered_retrieval.TieredRetriever` for cheap tiers.
785 """
786 # Resolve bank(s)
787 bank_ids = self._policy.resolve_read_bank_ids(bank_id, banks, context)
789 max_tokens = max_tokens or self._config.homeostasis.recall_max_tokens
791 with span(
792 "astrocyte.recall",
793 {
794 "astrocyte.bank_count": len(bank_ids),
795 "astrocyte.bank_id": bank_ids[0] if len(bank_ids) == 1 else f"{bank_ids[0]}+{len(bank_ids) - 1}",
796 },
797 ):
798 # Access control for all banks
799 for bid in bank_ids:
800 self._policy.check_access(bid, "read", context)
802 # Rate limiting + quota (atomic per-bank)
803 for bid in bank_ids:
804 self._policy.check_rate_and_quota(bid, "recall")
806 # Build typed params for internal helpers
807 _rp = RecallParams(
808 external_context=external_context,
809 fact_types=fact_types,
810 time_range=time_range,
811 include_sources=include_sources,
812 layer_weights=layer_weights,
813 detail_level=detail_level,
814 as_of=as_of, # M9
815 query_reference_date=query_reference_date,
816 session_id=session_id, # M31 Fix 2
817 )
819 # Single bank — direct
820 if len(bank_ids) == 1:
821 request = await self._make_recall_request(
822 query,
823 bank_ids[0],
824 max_results,
825 max_tokens,
826 tags,
827 _rp,
828 )
829 try:
830 self._policy.check_circuit(self._provider_name)
831 with timed() as t:
832 result = await self._dispatcher.recall(request)
833 self._policy.record_success()
834 self._metrics.inc_counter(
835 "astrocyte_recall_total",
836 {"bank_id": bank_ids[0], "provider": self._provider_name, "status": "ok"},
837 )
838 self._metrics.observe_histogram(
839 "astrocyte_recall_duration_seconds",
840 t["elapsed_ms"] / 1000,
841 {"bank_id": bank_ids[0], "provider": self._provider_name},
842 )
843 top_score = result.hits[0].score if result.hits else 0.0
844 self._analytics.record_recall(
845 bank_ids[0],
846 len(result.hits),
847 top_score,
848 )
849 await self._hook_manager.fire(
850 "on_recall",
851 bank_id=bank_ids[0],
852 data={
853 "query_length": len(query),
854 "result_count": len(result.hits),
855 },
856 )
857 if self._config.dlp.scan_recall_output:
858 result = self._output_scanner.scan_recall(result)
859 return apply_recall_authority(result, self._config.recall_authority)
860 except ProviderUnavailable:
861 return self._policy.handle_degraded_recall(self._provider_name)
862 except Exception:
863 self._policy.record_failure()
864 raise
866 strat = _normalize_multi_bank_strategy(strategy)
867 result = await self._multi_bank.recall(
868 query,
869 bank_ids,
870 max_results,
871 max_tokens,
872 tags,
873 _rp,
874 strat,
875 )
876 if self._config.dlp.scan_recall_output:
877 result = self._output_scanner.scan_recall(result)
878 return apply_recall_authority(result, self._config.recall_authority)
880 async def reflect(
881 self,
882 query: str,
883 bank_id: str | None = None,
884 *,
885 banks: list[str] | None = None,
886 strategy: Literal["cascade", "parallel", "first_match"] | MultiBankStrategy | None = None,
887 max_tokens: int | None = None,
888 tags: list[str] | None = None,
889 context: AstrocyteContext | None = None,
890 include_sources: bool = True,
891 dispositions: Any | None = None,
892 as_of: datetime | None = None,
893 query_reference_date: datetime | None = None,
894 ) -> ReflectResult:
895 """Synthesize an answer from memory.
897 Args:
898 tags: Filter recall by tags during reflect.
899 include_sources: Include source memories in the result.
900 dispositions: Emotional/tonal dispositions for synthesis.
901 as_of: M9 time-travel anchor — filters underlying recall to
902 ``retained_at <= as_of``. For legal-hold / audit-replay
903 use cases ("show me what the system knew at time X"). Do
904 NOT pass for relative-phrase anchoring on benchmarks
905 whose retain happens after the dataset's question dates;
906 use ``query_reference_date`` instead.
907 query_reference_date: Reference date for resolving relative
908 temporal phrases (``"yesterday"``, ``"last week"``,
909 ``"3 days ago"``). Required when running a benchmark
910 whose dataset predates the wall-clock — e.g. LongMemEval
911 (2023-vintage) measured in 2026 must pass the question's
912 contemporaneous date here. Unlike ``as_of``, setting
913 this alone does NOT filter out memories retained after
914 this date — the entire corpus stays in scope.
916 Supports multi-bank reflect: pass ``banks`` (and optionally ``strategy``) to
917 recall across multiple banks and synthesize over the fused results.
918 """
919 # Resolve bank(s)
920 bank_ids = self._policy.resolve_read_bank_ids(bank_id, banks, context)
922 max_tokens = max_tokens or self._config.homeostasis.reflect_max_tokens
923 primary_bank = bank_ids[0]
925 with span(
926 "astrocyte.reflect",
927 {
928 "astrocyte.bank_count": len(bank_ids),
929 "astrocyte.bank_id": bank_ids[0] if len(bank_ids) == 1 else f"{bank_ids[0]}+{len(bank_ids) - 1}",
930 },
931 ):
932 # Access control for all banks
933 for bid in bank_ids:
934 self._policy.check_access(bid, "read", context)
936 for bid in bank_ids:
937 self._policy.check_rate_and_quota(bid, "reflect")
939 # ── Single bank: delegate to provider/pipeline reflect ──
940 if len(bank_ids) == 1:
941 request = ReflectRequest(
942 query=query,
943 bank_id=primary_bank,
944 max_tokens=max_tokens,
945 include_sources=include_sources,
946 dispositions=dispositions,
947 tags=tags,
948 as_of=as_of,
949 query_reference_date=query_reference_date,
950 )
951 try:
952 self._policy.check_circuit(self._provider_name)
953 with timed() as t:
954 result = await self._dispatcher.reflect(request)
955 self._policy.record_success()
956 except ProviderUnavailable:
957 return ReflectResult(answer="Memory unavailable", sources=[])
958 except Exception:
959 self._policy.record_failure()
960 raise
962 # ── Multi-bank: recall across banks, then synthesize ──
963 else:
964 strat = _normalize_multi_bank_strategy(strategy)
965 with timed() as t:
966 _rp = RecallParams(
967 include_sources=include_sources,
968 as_of=as_of,
969 query_reference_date=query_reference_date,
970 )
971 recall_result = await self._multi_bank.recall(
972 query,
973 bank_ids,
974 max_results=20, # Larger set for synthesis context
975 max_tokens=None, # Budget applied after synthesis
976 tags=tags,
977 params=_rp,
978 strategy=strat,
979 )
980 auth_ctx: str | None = None
981 ra = self._config.recall_authority
982 if ra.enabled and ra.apply_to_reflect:
983 recall_result = apply_recall_authority(recall_result, ra)
984 auth_ctx = recall_result.authority_context
985 result = await self._dispatcher.reflect_from_hits(
986 query=query,
987 hits=recall_result.hits,
988 bank_id=primary_bank,
989 max_tokens=max_tokens,
990 dispositions=dispositions,
991 authority_context=auth_ctx,
992 )
994 self._analytics.record_reflect(
995 primary_bank,
996 success=bool(result.answer.strip()),
997 )
998 self._policy.record_quota(primary_bank, "reflect")
999 self._metrics.inc_counter(
1000 "astrocyte_reflect_total",
1001 {"bank_id": ",".join(bank_ids), "provider": self._provider_name, "status": "ok"},
1002 )
1003 self._metrics.observe_histogram(
1004 "astrocyte_reflect_duration_seconds",
1005 t["elapsed_ms"] / 1000,
1006 {"bank_id": ",".join(bank_ids), "provider": self._provider_name},
1007 )
1008 await self._hook_manager.fire(
1009 "on_reflect",
1010 bank_id=primary_bank,
1011 data={
1012 "query_length": len(query),
1013 "answer_length": len(result.answer),
1014 "bank_count": len(bank_ids),
1015 },
1016 )
1017 if self._config.dlp.scan_reflect_output:
1018 result = self._output_scanner.scan_reflect(result)
1019 return result
1021 # ── M21 — Mental model CRUD (Hindsight parity) ────────────────────
1023 def _mental_model_service(self) -> Any:
1024 """Return the configured MentalModelService or raise.
1026 Mental-model CRUD requires a store to have been set via
1027 :meth:`set_mental_model_store`; otherwise these methods raise
1028 :class:`RuntimeError` with a hint. Cleanest place to fail —
1029 attempts to call the methods without storage would otherwise
1030 crash deeper inside the pipeline.
1031 """
1032 if self._mental_model_store is None:
1033 raise RuntimeError(
1034 "mental_model_store not configured — call "
1035 "Astrocyte.set_mental_model_store(store) before using "
1036 "mental-model CRUD."
1037 )
1038 from astrocyte.pipeline.mental_model import MentalModelService
1040 return MentalModelService(self._mental_model_store) # type: ignore[arg-type]
1042 async def list_mental_models(
1043 self,
1044 bank_id: str | None = None,
1045 *,
1046 scope: str | None = None,
1047 context: AstrocyteContext | None = None,
1048 ) -> "list[MentalModel]":
1049 """List mental models in a bank. Optionally filter by scope."""
1050 bid = self._policy.resolve_read_bank_ids(bank_id, None, context)[0]
1051 self._policy.check_access(bid, "read", context)
1052 return await self._mental_model_service().list(bid, scope=scope)
1054 async def get_mental_model(
1055 self,
1056 model_id: str,
1057 bank_id: str | None = None,
1058 *,
1059 context: AstrocyteContext | None = None,
1060 ) -> "MentalModel | None":
1061 """Fetch one mental model by id. Returns None if not found / deleted."""
1062 bid = self._policy.resolve_read_bank_ids(bank_id, None, context)[0]
1063 self._policy.check_access(bid, "read", context)
1064 return await self._mental_model_service().get(bank_id=bid, model_id=model_id)
1066 async def create_mental_model(
1067 self,
1068 *,
1069 model_id: str,
1070 title: str,
1071 content: str | None = None,
1072 sections: list[dict] | None = None,
1073 scope: str = "bank",
1074 source_ids: list[str] | None = None,
1075 bank_id: str | None = None,
1076 context: AstrocyteContext | None = None,
1077 ) -> "MentalModel":
1078 """Create or refresh a mental model.
1080 Two content shapes (mutually exclusive — provide one):
1082 - ``content``: raw markdown string. Parsed on first refresh into
1083 a structured doc; legacy-shape path.
1084 - ``sections``: list of section dicts matching
1085 :class:`astrocyte.pipeline.structured_doc.Section` JSON shape.
1086 Modern path — the structured doc is populated immediately and
1087 ``content`` is rendered from it.
1089 Pass exactly one. If both are provided, ``sections`` wins.
1090 """
1091 bid = self._policy.resolve_read_bank_ids(bank_id, None, context)[0]
1092 self._policy.check_access(bid, "write", context)
1093 self._policy.check_rate_and_quota(bid, "retain")
1095 service = self._mental_model_service()
1097 if sections is not None:
1098 from astrocyte.pipeline.structured_doc import (
1099 StructuredDocument,
1100 render_document,
1101 )
1103 doc = StructuredDocument.model_validate({"sections": sections})
1104 rendered = render_document(doc)
1105 return await service.create(
1106 bank_id=bid,
1107 model_id=model_id,
1108 title=title,
1109 content=rendered,
1110 scope=scope,
1111 source_ids=source_ids,
1112 structured_doc=doc.model_dump(),
1113 )
1115 if content is None:
1116 raise ValueError("create_mental_model requires either `content` or `sections`")
1118 return await service.create(
1119 bank_id=bid,
1120 model_id=model_id,
1121 title=title,
1122 content=content,
1123 scope=scope,
1124 source_ids=source_ids,
1125 )
1127 async def update_mental_model(
1128 self,
1129 *,
1130 model_id: str,
1131 operations: list[dict],
1132 bank_id: str | None = None,
1133 context: AstrocyteContext | None = None,
1134 ) -> "tuple[MentalModel, dict] | None":
1135 """Apply structured delta operations to a mental model.
1137 See :func:`astrocyte.pipeline.delta_ops.apply_operations` for
1138 the operation schema. Conservative-failure contract — invalid
1139 ops drop with a logged reason; the document never gets worse
1140 than its input.
1142 Returns ``(updated_model, summary)`` where summary is
1143 ``{"applied": [...], "skipped": [...], "changed": bool}`` —
1144 the audit trail of which ops landed. Returns ``None`` when
1145 the model doesn't exist.
1146 """
1147 bid = self._policy.resolve_read_bank_ids(bank_id, None, context)[0]
1148 self._policy.check_access(bid, "write", context)
1149 self._policy.check_rate_and_quota(bid, "retain")
1150 return await self._mental_model_service().update_via_ops(
1151 bank_id=bid,
1152 model_id=model_id,
1153 operations=operations,
1154 )
1156 async def delete_mental_model(
1157 self,
1158 model_id: str,
1159 bank_id: str | None = None,
1160 *,
1161 context: AstrocyteContext | None = None,
1162 ) -> bool:
1163 """Soft-delete a mental model. Returns True iff it existed."""
1164 bid = self._policy.resolve_read_bank_ids(bank_id, None, context)[0]
1165 self._policy.check_access(bid, "forget", context)
1166 return await self._mental_model_service().delete(bank_id=bid, model_id=model_id)
1168 async def refresh_mental_model(
1169 self,
1170 model_id: str,
1171 new_source_ids: list[str],
1172 bank_id: str | None = None,
1173 *,
1174 context: AstrocyteContext | None = None,
1175 ) -> "MentalModel | None":
1176 """M28 — re-derive a mental model from new/expanded source memories.
1178 Hindsight parity for ``mental_model.refresh()``: after retain
1179 adds memories that pertain to an existing model, this folds
1180 them into the model's ``source_ids`` and bumps the revision.
1182 The store-level implementation merges ``new_source_ids`` into
1183 the existing set (order-preserving dedup); future enhancement
1184 invokes the LLM compile pipeline against the merged source set
1185 to also rewrite ``content`` and ``structured_doc`` — the SPI
1186 surface is the same.
1188 Returns the refreshed :class:`MentalModel` or ``None`` if the
1189 model doesn't exist.
1190 """
1191 bid = self._policy.resolve_read_bank_ids(bank_id, None, context)[0]
1192 self._policy.check_access(bid, "write", context)
1193 self._policy.check_rate_and_quota(bid, "retain")
1194 return await self._mental_model_service().refresh_from_sources(
1195 bank_id=bid,
1196 model_id=model_id,
1197 new_source_ids=list(new_source_ids),
1198 )
1200 async def create_directive(
1201 self,
1202 *,
1203 rule_text: str,
1204 directive_id: str | None = None,
1205 scope: str = "bank",
1206 bank_id: str | None = None,
1207 context: AstrocyteContext | None = None,
1208 ) -> "MentalModel":
1209 """Create a user-authored hard rule (M18b/M19 deferred item).
1211 Directives are stored as :class:`MentalModel` rows with
1212 ``kind="directive"``. They participate in the agentic reflect
1213 loop's ``search_mental_models`` tool exactly like ``general``
1214 / ``preference`` models, but the ``directive`` discriminator
1215 signals to the answerer that the rule should be applied as a
1216 hard preference override rather than as one input among many.
1218 Mirrors Hindsight's ``mental_models.subtype="directive"``
1219 rows. Architecturally replaces the M18a-2 ``directive_compile``
1220 auto-extraction path that was deprecated in M19 (it replicated
1221 a −30pp SSP regression because compressed auto-directives
1222 overrode original preference nuance).
1223 """
1224 import uuid as _uuid
1226 bid = self._policy.resolve_read_bank_ids(bank_id, None, context)[0]
1227 self._policy.check_access(bid, "write", context)
1228 self._policy.check_rate_and_quota(bid, "retain")
1230 if not rule_text.strip():
1231 raise ValueError("create_directive requires non-empty rule_text")
1233 mid = directive_id or f"directive:{_uuid.uuid4().hex[:12]}"
1234 service = self._mental_model_service()
1235 # Author the directive with a single-paragraph structured doc so
1236 # subsequent refreshes can use delta_ops cleanly.
1237 from astrocyte.pipeline.structured_doc import (
1238 ParagraphBlock,
1239 Section,
1240 StructuredDocument,
1241 render_document,
1242 )
1244 doc = StructuredDocument(
1245 sections=[
1246 Section(
1247 id="rule",
1248 heading="Rule",
1249 level=2,
1250 blocks=[ParagraphBlock(text=rule_text.strip())],
1251 ),
1252 ],
1253 )
1254 rendered = render_document(doc)
1255 return await service.create(
1256 bank_id=bid,
1257 model_id=mid,
1258 title="Directive",
1259 content=rendered,
1260 scope=scope,
1261 kind="directive",
1262 structured_doc=doc.model_dump(),
1263 )
1265 # ── M21 — Observation CRUD (Hindsight parity) ────────────────────
1267 async def list_observations(
1268 self,
1269 bank_id: str | None = None,
1270 *,
1271 scope: str | None = None,
1272 trend: "str | None" = None,
1273 limit: int = 100,
1274 context: AstrocyteContext | None = None,
1275 ) -> list[dict]:
1276 """List observations in a bank.
1278 Returns a list of dicts ``{id, text, trend, proof_count,
1279 source_ids, confidence, updated_at}`` so callers (incl. the
1280 MCP tool) get a JSON-shaped output without round-tripping
1281 through ``VectorHit``.
1283 Args:
1284 scope: Filter by ``_obs_scope`` metadata (default ``"bank"``
1285 if observations were created without a tag).
1286 trend: Filter by computed trend (``"new"`` / ``"stable"`` /
1287 ``"strengthening"`` / ``"weakening"`` / ``"stale"``).
1288 Trend is computed at call time from each observation's
1289 source-timestamp metadata; see
1290 :func:`astrocyte.pipeline.observation.compute_observation_trend`.
1291 limit: Max rows. Hard cap 1000.
1292 """
1293 from astrocyte.pipeline.observation import (
1294 compute_observation_trend,
1295 obs_bank_id,
1296 )
1298 bid = self._policy.resolve_read_bank_ids(bank_id, None, context)[0]
1299 self._policy.check_access(bid, "read", context)
1301 if self._pipeline is None or not hasattr(self._pipeline, "vector_store"):
1302 raise RuntimeError("vector_store not configured on the pipeline")
1303 store = self._pipeline.vector_store # type: ignore[attr-defined]
1304 if store is None:
1305 raise RuntimeError("vector_store is not set")
1307 obs_bank = obs_bank_id(bid)
1308 cap = max(1, min(int(limit), 1000))
1309 items = await store.list_vectors(
1310 obs_bank,
1311 offset=0,
1312 limit=cap,
1313 )
1314 out: list[dict] = []
1315 for item in items:
1316 meta = item.metadata or {}
1317 if scope is not None and str(meta.get("_obs_scope", "")) != scope:
1318 continue
1319 t = compute_observation_trend(dict(meta)).value
1320 if trend is not None and t != trend:
1321 continue
1322 out.append(
1323 {
1324 "id": item.id,
1325 "text": item.text,
1326 "trend": t,
1327 "proof_count": int(meta.get("_obs_proof_count", 1)),
1328 "source_ids": meta.get("_obs_source_ids", "[]"),
1329 "confidence": meta.get("_obs_confidence"),
1330 "updated_at": meta.get("_obs_updated_at"),
1331 "scope": meta.get("_obs_scope"),
1332 }
1333 )
1334 return out
1336 async def get_observation(
1337 self,
1338 observation_id: str,
1339 bank_id: str | None = None,
1340 *,
1341 context: AstrocyteContext | None = None,
1342 ) -> dict | None:
1343 """Fetch one observation by id. Returns None if not found."""
1344 rows = await self.list_observations(
1345 bank_id=bank_id,
1346 limit=1000,
1347 context=context,
1348 )
1349 for r in rows:
1350 if r["id"] == observation_id:
1351 return r
1352 return None
1354 async def create_observation(
1355 self,
1356 *,
1357 text: str,
1358 source_ids: list[str] | None = None,
1359 scope: str | None = None,
1360 confidence: float = 0.9,
1361 bank_id: str | None = None,
1362 context: AstrocyteContext | None = None,
1363 ) -> dict:
1364 """User-authored observation.
1366 Bypasses the autonomous consolidator — for cases where an
1367 agent or user wants to assert a fact directly without going
1368 through retain → consolidate. The observation is stored in
1369 the dedicated ``::obs`` bank with proof_count=1 and the given
1370 source_ids (may be empty for hand-authored entries).
1371 """
1372 from astrocyte.pipeline.observation import (
1373 ObservationConsolidator,
1374 observation_scope,
1375 )
1377 bid = self._policy.resolve_read_bank_ids(bank_id, None, context)[0]
1378 self._policy.check_access(bid, "write", context)
1379 self._policy.check_rate_and_quota(bid, "retain")
1381 if self._pipeline is None or not hasattr(self._pipeline, "vector_store"):
1382 raise RuntimeError("vector_store not configured on the pipeline")
1383 store = self._pipeline.vector_store # type: ignore[attr-defined]
1384 llm = self._pipeline.llm_provider # type: ignore[attr-defined]
1385 if store is None or llm is None:
1386 raise RuntimeError("vector_store / llm_provider not set")
1388 consolidator = ObservationConsolidator()
1389 obs_scope_value = scope or observation_scope(bid)
1390 ok = await consolidator._apply_create( # noqa: SLF001 — single-step CRUD
1391 {"text": text.strip(), "confidence": confidence},
1392 bid,
1393 store,
1394 llm,
1395 source_ids=list(source_ids or []),
1396 now_iso=datetime.now(timezone.utc).isoformat(),
1397 scope=obs_scope_value,
1398 )
1399 if not ok:
1400 raise ValueError("observation create rejected (empty text or confidence below threshold)")
1401 # The freshly-created row is at the top of the obs bank — list and
1402 # find it. (For multi-writer setups, callers can use the returned
1403 # text + scope as the identity check.)
1404 rows = await self.list_observations(bank_id=bid, scope=obs_scope_value, context=context)
1405 for r in rows:
1406 if r["text"] == text.strip():
1407 return r
1408 # Fall back: return a minimal dict so the caller doesn't get None
1409 return {"text": text.strip(), "scope": obs_scope_value, "trend": "new"}
1411 async def delete_observation(
1412 self,
1413 observation_id: str,
1414 bank_id: str | None = None,
1415 *,
1416 context: AstrocyteContext | None = None,
1417 ) -> bool:
1418 """Delete one observation from the dedicated obs bank."""
1419 from astrocyte.pipeline.observation import obs_bank_id
1421 bid = self._policy.resolve_read_bank_ids(bank_id, None, context)[0]
1422 self._policy.check_access(bid, "forget", context)
1424 if self._pipeline is None or not hasattr(self._pipeline, "vector_store"):
1425 raise RuntimeError("vector_store not configured on the pipeline")
1426 store = self._pipeline.vector_store # type: ignore[attr-defined]
1427 if store is None:
1428 return False
1429 obs_bank = obs_bank_id(bid)
1430 deleted = await store.delete([observation_id], obs_bank)
1431 return deleted > 0
1433 async def clear_bank(
1434 self,
1435 bank_id: str,
1436 *,
1437 context: AstrocyteContext | None = None,
1438 ) -> ForgetResult:
1439 """Delete all memories in a bank. Requires admin access if ACL enabled."""
1440 return await self.forget(bank_id, scope="all", context=context)
1442 async def forget(
1443 self,
1444 bank_id: str,
1445 *,
1446 memory_ids: list[str] | None = None,
1447 tags: list[str] | None = None,
1448 scope: str | None = None,
1449 context: AstrocyteContext | None = None,
1450 compliance: bool = False,
1451 reason: str | None = None,
1452 before_date: datetime | None = None,
1453 ) -> ForgetResult:
1454 """Remove memories.
1456 Args:
1457 scope: ``"all"`` to delete everything in a bank (requires admin).
1458 compliance: Bypass legal holds for right-to-forget (requires context).
1459 reason: Audit reason when ``compliance=True``.
1460 before_date: Delete memories created before this date.
1461 """
1462 validate_bank_id(bank_id)
1463 with span("astrocyte.forget", {"astrocyte.bank_id": bank_id}):
1464 # scope="all" requires admin permission
1465 if scope == "all":
1466 if self._config.access_control.enabled:
1467 self._policy.check_access(bank_id, "admin", context)
1468 else:
1469 self._policy.check_access(bank_id, "forget", context)
1471 # Legal hold check — compliance=True bypasses for right-to-forget.
1472 # Even when access_control is disabled, compliance bypass requires
1473 # explicit context (caller must identify themselves).
1474 # MIP forget policy resolution (Phase 4) — runs BEFORE the existing
1475 # legal-hold check so that a rule with respect_legal_hold=True wins
1476 # over the caller's compliance bypass, and so audit logs fire even
1477 # on policy refusal.
1478 mip_forget = self._mip_router.resolve_forget_for_bank(bank_id) if self._mip_router else None
1479 if mip_forget is not None:
1480 # max_per_call: cap blast radius for selective deletes by id
1481 if (
1482 mip_forget.max_per_call is not None
1483 and memory_ids is not None
1484 and len(memory_ids) > mip_forget.max_per_call
1485 ):
1486 raise MipRoutingError(
1487 f"forget rejected: {len(memory_ids)} memory_ids exceeds "
1488 f"forget.max_per_call={mip_forget.max_per_call} for bank {bank_id!r}"
1489 )
1491 # min_age_days: refuse forget when any targeted record is younger.
1492 # Best-effort: requires the engine to populate `_created_at` in
1493 # metadata at retain time. Records lacking this stamp are skipped
1494 # with a warning (degraded enforcement, not a hard failure).
1495 if mip_forget.min_age_days is not None and mip_forget.min_age_days > 0 and memory_ids:
1496 too_young = await self._collect_too_young_ids(
1497 bank_id,
1498 memory_ids,
1499 mip_forget.min_age_days,
1500 )
1501 if too_young:
1502 raise MipRoutingError(
1503 f"forget rejected: {len(too_young)} record(s) in {bank_id!r} "
1504 f"younger than forget.min_age_days={mip_forget.min_age_days} "
1505 f"({sorted(too_young)[:5]}{'...' if len(too_young) > 5 else ''})"
1506 )
1508 # audit: emit a structured log line before any deletion occurs
1509 if mip_forget.audit in ("required", "recommended"):
1510 self._logger.log(
1511 "astrocyte.mip.forget.audit",
1512 bank_id=bank_id,
1513 data={
1514 "mode": str(mip_forget.mode or "soft"),
1515 "audit": str(mip_forget.audit),
1516 "cascade": bool(mip_forget.cascade) if mip_forget.cascade is not None else True,
1517 "memory_ids_count": len(memory_ids) if memory_ids else 0,
1518 "scope": scope or "selective",
1519 "actor": context_principal_label(context) if context else "anonymous",
1520 "reason": str(reason) if reason else None,
1521 },
1522 level=logging.WARNING,
1523 )
1525 # respect_legal_hold: when True, override the compliance bypass.
1526 # The MIP rule is the source of truth; compliance flag cannot
1527 # circumvent a rule that explicitly demands legal hold respect.
1528 if mip_forget.respect_legal_hold:
1529 self._lifecycle.check_forget_allowed(bank_id)
1531 if not compliance:
1532 # Skip duplicate check if MIP already enforced legal hold above.
1533 if mip_forget is None or not mip_forget.respect_legal_hold:
1534 self._lifecycle.check_forget_allowed(bank_id)
1535 else:
1536 if context is None:
1537 raise AccessDenied("anonymous", bank_id, "compliance_forget")
1538 # Log compliance forget with actor identity for audit trail
1539 principal_label = context_principal_label(context)
1540 audit_reason = reason or "compliance_forget_request"
1541 self._logger.log(
1542 "astrocyte.compliance.forget",
1543 bank_id=bank_id,
1544 data={
1545 "actor": principal_label,
1546 "reason": str(audit_reason),
1547 "scope": scope or "selective",
1548 },
1549 level=logging.WARNING,
1550 )
1551 # When access control is enabled, also require admin permission
1552 if self._config.access_control.enabled:
1553 self._policy.check_access(bank_id, "admin", context)
1555 # Soft-delete path (mip_forget.mode == "soft"): mark records with
1556 # `_deleted: true` instead of physically removing. Recall is
1557 # responsible for filtering them out. Falls through to hard delete
1558 # with a warning if the engine doesn't expose `soft_delete`.
1559 soft_mode = mip_forget is not None and mip_forget.mode == "soft" and memory_ids is not None
1560 if soft_mode:
1561 soft_fn = getattr(self._engine_provider, "soft_delete", None)
1562 if soft_fn is not None:
1563 deleted = await soft_fn(bank_id, memory_ids)
1565 # Same dedup-cache invalidation as the hard-delete path —
1566 # a soft-deleted memory is no longer reachable on recall,
1567 # so its embedding shouldn't keep blocking re-retains.
1568 if self._pipeline is not None and deleted > 0:
1569 self._pipeline.invalidate_dedup_cache(bank_id, memory_ids)
1571 await self._hook_manager.fire(
1572 "on_forget",
1573 bank_id=bank_id,
1574 data={"deleted_count": deleted, "archived_count": 0, "soft": True},
1575 )
1576 return ForgetResult(deleted_count=deleted)
1577 logging.getLogger("astrocyte.mip").warning(
1578 "forget.mode=soft requested for bank=%s but engine %s does not "
1579 "implement soft_delete(); falling back to hard delete",
1580 _safe_log(bank_id),
1581 type(self._engine_provider).__name__ if self._engine_provider else "pipeline",
1582 )
1584 request = ForgetRequest(
1585 bank_id=bank_id,
1586 memory_ids=memory_ids,
1587 tags=tags,
1588 before_date=before_date,
1589 scope=scope,
1590 )
1591 result = await self._dispatcher.forget(request)
1593 # Drop the forgotten memories from the in-memory dedup cache so
1594 # a re-retain of similar content produces a fresh row instead of
1595 # silently no-op'ing with "All chunks are near-duplicates". The
1596 # DedupDetector lives on the pipeline; safe to skip when no
1597 # pipeline is configured (provider-direct deployments).
1598 if self._pipeline is not None and result.deleted_count > 0:
1599 self._pipeline.invalidate_dedup_cache(bank_id, memory_ids)
1601 await self._hook_manager.fire(
1602 "on_forget",
1603 bank_id=bank_id,
1604 data={
1605 "deleted_count": result.deleted_count,
1606 "archived_count": result.archived_count,
1607 },
1608 )
1609 return result
1611 async def _collect_too_young_ids(
1612 self,
1613 bank_id: str,
1614 memory_ids: list[str],
1615 min_age_days: int,
1616 ) -> list[str]:
1617 """Return the subset of ``memory_ids`` younger than ``min_age_days``.
1619 Best-effort enforcement of ``forget.min_age_days``: relies on records
1620 carrying a ``_created_at`` ISO timestamp in metadata (stamped by the
1621 engine at retain time). Records lacking the stamp are skipped with a
1622 warning — degraded enforcement, not a hard failure, since older data
1623 predating the stamp shouldn't permanently block legitimate forgets.
1624 """
1625 from datetime import timedelta
1627 wanted = set(memory_ids)
1628 try:
1629 result = await self._dispatcher.recall(
1630 RecallRequest(query="*", bank_id=bank_id, max_results=10000),
1631 )
1632 except Exception as exc: # pragma: no cover — defensive
1633 logger.warning(
1634 "min_age_days check skipped: recall failed for bank=%s: %s",
1635 _safe_log(bank_id),
1636 _safe_log(exc),
1637 )
1638 return []
1640 now = datetime.now(timezone.utc)
1641 threshold = timedelta(days=min_age_days)
1642 too_young: list[str] = []
1643 seen: set[str] = set()
1644 missing_stamp = 0
1646 for hit in result.hits:
1647 if hit.memory_id is None or hit.memory_id not in wanted:
1648 continue
1649 seen.add(hit.memory_id)
1650 stamp = (hit.metadata or {}).get("_created_at")
1651 if stamp is None:
1652 missing_stamp += 1
1653 continue
1654 if isinstance(stamp, str):
1655 try:
1656 created_at = datetime.fromisoformat(stamp)
1657 except ValueError:
1658 missing_stamp += 1
1659 continue
1660 elif isinstance(stamp, datetime):
1661 created_at = stamp
1662 else:
1663 missing_stamp += 1
1664 continue
1665 if created_at.tzinfo is None:
1666 created_at = created_at.replace(tzinfo=timezone.utc)
1667 if (now - created_at) < threshold:
1668 too_young.append(hit.memory_id)
1670 if missing_stamp:
1671 mip_logger = logging.getLogger("astrocyte.mip")
1672 mip_logger.warning(
1673 "min_age_days enforcement degraded: %d/%d record(s) in bank=%s "
1674 "lack `_created_at` metadata and were skipped",
1675 missing_stamp,
1676 len(seen),
1677 _safe_log(bank_id),
1678 )
1679 # One increment per forget call that encountered unstamped records.
1680 # The warning log above carries the exact count; this counter
1681 # fires dashboards / alerts when the condition occurs at all.
1682 self._metrics.inc_counter(
1683 "astrocyte_forget_unstamped_records_total",
1684 {"bank_id": bank_id},
1685 )
1686 return too_young
1688 async def compile(
1689 self,
1690 bank_id: str,
1691 scope: str | None = None,
1692 ) -> CompileResult:
1693 """Compile raw memories into WikiPages for ``bank_id`` (M8).
1695 Synthesises a structured wiki page for each detected topic scope using
1696 an LLM. Compiled pages are stored in the WikiStore and embedded back
1697 into the VectorStore (``memory_layer="compiled"``) so the recall pipeline
1698 can surface them ahead of raw memory fragments.
1700 Args:
1701 bank_id: Bank to compile.
1702 scope: If provided, compile only memories tagged with this scope string.
1703 If ``None``, trigger full scope discovery:
1705 1. Tagged memories are grouped by tag (each tag = one page).
1706 2. Untagged memories are clustered by embedding similarity
1707 (DBSCAN); each cluster is labelled with a lightweight LLM
1708 call. Noise points are held for the next compile cycle.
1710 Returns:
1711 :class:`~astrocyte.types.CompileResult` with pages created/updated,
1712 noise count, and token usage.
1714 Raises:
1715 :class:`~astrocyte.errors.ConfigError`: If no WikiStore has been
1716 configured (call ``set_wiki_store()`` before ``compile()``).
1717 :class:`~astrocyte.errors.ProviderUnavailable`: If the Tier 1 pipeline
1718 has not been configured.
1720 Example::
1722 # Explicit — compile memories tagged "incident-response"
1723 result = await brain.compile("eng-team", scope="incident-response")
1725 # Automatic — discover scopes from tags and embedding clusters
1726 result = await brain.compile("eng-team")
1728 print(result.pages_created, result.pages_updated, result.tokens_used)
1729 """
1730 validate_bank_id(bank_id)
1732 if self._wiki_store is None:
1733 raise ConfigError(
1734 "brain.compile() requires a WikiStore. Call brain.set_wiki_store(store) before compiling."
1735 )
1737 if self._pipeline is None:
1738 raise ProviderUnavailable(
1739 "brain.compile() requires a Tier 1 pipeline. Call brain.set_pipeline(pipeline) before compiling."
1740 )
1742 from astrocyte.pipeline.compile import CompileEngine
1744 engine = CompileEngine(
1745 vector_store=self._pipeline.vector_store,
1746 llm_provider=self._pipeline.llm_provider,
1747 wiki_store=self._wiki_store, # type: ignore[arg-type]
1748 )
1750 with span("compile", {"bank_id": bank_id, "scope": scope or "auto"}):
1751 result = await engine.run(bank_id, scope=scope)
1753 if result.error:
1754 logger.warning(
1755 "compile failed for bank %s scope %s: %s",
1756 _safe_log(bank_id),
1757 _safe_log(scope or "auto"),
1758 _safe_log(result.error),
1759 )
1760 else:
1761 logger.info(
1762 "compile complete bank=%s scope=%s pages_created=%d pages_updated=%d noise=%d tokens=%d elapsed_ms=%d",
1763 _safe_log(bank_id),
1764 _safe_log(scope or "auto"),
1765 result.pages_created,
1766 result.pages_updated,
1767 result.noise_memories,
1768 result.tokens_used,
1769 result.elapsed_ms,
1770 )
1772 return result
1774 # ---------------------------------------------------------------------------
1775 # Graph traversal (public API)
1776 # ---------------------------------------------------------------------------
1778 async def graph_search(
1779 self,
1780 query: str,
1781 bank_id: str,
1782 limit: int = 10,
1783 ) -> list[Entity]:
1784 """Search the knowledge graph for entities matching *query*.
1786 Performs a name/alias search against the graph store and returns
1787 matching :class:`~astrocyte.types.Entity` objects. Useful for
1788 resolving entity references before calling :meth:`graph_neighbors`.
1790 Args:
1791 query: Name or partial name to search for.
1792 bank_id: Memory bank whose graph to search.
1793 limit: Maximum number of entities to return.
1795 Returns:
1796 Matching entities ordered by relevance.
1798 Raises:
1799 :class:`~astrocyte.errors.ConfigError`: If no graph store has been
1800 configured (set ``graph_store`` on the pipeline).
1801 """
1802 from astrocyte.errors import ConfigError
1804 validate_bank_id(bank_id)
1805 graph_store = getattr(self._pipeline, "graph_store", None) if self._pipeline else None
1806 if graph_store is None:
1807 raise ConfigError(
1808 "graph_search() requires a GraphStore. Configure a graph_store provider in astrocyte.yaml."
1809 )
1810 return await graph_store.query_entities(query, bank_id, limit=limit)
1812 async def graph_neighbors(
1813 self,
1814 entity_ids: list[str],
1815 bank_id: str,
1816 max_depth: int = 2,
1817 limit: int = 20,
1818 ) -> list[GraphHit]:
1819 """Traverse the knowledge graph from *entity_ids* and return connected memories.
1821 Walks the entity graph up to *max_depth* hops from each seed entity
1822 and returns the memories attached to discovered entities, scored by
1823 proximity.
1825 Args:
1826 entity_ids: Seed entity IDs to start traversal from.
1827 bank_id: Memory bank whose graph to traverse.
1828 max_depth: Maximum traversal depth (default 2).
1829 limit: Maximum number of memory hits to return.
1831 Returns:
1832 :class:`~astrocyte.types.GraphHit` objects sorted by relevance.
1834 Raises:
1835 :class:`~astrocyte.errors.ConfigError`: If no graph store has been
1836 configured.
1837 """
1838 from astrocyte.errors import ConfigError
1840 validate_bank_id(bank_id)
1841 if not entity_ids:
1842 return []
1843 graph_store = getattr(self._pipeline, "graph_store", None) if self._pipeline else None
1844 if graph_store is None:
1845 raise ConfigError(
1846 "graph_neighbors() requires a GraphStore. Configure a graph_store provider in astrocyte.yaml."
1847 )
1848 return await graph_store.query_neighbors(entity_ids, bank_id, max_depth=max_depth, limit=limit)
1850 async def history(
1851 self,
1852 query: str,
1853 bank_id: str,
1854 as_of: datetime,
1855 *,
1856 max_results: int = 10,
1857 max_tokens: int | None = None,
1858 tags: list[str] | None = None,
1859 ) -> HistoryResult:
1860 """Reconstruct what the agent knew at a past point in time (M9 time travel).
1862 Returns memories that existed in *bank_id* at the moment *as_of* — i.e.
1863 only memories whose ``retained_at`` timestamp is on or before *as_of*.
1864 Memories retained after *as_of* are excluded, giving a faithful snapshot
1865 of the agent's knowledge at that instant.
1867 Args:
1868 query: The recall query to run against the historical snapshot.
1869 bank_id: Bank to query.
1870 as_of: UTC datetime. Memories retained after this moment are hidden.
1871 max_results: Maximum number of hits to return.
1872 max_tokens: Optional token budget for the result set.
1873 tags: Optional tag filter (applied on top of the time filter).
1875 Returns:
1876 :class:`~astrocyte.types.HistoryResult` with hits and the ``as_of``
1877 timestamp embedded for traceability.
1879 Raises:
1880 ConfigError: If no pipeline is configured (no vector store to query).
1882 Example::
1884 from datetime import datetime, UTC
1885 snapshot = await brain.history(
1886 "What did we know about Alice?",
1887 bank_id="user-alice",
1888 as_of=datetime(2025, 1, 1, tzinfo=UTC),
1889 )
1890 for hit in snapshot.hits:
1891 print(hit.retained_at, hit.text)
1892 """
1893 recall_result = await self.recall(
1894 query,
1895 bank_id=bank_id,
1896 max_results=max_results,
1897 max_tokens=max_tokens,
1898 tags=tags,
1899 as_of=as_of,
1900 )
1901 return HistoryResult(
1902 hits=recall_result.hits,
1903 total_available=recall_result.total_available,
1904 truncated=recall_result.truncated,
1905 as_of=as_of,
1906 bank_id=bank_id,
1907 trace=recall_result.trace,
1908 )
1910 async def audit(
1911 self,
1912 scope: str,
1913 bank_id: str,
1914 *,
1915 max_memories: int = 50,
1916 max_tokens: int | None = None,
1917 tags: list[str] | None = None,
1918 ) -> AuditResult:
1919 """Identify knowledge gaps for a topic in a memory bank (M10 gap analysis).
1921 Recalls up to *max_memories* relevant memories for *scope*, then calls
1922 an LLM audit judge to assess what is missing or under-covered. The
1923 result includes a list of :class:`~astrocyte.types.GapItem` objects and
1924 a ``coverage_score`` between 0 (empty bank) and 1 (comprehensive).
1926 This is a diagnostic operation — it does not modify any stored memory.
1927 It does consume LLM tokens proportional to the number of memories scanned.
1929 Args:
1930 scope: Natural-language description of the topic to audit
1931 (e.g. ``"Alice's employment history"``).
1932 bank_id: Bank to audit.
1933 max_memories: Maximum number of memories to retrieve and pass to
1934 the audit judge. Defaults to ``50``.
1935 max_tokens: Optional token budget applied to retrieved memories
1936 before the judge call.
1937 tags: Optional tag filter to narrow which memories are retrieved.
1939 Returns:
1940 :class:`~astrocyte.types.AuditResult` with gaps and coverage score.
1942 Raises:
1943 ConfigError: If no pipeline is configured.
1945 Example::
1947 result = await brain.audit(
1948 "Alice's employment history",
1949 bank_id="user-alice",
1950 )
1951 print(f"Coverage: {result.coverage_score:.0%}")
1952 for gap in result.gaps:
1953 print(f"[{gap.severity}] {gap.topic}: {gap.reason}")
1954 """
1955 from astrocyte.pipeline.audit import run_audit
1957 recall_result = await self.recall(
1958 scope,
1959 bank_id=bank_id,
1960 max_results=max_memories,
1961 max_tokens=max_tokens,
1962 tags=tags,
1963 )
1965 pipeline = self._pipeline
1966 if pipeline is None:
1967 from astrocyte.exceptions import ConfigError
1969 raise ConfigError("No pipeline configured — call set_pipeline() first.")
1971 return await run_audit(
1972 scope=scope,
1973 bank_id=bank_id,
1974 memories=recall_result.hits,
1975 llm_provider=pipeline.llm_provider,
1976 trace=recall_result.trace,
1977 )
1979 async def health(self) -> HealthStatus:
1980 """Check system health."""
1981 with timed() as t:
1982 if self._engine_provider:
1983 status = await self._engine_provider.health()
1984 elif self._pipeline:
1985 status = await self._pipeline.vector_store.health()
1986 else:
1987 status = HealthStatus(healthy=True, message="No provider configured")
1988 status.latency_ms = t["elapsed_ms"]
1989 return status
1991 # ---------------------------------------------------------------------------
1992 # Lifecycle — legal hold + TTL
1993 # ---------------------------------------------------------------------------
1995 def set_legal_hold(self, bank_id: str, hold_id: str, reason: str, *, set_by: str = "user:api") -> LegalHold:
1996 """Place a bank under legal hold. Blocks forget() until released."""
1997 return self._lifecycle.set_legal_hold(bank_id, hold_id, reason, set_by=set_by)
1999 def release_legal_hold(self, bank_id: str, hold_id: str) -> bool:
2000 """Release a legal hold from a bank. Returns True if hold existed."""
2001 return self._lifecycle.release_legal_hold(bank_id, hold_id)
2003 def is_under_hold(self, bank_id: str) -> bool:
2004 """Check if bank is under legal hold."""
2005 return self._lifecycle.is_under_hold(bank_id)
2007 async def run_lifecycle(self, bank_id: str) -> LifecycleRunResult:
2008 """Run TTL lifecycle check on a bank. Scan memories, archive/delete as needed.
2010 Note: v1 treats "archive" as delete — no separate archive storage yet.
2011 The LifecycleAction.action distinguishes the reason (ttl_unretrieved vs ttl_expired)
2012 so callers can differentiate, but both result in deletion from the provider.
2013 """
2014 from astrocyte.types import LifecycleAction
2016 if not self._config.lifecycle.enabled:
2017 return LifecycleRunResult(archived_count=0, deleted_count=0, skipped_count=0, actions=[])
2019 now = datetime.now(timezone.utc)
2020 actions: list[LifecycleAction] = []
2021 to_delete: list[str] = []
2023 # Scan memories via paginated list_vectors (avoids query="*" + 10K limit)
2024 vector_store = self._pipeline.vector_store if self._pipeline else None
2025 if vector_store and hasattr(vector_store, "list_vectors"):
2026 scan_offset = 0
2027 scan_batch = 200
2028 while True:
2029 items = await vector_store.list_vectors(bank_id, offset=scan_offset, limit=scan_batch)
2030 if not items:
2031 break
2032 for item in items:
2033 created_at = item.metadata.get("_created_at") if item.metadata else None
2034 last_recalled = item.metadata.get("_last_recalled_at") if item.metadata else None
2035 if isinstance(created_at, str):
2036 created_at = datetime.fromisoformat(created_at)
2037 if isinstance(last_recalled, str):
2038 last_recalled = datetime.fromisoformat(last_recalled)
2039 action = self._lifecycle.evaluate_memory_ttl(
2040 memory_id=item.id,
2041 bank_id=bank_id,
2042 created_at=created_at,
2043 last_recalled_at=last_recalled,
2044 tags=item.metadata.get("_tags", "").split(",")
2045 if item.metadata and item.metadata.get("_tags")
2046 else None,
2047 fact_type=item.metadata.get("_fact_type") if item.metadata else None,
2048 now=now,
2049 )
2050 actions.append(action)
2051 if action.action in ("delete", "archive"):
2052 to_delete.append(item.id)
2053 scan_offset += len(items)
2054 if len(items) < scan_batch:
2055 break
2056 if scan_offset > 100_000:
2057 logger.warning("Lifecycle scan capped at 100k vectors for bank %s", _safe_log(bank_id))
2058 break
2059 else:
2060 # Fallback for engine-only or stores without list_vectors
2061 logger.warning(
2062 "Lifecycle scan using query='*' fallback for bank %s — "
2063 "results may be incomplete. Use a pipeline with list_vectors support for full coverage.",
2064 _safe_log(bank_id),
2065 )
2066 result = await self._dispatcher.recall(RecallRequest(query="*", bank_id=bank_id, max_results=10000))
2067 for hit in result.hits:
2068 created_at = hit.metadata.get("_created_at") if hit.metadata else None
2069 last_recalled = hit.metadata.get("_last_recalled_at") if hit.metadata else None
2070 if isinstance(created_at, str):
2071 created_at = datetime.fromisoformat(created_at)
2072 if isinstance(last_recalled, str):
2073 last_recalled = datetime.fromisoformat(last_recalled)
2074 action = self._lifecycle.evaluate_memory_ttl(
2075 memory_id=hit.memory_id or "",
2076 bank_id=bank_id,
2077 created_at=created_at,
2078 last_recalled_at=last_recalled,
2079 tags=hit.tags,
2080 fact_type=hit.fact_type,
2081 now=now,
2082 )
2083 actions.append(action)
2084 if action.action in ("delete", "archive"):
2085 to_delete.append(hit.memory_id or "")
2087 # Batch delete/archive
2088 deleted = 0
2089 if to_delete:
2090 forget_result = await self._dispatcher.forget(ForgetRequest(bank_id=bank_id, memory_ids=to_delete))
2091 deleted = forget_result.deleted_count
2093 archived = sum(1 for a in actions if a.action == "archive")
2094 skipped = sum(1 for a in actions if a.action == "keep")
2096 # Run dedup consolidation if pipeline has a vector store
2097 consolidation_removed = 0
2098 if self._pipeline and self._pipeline.vector_store:
2099 from astrocyte.pipeline.consolidation import run_consolidation
2101 cons_result = await run_consolidation(
2102 self._pipeline.vector_store,
2103 bank_id,
2104 graph_store=getattr(self._pipeline, "graph_store", None),
2105 )
2106 consolidation_removed = cons_result.duplicates_removed
2108 return LifecycleRunResult(
2109 archived_count=archived,
2110 deleted_count=deleted + consolidation_removed,
2111 skipped_count=skipped,
2112 actions=actions,
2113 )
2115 # ---------------------------------------------------------------------------
2116 # Bank health & analytics
2117 # ---------------------------------------------------------------------------
2119 async def bank_health(self, bank_id: str) -> "BankHealth":
2120 """Compute health score and issues for a bank.
2122 Uses in-memory operation counters collected since process start.
2123 Optionally enriches with memory count from the vector store.
2124 """
2125 counters = self._analytics.get_counters(bank_id)
2126 memory_count = 0
2127 if self._pipeline and self._pipeline.vector_store:
2128 try:
2129 items = await self._pipeline.vector_store.list_vectors(bank_id, limit=0)
2130 memory_count = len(items)
2131 except Exception:
2132 logger.debug(
2133 "list_vectors(limit=0) failed or unsupported; bank_health memory_count=0",
2134 exc_info=True,
2135 )
2136 return compute_bank_health(bank_id, counters, memory_count)
2138 async def all_bank_health(self) -> list["BankHealth"]:
2139 """Compute health for all banks that have recorded operations."""
2140 results = []
2141 for bid in self._analytics.bank_ids():
2142 results.append(await self.bank_health(bid))
2143 return results
2145 def bank_quality_snapshot(self, bank_id: str) -> "QualityDataPoint":
2146 """Return a QualityDataPoint snapshot of current counters for trend tracking."""
2147 counters = self._analytics.get_counters(bank_id)
2148 return counters_to_quality_point(counters)
2150 # ---------------------------------------------------------------------------
2151 # Memory portability
2152 # ---------------------------------------------------------------------------
2154 async def export_bank(
2155 self,
2156 bank_id: str,
2157 path: str,
2158 *,
2159 include_embeddings: bool = False,
2160 include_entities: bool = True,
2161 allowed_roots: list[str] | None = None,
2162 allow_uncontained: bool = False,
2163 context: AstrocyteContext | None = None,
2164 ) -> int:
2165 """Export a memory bank to AMA (Astrocyte Memory Archive) JSONL format.
2167 ``allowed_roots`` / ``allow_uncontained`` propagate to the path
2168 containment check in ``astrocyte.portability``. By default
2169 (no roots configured and ``allow_uncontained=False``), the
2170 export refuses to run — set ``ASTROCYTE_PORTABILITY_ROOTS``,
2171 pass ``allowed_roots=[<dir>]``, or set ``allow_uncontained=True``.
2173 Returns the number of memories exported.
2174 """
2175 from astrocyte.portability import export_bank as _export
2177 self._policy.check_access(bank_id, "admin", context)
2179 count = await _export(
2180 recall_fn=self._dispatcher.recall,
2181 bank_id=bank_id,
2182 path=path,
2183 provider_name=self._provider_name,
2184 include_embeddings=include_embeddings,
2185 include_entities=include_entities,
2186 allowed_roots=allowed_roots,
2187 allow_uncontained=allow_uncontained,
2188 )
2189 await self._hook_manager.fire("on_export", bank_id=bank_id, data={"memory_count": count, "path": path})
2190 return count
2192 async def import_bank(
2193 self,
2194 bank_id: str,
2195 path: str,
2196 *,
2197 on_conflict: str = "skip",
2198 allowed_roots: list[str] | None = None,
2199 allow_uncontained: bool = False,
2200 context: AstrocyteContext | None = None,
2201 progress_fn: Any = None,
2202 ) -> Any:
2203 """Import memories from an AMA JSONL file into a bank.
2205 See ``export_bank`` for ``allowed_roots`` / ``allow_uncontained`` semantics.
2207 Returns an ImportResult with imported/skipped/errors counts.
2208 """
2209 from astrocyte.portability import ImportResult
2210 from astrocyte.portability import import_bank as _import
2212 self._policy.check_access(bank_id, "admin", context)
2214 result: ImportResult = await _import(
2215 retain_fn=self._dispatcher.retain,
2216 bank_id=bank_id,
2217 path=path,
2218 on_conflict=on_conflict,
2219 progress_fn=progress_fn,
2220 allowed_roots=allowed_roots,
2221 allow_uncontained=allow_uncontained,
2222 )
2223 await self._hook_manager.fire(
2224 "on_import",
2225 bank_id=bank_id,
2226 data={
2227 "imported": result.imported,
2228 "skipped": result.skipped,
2229 "errors": result.errors,
2230 },
2231 )
2232 return result
2234 # ---------------------------------------------------------------------------
2235 # Internal routing
2236 # ---------------------------------------------------------------------------
2238 @property
2239 def _provider_name(self) -> str:
2240 return self._config.provider or "pipeline"
2242 # ---------------------------------------------------------------------------
2243 # Backwards-compat thin wrappers (tests access these directly)
2244 # ---------------------------------------------------------------------------
2246 @property
2247 def _tiered_retriever(self):
2248 """Expose tiered retriever for testing/introspection."""
2249 return self._dispatcher.tiered_retriever
2251 async def _do_retain(self, request: RetainRequest) -> RetainResult:
2252 return await self._dispatcher.retain(request)
2254 async def _do_recall(self, request: RecallRequest) -> RecallResult:
2255 return await self._dispatcher.recall(request)
2257 async def _do_forget(self, request: ForgetRequest) -> ForgetResult:
2258 return await self._dispatcher.forget(request)