Coverage for astrocyte/pipeline/observation.py: 88%
237 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""Observation consolidation — semantic deduplication and synthesis layer.
3After each ``retain()`` call, this module fires a background LLM pass that
4maintains a deduplicated *observations layer* on top of raw memories. The
5design is inspired by the Hindsight memory system (vectorize-io/hindsight,
6MIT licence) and adapted to Astrocyte's existing multi-strategy RRF pipeline.
8Terminology
9-----------
10- **Raw memory** — a chunk stored verbatim by ``retain()``; ``fact_type`` is
11 ``"world"``, ``"experience"``, or ``None``.
12- **Observation** — a distilled, deduplicated atomic fact synthesized from one
13 or more raw memories; stored as a ``VectorItem`` with ``fact_type="observation"``
14 and extra bookkeeping metadata (``_obs_proof_count``, ``_obs_source_ids``,
15 ``_obs_confidence``).
17Lifecycle
18---------
191. ``retain()`` stores raw chunks as usual.
202. Immediately after, ``ObservationConsolidator.consolidate()`` is called
21 (fire-and-forget by default; awaitable for tests).
223. The consolidator fetches the top-K semantically-related existing
23 observations, then calls the LLM to produce structured
24 ``create`` / ``update`` / ``delete`` actions.
254. Actions are applied to the vector store:
26 - ``create`` → new ``VectorItem(fact_type="observation", ...)``
27 - ``update`` → delete the old item, insert a revised one with incremented
28 ``_obs_proof_count`` and merged ``_obs_source_ids``
29 - ``delete`` → remove the observation from the store
305. During ``recall()``, the orchestrator runs an additional ``observation``
31 strategy (``search_similar`` with ``fact_types=["observation"]``) and
32 injects its results into the RRF fusion with a configurable weight boost,
33 so confirmed multi-evidence observations naturally rank above single-mention
34 raw memories.
36Prompt design
37-------------
38The LLM receives a brief of existing observations (with proof counts) and
39the new memory, then outputs a JSON array of actions. The prompt is kept
40deliberately short to minimise token cost — the consolidator is in the
41critical path of every retain, even in fire-and-forget mode. A ``max_obs``
42cap (default 10) limits context window growth.
44Metadata schema on observation VectorItems
45------------------------------------------
46``_obs_proof_count`` int (stored as int) — # supporting memories
47``_obs_source_ids`` str (JSON list) — raw memory IDs that contributed
48``_obs_source_timestamps`` str (JSON list) — ISO timestamps parallel to source_ids; feeds :func:`astrocyte.pipeline.trend.compute_trend` (M21)
49``_obs_confidence`` str (float, 0–1) — LLM-assigned confidence
50``_obs_updated_at`` str (ISO datetime) — last mutation timestamp
51``_obs_scope`` str — scope key ("bank" or caller tag)
52``_obs_freshness`` str — "fresh" | "stale"
53"""
55from __future__ import annotations
57import json
58import logging
59import uuid
60from dataclasses import dataclass, field
61from datetime import datetime, timezone
62from typing import TYPE_CHECKING, Any
64if TYPE_CHECKING:
65 from astrocyte.pipeline.trend import Trend
66 from astrocyte.provider import LLMProvider, VectorStore
67 from astrocyte.types import VectorHit
69logger = logging.getLogger("astrocyte.pipeline.observation")
71# ---------------------------------------------------------------------------
72# Action types
73# ---------------------------------------------------------------------------
75_VALID_ACTIONS = frozenset({"create", "update", "delete"})
77# ---------------------------------------------------------------------------
78# Observation bank naming
79# ---------------------------------------------------------------------------
81_OBS_SUFFIX = "::obs"
84def obs_bank_id(bank_id: str) -> str:
85 """Return the dedicated observation bank for ``bank_id``.
87 Observations are stored in a *separate* bank (``{bank_id}::obs``) so that
88 the main semantic/keyword/temporal strategies never retrieve them. This
89 prevents observations from double-counting in RRF fusion (they would
90 otherwise appear in both the ``semantic`` results and the ``observation``
91 strategy results, crowding out the raw source memories that carry verbatim
92 answers).
93 """
94 return f"{bank_id}{_OBS_SUFFIX}"
97def observation_scope(bank_id: str, tags: list[str] | None = None) -> str:
98 """Return a stable observation scope for a retain/recall context."""
99 if tags:
100 return "|".join(sorted(str(tag) for tag in tags))
101 return f"bank:{bank_id}"
104# ---------------------------------------------------------------------------
105# Result
106# ---------------------------------------------------------------------------
109@dataclass
110class ObservationConsolidationResult:
111 created: int = 0
112 updated: int = 0
113 deleted: int = 0
114 skipped: int = 0
115 errors: list[str] = field(default_factory=list)
118def compute_observation_trend(
119 metadata: dict[str, Any] | None,
120 *,
121 now: datetime | None = None,
122) -> "Trend":
123 """Compute :class:`~astrocyte.pipeline.trend.Trend` for a stored observation.
125 Reads ``_obs_source_timestamps`` (the per-source ISO-timestamp list
126 populated by :meth:`ObservationConsolidator._apply_create` /
127 :meth:`_apply_update`) and feeds it to
128 :func:`astrocyte.pipeline.trend.compute_trend`.
130 Fallback path when ``_obs_source_timestamps`` is absent (legacy
131 observations created before M21 / external writers): use
132 ``_obs_updated_at`` as a single-point timestamp. This is coarser
133 than per-source tracking but never returns garbage —
134 :func:`compute_trend` will classify it as NEW / STALE based on the
135 one timestamp's age.
136 """
137 from astrocyte.pipeline.trend import Trend, compute_trend
139 meta = metadata or {}
140 raw = meta.get("_obs_source_timestamps")
141 timestamps: list[datetime] = []
142 if isinstance(raw, str) and raw.strip():
143 try:
144 parsed = json.loads(raw)
145 if isinstance(parsed, list):
146 for ts in parsed:
147 if isinstance(ts, str) and ts:
148 try:
149 timestamps.append(datetime.fromisoformat(ts.replace("Z", "+00:00")))
150 except ValueError:
151 continue
152 except json.JSONDecodeError:
153 # Malformed metadata blob — fall through to the legacy
154 # single-timestamp path below. The trend computation
155 # degrades to STALE if nothing else parses either.
156 pass
158 if not timestamps:
159 # Legacy fallback: use the updated_at as a single-point timestamp.
160 fallback = meta.get("_obs_updated_at") or meta.get("_created_at")
161 if isinstance(fallback, str) and fallback:
162 try:
163 timestamps.append(datetime.fromisoformat(fallback.replace("Z", "+00:00")))
164 except ValueError:
165 # Malformed fallback timestamp — leave ``timestamps``
166 # empty so the next check returns Trend.STALE.
167 pass
169 if not timestamps:
170 return Trend.STALE
171 return compute_trend(timestamps, now=now)
174# ---------------------------------------------------------------------------
175# Prompt
176# ---------------------------------------------------------------------------
178_SYSTEM_PROMPT = """\
179You are a memory consolidation agent. Your job is to maintain a concise, \
180deduplicated set of *observations* — atomic facts distilled from raw memories.
182Given:
183- A list of existing observations (each with an ID, proof count, and text).
184- A new memory to integrate.
186Output a JSON array of actions. Each action is one of:
187 {"action": "create", "text": "<observation>", "confidence": <0.0–1.0>}
188 {"action": "update", "obs_id": "<id>", "text": "<revised observation>", \
189"confidence": <0.0–1.0>}
190 {"action": "delete", "obs_id": "<id>"}
192Rules (Hindsight-style three-strategy reconciliation):
1931. REDUNDANT — same fact in different words: UPDATE the existing observation \
194to the cleaner phrasing and raise confidence. Don't create a near-duplicate.
1952. STATE UPDATE — new info supersedes old state: UPDATE the observation to \
196preserve the *journey*. Use phrases like "used to X, now Y" or "changed from \
197X to Y" so the temporal evolution is captured. Never silently overwrite — \
198the agent must be able to answer questions about the prior state.
1993. DIRECT CONTRADICTION (rare) — irreconcilable claim with no temporal frame: \
200DELETE the old observation only when the new memory definitively supersedes \
201it AND no journey can be expressed (e.g. an outright correction of a wrong \
202fact). When in doubt, prefer UPDATE-with-journey over DELETE.
2034. CREATE if the new memory introduces a fact not already covered.
2045. If the new memory is fully redundant *and* the existing observation \
205already captures it precisely, respond with [].
2066. Preserve stable persona and preference facts: identities, goals, hobbies, \
207relationships, repeated activities, values, career plans, and stated likes/dislikes.
209Constraints:
210- Each observation must be a single, independently-useful sentence (≤ 30 words).
211- Output valid JSON only. No prose before or after the array.
212- Maximum 5 actions per call.
213"""
216def _build_user_prompt(
217 existing_obs: list[VectorHit],
218 new_memory_text: str,
219 *,
220 max_obs: int = 10,
221) -> str:
222 lines: list[str] = ["Existing observations:"]
223 if existing_obs:
224 for obs in existing_obs[:max_obs]:
225 proof = (obs.metadata or {}).get("_obs_proof_count", 1)
226 conf = (obs.metadata or {}).get("_obs_confidence", "?")
227 lines.append(f"[{obs.id}] (proof_count={proof}, confidence={conf}): {obs.text}")
228 else:
229 lines.append("(none)")
230 lines.append("")
231 lines.append("New memory:")
232 lines.append(new_memory_text.strip())
233 lines.append("")
234 lines.append("Actions (JSON array):")
235 return "\n".join(lines)
238# ---------------------------------------------------------------------------
239# Action parsing
240# ---------------------------------------------------------------------------
243def _parse_actions(raw: str) -> list[dict[str, Any]]:
244 """Extract the JSON array from the LLM response.
246 The LLM is instructed to output *only* a JSON array, but may include
247 leading/trailing whitespace or a markdown code fence. We extract the
248 first ``[...`` block.
249 """
250 text = raw.strip()
251 # Strip markdown code fences if present
252 if text.startswith("```"):
253 lines = text.splitlines()
254 text = "\n".join(line for line in lines if not line.strip().startswith("```")).strip()
256 # Find the first JSON array
257 start = text.find("[")
258 end = text.rfind("]")
259 if start == -1 or end == -1 or end < start:
260 logger.debug("No JSON array found in consolidation response: %r", raw[:200])
261 return []
263 try:
264 actions = json.loads(text[start : end + 1])
265 except json.JSONDecodeError as exc:
266 logger.warning("JSON parse error in consolidation response: %s — %r", exc, raw[:200])
267 return []
269 if not isinstance(actions, list):
270 return []
272 validated: list[dict[str, Any]] = []
273 for item in actions:
274 if not isinstance(item, dict):
275 continue
276 action = item.get("action")
277 if action not in _VALID_ACTIONS:
278 continue
279 validated.append(item)
281 return validated
284# ---------------------------------------------------------------------------
285# Core consolidator
286# ---------------------------------------------------------------------------
289class ObservationConsolidator:
290 """Maintains the observations layer via post-retain LLM consolidation.
292 Parameters
293 ----------
294 max_context_obs:
295 Maximum number of existing observations fetched for the LLM prompt.
296 Higher values give the LLM more context but increase token cost.
297 min_confidence:
298 Observations with confidence below this threshold are not stored.
299 observation_recall_limit:
300 How many existing observations to fetch per consolidation call.
301 This is the ``limit`` passed to ``vector_store.search_similar``.
302 """
304 def __init__(
305 self,
306 *,
307 max_context_obs: int = 10,
308 min_confidence: float = 0.5,
309 observation_recall_limit: int = 15,
310 ) -> None:
311 self.max_context_obs = max_context_obs
312 self.min_confidence = min_confidence
313 self.observation_recall_limit = observation_recall_limit
315 async def consolidate(
316 self,
317 new_memory_text: str,
318 new_memory_ids: list[str],
319 bank_id: str,
320 vector_store: VectorStore,
321 llm_provider: LLMProvider,
322 *,
323 query_vector: list[float] | None = None,
324 scope: str | None = None,
325 ) -> ObservationConsolidationResult:
326 """Integrate a new memory into the observations layer.
328 Args:
329 new_memory_text: The raw text of the newly retained memory (or
330 the first chunk when a retain produces multiple chunks — the
331 consolidator works at the retain-call level, not chunk level).
332 new_memory_ids: IDs of the vector items stored in this retain call.
333 bank_id: The memory bank being updated.
334 vector_store: Backing store (must support ``search_similar`` and
335 ``store_vectors``).
336 llm_provider: LLM used for the consolidation call.
337 query_vector: Pre-computed embedding of ``new_memory_text``. When
338 ``None``, the method embeds the text itself. Pass the vector
339 already computed during retain to avoid a redundant embedding
340 call.
341 """
342 result = ObservationConsolidationResult()
343 obs_scope = scope or observation_scope(bank_id)
345 try:
346 # 1. Embed the new memory (or reuse the caller's vector)
347 if query_vector is None:
348 vecs = await llm_provider.embed([new_memory_text])
349 query_vector = vecs[0]
351 # 2. Fetch semantically-related existing observations from the
352 # dedicated observation bank (separate from the raw memory bank).
353 from astrocyte.types import VectorFilters
355 obs_bank = obs_bank_id(bank_id)
356 existing_obs = await vector_store.search_similar(
357 query_vector,
358 obs_bank,
359 limit=self.observation_recall_limit,
360 filters=VectorFilters(bank_id=obs_bank),
361 )
363 # 3. Build and execute the LLM prompt
364 user_prompt = _build_user_prompt(
365 existing_obs,
366 new_memory_text,
367 max_obs=self.max_context_obs,
368 )
369 from astrocyte.types import Message
371 completion = await llm_provider.complete(
372 messages=[
373 Message(role="system", content=_SYSTEM_PROMPT),
374 Message(role="user", content=user_prompt),
375 ],
376 max_tokens=512,
377 temperature=0.0,
378 )
380 # 4. Parse actions
381 actions = _parse_actions(completion.text)
382 if not actions:
383 return result
385 # Build a quick lookup: obs_id → VectorHit
386 obs_by_id: dict[str, VectorHit] = {h.id: h for h in existing_obs}
388 # 5. Apply actions
389 now_iso = datetime.now(timezone.utc).isoformat()
390 new_ids_json = json.dumps(new_memory_ids)
392 for action in actions:
393 act = action.get("action")
394 try:
395 if act == "create":
396 r = await self._apply_create(
397 action,
398 bank_id,
399 vector_store,
400 llm_provider,
401 source_ids=new_memory_ids,
402 now_iso=now_iso,
403 scope=obs_scope,
404 )
405 if r:
406 result.created += 1
407 else:
408 result.skipped += 1
410 elif act == "update":
411 obs_id = action.get("obs_id", "")
412 existing = obs_by_id.get(obs_id)
413 r = await self._apply_update(
414 action,
415 existing,
416 bank_id,
417 vector_store,
418 llm_provider,
419 new_source_ids=new_memory_ids,
420 now_iso=now_iso,
421 new_ids_json=new_ids_json,
422 scope=obs_scope,
423 )
424 if r:
425 result.updated += 1
426 else:
427 result.skipped += 1
429 elif act == "delete":
430 obs_id = action.get("obs_id", "")
431 if obs_id and obs_id in obs_by_id:
432 deleted = await vector_store.delete([obs_id], obs_bank)
433 if deleted:
434 result.deleted += 1
435 else:
436 result.skipped += 1
438 except Exception as exc:
439 msg = f"action={act} failed: {exc}"
440 logger.warning("Observation consolidation %s in bank %s: %s", act, bank_id, exc)
441 result.errors.append(msg)
443 except Exception as exc:
444 logger.warning(
445 "Observation consolidation failed for bank %s: %s",
446 bank_id,
447 exc,
448 exc_info=True,
449 )
450 result.errors.append(str(exc))
452 logger.debug(
453 "Observation consolidation bank=%s created=%d updated=%d deleted=%d skipped=%d errors=%d",
454 bank_id,
455 result.created,
456 result.updated,
457 result.deleted,
458 result.skipped,
459 len(result.errors),
460 )
461 return result
463 async def invalidate_sources(
464 self,
465 source_ids: list[str],
466 bank_id: str,
467 vector_store: VectorStore,
468 ) -> int:
469 """Delete observations whose provenance references any source ID."""
470 if not source_ids:
471 return 0
472 source_set = set(source_ids)
473 obs_bank = obs_bank_id(bank_id)
474 deleted = 0
475 offset = 0
476 while True:
477 page = await vector_store.list_vectors(obs_bank, offset=offset, limit=200)
478 if not page:
479 break
480 to_delete: list[str] = []
481 for item in page:
482 metadata = item.metadata or {}
483 try:
484 obs_sources = set(json.loads(str(metadata.get("_obs_source_ids", "[]"))))
485 except (json.JSONDecodeError, TypeError):
486 obs_sources = set()
487 if obs_sources & source_set:
488 to_delete.append(item.id)
489 if to_delete:
490 deleted += await vector_store.delete(to_delete, obs_bank)
491 if len(page) < 200:
492 break
493 offset += len(page)
494 return deleted
496 async def _apply_create(
497 self,
498 action: dict[str, Any],
499 bank_id: str,
500 vector_store: VectorStore,
501 llm_provider: LLMProvider,
502 *,
503 source_ids: list[str],
504 now_iso: str,
505 scope: str,
506 ) -> bool:
507 """Store a new observation."""
508 text = (action.get("text") or "").strip()
509 if not text:
510 return False
511 confidence = float(action.get("confidence", 0.7))
512 if confidence < self.min_confidence:
513 return False
515 vecs = await llm_provider.embed([text])
516 obs_id = uuid.uuid4().hex[:16]
517 target_bank = obs_bank_id(bank_id)
519 from astrocyte.types import VectorItem
521 # M21: per-source timestamps feed compute_trend. At create time,
522 # each source contributes its retain timestamp (≈ now); the list
523 # grows monotonically across future updates via _apply_update.
524 source_timestamps = [now_iso] * len(source_ids)
526 item = VectorItem(
527 id=obs_id,
528 bank_id=target_bank,
529 vector=vecs[0],
530 text=text,
531 fact_type="observation",
532 metadata={
533 "_obs_proof_count": 1,
534 "_obs_source_ids": json.dumps(source_ids),
535 "_obs_source_timestamps": json.dumps(source_timestamps),
536 "_obs_confidence": str(round(confidence, 3)),
537 "_obs_updated_at": now_iso,
538 "_obs_scope": scope,
539 "_obs_freshness": "fresh",
540 "_created_at": now_iso,
541 },
542 retained_at=datetime.now(timezone.utc),
543 )
544 await vector_store.store_vectors([item])
545 return True
547 async def _apply_update(
548 self,
549 action: dict[str, Any],
550 existing: VectorHit | None,
551 bank_id: str,
552 vector_store: VectorStore,
553 llm_provider: LLMProvider,
554 *,
555 new_source_ids: list[str],
556 now_iso: str,
557 new_ids_json: str,
558 scope: str,
559 ) -> bool:
560 """Delete old observation and store the revised version."""
561 text = (action.get("text") or "").strip()
562 if not text:
563 return False
564 confidence = float(action.get("confidence", 0.7))
566 obs_id = action.get("obs_id", "")
568 # Merge proof count and source IDs from the existing observation
569 old_proof = 1
570 old_sources: list[str] = []
571 old_source_timestamps: list[str] = []
572 old_created_at = now_iso
573 old_scope = scope
574 if existing is not None:
575 meta = existing.metadata or {}
576 old_proof = int(meta.get("_obs_proof_count", 1))
577 old_created_at = str(meta.get("_created_at", now_iso))
578 old_scope = str(meta.get("_obs_scope", scope))
579 try:
580 old_sources = json.loads(str(meta.get("_obs_source_ids", "[]")))
581 except (json.JSONDecodeError, TypeError):
582 old_sources = []
583 try:
584 _raw_ts = meta.get("_obs_source_timestamps", "[]")
585 _parsed_ts = json.loads(str(_raw_ts))
586 if isinstance(_parsed_ts, list):
587 old_source_timestamps = [str(t) for t in _parsed_ts if t]
588 except (json.JSONDecodeError, TypeError):
589 old_source_timestamps = []
590 # Delete the stale observation from the obs bank
591 target_bank = obs_bank_id(bank_id)
592 await vector_store.delete([obs_id], target_bank)
594 target_bank = obs_bank_id(bank_id)
595 # Merge source IDs, deduped; merge timestamps by appending new-source
596 # entries (one per new_source_id, all at now). Old timestamps for
597 # legacy observations missing the field are backfilled to old_created_at
598 # so the resulting list is parallel to merged_sources (best-effort).
599 merged_sources_set: set[str] = set()
600 merged_sources: list[str] = []
601 merged_timestamps: list[str] = []
602 # Preserve ordering: old first, then new (only ones not already present).
603 for sid, ts in zip(old_sources, old_source_timestamps or [old_created_at] * len(old_sources)):
604 if sid not in merged_sources_set:
605 merged_sources_set.add(sid)
606 merged_sources.append(sid)
607 merged_timestamps.append(ts)
608 for sid in new_source_ids:
609 if sid not in merged_sources_set:
610 merged_sources_set.add(sid)
611 merged_sources.append(sid)
612 merged_timestamps.append(now_iso)
613 new_proof = old_proof + 1
615 vecs = await llm_provider.embed([text])
616 new_id = uuid.uuid4().hex[:16]
618 from astrocyte.types import VectorItem
620 item = VectorItem(
621 id=new_id,
622 bank_id=target_bank,
623 vector=vecs[0],
624 text=text,
625 fact_type="observation",
626 metadata={
627 "_obs_proof_count": new_proof,
628 "_obs_source_ids": json.dumps(merged_sources),
629 "_obs_source_timestamps": json.dumps(merged_timestamps),
630 "_obs_confidence": str(round(confidence, 3)),
631 "_obs_updated_at": now_iso,
632 "_obs_scope": old_scope,
633 "_obs_freshness": "fresh",
634 "_created_at": old_created_at,
635 },
636 retained_at=datetime.now(timezone.utc),
637 )
638 await vector_store.store_vectors([item])
639 return True