Coverage for astrocyte/cli.py: 94%
141 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""Astrocyte CLI — operator tooling for MIP configs.
3Commands:
4 astrocyte mip lint <path> Validate a mip.yaml, printing errors and warnings.
5 astrocyte mip explain <path> ... Show which rule fires for a hypothetical input.
7The CLI is intentionally minimal: argparse-based, no external dependencies, and
8reuses the same loader / router code that runtime uses. It is the recommended
9way to verify pipeline overrides (chunker, dedup, rerank, reflect) before
10deploying a rule change.
11"""
13from __future__ import annotations
15import argparse
16import dataclasses
17import sys
18import warnings
19from pathlib import Path
20from typing import Sequence
22from astrocyte.errors import ConfigError
23from astrocyte.mip import MipRouter, load_mip_config
24from astrocyte.mip.rule_engine import RuleEngineInput, evaluate_rules
25from astrocyte.mip.schema import ForgetSpec, PipelineSpec
26from astrocyte.types import MetadataValue
28# ---------------------------------------------------------------------------
29# astrocyte mip lint
30# ---------------------------------------------------------------------------
33def _cmd_mip_lint(args: argparse.Namespace) -> int:
34 """Load and validate a mip.yaml. Returns 0 if clean, 1 on any error."""
35 path = Path(args.path)
36 print(f"Linting MIP config: {path}")
38 captured: list[warnings.WarningMessage] = []
39 try:
40 with warnings.catch_warnings(record=True) as caught:
41 warnings.simplefilter("always")
42 config = load_mip_config(path)
43 captured = list(caught)
44 except ConfigError as exc:
45 print(f" error: {exc}", file=sys.stderr)
46 return 1
47 except FileNotFoundError as exc:
48 print(f" error: {exc}", file=sys.stderr)
49 return 1
51 rule_count = len(config.rules or [])
52 bank_count = len(config.banks or [])
53 print(f" ok: {rule_count} rule(s), {bank_count} bank(s)")
55 if captured:
56 print(f" {len(captured)} warning(s):")
57 for w in captured:
58 print(f" - {w.message}")
59 return 0
62# ---------------------------------------------------------------------------
63# astrocyte mip explain
64# ---------------------------------------------------------------------------
67def _parse_kv(items: Sequence[str] | None) -> dict[str, MetadataValue]:
68 """Parse a list of ``key=value`` strings into a metadata dict.
70 Numeric and boolean values are coerced; everything else stays as a string.
71 """
72 out: dict[str, MetadataValue] = {}
73 if not items:
74 return out
75 for raw in items:
76 if "=" not in raw:
77 raise SystemExit(f"Invalid --metadata entry (expected KEY=VALUE): {raw!r}")
78 key, _, val = raw.partition("=")
79 key = key.strip()
80 val = val.strip()
81 coerced: MetadataValue
82 if val.lower() in ("true", "false"):
83 coerced = val.lower() == "true"
84 else:
85 try:
86 coerced = int(val)
87 except ValueError:
88 try:
89 coerced = float(val)
90 except ValueError:
91 coerced = val
92 out[key] = coerced
93 return out
96def _format_pipeline(pipeline: PipelineSpec | None) -> list[str]:
97 """Render a PipelineSpec as a list of indented printable lines."""
98 if pipeline is None:
99 return []
100 lines: list[str] = [" pipeline:"]
101 if pipeline.version is not None:
102 lines.append(f" version: {pipeline.version}")
103 for field in ("chunker", "dedup", "rerank", "reflect"):
104 spec = getattr(pipeline, field)
105 if spec is None:
106 continue
107 # Show only fields that are set
108 set_fields = {
109 f.name: getattr(spec, f.name) for f in dataclasses.fields(spec) if getattr(spec, f.name) is not None
110 }
111 if set_fields:
112 lines.append(f" {field}: {set_fields}")
113 return lines
116def _format_forget(forget: ForgetSpec | None) -> list[str]:
117 """Render a ForgetSpec as a list of indented printable lines."""
118 if forget is None:
119 return []
120 lines: list[str] = [" forget:"]
121 set_fields = {
122 f.name: getattr(forget, f.name) for f in dataclasses.fields(forget) if getattr(forget, f.name) is not None
123 }
124 for k, v in set_fields.items():
125 lines.append(f" {k}: {v}")
126 return lines
129def _cmd_mip_explain(args: argparse.Namespace) -> int:
130 """Show which rule(s) match a hypothetical input and the resulting decision."""
131 path = Path(args.path)
132 try:
133 config = load_mip_config(path)
134 except (ConfigError, FileNotFoundError) as exc:
135 print(f"error: {exc}", file=sys.stderr)
136 return 1
138 metadata = _parse_kv(args.metadata)
139 rule_input = RuleEngineInput(
140 content=args.content or "",
141 content_type=args.content_type,
142 metadata=metadata or None,
143 tags=list(args.tag) if args.tag else None,
144 pii_detected=args.pii_detected,
145 source=args.source,
146 )
148 print("Input:")
149 print(f" content_type: {rule_input.content_type!r}")
150 print(f" metadata: {rule_input.metadata}")
151 print(f" tags: {rule_input.tags}")
152 print(f" pii_detected: {rule_input.pii_detected}")
153 print(f" source: {rule_input.source!r}")
155 # 1. Show every matching rule (mechanical eval, before escalation policy)
156 sorted_rules = sorted(config.rules or [], key=lambda r: r.priority)
157 matches = evaluate_rules(sorted_rules, rule_input)
158 print(f"\nMatched rules ({len(matches)}):")
159 if not matches:
160 print(" (none)")
161 for m in matches:
162 print(f" - {m.rule.name} (priority={m.rule.priority}, override={m.rule.override}, confidence={m.confidence})")
164 # 2. Show what the synchronous router would actually return
165 router = MipRouter(config)
166 decision = router.route_sync(rule_input)
167 print("\nDecision (sync routing):")
168 if decision is None:
169 print(" → escalation required (would call intent layer at runtime)")
170 return 0
172 print(f" resolved_by: {decision.resolved_by}")
173 print(f" rule_name: {decision.rule_name}")
174 print(f" bank_id: {decision.bank_id}")
175 print(f" tags: {decision.tags}")
176 print(f" retain_policy: {decision.retain_policy}")
177 print(f" confidence: {decision.confidence}")
178 for line in _format_pipeline(decision.pipeline):
179 print(line)
180 for line in _format_forget(decision.forget):
181 print(line)
182 return 0
185# ---------------------------------------------------------------------------
186# argparse plumbing
187# ---------------------------------------------------------------------------
190def _build_parser() -> argparse.ArgumentParser:
191 parser = argparse.ArgumentParser(prog="astrocyte", description="Astrocyte operator CLI")
192 sub = parser.add_subparsers(dest="command", required=True)
194 mip = sub.add_parser("mip", help="MIP (Memory Intent Protocol) tools")
195 mip_sub = mip.add_subparsers(dest="mip_command", required=True)
197 lint = mip_sub.add_parser("lint", help="Validate a mip.yaml file")
198 lint.add_argument("path", help="Path to mip.yaml")
199 lint.set_defaults(func=_cmd_mip_lint)
201 explain = mip_sub.add_parser(
202 "explain",
203 help="Show which rule fires for a hypothetical retain input",
204 )
205 explain.add_argument("path", help="Path to mip.yaml")
206 explain.add_argument("--content", default="", help="Inbound content (text body)")
207 explain.add_argument("--content-type", default=None, help="Content type, e.g. text|conversation|document")
208 explain.add_argument("--metadata", action="append", default=[], help="Metadata KEY=VALUE (repeatable)")
209 explain.add_argument("--tag", action="append", default=[], help="Tag (repeatable)")
210 explain.add_argument("--source", default=None, help="Source identifier")
211 explain.add_argument("--pii-detected", action="store_true", help="Mark PII as already detected")
212 explain.set_defaults(func=_cmd_mip_explain)
214 return parser
217def main(argv: Sequence[str] | None = None) -> int:
218 parser = _build_parser()
219 args = parser.parse_args(argv)
220 return int(args.func(args) or 0)
223if __name__ == "__main__":
224 raise SystemExit(main())