Coverage for astrocyte/pipeline/consolidation.py: 90%

107 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""Memory consolidation — dedup, stale archival, and entity cleanup. 

2 

3Async (coordinates I/O operations). See docs/_design/built-in-pipeline.md section 5. 

4 

5Tier 1 consolidation supports: 

6- Dedup: remove near-duplicate embeddings (cosine similarity) 

7- Stale archival: identify memories never recalled within a time window 

8- Entity cleanup: remove orphaned entities from the graph store 

9""" 

10 

11from __future__ import annotations 

12 

13import logging 

14from collections import defaultdict 

15from dataclasses import dataclass 

16from datetime import datetime, timezone 

17from typing import TYPE_CHECKING 

18 

19from astrocyte._log_safety import safe as _safe_log 

20from astrocyte.policy.signal_quality import cosine_similarity 

21 

22if TYPE_CHECKING: 

23 from astrocyte.provider import GraphStore, VectorStore 

24 

25logger = logging.getLogger("astrocyte.pipeline") 

26 

27 

28class _VectorBuckets: 

29 """Multi-band quantization bucketing for near-duplicate detection. 

30 

31 Uses multiple offset hash bands to reduce false negatives. A vector is 

32 checked against candidates from ALL bands it maps to (union), but only 

33 stored once per band. This gives O(n × avg_bucket_size) instead of O(n²). 

34 """ 

35 

36 _NUM_HASH_BANDS = 3 # Number of independent hash bands 

37 

38 def __init__(self, num_dims_per_band: int = 8) -> None: 

39 self._num_dims = num_dims_per_band 

40 # band_index -> bucket_key -> list of (id, vector) 

41 self._bands: list[dict[tuple[int, ...], list[tuple[str, list[float]]]]] = [ 

42 defaultdict(list) for _ in range(self._NUM_HASH_BANDS) 

43 ] 

44 

45 def _bucket_key(self, vector: list[float], band: int) -> tuple[int, ...]: 

46 """Quantize vector into a coarse bucket key for the given band.""" 

47 if not vector: 

48 return () 

49 dims = len(vector) 

50 # Each band uses a different offset to sample different dimensions 

51 offset = (band * dims) // (self._NUM_HASH_BANDS * 2) 

52 step = max(1, dims // self._num_dims) 

53 return tuple( 

54 (1 if vector[(offset + i) % dims] > 0.1 else (-1 if vector[(offset + i) % dims] < -0.1 else 0)) 

55 for i in range(0, min(dims, step * self._num_dims), step) 

56 ) 

57 

58 def find_similar(self, vector: list[float], threshold: float) -> bool: 

59 """Check if any stored vector is similar above threshold (any band).""" 

60 checked: set[str] = set() 

61 for band_idx, band in enumerate(self._bands): 

62 key = self._bucket_key(vector, band_idx) 

63 for item_id, seen_vec in band.get(key, []): 

64 if item_id in checked: 

65 continue 

66 checked.add(item_id) 

67 try: 

68 if cosine_similarity(vector, seen_vec) >= threshold: 

69 return True 

70 except ValueError: 

71 continue 

72 return False 

73 

74 def add(self, item_id: str, vector: list[float]) -> None: 

75 for band_idx, band in enumerate(self._bands): 

76 key = self._bucket_key(vector, band_idx) 

77 band[key].append((item_id, vector)) 

78 

79 

80@dataclass 

81class ConsolidationResult: 

82 duplicates_removed: int 

83 total_scanned: int 

84 stale_archived: int = 0 

85 orphaned_entities_removed: int = 0 

86 

87 

88async def run_consolidation( 

89 vector_store: VectorStore, 

90 bank_id: str, 

91 similarity_threshold: float = 0.95, 

92 batch_size: int = 100, 

93 *, 

94 archive_unretrieved_after_days: int | None = None, 

95 graph_store: GraphStore | None = None, 

96) -> ConsolidationResult: 

97 """Run Tier 1 consolidation on a bank. 

98 

99 1. **Dedup** — paginates through all vectors, compares embeddings pairwise, 

