Coverage for astrocyte/cli.py: 94%

141 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""Astrocyte CLI — operator tooling for MIP configs. 

2 

3Commands: 

4 astrocyte mip lint <path> Validate a mip.yaml, printing errors and warnings. 

5 astrocyte mip explain <path> ... Show which rule fires for a hypothetical input. 

6 

7The CLI is intentionally minimal: argparse-based, no external dependencies, and 

8reuses the same loader / router code that runtime uses. It is the recommended 

9way to verify pipeline overrides (chunker, dedup, rerank, reflect) before 

10deploying a rule change. 

11""" 

12 

13from __future__ import annotations 

14 

15import argparse 

16import dataclasses 

17import sys 

18import warnings 

19from pathlib import Path 

20from typing import Sequence 

21 

22from astrocyte.errors import ConfigError 

23from astrocyte.mip import MipRouter, load_mip_config 

24from astrocyte.mip.rule_engine import RuleEngineInput, evaluate_rules 

25from astrocyte.mip.schema import ForgetSpec, PipelineSpec 

26from astrocyte.types import MetadataValue 

27 

28# --------------------------------------------------------------------------- 

29# astrocyte mip lint 

30# --------------------------------------------------------------------------- 

31 

32 

33def _cmd_mip_lint(args: argparse.Namespace) -> int: 

34 """Load and validate a mip.yaml. Returns 0 if clean, 1 on any error.""" 

35 path = Path(args.path) 

36 print(f"Linting MIP config: {path}") 

37 

38 captured: list[warnings.WarningMessage] = [] 

39 try: 

40 with warnings.catch_warnings(record=True) as caught: 

41 warnings.simplefilter("always") 

42 config = load_mip_config(path) 

43 captured = list(caught) 

44 except ConfigError as exc: 

45 print(f" error: {exc}", file=sys.stderr) 

46 return 1 

47 except FileNotFoundError as exc: 

48 print(f" error: {exc}", file=sys.stderr) 

49 return 1 

50 

51 rule_count = len(config.rules or []) 

52 bank_count = len(config.banks or []) 

53 print(f" ok: {rule_count} rule(s), {bank_count} bank(s)") 

54 

55 if captured: 

56 print(f" {len(captured)} warning(s):") 

57 for w in captured: 

58 print(f" - {w.message}") 

59 return 0 

60 

61 

62# --------------------------------------------------------------------------- 

63# astrocyte mip explain 

64# --------------------------------------------------------------------------- 

65 

66 

67def _parse_kv(items: Sequence[str] | None) -> dict[str, MetadataValue]: 

68 """Parse a list of ``key=value`` strings into a metadata dict. 

69 

70 Numeric and boolean values are coerced; everything else stays as a string. 

