Coverage for astrocyte/analytics.py: 98%
105 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""Bank health analytics — in-memory metric collection and health scoring.
3Collects per-bank operation counters and computes composite health scores.
4See docs/_design/memory-analytics.md for the full specification.
5"""
7from __future__ import annotations
9import time
10from collections import defaultdict
11from dataclasses import dataclass, field
12from datetime import date, datetime, timezone
14from astrocyte.types import BankHealth, HealthIssue, QualityDataPoint
16# ---------------------------------------------------------------------------
17# Per-bank operation counters (in-memory, reset on restart)
18# ---------------------------------------------------------------------------
21@dataclass
22class _BankCounters:
23 """Rolling counters for a single bank."""
25 retain_count: int = 0
26 recall_count: int = 0
27 reflect_count: int = 0
28 recall_hits: int = 0 # recalls that returned >= 1 result
29 dedup_count: int = 0 # retains that were near-duplicates
30 reflect_success: int = 0 # reflects that produced non-empty answer
31 total_recall_score: float = 0.0 # sum of top-hit scores
32 total_content_length: int = 0 # sum of retained content lengths
33 created_at: float = field(default_factory=time.monotonic)
36class BankMetricsCollector:
37 """Collects per-bank operation metrics in memory.
39 Thread-safe for single-event-loop async usage (no locks needed).
40 Counters reset on process restart — this is operational analytics,
41 not durable history.
42 """
44 def __init__(self) -> None:
45 self._banks: dict[str, _BankCounters] = defaultdict(_BankCounters)
47 def record_retain(self, bank_id: str, content_length: int, *, deduplicated: bool = False) -> None:
48 c = self._banks[bank_id]
49 c.retain_count += 1
50 c.total_content_length += content_length
51 if deduplicated:
52 c.dedup_count += 1
54 def record_recall(self, bank_id: str, hit_count: int, top_score: float = 0.0) -> None:
55 c = self._banks[bank_id]
56 c.recall_count += 1
57 if hit_count > 0:
58 c.recall_hits += 1
59 c.total_recall_score += top_score
61 def record_reflect(self, bank_id: str, *, success: bool) -> None:
62 c = self._banks[bank_id]
63 c.reflect_count += 1
64 if success:
65 c.reflect_success += 1
67 def get_counters(self, bank_id: str) -> _BankCounters:
68 return self._banks[bank_id]
70 def bank_ids(self) -> list[str]:
71 return list(self._banks.keys())
73 def reset(self, bank_id: str | None = None) -> None:
74 if bank_id:
75 self._banks.pop(bank_id, None)
76 else:
77 self._banks.clear()
80# ---------------------------------------------------------------------------
81# Health scoring
82# ---------------------------------------------------------------------------
84# Metric weights for composite score (sum to 1.0)
85_WEIGHTS = {
86 "recall_hit_rate": 0.30,
87 "avg_recall_score": 0.20,
88 "dedup_rate_inv": 0.15, # lower dedup = healthier
89 "reflect_success_rate": 0.15,
90 "avg_content_length_norm": 0.10,
91 "recall_to_retain_ratio": 0.10,
92}
95def compute_bank_health(bank_id: str, counters: _BankCounters, memory_count: int = 0) -> BankHealth:
96 """Compute a composite health score for a bank from its counters.
98 Returns BankHealth with score 0.0–1.0, status, and actionable issues.
99 """
100 issues: list[HealthIssue] = []
101 metrics: dict[str, float] = {}
103 # -- Recall hit rate --
104 if counters.recall_count > 0:
105 hit_rate = counters.recall_hits / counters.recall_count
106 else:
107 hit_rate = 0.0
108 metrics["recall_hit_rate"] = hit_rate
109 if counters.recall_count >= 5 and hit_rate < 0.3:
110 issues.append(
111 HealthIssue(
112 severity="critical",
113 code="LOW_RECALL_HIT_RATE",
114 message=f"Recall hit rate is {hit_rate:.0%} (< 30%)",
115 recommendation="Check that retained content matches expected recall queries. "
116 "Review embedding model quality.",
117 )
118 )
119 elif counters.recall_count >= 5 and hit_rate < 0.6:
120 issues.append(
121 HealthIssue(
122 severity="warning",
123 code="LOW_RECALL_HIT_RATE",
124 message=f"Recall hit rate is {hit_rate:.0%} (< 60%)",
125 recommendation="Review retained content relevance and embedding quality.",
126 )
127 )
129 # -- Average recall score --
130 if counters.recall_hits > 0:
131 avg_score = counters.total_recall_score / counters.recall_hits
132 else:
133 avg_score = 0.0
134 metrics["avg_recall_score"] = avg_score
136 # -- Dedup rate --
137 if counters.retain_count > 0:
138 dedup_rate = counters.dedup_count / counters.retain_count
139 else:
140 dedup_rate = 0.0
141 metrics["dedup_rate"] = dedup_rate
142 if counters.retain_count >= 10 and dedup_rate > 0.5:
143 issues.append(
144 HealthIssue(
145 severity="warning",
146 code="HIGH_DEDUP_RATE",
147 message=f"Dedup rate is {dedup_rate:.0%} (> 50%)",
148 recommendation="Agent may be retaining duplicate content. "
149 "Check for retain loops or missing dedup at the application layer.",
150 )
151 )
153 # -- Reflect success rate --
154 if counters.reflect_count > 0:
155 reflect_rate = counters.reflect_success / counters.reflect_count
156 else:
157 reflect_rate = 1.0 # no reflects = no failures
158 metrics["reflect_success_rate"] = reflect_rate
159 if counters.reflect_count >= 3 and reflect_rate < 0.5:
160 issues.append(
161 HealthIssue(
162 severity="warning",
163 code="LOW_REFLECT_SUCCESS",
164 message=f"Reflect success rate is {reflect_rate:.0%} (< 50%)",
165 recommendation="Check that the bank has enough relevant content for synthesis.",
166 )
167 )
169 # -- Average content length --
170 if counters.retain_count > 0:
171 avg_len = counters.total_content_length / counters.retain_count
172 else:
173 avg_len = 0.0
174 metrics["avg_content_length"] = avg_len
175 if counters.retain_count >= 5 and avg_len < 20:
176 issues.append(
177 HealthIssue(
178 severity="info",
179 code="SHORT_CONTENT",
180 message=f"Average content length is {avg_len:.0f} chars (< 20)",
181 recommendation="Very short content may not embed well. Consider batching related facts.",
182 )
183 )
185 # -- Recall-to-retain ratio --
186 if counters.retain_count > 0:
187 rr_ratio = counters.recall_count / counters.retain_count
188 else:
189 rr_ratio = 0.0
190 metrics["recall_to_retain_ratio"] = rr_ratio
192 metrics["retain_count"] = float(counters.retain_count)
193 metrics["recall_count"] = float(counters.recall_count)
194 metrics["reflect_count"] = float(counters.reflect_count)
195 metrics["memory_count"] = float(memory_count)
197 # -- Composite score --
198 # Normalize each metric to 0.0–1.0, then weight
199 components = {
200 "recall_hit_rate": hit_rate,
201 "avg_recall_score": min(avg_score, 1.0),
202 "dedup_rate_inv": 1.0 - min(dedup_rate, 1.0),
203 "reflect_success_rate": reflect_rate,
204 "avg_content_length_norm": min(avg_len / 200.0, 1.0), # 200 chars = perfect
205 "recall_to_retain_ratio": min(rr_ratio / 2.0, 1.0), # 2:1 ratio = perfect
206 }
208 # If no operations yet, score is neutral
209 total_ops = counters.retain_count + counters.recall_count + counters.reflect_count
210 if total_ops == 0:
211 score = 0.5
212 else:
213 score = sum(components[k] * _WEIGHTS[k] for k in _WEIGHTS)
215 # Map to status
216 if score >= 0.7:
217 status: str = "healthy"
218 elif score >= 0.4:
219 status = "warning"
220 else:
221 status = "unhealthy"
223 return BankHealth(
224 bank_id=bank_id,
225 score=round(score, 4),
226 status=status, # type: ignore[arg-type]
227 issues=issues,
228 metrics=metrics,
229 assessed_at=datetime.now(timezone.utc),
230 )
233def counters_to_quality_point(counters: _BankCounters) -> QualityDataPoint:
234 """Snapshot current counters as a QualityDataPoint for trend tracking."""
235 rc = counters.recall_count
236 hit_rate = counters.recall_hits / rc if rc > 0 else 0.0
237 avg_score = counters.total_recall_score / counters.recall_hits if counters.recall_hits > 0 else 0.0
238 dedup_rate = counters.dedup_count / counters.retain_count if counters.retain_count > 0 else 0.0
239 reflect_rate = counters.reflect_success / counters.reflect_count if counters.reflect_count > 0 else 1.0
241 return QualityDataPoint(
242 date=date.today(),
243 retain_count=counters.retain_count,
244 recall_count=counters.recall_count,
245 recall_hit_rate=round(hit_rate, 4),
246 avg_recall_score=round(avg_score, 4),
247 dedup_rate=round(dedup_rate, 4),
248 reflect_success_rate=round(reflect_rate, 4),
249 )