Coverage for astrocyte/pipeline/consolidation.py: 90%
107 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""Memory consolidation — dedup, stale archival, and entity cleanup.
3Async (coordinates I/O operations). See docs/_design/built-in-pipeline.md section 5.
5Tier 1 consolidation supports:
6- Dedup: remove near-duplicate embeddings (cosine similarity)
7- Stale archival: identify memories never recalled within a time window
8- Entity cleanup: remove orphaned entities from the graph store
9"""
11from __future__ import annotations
13import logging
14from collections import defaultdict
15from dataclasses import dataclass
16from datetime import datetime, timezone
17from typing import TYPE_CHECKING
19from astrocyte._log_safety import safe as _safe_log
20from astrocyte.policy.signal_quality import cosine_similarity
22if TYPE_CHECKING:
23 from astrocyte.provider import GraphStore, VectorStore
25logger = logging.getLogger("astrocyte.pipeline")
28class _VectorBuckets:
29 """Multi-band quantization bucketing for near-duplicate detection.
31 Uses multiple offset hash bands to reduce false negatives. A vector is
32 checked against candidates from ALL bands it maps to (union), but only
33 stored once per band. This gives O(n × avg_bucket_size) instead of O(n²).
34 """
36 _NUM_HASH_BANDS = 3 # Number of independent hash bands
38 def __init__(self, num_dims_per_band: int = 8) -> None:
39 self._num_dims = num_dims_per_band
40 # band_index -> bucket_key -> list of (id, vector)
41 self._bands: list[dict[tuple[int, ...], list[tuple[str, list[float]]]]] = [
42 defaultdict(list) for _ in range(self._NUM_HASH_BANDS)
43 ]
45 def _bucket_key(self, vector: list[float], band: int) -> tuple[int, ...]:
46 """Quantize vector into a coarse bucket key for the given band."""
47 if not vector:
48 return ()
49 dims = len(vector)
50 # Each band uses a different offset to sample different dimensions
51 offset = (band * dims) // (self._NUM_HASH_BANDS * 2)
52 step = max(1, dims // self._num_dims)
53 return tuple(
54 (1 if vector[(offset + i) % dims] > 0.1 else (-1 if vector[(offset + i) % dims] < -0.1 else 0))
55 for i in range(0, min(dims, step * self._num_dims), step)
56 )
58 def find_similar(self, vector: list[float], threshold: float) -> bool:
59 """Check if any stored vector is similar above threshold (any band)."""
60 checked: set[str] = set()
61 for band_idx, band in enumerate(self._bands):
62 key = self._bucket_key(vector, band_idx)
63 for item_id, seen_vec in band.get(key, []):
64 if item_id in checked:
65 continue
66 checked.add(item_id)
67 try:
68 if cosine_similarity(vector, seen_vec) >= threshold:
69 return True
70 except ValueError:
71 continue
72 return False
74 def add(self, item_id: str, vector: list[float]) -> None:
75 for band_idx, band in enumerate(self._bands):
76 key = self._bucket_key(vector, band_idx)
77 band[key].append((item_id, vector))
80@dataclass
81class ConsolidationResult:
82 duplicates_removed: int
83 total_scanned: int
84 stale_archived: int = 0
85 orphaned_entities_removed: int = 0
88async def run_consolidation(
89 vector_store: VectorStore,
90 bank_id: str,
91 similarity_threshold: float = 0.95,
92 batch_size: int = 100,
93 *,
94 archive_unretrieved_after_days: int | None = None,
95 graph_store: GraphStore | None = None,
96) -> ConsolidationResult:
97 """Run Tier 1 consolidation on a bank.
99 1. **Dedup** — paginates through all vectors, compares embeddings pairwise,
100 and deletes near-duplicates (keeping the first occurrence).
101 2. **Stale archival** — if ``archive_unretrieved_after_days`` is set, deletes
102 memories that have never been recalled within that window.
103 3. **Entity cleanup** — if a ``graph_store`` is provided, removes entities
104 that no longer link to any remaining memories.
105 """
106 if not hasattr(vector_store, "list_vectors"):
107 logger.warning("VectorStore does not support list_vectors; skipping consolidation")
108 return ConsolidationResult(duplicates_removed=0, total_scanned=0)
110 duplicates_removed = 0
111 stale_archived = 0
112 total_scanned = 0
113 seen_index = _VectorBuckets()
114 to_delete_dedup: list[str] = []
115 to_delete_stale: list[str] = []
117 now = datetime.now(timezone.utc)
119 offset = 0
120 while True:
121 batch = await vector_store.list_vectors(bank_id, offset=offset, limit=batch_size)
122 if not batch:
123 break
125 for item in batch:
126 total_scanned += 1
128 # -- Dedup check (bucketed: O(n × bucket_size) instead of O(n²)) --
129 is_dup = seen_index.find_similar(item.vector, similarity_threshold)
130 if is_dup:
131 to_delete_dedup.append(item.id)
132 else:
133 seen_index.add(item.id, item.vector)
135 # -- Stale archival check --
136 if not is_dup and archive_unretrieved_after_days is not None and item.metadata:
137 last_recalled = item.metadata.get("_last_recalled_at")
138 created_at = item.metadata.get("_created_at")
140 # Parse datetime strings
141 ref_time = None
142 if last_recalled:
143 ref_time = _parse_dt(last_recalled) if isinstance(last_recalled, str) else last_recalled
144 elif created_at:
145 ref_time = _parse_dt(created_at) if isinstance(created_at, str) else created_at
147 if ref_time and isinstance(ref_time, datetime):
148 age_days = (now - ref_time).days
149 if age_days >= archive_unretrieved_after_days:
150 to_delete_stale.append(item.id)
152 offset += len(batch)
153 if len(batch) < batch_size:
154 break
156 # Safety: prevent runaway scans
157 if offset > 100000:
158 logger.warning("Consolidation scan capped at 100k vectors for bank %s", _safe_log(bank_id))
159 break
161 # Delete duplicates
162 if to_delete_dedup:
163 for i in range(0, len(to_delete_dedup), batch_size):
164 chunk = to_delete_dedup[i : i + batch_size]
165 deleted = await vector_store.delete(chunk, bank_id)
166 duplicates_removed += deleted
168 # Delete stale memories
169 if to_delete_stale:
170 for i in range(0, len(to_delete_stale), batch_size):
171 chunk = to_delete_stale[i : i + batch_size]
172 deleted = await vector_store.delete(chunk, bank_id)
173 stale_archived += deleted
175 # Entity cleanup
176 orphaned_removed = 0
177 if graph_store and hasattr(graph_store, "remove_orphaned_entities"):
178 try:
179 orphaned_removed = await graph_store.remove_orphaned_entities(bank_id)
180 except Exception:
181 logger.warning("Entity cleanup failed for bank %s", _safe_log(bank_id), exc_info=True)
183 return ConsolidationResult(
184 duplicates_removed=duplicates_removed,
185 total_scanned=total_scanned,
186 stale_archived=stale_archived,
187 orphaned_entities_removed=orphaned_removed,
188 )
191def _parse_dt(value: str) -> datetime | None:
192 """Best-effort ISO datetime parse."""
193 try:
194 return datetime.fromisoformat(value)
195 except (ValueError, TypeError):
196 return None