71 """ 

72 out: dict[str, MetadataValue] = {} 

73 if not items: 

74 return out 

75 for raw in items: 

76 if "=" not in raw: 

77 raise SystemExit(f"Invalid --metadata entry (expected KEY=VALUE): {raw!r}") 

78 key, _, val = raw.partition("=") 

79 key = key.strip() 

80 val = val.strip() 

81 coerced: MetadataValue 

82 if val.lower() in ("true", "false"): 

83 coerced = val.lower() == "true" 

84 else: 

85 try: 

86 coerced = int(val) 

87 except ValueError: 

88 try: 

89 coerced = float(val) 

90 except ValueError: 

91 coerced = val 

92 out[key] = coerced 

93 return out 

94 

95 

96def _format_pipeline(pipeline: PipelineSpec | None) -> list[str]: 

97 """Render a PipelineSpec as a list of indented printable lines.""" 

98 if pipeline is None: 

99 return [] 

100 lines: list[str] = [" pipeline:"] 

101 if pipeline.version is not None: 

102 lines.append(f" version: {pipeline.version}") 

103 for field in ("chunker", "dedup", "rerank", "reflect"): 

104 spec = getattr(pipeline, field) 

105 if spec is None: 

106 continue 

107 # Show only fields that are set 

108 set_fields = { 

109 f.name: getattr(spec, f.name) for f in dataclasses.fields(spec) if getattr(spec, f.name) is not None 

110 } 

111 if set_fields: 

112 lines.append(f" {field}: {set_fields}") 

113 return lines 

114 

115 

116def _format_forget(forget: ForgetSpec | None) -> list[str]: 

117 """Render a ForgetSpec as a list of indented printable lines.""" 

118 if forget is None: 

119 return [] 

120 lines: list[str] = [" forget:"] 

121 set_fields = { 

122 f.name: getattr(forget, f.name) for f in dataclasses.fields(forget) if getattr(forget, f.name) is not None 

123 } 

124 for k, v in set_fields.items(): 

125 lines.append(f" {k}: {v}") 

126 return lines 

127 

128 

129def _cmd_mip_explain(args: argparse.Namespace) -> int: 

130 """Show which rule(s) match a hypothetical input and the resulting decision.""" 

131 path = Path(args.path) 

132 try: 

133 config = load_mip_config(path) 

134 except (ConfigError, FileNotFoundError) as exc: 

135 print(f"error: {exc}", file=sys.stderr) 

136 return 1 

137 

138 metadata = _parse_kv(args.metadata) 

139 rule_input = RuleEngineInput( 

140 content=args.content or "", 

141 content_type=args.content_type, 

142 metadata=metadata or None, 

143 tags=list(args.tag) if args.tag else None, 

144 pii_detected=args.pii_detected, 

145 source=args.source, 

146 ) 

147 

148 print("Input:") 

149 print(f" content_type: {rule_input.content_type!r}") 

150 print(f" metadata: {rule_input.metadata}") 

151 print(f" tags: {rule_input.tags}") 

152 print(f" pii_detected: {rule_input.pii_detected}") 

153 print(f" source: {rule_input.source!r}") 

154 

155 # 1. Show every matching rule (mechanical eval, before escalation policy) 

156 sorted_rules = sorted(config.rules or [], key=lambda r: r.priority) 

157 matches = evaluate_rules(sorted_rules, rule_input) 

158 print(f"\nMatched rules ({len(matches)}):") 

159 if not matches: 

160 print(" (none)") 

161 for m in matches: 

162 print(f" - {m.rule.name} (priority={m.rule.priority}, override={m.rule.override}, confidence={m.confidence})") 

163 

164 # 2. Show what the synchronous router would actually return 

165 router = MipRouter(config) 

166 decision = router.route_sync(rule_input) 

167 print("\nDecision (sync routing):") 

168 if decision is None: 

169 print(" → escalation required (would call intent layer at runtime)") 

170 return 0 

171 

172 print(f" resolved_by: {decision.resolved_by}") 

173 print(f" rule_name: {decision.rule_name}") 

174 print(f" bank_id: {decision.bank_id}") 

175 print(f" tags: {decision.tags}") 

176 print(f" retain_policy: {decision.retain_policy}") 

177 print(f" confidence: {decision.confidence}") 

178 for line in _format_pipeline(decision.pipeline): 

179 print(line) 

180 for line in _format_forget(decision.forget): 

181 print(line) 

182 return 0 

183 

184 

185# --------------------------------------------------------------------------- 

186# argparse plumbing 

187# --------------------------------------------------------------------------- 

188 

189 

190def _build_parser() -> argparse.ArgumentParser: 

191 parser = argparse.ArgumentParser(prog="astrocyte", description="Astrocyte operator CLI") 

192 sub = parser.add_subparsers(dest="command", required=True) 

193 

194 mip = sub.add_parser("mip", help="MIP (Memory Intent Protocol) tools") 

195 mip_sub = mip.add_subparsers(dest="mip_command", required=True) 

196 

197 lint = mip_sub.add_parser("lint", help="Validate a mip.yaml file") 

198 lint.add_argument("path", help="Path to mip.yaml") 

199 lint.set_defaults(func=_cmd_mip_lint) 

200 

201 explain = mip_sub.add_parser( 

202 "explain", 

203 help="Show which rule fires for a hypothetical retain input", 

204 ) 

205 explain.add_argument("path", help="Path to mip.yaml") 

206 explain.add_argument("--content", default="", help="Inbound content (text body)") 

207 explain.add_argument("--content-type", default=None, help="Content type, e.g. text|conversation|document") 

208 explain.add_argument("--metadata", action="append", default=[], help="Metadata KEY=VALUE (repeatable)") 

209 explain.add_argument("--tag", action="append", default=[], help="Tag (repeatable)") 

210 explain.add_argument("--source", default=None, help="Source identifier") 

211 explain.add_argument("--pii-detected", action="store_true", help="Mark PII as already detected") 

212 explain.set_defaults(func=_cmd_mip_explain) 

213 

214 return parser 

215 

216 

217def main(argv: Sequence[str] | None = None) -> int: 

218 parser = _build_parser() 

219 args = parser.parse_args(argv) 

220 return int(args.func(args) or 0) 

221 

222 

223if __name__ == "__main__": 

224 raise SystemExit(main())