Coverage for astrocyte/policy/signal_quality.py: 96%
54 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""Signal quality policies — deduplication detection.
3All functions are sync (Rust migration candidates).
4See docs/_design/policy-layer.md section 3.
5"""
7from __future__ import annotations
9import math
12def cosine_similarity(a: list[float], b: list[float]) -> float:
13 """Compute cosine similarity between two vectors.
15 Returns value in [-1.0, 1.0]. Returns 0.0 for zero vectors.
16 Sync, pure computation — Rust migration candidate.
17 """
18 if len(a) != len(b):
19 raise ValueError(f"Vector dimension mismatch: {len(a)} != {len(b)}")
21 dot = sum(x * y for x, y in zip(a, b))
22 norm_a = math.sqrt(sum(x * x for x in a))
23 norm_b = math.sqrt(sum(x * x for x in b))
25 if norm_a == 0.0 or norm_b == 0.0:
26 return 0.0
28 return dot / (norm_a * norm_b)
31class DedupDetector:
32 """Detect near-duplicate content via embedding similarity.
34 Stores recent embeddings per bank for comparison.
35 Sync, self-contained — Rust migration candidate.
36 """
38 _MAX_BANKS = 1000
40 def __init__(self, similarity_threshold: float = 0.95, max_cache_per_bank: int = 1000) -> None:
41 self.threshold = similarity_threshold
42 self.max_cache = max_cache_per_bank
43 # bank_id -> list of (memory_id, embedding)
44 self._cache: dict[str, list[tuple[str, list[float]]]] = {}
46 def _touch_bank(self, bank_id: str) -> None:
47 """Move bank to end of dict (most recently used) for LRU eviction."""
48 if bank_id in self._cache:
49 self._cache[bank_id] = self._cache.pop(bank_id)
51 def is_duplicate(
52 self,
53 bank_id: str,
54 embedding: list[float],
55 threshold_override: float | None = None,
56 ) -> tuple[bool, float]:
57 """Check if embedding is a near-duplicate of cached content.
59 ``threshold_override`` lets a per-call MIP DedupSpec.threshold take
60 precedence over the instance-level default. When ``None``, the
61 configured ``self.threshold`` is used.
63 Returns (is_dup, max_similarity).
64 """
65 threshold = threshold_override if threshold_override is not None else self.threshold
67 entries = self._cache.get(bank_id, [])
68 if entries:
69 self._touch_bank(bank_id)
70 max_sim = 0.0
72 for _, cached_emb in entries:
73 sim = cosine_similarity(embedding, cached_emb)
74 max_sim = max(max_sim, sim)
75 if sim >= threshold:
76 return True, sim
78 return False, max_sim
80 def add(self, bank_id: str, memory_id: str, embedding: list[float]) -> None:
81 """Add an embedding to the cache for future dedup checks."""
82 if bank_id not in self._cache:
83 if len(self._cache) >= self._MAX_BANKS:
84 # Evict least-recently-used bank (first key in insertion-ordered dict)
85 lru_bank = next(iter(self._cache))
86 del self._cache[lru_bank]
87 self._cache[bank_id] = []
88 self._touch_bank(bank_id)
90 entries = self._cache[bank_id]
91 entries.append((memory_id, embedding))
93 # Evict oldest if over capacity
94 if len(entries) > self.max_cache:
95 self._cache[bank_id] = entries[-self.max_cache :]
97 def clear_bank(self, bank_id: str) -> None:
98 """Clear cache for a bank."""
99 self._cache.pop(bank_id, None)
101 def remove(self, bank_id: str, memory_id: str) -> bool:
102 """Drop a single ``(memory_id, embedding)`` entry from the cache.
104 Called from the forget pipeline so a re-retain of similar content
105 after forget produces a fresh row instead of silently dedup'ing
106 against the cached embedding of the now-deleted memory.
108 Returns ``True`` if an entry was removed, ``False`` otherwise.
109 Idempotent — multiple forgets of the same id are safe.
110 """
111 entries = self._cache.get(bank_id)
112 if not entries:
113 return False
115 before = len(entries)
116 self._cache[bank_id] = [(mid, emb) for (mid, emb) in entries if mid != memory_id]
118 # Reclaim the bank slot entirely if it became empty.
119 if not self._cache[bank_id]:
120 del self._cache[bank_id]
122 return len(self._cache.get(bank_id, [])) < before