Coverage for astrocyte/policy/signal_quality.py: 96%

54 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""Signal quality policies — deduplication detection. 

2 

3All functions are sync (Rust migration candidates). 

4See docs/_design/policy-layer.md section 3. 

5""" 

6 

7from __future__ import annotations 

8 

9import math 

10 

11 

12def cosine_similarity(a: list[float], b: list[float]) -> float: 

13 """Compute cosine similarity between two vectors. 

14 

15 Returns value in [-1.0, 1.0]. Returns 0.0 for zero vectors. 

16 Sync, pure computation — Rust migration candidate. 

17 """ 

18 if len(a) != len(b): 

19 raise ValueError(f"Vector dimension mismatch: {len(a)} != {len(b)}") 

20 

21 dot = sum(x * y for x, y in zip(a, b)) 

22 norm_a = math.sqrt(sum(x * x for x in a)) 

23 norm_b = math.sqrt(sum(x * x for x in b)) 

24 

25 if norm_a == 0.0 or norm_b == 0.0: 

26 return 0.0 

27 

28 return dot / (norm_a * norm_b) 

29 

30 

31class DedupDetector: 

32 """Detect near-duplicate content via embedding similarity. 

33 

34 Stores recent embeddings per bank for comparison. 

35 Sync, self-contained — Rust migration candidate. 

36 """ 

37 

38 _MAX_BANKS = 1000 

39 

40 def __init__(self, similarity_threshold: float = 0.95, max_cache_per_bank: int = 1000) -> None: 

41 self.threshold = similarity_threshold 

42 self.max_cache = max_cache_per_bank 

43 # bank_id -> list of (memory_id, embedding) 

44 self._cache: dict[str, list[tuple[str, list[float]]]] = {} 

45 

46 def _touch_bank(self, bank_id: str) -> None: 

47 """Move bank to end of dict (most recently used) for LRU eviction.""" 

48 if bank_id in self._cache: 

49 self._cache[bank_id] = self._cache.pop(bank_id) 

50 

51 def is_duplicate( 

52 self, 

53 bank_id: str, 

54 embedding: list[float], 

55 threshold_override: float | None = None, 

56 ) -> tuple[bool, float]: 

57 """Check if embedding is a near-duplicate of cached content. 

58 

59 ``threshold_override`` lets a per-call MIP DedupSpec.threshold take 

60 precedence over the instance-level default. When ``None``, the 

61 configured ``self.threshold`` is used. 

62 

63 Returns (is_dup, max_similarity). 

64 """ 

65 threshold = threshold_override if threshold_override is not None else self.threshold 

66 

67 entries = self._cache.get(bank_id, []) 

68 if entries: 

69 self._touch_bank(bank_id) 

70 max_sim = 0.0 

71 

72 for _, cached_emb in entries: 

73 sim = cosine_similarity(embedding, cached_emb) 

74 max_sim = max(max_sim, sim) 

75 if sim >= threshold: 

76 return True, sim 

77 

78 return False, max_sim 

79 

80 def add(self, bank_id: str, memory_id: str, embedding: list[float]) -> None: 

81 """Add an embedding to the cache for future dedup checks.""" 

82 if bank_id not in self._cache: 

83 if len(self._cache) >= self._MAX_BANKS: 

84 # Evict least-recently-used bank (first key in insertion-ordered dict) 

85 lru_bank = next(iter(self._cache)) 

86 del self._cache[lru_bank] 

87 self._cache[bank_id] = [] 

88 self._touch_bank(bank_id) 

89 

90 entries = self._cache[bank_id] 

91 entries.append((memory_id, embedding)) 

92 

93 # Evict oldest if over capacity 

94 if len(entries) > self.max_cache: 

95 self._cache[bank_id] = entries[-self.max_cache :] 

96 

97 def clear_bank(self, bank_id: str) -> None: 

98 """Clear cache for a bank.""" 

99 self._cache.pop(bank_id, None) 

100 

101 def remove(self, bank_id: str, memory_id: str) -> bool: 

102 """Drop a single ``(memory_id, embedding)`` entry from the cache. 

103 

104 Called from the forget pipeline so a re-retain of similar content 

105 after forget produces a fresh row instead of silently dedup'ing 

106 against the cached embedding of the now-deleted memory. 

107 

108 Returns ``True`` if an entry was removed, ``False`` otherwise. 

109 Idempotent — multiple forgets of the same id are safe. 

110 """ 

111 entries = self._cache.get(bank_id) 

112 if not entries: 

113 return False 

114 

115 before = len(entries) 

116 self._cache[bank_id] = [(mid, emb) for (mid, emb) in entries if mid != memory_id] 

117 

118 # Reclaim the bank slot entirely if it became empty. 

119 if not self._cache[bank_id]: 

120 del self._cache[bank_id] 

121 

122 return len(self._cache.get(bank_id, [])) < before