100 and deletes near-duplicates (keeping the first occurrence). 

101 2. **Stale archival** — if ``archive_unretrieved_after_days`` is set, deletes 

102 memories that have never been recalled within that window. 

103 3. **Entity cleanup** — if a ``graph_store`` is provided, removes entities 

104 that no longer link to any remaining memories. 

105 """ 

106 if not hasattr(vector_store, "list_vectors"): 

107 logger.warning("VectorStore does not support list_vectors; skipping consolidation") 

108 return ConsolidationResult(duplicates_removed=0, total_scanned=0) 

109 

110 duplicates_removed = 0 

111 stale_archived = 0 

112 total_scanned = 0 

113 seen_index = _VectorBuckets() 

114 to_delete_dedup: list[str] = [] 

115 to_delete_stale: list[str] = [] 

116 

117 now = datetime.now(timezone.utc) 

118 

119 offset = 0 

120 while True: 

121 batch = await vector_store.list_vectors(bank_id, offset=offset, limit=batch_size) 

122 if not batch: 

123 break 

124 

125 for item in batch: 

126 total_scanned += 1 

127 

128 # -- Dedup check (bucketed: O(n × bucket_size) instead of O(n²)) -- 

129 is_dup = seen_index.find_similar(item.vector, similarity_threshold) 

130 if is_dup: 

131 to_delete_dedup.append(item.id) 

132 else: 

133 seen_index.add(item.id, item.vector) 

134 

135 # -- Stale archival check -- 

136 if not is_dup and archive_unretrieved_after_days is not None and item.metadata: 

137 last_recalled = item.metadata.get("_last_recalled_at") 

138 created_at = item.metadata.get("_created_at") 

139 

140 # Parse datetime strings 

141 ref_time = None 

142 if last_recalled: 

143 ref_time = _parse_dt(last_recalled) if isinstance(last_recalled, str) else last_recalled 

144 elif created_at: 

145 ref_time = _parse_dt(created_at) if isinstance(created_at, str) else created_at 

146 

147 if ref_time and isinstance(ref_time, datetime): 

148 age_days = (now - ref_time).days 

149 if age_days >= archive_unretrieved_after_days: 

150 to_delete_stale.append(item.id) 

151 

152 offset += len(batch) 

153 if len(batch) < batch_size: 

154 break 

155 

156 # Safety: prevent runaway scans 

157 if offset > 100000: 

158 logger.warning("Consolidation scan capped at 100k vectors for bank %s", _safe_log(bank_id)) 

159 break 

160 

161 # Delete duplicates 

162 if to_delete_dedup: 

163 for i in range(0, len(to_delete_dedup), batch_size): 

164 chunk = to_delete_dedup[i : i + batch_size] 

165 deleted = await vector_store.delete(chunk, bank_id) 

166 duplicates_removed += deleted 

167 

168 # Delete stale memories 

169 if to_delete_stale: 

170 for i in range(0, len(to_delete_stale), batch_size): 

171 chunk = to_delete_stale[i : i + batch_size] 

172 deleted = await vector_store.delete(chunk, bank_id) 

173 stale_archived += deleted 

174 

175 # Entity cleanup 

176 orphaned_removed = 0 

177 if graph_store and hasattr(graph_store, "remove_orphaned_entities"): 

178 try: 

179 orphaned_removed = await graph_store.remove_orphaned_entities(bank_id) 

180 except Exception: 

181 logger.warning("Entity cleanup failed for bank %s", _safe_log(bank_id), exc_info=True) 

182 

183 return ConsolidationResult( 

184 duplicates_removed=duplicates_removed, 

185 total_scanned=total_scanned, 

186 stale_archived=stale_archived, 

187 orphaned_entities_removed=orphaned_removed, 

188 ) 

189 

190 

191def _parse_dt(value: str) -> datetime | None: 

192 """Best-effort ISO datetime parse.""" 

193 try: 

194 return datetime.fromisoformat(value) 

195 except (ValueError, TypeError): 

196 return None