Coverage for astrocyte/policy/observability.py: 68%
100 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""Observability policies — OTel spans, structured logging, metrics.
3OTel and Prometheus are optional dependencies. Gracefully degrade to no-op.
4See docs/_design/policy-layer.md section 5.
5"""
7from __future__ import annotations
9import json
10import logging
11import time
12from contextlib import contextmanager
13from dataclasses import asdict, dataclass
14from datetime import datetime, timezone
15from typing import Any, Generator
17logger = logging.getLogger("astrocyte")
19# ---------------------------------------------------------------------------
20# OTel span manager (optional dependency)
21# ---------------------------------------------------------------------------
23try:
24 from opentelemetry import trace
26 _tracer = trace.get_tracer("astrocyte")
27 _HAS_OTEL = True
28except ImportError:
29 _HAS_OTEL = False
30 _tracer = None
33@contextmanager
34def span(name: str, attributes: dict[str, str | int | float | bool] | None = None) -> Generator[Any, None, None]:
35 """Create an OTel span. No-op if opentelemetry is not installed.
37 Usage:
38 with span("astrocyte.recall", {"bank_id": "user-123"}) as s:
39 result = await provider.recall(request)
40 s.set_attribute("result_count", len(result.hits))
41 """
42 if _HAS_OTEL and _tracer is not None:
43 with _tracer.start_as_current_span(name, attributes=attributes or {}) as s:
44 yield s
45 else:
46 yield _NoOpSpan()
49class _NoOpSpan:
50 """No-op span when OTel is not installed."""
52 def set_attribute(self, key: str, value: str | int | float | bool) -> None:
53 pass
55 def add_event(self, name: str, attributes: dict[str, Any] | None = None) -> None:
56 pass
58 def set_status(self, status: Any, description: str | None = None) -> None:
59 pass
62# ---------------------------------------------------------------------------
63# Structured logger
64# ---------------------------------------------------------------------------
67@dataclass
68class LogEntry:
69 event: str
70 timestamp: str
71 bank_id: str | None = None
72 provider: str | None = None
73 operation: str | None = None
74 trace_id: str | None = None
75 data: dict[str, str | int | float | bool | None] | None = None
78class StructuredLogger:
79 """JSON structured logging for policy events.
81 Sync — wraps Python logging module.
82 """
84 def __init__(self, level: str = "info") -> None:
85 self._level = getattr(logging, level.upper(), logging.INFO)
87 def log(
88 self,
89 event: str,
90 bank_id: str | None = None,
91 provider: str | None = None,
92 operation: str | None = None,
93 data: dict[str, str | int | float | bool | None] | None = None,
94 level: int | None = None,
95 ) -> None:
96 """Emit a structured log entry."""
97 entry = LogEntry(
98 event=event,
99 timestamp=datetime.now(timezone.utc).isoformat(),
100 bank_id=bank_id,
101 provider=provider,
102 operation=operation,
103 data=data,
104 )
105 log_level = level or self._level
106 # Remove None values for cleaner output
107 entry_dict = {k: v for k, v in asdict(entry).items() if v is not None}
108 logger.log(log_level, json.dumps(entry_dict))
111# ---------------------------------------------------------------------------
112# Metrics collector (optional Prometheus)
113# ---------------------------------------------------------------------------
115try:
116 from prometheus_client import Counter, Gauge, Histogram
118 _HAS_PROMETHEUS = True
119except ImportError:
120 _HAS_PROMETHEUS = False
123class MetricsCollector:
124 """Prometheus metrics collection. No-op if prometheus_client is not installed."""
126 def __init__(self, enabled: bool = True) -> None:
127 self.enabled = enabled and _HAS_PROMETHEUS
128 self._counters: dict[str, Any] = {}
129 self._histograms: dict[str, Any] = {}
130 self._gauges: dict[str, Any] = {}
132 def _get_counter(self, name: str, description: str, labels: list[str]) -> Any:
133 if not self.enabled:
134 return None
135 if name not in self._counters:
136 self._counters[name] = Counter(name, description, labels)
137 return self._counters[name]
139 def _get_histogram(self, name: str, description: str, labels: list[str]) -> Any:
140 if not self.enabled:
141 return None
142 if name not in self._histograms:
143 self._histograms[name] = Histogram(name, description, labels)
144 return self._histograms[name]
146 def _get_gauge(self, name: str, description: str, labels: list[str]) -> Any:
147 if not self.enabled:
148 return None
149 if name not in self._gauges:
150 self._gauges[name] = Gauge(name, description, labels)
151 return self._gauges[name]
153 def inc_counter(self, name: str, labels: dict[str, str], description: str = "") -> None:
154 """Increment a counter."""
155 if not self.enabled:
156 return
157 counter = self._get_counter(name, description, list(labels.keys()))
158 if counter:
159 counter.labels(**labels).inc()
161 def observe_histogram(self, name: str, value: float, labels: dict[str, str], description: str = "") -> None:
162 """Record a histogram observation."""
163 if not self.enabled:
164 return
165 histogram = self._get_histogram(name, description, list(labels.keys()))
166 if histogram:
167 histogram.labels(**labels).observe(value)
169 def set_gauge(self, name: str, value: float, labels: dict[str, str], description: str = "") -> None:
170 """Set a gauge value."""
171 if not self.enabled:
172 return
173 gauge = self._get_gauge(name, description, list(labels.keys()))
174 if gauge:
175 gauge.labels(**labels).set(value)
178# ---------------------------------------------------------------------------
179# Timer utility
180# ---------------------------------------------------------------------------
183@contextmanager
184def timed() -> Generator[dict[str, float], None, None]:
185 """Context manager that tracks elapsed time in milliseconds.
187 Usage:
188 with timed() as t:
189 do_work()
190 print(f"Took {t['elapsed_ms']:.1f}ms")
191 """
192 result: dict[str, float] = {"elapsed_ms": 0.0}
193 start = time.monotonic()
194 try:
195 yield result
196 finally:
197 result["elapsed_ms"] = (time.monotonic() - start) * 1000