Coverage for astrocyte/pipeline/audit.py: 94%

52 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""M10: Gap-analysis audit pipeline. 

2 

3``run_audit()`` takes a set of memories already retrieved for a scope, 

4calls the LLM audit judge, and returns a structured ``AuditResult``. 

5 

6The judge is a single-shot LLM call that: 

71. Receives all retrieved memories as context. 

82. Identifies topics that are absent or under-covered. 

93. Returns a JSON object with ``gaps`` (list) and ``coverage_score`` (float). 

10 

11The module is intentionally narrow: memory retrieval and token budgeting 

12happen in the orchestrator; this module owns only the prompt + parse logic 

13so it can be unit-tested against a ``MockLLMProvider`` without any store. 

14""" 

15 

16from __future__ import annotations 

17 

18import json 

19import logging 

20from typing import TYPE_CHECKING, Literal 

21 

22from astrocyte.types import AuditResult, GapItem, MemoryHit, Message, RecallTrace 

23 

24if TYPE_CHECKING: 

25 from astrocyte.provider import LLMProvider 

26 

27_logger = logging.getLogger("astrocyte.audit") 

28 

29# --------------------------------------------------------------------------- 

30# Prompt 

31# --------------------------------------------------------------------------- 

32 

33_SYSTEM_PROMPT = """\ 

34You are a memory-gap analyst. You will be given: 

35- A SCOPE: the topic or question the user cares about. 

36- MEMORIES: a numbered list of facts the agent currently knows. 

37 

38Your task is to identify what is MISSING or UNDER-COVERED — knowledge that 

39would be needed to answer questions about the scope reliably but is absent 

40or too thin in the provided memories. 

41 

42Return ONLY valid JSON in this exact shape (no markdown, no preamble): 

43 

44{ 

45 "coverage_score": <float 0.0–1.0>, 

46 "gaps": [ 

47 {"topic": "<short label>", "severity": "<high|medium|low>", "reason": "<one sentence>"}, 

48 ... 

49 ] 

50} 

51 

52Scoring guide for coverage_score: 

53 1.0 — comprehensive; the scope is well covered from multiple angles. 

54 0.7 — good; most key facts are present, minor gaps only. 

55 0.5 — partial; useful but material gaps exist. 

56 0.3 — sparse; only surface-level coverage. 

57 0.0 — no relevant memories at all. 

58 

59Gap severity guide: 

60 high — absence would likely produce a wrong or confidently-wrong answer. 

61 medium — partial coverage; answer would be incomplete. 

62 low — nuance or context is missing but a reasonable answer is still possible. 

63 

64If there are no gaps, return an empty list for "gaps". 

65Do not fabricate memories. Only identify gaps relative to what was provided.\ 

66""" 

67 

68 

69def _render_memories(memories: list[MemoryHit]) -> str: 

70 if not memories: 

71 return "(no memories retrieved)" 

72 lines: list[str] = [] 

73 for i, m in enumerate(memories, 1): 

74 ts = f" [{m.retained_at:%Y-%m-%d}]" if m.retained_at else "" 

75 lines.append(f"[{i}]{ts} {m.text}") 

76 return "\n".join(lines) 

77 

78 

79def _parse_response( 

80 raw: str, scope: str, bank_id: str, memories_scanned: int, trace: RecallTrace | None 

81) -> AuditResult: 

82 """Parse LLM JSON response into AuditResult, with graceful fallback.""" 

83 raw = raw.strip() 

84 # Strip markdown fences if the model wrapped the JSON 

85 if raw.startswith("```"): 

86 lines = raw.splitlines() 

87 raw = "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:]) 

88 

89 try: 

90 data = json.loads(raw) 

91 except json.JSONDecodeError: 

92 _logger.warning("audit judge returned non-JSON: %r", raw[:200]) 

93 return AuditResult( 

94 scope=scope, 

95 bank_id=bank_id, 

96 gaps=[GapItem(topic="(parse error)", severity="low", reason="Audit judge returned non-JSON output.")], 

97 coverage_score=0.0 if not memories_scanned else 0.5, 

98 memories_scanned=memories_scanned, 

99 trace=trace, 

100 ) 

101 

102 raw_score = data.get("coverage_score", 0.5) 

103 try: 

104 coverage_score = max(0.0, min(1.0, float(raw_score))) 

105 except (TypeError, ValueError): 

106 coverage_score = 0.5 

107 

108 gaps: list[GapItem] = [] 

109 for g in data.get("gaps", []): 

110 if not isinstance(g, dict): 

111 continue 

112 topic = str(g.get("topic", "unknown")) 

113 severity_raw = str(g.get("severity", "low")).lower() 

114 severity: Literal["high", "medium", "low"] = ( 

115 severity_raw if severity_raw in ("high", "medium", "low") else "low" 

116 ) 

117 reason = str(g.get("reason", "")) 

118 gaps.append(GapItem(topic=topic, severity=severity, reason=reason)) 

119 

120 return AuditResult( 

121 scope=scope, 

122 bank_id=bank_id, 

123 gaps=gaps, 

124 coverage_score=coverage_score, 

125 memories_scanned=memories_scanned, 

126 trace=trace, 

127 ) 

128 

129 

130# --------------------------------------------------------------------------- 

131# Public entry point 

132# --------------------------------------------------------------------------- 

133 

134 

135async def run_audit( 

136 scope: str, 

137 bank_id: str, 

138 memories: list[MemoryHit], 

139 llm_provider: LLMProvider, 

140 *, 

141 trace: RecallTrace | None = None, 

142) -> AuditResult: 

143 """Call the LLM audit judge and return a structured ``AuditResult``. 

144 

145 Args: 

146 scope: The topic or question to audit coverage for. 

147 bank_id: The bank that was searched (echoed into result). 

148 memories: Retrieved memories to pass as context to the judge. 

149 The caller is responsible for retrieving and budget-trimming them. 

150 llm_provider: LLM to use for the audit judge call. 

151 trace: Optional recall trace to embed in the result. 

152 

153 Returns: 

154 :class:`~astrocyte.types.AuditResult` with gaps and coverage score. 

155 """ 

156 if not memories: 

157 # No memories at all → zero coverage, one high-severity gap 

158 return AuditResult( 

159 scope=scope, 

160 bank_id=bank_id, 

161 gaps=[ 

162 GapItem( 

163 topic=scope, 

164 severity="high", 

165 reason="No memories were found in this bank for the given scope.", 

166 ) 

167 ], 

168 coverage_score=0.0, 

169 memories_scanned=0, 

170 trace=trace, 

171 ) 

172 

173 memory_block = _render_memories(memories) 

174 user_content = f"SCOPE: {scope}\n\nMEMORIES:\n{memory_block}" 

175 

176 messages = [ 

177 Message(role="system", content=_SYSTEM_PROMPT), 

178 Message(role="user", content=user_content), 

179 ] 

180 

181 try: 

182 completion = await llm_provider.complete(messages, max_tokens=1024, temperature=0.0) 

183 raw = completion.text or "" 

184 except Exception as exc: 

185 _logger.warning("audit judge LLM call failed: %s", exc) 

186 raw = "" 

187 

188 return _parse_response(raw, scope, bank_id, len(memories), trace)