Coverage for astrocyte/policy/observability.py: 68%

100 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""Observability policies — OTel spans, structured logging, metrics. 

2 

3OTel and Prometheus are optional dependencies. Gracefully degrade to no-op. 

4See docs/_design/policy-layer.md section 5. 

5""" 

6 

7from __future__ import annotations 

8 

9import json 

10import logging 

11import time 

12from contextlib import contextmanager 

13from dataclasses import asdict, dataclass 

14from datetime import datetime, timezone 

15from typing import Any, Generator 

16 

17logger = logging.getLogger("astrocyte") 

18 

19# --------------------------------------------------------------------------- 

20# OTel span manager (optional dependency) 

21# --------------------------------------------------------------------------- 

22 

23try: 

24 from opentelemetry import trace 

25 

26 _tracer = trace.get_tracer("astrocyte") 

27 _HAS_OTEL = True 

28except ImportError: 

29 _HAS_OTEL = False 

30 _tracer = None 

31 

32 

33@contextmanager 

34def span(name: str, attributes: dict[str, str | int | float | bool] | None = None) -> Generator[Any, None, None]: 

35 """Create an OTel span. No-op if opentelemetry is not installed. 

36 

37 Usage: 

38 with span("astrocyte.recall", {"bank_id": "user-123"}) as s: 

39 result = await provider.recall(request) 

40 s.set_attribute("result_count", len(result.hits)) 

41 """ 

42 if _HAS_OTEL and _tracer is not None: 

43 with _tracer.start_as_current_span(name, attributes=attributes or {}) as s: 

44 yield s 

45 else: 

46 yield _NoOpSpan() 

47 

48 

49class _NoOpSpan: 

50 """No-op span when OTel is not installed.""" 

51 

52 def set_attribute(self, key: str, value: str | int | float | bool) -> None: 

53 pass 

54 

55 def add_event(self, name: str, attributes: dict[str, Any] | None = None) -> None: 

56 pass 

57 

58 def set_status(self, status: Any, description: str | None = None) -> None: 

59 pass 

60 

61 

62# --------------------------------------------------------------------------- 

63# Structured logger 

64# --------------------------------------------------------------------------- 

65 

66 

67@dataclass 

68class LogEntry: 

69 event: str 

70 timestamp: str 

71 bank_id: str | None = None 

72 provider: str | None = None 

73 operation: str | None = None 

74 trace_id: str | None = None 

75 data: dict[str, str | int | float | bool | None] | None = None 

76 

77 

78class StructuredLogger: 

79 """JSON structured logging for policy events. 

80 

81 Sync — wraps Python logging module. 

82 """ 

83 

84 def __init__(self, level: str = "info") -> None: 

85 self._level = getattr(logging, level.upper(), logging.INFO) 

86 

87 def log( 

88 self, 

89 event: str, 

90 bank_id: str | None = None, 

91 provider: str | None = None, 

92 operation: str | None = None, 

93 data: dict[str, str | int | float | bool | None] | None = None, 

94 level: int | None = None, 

95 ) -> None: 

96 """Emit a structured log entry.""" 

97 entry = LogEntry( 

98 event=event, 

99 timestamp=datetime.now(timezone.utc).isoformat(), 

100 bank_id=bank_id, 

101 provider=provider, 

102 operation=operation, 

103 data=data, 

104 ) 

105 log_level = level or self._level 

106 # Remove None values for cleaner output 

107 entry_dict = {k: v for k, v in asdict(entry).items() if v is not None} 

108 logger.log(log_level, json.dumps(entry_dict)) 

109 

110 

111# --------------------------------------------------------------------------- 

112# Metrics collector (optional Prometheus) 

113# --------------------------------------------------------------------------- 

114 

115try: 

116 from prometheus_client import Counter, Gauge, Histogram 

117 

118 _HAS_PROMETHEUS = True 

119except ImportError: 

120 _HAS_PROMETHEUS = False 

121 

122 

123class MetricsCollector: 

124 """Prometheus metrics collection. No-op if prometheus_client is not installed.""" 

125 

126 def __init__(self, enabled: bool = True) -> None: 

127 self.enabled = enabled and _HAS_PROMETHEUS 

128 self._counters: dict[str, Any] = {} 

129 self._histograms: dict[str, Any] = {} 

130 self._gauges: dict[str, Any] = {} 

131 

132 def _get_counter(self, name: str, description: str, labels: list[str]) -> Any: 

133 if not self.enabled: 

134 return None 

135 if name not in self._counters: 

136 self._counters[name] = Counter(name, description, labels) 

137 return self._counters[name] 

138 

139 def _get_histogram(self, name: str, description: str, labels: list[str]) -> Any: 

140 if not self.enabled: 

141 return None 

142 if name not in self._histograms: 

143 self._histograms[name] = Histogram(name, description, labels) 

144 return self._histograms[name] 

145 

146 def _get_gauge(self, name: str, description: str, labels: list[str]) -> Any: 

147 if not self.enabled: 

148 return None 

149 if name not in self._gauges: 

150 self._gauges[name] = Gauge(name, description, labels) 

151 return self._gauges[name] 

152 

153 def inc_counter(self, name: str, labels: dict[str, str], description: str = "") -> None: 

154 """Increment a counter.""" 

155 if not self.enabled: 

156 return 

157 counter = self._get_counter(name, description, list(labels.keys())) 

158 if counter: 

159 counter.labels(**labels).inc() 

160 

161 def observe_histogram(self, name: str, value: float, labels: dict[str, str], description: str = "") -> None: 

162 """Record a histogram observation.""" 

163 if not self.enabled: 

164 return 

165 histogram = self._get_histogram(name, description, list(labels.keys())) 

166 if histogram: 

167 histogram.labels(**labels).observe(value) 

168 

169 def set_gauge(self, name: str, value: float, labels: dict[str, str], description: str = "") -> None: 

170 """Set a gauge value.""" 

171 if not self.enabled: 

172 return 

173 gauge = self._get_gauge(name, description, list(labels.keys())) 

174 if gauge: 

175 gauge.labels(**labels).set(value) 

176 

177 

178# --------------------------------------------------------------------------- 

179# Timer utility 

180# --------------------------------------------------------------------------- 

181 

182 

183@contextmanager 

184def timed() -> Generator[dict[str, float], None, None]: 

185 """Context manager that tracks elapsed time in milliseconds. 

186 

187 Usage: 

188 with timed() as t: 

189 do_work() 

190 print(f"Took {t['elapsed_ms']:.1f}ms") 

191 """ 

192 result: dict[str, float] = {"elapsed_ms": 0.0} 

193 start = time.monotonic() 

194 try: 

195 yield result 

196 finally: 

197 result["elapsed_ms"] = (time.monotonic() - start) * 1000