Coverage for astrocyte/pipeline/observation.py: 88%

237 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""Observation consolidation — semantic deduplication and synthesis layer. 

2 

3After each ``retain()`` call, this module fires a background LLM pass that 

4maintains a deduplicated *observations layer* on top of raw memories. The 

5design is inspired by the Hindsight memory system (vectorize-io/hindsight, 

6MIT licence) and adapted to Astrocyte's existing multi-strategy RRF pipeline. 

7 

8Terminology 

9----------- 

10- **Raw memory** — a chunk stored verbatim by ``retain()``; ``fact_type`` is 

11 ``"world"``, ``"experience"``, or ``None``. 

12- **Observation** — a distilled, deduplicated atomic fact synthesized from one 

13 or more raw memories; stored as a ``VectorItem`` with ``fact_type="observation"`` 

14 and extra bookkeeping metadata (``_obs_proof_count``, ``_obs_source_ids``, 

15 ``_obs_confidence``). 

16 

17Lifecycle 

18--------- 

191. ``retain()`` stores raw chunks as usual. 

202. Immediately after, ``ObservationConsolidator.consolidate()`` is called 

21 (fire-and-forget by default; awaitable for tests). 

223. The consolidator fetches the top-K semantically-related existing 

23 observations, then calls the LLM to produce structured 

24 ``create`` / ``update`` / ``delete`` actions. 

254. Actions are applied to the vector store: 

26 - ``create`` → new ``VectorItem(fact_type="observation", ...)`` 

27 - ``update`` → delete the old item, insert a revised one with incremented 

28 ``_obs_proof_count`` and merged ``_obs_source_ids`` 

29 - ``delete`` → remove the observation from the store 

305. During ``recall()``, the orchestrator runs an additional ``observation`` 

31 strategy (``search_similar`` with ``fact_types=["observation"]``) and 

32 injects its results into the RRF fusion with a configurable weight boost, 

33 so confirmed multi-evidence observations naturally rank above single-mention 

34 raw memories. 

35 

36Prompt design 

37------------- 

38The LLM receives a brief of existing observations (with proof counts) and 

39the new memory, then outputs a JSON array of actions. The prompt is kept 

40deliberately short to minimise token cost — the consolidator is in the 

41critical path of every retain, even in fire-and-forget mode. A ``max_obs`` 

42cap (default 10) limits context window growth. 

43 

44Metadata schema on observation VectorItems 

45------------------------------------------ 

46``_obs_proof_count`` int (stored as int) — # supporting memories 

47``_obs_source_ids`` str (JSON list) — raw memory IDs that contributed 

48``_obs_source_timestamps`` str (JSON list) — ISO timestamps parallel to source_ids; feeds :func:`astrocyte.pipeline.trend.compute_trend` (M21) 

49``_obs_confidence`` str (float, 0–1) — LLM-assigned confidence 

50``_obs_updated_at`` str (ISO datetime) — last mutation timestamp 

51``_obs_scope`` str — scope key ("bank" or caller tag) 

52``_obs_freshness`` str — "fresh" | "stale" 

53""" 

54 

55from __future__ import annotations 

56 

57import json 

58import logging 

59import uuid 

60from dataclasses import dataclass, field 

61from datetime import datetime, timezone 

62from typing import TYPE_CHECKING, Any 

63 

64if TYPE_CHECKING: 

65 from astrocyte.pipeline.trend import Trend 

66 from astrocyte.provider import LLMProvider, VectorStore 

67 from astrocyte.types import VectorHit 

68 

69logger = logging.getLogger("astrocyte.pipeline.observation") 

70 

71# --------------------------------------------------------------------------- 

72# Action types 

73# --------------------------------------------------------------------------- 

74 

75_VALID_ACTIONS = frozenset({"create", "update", "delete"}) 

76 

77# --------------------------------------------------------------------------- 

78# Observation bank naming 

79# --------------------------------------------------------------------------- 

80 

81_OBS_SUFFIX = "::obs" 

82 

83 

84def obs_bank_id(bank_id: str) -> str: 

85 """Return the dedicated observation bank for ``bank_id``. 

86 

87 Observations are stored in a *separate* bank (``{bank_id}::obs``) so that 

88 the main semantic/keyword/temporal strategies never retrieve them. This 

89 prevents observations from double-counting in RRF fusion (they would 

90 otherwise appear in both the ``semantic`` results and the ``observation`` 

91 strategy results, crowding out the raw source memories that carry verbatim 

92 answers). 

93 """ 

94 return f"{bank_id}{_OBS_SUFFIX}" 

95 

96 

97def observation_scope(bank_id: str, tags: list[str] | None = None) -> str: 

98 """Return a stable observation scope for a retain/recall context.""" 

99 if tags: 

100 return "|".join(sorted(str(tag) for tag in tags)) 

101 return f"bank:{bank_id}" 

102 

103 

104# --------------------------------------------------------------------------- 

105# Result 

106# --------------------------------------------------------------------------- 

107 

108 

109@dataclass 

110class ObservationConsolidationResult: 

111 created: int = 0 

112 updated: int = 0 

113 deleted: int = 0 

114 skipped: int = 0 

115 errors: list[str] = field(default_factory=list) 

116 

117 

118def compute_observation_trend( 

119 metadata: dict[str, Any] | None, 

120 *, 

121 now: datetime | None = None, 

122) -> "Trend": 

123 """Compute :class:`~astrocyte.pipeline.trend.Trend` for a stored observation. 

124 

125 Reads ``_obs_source_timestamps`` (the per-source ISO-timestamp list 

126 populated by :meth:`ObservationConsolidator._apply_create` / 

127 :meth:`_apply_update`) and feeds it to 

128 :func:`astrocyte.pipeline.trend.compute_trend`. 

129 

130 Fallback path when ``_obs_source_timestamps`` is absent (legacy 

131 observations created before M21 / external writers): use 

132 ``_obs_updated_at`` as a single-point timestamp. This is coarser 

133 than per-source tracking but never returns garbage — 

134 :func:`compute_trend` will classify it as NEW / STALE based on the 

135 one timestamp's age. 

136 """ 

137 from astrocyte.pipeline.trend import Trend, compute_trend 

138 

139 meta = metadata or {} 

140 raw = meta.get("_obs_source_timestamps") 

141 timestamps: list[datetime] = [] 

142 if isinstance(raw, str) and raw.strip(): 

143 try: 

144 parsed = json.loads(raw) 

145 if isinstance(parsed, list): 

146 for ts in parsed: 

147 if isinstance(ts, str) and ts: 

148 try: 

149 timestamps.append(datetime.fromisoformat(ts.replace("Z", "+00:00"))) 

150 except ValueError: 

151 continue 

152 except json.JSONDecodeError: 

153 # Malformed metadata blob — fall through to the legacy 

154 # single-timestamp path below. The trend computation 

155 # degrades to STALE if nothing else parses either. 

156 pass 

157 

158 if not timestamps: 

159 # Legacy fallback: use the updated_at as a single-point timestamp. 

160 fallback = meta.get("_obs_updated_at") or meta.get("_created_at") 

161 if isinstance(fallback, str) and fallback: 

162 try: 

163 timestamps.append(datetime.fromisoformat(fallback.replace("Z", "+00:00"))) 

164 except ValueError: 

165 # Malformed fallback timestamp — leave ``timestamps`` 

166 # empty so the next check returns Trend.STALE. 

167 pass 

168 

169 if not timestamps: 

170 return Trend.STALE 

171 return compute_trend(timestamps, now=now) 

172 

173 

174# --------------------------------------------------------------------------- 

175# Prompt 

176# --------------------------------------------------------------------------- 

177 

178_SYSTEM_PROMPT = """\ 

179You are a memory consolidation agent. Your job is to maintain a concise, \ 

180deduplicated set of *observations* — atomic facts distilled from raw memories. 

181 

182Given: 

183- A list of existing observations (each with an ID, proof count, and text). 

184- A new memory to integrate. 

185 

186Output a JSON array of actions. Each action is one of: 

187 {"action": "create", "text": "<observation>", "confidence": <0.0–1.0>} 

188 {"action": "update", "obs_id": "<id>", "text": "<revised observation>", \ 

189"confidence": <0.0–1.0>} 

190 {"action": "delete", "obs_id": "<id>"} 

191 

192Rules (Hindsight-style three-strategy reconciliation): 

1931. REDUNDANT — same fact in different words: UPDATE the existing observation \ 

194to the cleaner phrasing and raise confidence. Don't create a near-duplicate. 

1952. STATE UPDATE — new info supersedes old state: UPDATE the observation to \ 

196preserve the *journey*. Use phrases like "used to X, now Y" or "changed from \ 

197X to Y" so the temporal evolution is captured. Never silently overwrite — \ 

198the agent must be able to answer questions about the prior state. 

1993. DIRECT CONTRADICTION (rare) — irreconcilable claim with no temporal frame: \ 

200DELETE the old observation only when the new memory definitively supersedes \ 

201it AND no journey can be expressed (e.g. an outright correction of a wrong \ 

202fact). When in doubt, prefer UPDATE-with-journey over DELETE. 

2034. CREATE if the new memory introduces a fact not already covered. 

2045. If the new memory is fully redundant *and* the existing observation \ 

205already captures it precisely, respond with []. 

2066. Preserve stable persona and preference facts: identities, goals, hobbies, \ 

207relationships, repeated activities, values, career plans, and stated likes/dislikes. 

208 

209Constraints: 

210- Each observation must be a single, independently-useful sentence (≤ 30 words). 

211- Output valid JSON only. No prose before or after the array. 

212- Maximum 5 actions per call. 

213""" 

214 

215 

216def _build_user_prompt( 

217 existing_obs: list[VectorHit], 

218 new_memory_text: str, 

219 *, 

220 max_obs: int = 10, 

221) -> str: 

222 lines: list[str] = ["Existing observations:"] 

223 if existing_obs: 

224 for obs in existing_obs[:max_obs]: 

225 proof = (obs.metadata or {}).get("_obs_proof_count", 1) 

226 conf = (obs.metadata or {}).get("_obs_confidence", "?") 

227 lines.append(f"[{obs.id}] (proof_count={proof}, confidence={conf}): {obs.text}") 

228 else: 

229 lines.append("(none)") 

230 lines.append("") 

231 lines.append("New memory:") 

232 lines.append(new_memory_text.strip()) 

233 lines.append("") 

234 lines.append("Actions (JSON array):") 

235 return "\n".join(lines) 

236 

237 

238# --------------------------------------------------------------------------- 

239# Action parsing 

240# --------------------------------------------------------------------------- 

241 

242 

243def _parse_actions(raw: str) -> list[dict[str, Any]]: 

244 """Extract the JSON array from the LLM response. 

245 

246 The LLM is instructed to output *only* a JSON array, but may include 

247 leading/trailing whitespace or a markdown code fence. We extract the 

248 first ``[...`` block. 

249 """ 

250 text = raw.strip() 

251 # Strip markdown code fences if present 

252 if text.startswith("```"): 

253 lines = text.splitlines() 

254 text = "\n".join(line for line in lines if not line.strip().startswith("```")).strip() 

255 

256 # Find the first JSON array 

257 start = text.find("[") 

258 end = text.rfind("]") 

259 if start == -1 or end == -1 or end < start: 

260 logger.debug("No JSON array found in consolidation response: %r", raw[:200]) 

261 return [] 

262 

263 try: 

264 actions = json.loads(text[start : end + 1]) 

265 except json.JSONDecodeError as exc: 

266 logger.warning("JSON parse error in consolidation response: %s — %r", exc, raw[:200]) 

267 return [] 

268 

269 if not isinstance(actions, list): 

270 return [] 

271 

272 validated: list[dict[str, Any]] = [] 

273 for item in actions: 

274 if not isinstance(item, dict): 

275 continue 

276 action = item.get("action") 

277 if action not in _VALID_ACTIONS: 

278 continue 

279 validated.append(item) 

280 

281 return validated 

282 

283 

284# --------------------------------------------------------------------------- 

285# Core consolidator 

286# --------------------------------------------------------------------------- 

287 

288 

289class ObservationConsolidator: 

290 """Maintains the observations layer via post-retain LLM consolidation. 

291 

292 Parameters 

293 ---------- 

294 max_context_obs: 

295 Maximum number of existing observations fetched for the LLM prompt. 

296 Higher values give the LLM more context but increase token cost. 

297 min_confidence: 

298 Observations with confidence below this threshold are not stored. 

299 observation_recall_limit: 

300 How many existing observations to fetch per consolidation call. 

301 This is the ``limit`` passed to ``vector_store.search_similar``. 

302 """ 

303 

304 def __init__( 

305 self, 

306 *, 

307 max_context_obs: int = 10, 

308 min_confidence: float = 0.5, 

309 observation_recall_limit: int = 15, 

310 ) -> None: 

311 self.max_context_obs = max_context_obs 

312 self.min_confidence = min_confidence 

313 self.observation_recall_limit = observation_recall_limit 

314 

315 async def consolidate( 

316 self, 

317 new_memory_text: str, 

318 new_memory_ids: list[str], 

319 bank_id: str, 

320 vector_store: VectorStore, 

321 llm_provider: LLMProvider, 

322 *, 

323 query_vector: list[float] | None = None, 

324 scope: str | None = None, 

325 ) -> ObservationConsolidationResult: 

326 """Integrate a new memory into the observations layer. 

327 

328 Args: 

329 new_memory_text: The raw text of the newly retained memory (or 

330 the first chunk when a retain produces multiple chunks — the 

331 consolidator works at the retain-call level, not chunk level). 

332 new_memory_ids: IDs of the vector items stored in this retain call. 

333 bank_id: The memory bank being updated. 

334 vector_store: Backing store (must support ``search_similar`` and 

335 ``store_vectors``). 

336 llm_provider: LLM used for the consolidation call. 

337 query_vector: Pre-computed embedding of ``new_memory_text``. When 

338 ``None``, the method embeds the text itself. Pass the vector 

339 already computed during retain to avoid a redundant embedding 

340 call. 

341 """ 

342 result = ObservationConsolidationResult() 

343 obs_scope = scope or observation_scope(bank_id) 

344 

345 try: 

346 # 1. Embed the new memory (or reuse the caller's vector) 

347 if query_vector is None: 

348 vecs = await llm_provider.embed([new_memory_text]) 

349 query_vector = vecs[0] 

350 

351 # 2. Fetch semantically-related existing observations from the 

352 # dedicated observation bank (separate from the raw memory bank). 

353 from astrocyte.types import VectorFilters 

354 

355 obs_bank = obs_bank_id(bank_id) 

356 existing_obs = await vector_store.search_similar( 

357 query_vector, 

358 obs_bank, 

359 limit=self.observation_recall_limit, 

360 filters=VectorFilters(bank_id=obs_bank), 

361 ) 

362 

363 # 3. Build and execute the LLM prompt 

364 user_prompt = _build_user_prompt( 

365 existing_obs, 

366 new_memory_text, 

367 max_obs=self.max_context_obs, 

368 ) 

369 from astrocyte.types import Message 

370 

371 completion = await llm_provider.complete( 

372 messages=[ 

373 Message(role="system", content=_SYSTEM_PROMPT), 

374 Message(role="user", content=user_prompt), 

375 ], 

376 max_tokens=512, 

377 temperature=0.0, 

378 ) 

379 

380 # 4. Parse actions 

381 actions = _parse_actions(completion.text) 

382 if not actions: 

383 return result 

384 

385 # Build a quick lookup: obs_id → VectorHit 

386 obs_by_id: dict[str, VectorHit] = {h.id: h for h in existing_obs} 

387 

388 # 5. Apply actions 

389 now_iso = datetime.now(timezone.utc).isoformat() 

390 new_ids_json = json.dumps(new_memory_ids) 

391 

392 for action in actions: 

393 act = action.get("action") 

394 try: 

395 if act == "create": 

396 r = await self._apply_create( 

397 action, 

398 bank_id, 

399 vector_store, 

400 llm_provider, 

401 source_ids=new_memory_ids, 

402 now_iso=now_iso, 

403 scope=obs_scope, 

404 ) 

405 if r: 

406 result.created += 1 

407 else: 

408 result.skipped += 1 

409 

410 elif act == "update": 

411 obs_id = action.get("obs_id", "") 

412 existing = obs_by_id.get(obs_id) 

413 r = await self._apply_update( 

414 action, 

415 existing, 

416 bank_id, 

417 vector_store, 

418 llm_provider, 

419 new_source_ids=new_memory_ids, 

420 now_iso=now_iso, 

421 new_ids_json=new_ids_json, 

422 scope=obs_scope, 

423 ) 

424 if r: 

425 result.updated += 1 

426 else: 

427 result.skipped += 1 

428 

429 elif act == "delete": 

430 obs_id = action.get("obs_id", "") 

431 if obs_id and obs_id in obs_by_id: 

432 deleted = await vector_store.delete([obs_id], obs_bank) 

433 if deleted: 

434 result.deleted += 1 

435 else: 

436 result.skipped += 1 

437 

438 except Exception as exc: 

439 msg = f"action={act} failed: {exc}" 

440 logger.warning("Observation consolidation %s in bank %s: %s", act, bank_id, exc) 

441 result.errors.append(msg) 

442 

443 except Exception as exc: 

444 logger.warning( 

445 "Observation consolidation failed for bank %s: %s", 

446 bank_id, 

447 exc, 

448 exc_info=True, 

449 ) 

450 result.errors.append(str(exc)) 

451 

452 logger.debug( 

453 "Observation consolidation bank=%s created=%d updated=%d deleted=%d skipped=%d errors=%d", 

454 bank_id, 

455 result.created, 

456 result.updated, 

457 result.deleted, 

458 result.skipped, 

459 len(result.errors), 

460 ) 

461 return result 

462 

463 async def invalidate_sources( 

464 self, 

465 source_ids: list[str], 

466 bank_id: str, 

467 vector_store: VectorStore, 

468 ) -> int: 

469 """Delete observations whose provenance references any source ID.""" 

470 if not source_ids: 

471 return 0 

472 source_set = set(source_ids) 

473 obs_bank = obs_bank_id(bank_id) 

474 deleted = 0 

475 offset = 0 

476 while True: 

477 page = await vector_store.list_vectors(obs_bank, offset=offset, limit=200) 

478 if not page: 

479 break 

480 to_delete: list[str] = [] 

481 for item in page: 

482 metadata = item.metadata or {} 

483 try: 

484 obs_sources = set(json.loads(str(metadata.get("_obs_source_ids", "[]")))) 

485 except (json.JSONDecodeError, TypeError): 

486 obs_sources = set() 

487 if obs_sources & source_set: 

488 to_delete.append(item.id) 

489 if to_delete: 

490 deleted += await vector_store.delete(to_delete, obs_bank) 

491 if len(page) < 200: 

492 break 

493 offset += len(page) 

494 return deleted 

495 

496 async def _apply_create( 

497 self, 

498 action: dict[str, Any], 

499 bank_id: str, 

500 vector_store: VectorStore, 

501 llm_provider: LLMProvider, 

502 *, 

503 source_ids: list[str], 

504 now_iso: str, 

505 scope: str, 

506 ) -> bool: 

507 """Store a new observation.""" 

508 text = (action.get("text") or "").strip() 

509 if not text: 

510 return False 

511 confidence = float(action.get("confidence", 0.7)) 

512 if confidence < self.min_confidence: 

513 return False 

514 

515 vecs = await llm_provider.embed([text]) 

516 obs_id = uuid.uuid4().hex[:16] 

517 target_bank = obs_bank_id(bank_id) 

518 

519 from astrocyte.types import VectorItem 

520 

521 # M21: per-source timestamps feed compute_trend. At create time, 

522 # each source contributes its retain timestamp (≈ now); the list 

523 # grows monotonically across future updates via _apply_update. 

524 source_timestamps = [now_iso] * len(source_ids) 

525 

526 item = VectorItem( 

527 id=obs_id, 

528 bank_id=target_bank, 

529 vector=vecs[0], 

530 text=text, 

531 fact_type="observation", 

532 metadata={ 

533 "_obs_proof_count": 1, 

534 "_obs_source_ids": json.dumps(source_ids), 

535 "_obs_source_timestamps": json.dumps(source_timestamps), 

536 "_obs_confidence": str(round(confidence, 3)), 

537 "_obs_updated_at": now_iso, 

538 "_obs_scope": scope, 

539 "_obs_freshness": "fresh", 

540 "_created_at": now_iso, 

541 }, 

542 retained_at=datetime.now(timezone.utc), 

543 ) 

544 await vector_store.store_vectors([item]) 

545 return True 

546 

547 async def _apply_update( 

548 self, 

549 action: dict[str, Any], 

550 existing: VectorHit | None, 

551 bank_id: str, 

552 vector_store: VectorStore, 

553 llm_provider: LLMProvider, 

554 *, 

555 new_source_ids: list[str], 

556 now_iso: str, 

557 new_ids_json: str, 

558 scope: str, 

559 ) -> bool: 

560 """Delete old observation and store the revised version.""" 

561 text = (action.get("text") or "").strip() 

562 if not text: 

563 return False 

564 confidence = float(action.get("confidence", 0.7)) 

565 

566 obs_id = action.get("obs_id", "") 

567 

568 # Merge proof count and source IDs from the existing observation 

569 old_proof = 1 

570 old_sources: list[str] = [] 

571 old_source_timestamps: list[str] = [] 

572 old_created_at = now_iso 

573 old_scope = scope 

574 if existing is not None: 

575 meta = existing.metadata or {} 

576 old_proof = int(meta.get("_obs_proof_count", 1)) 

577 old_created_at = str(meta.get("_created_at", now_iso)) 

578 old_scope = str(meta.get("_obs_scope", scope)) 

579 try: 

580 old_sources = json.loads(str(meta.get("_obs_source_ids", "[]"))) 

581 except (json.JSONDecodeError, TypeError): 

582 old_sources = [] 

583 try: 

584 _raw_ts = meta.get("_obs_source_timestamps", "[]") 

585 _parsed_ts = json.loads(str(_raw_ts)) 

586 if isinstance(_parsed_ts, list): 

587 old_source_timestamps = [str(t) for t in _parsed_ts if t] 

588 except (json.JSONDecodeError, TypeError): 

589 old_source_timestamps = [] 

590 # Delete the stale observation from the obs bank 

591 target_bank = obs_bank_id(bank_id) 

592 await vector_store.delete([obs_id], target_bank) 

593 

594 target_bank = obs_bank_id(bank_id) 

595 # Merge source IDs, deduped; merge timestamps by appending new-source 

596 # entries (one per new_source_id, all at now). Old timestamps for 

597 # legacy observations missing the field are backfilled to old_created_at 

598 # so the resulting list is parallel to merged_sources (best-effort). 

599 merged_sources_set: set[str] = set() 

600 merged_sources: list[str] = [] 

601 merged_timestamps: list[str] = [] 

602 # Preserve ordering: old first, then new (only ones not already present). 

603 for sid, ts in zip(old_sources, old_source_timestamps or [old_created_at] * len(old_sources)): 

604 if sid not in merged_sources_set: 

605 merged_sources_set.add(sid) 

606 merged_sources.append(sid) 

607 merged_timestamps.append(ts) 

608 for sid in new_source_ids: 

609 if sid not in merged_sources_set: 

610 merged_sources_set.add(sid) 

611 merged_sources.append(sid) 

612 merged_timestamps.append(now_iso) 

613 new_proof = old_proof + 1 

614 

615 vecs = await llm_provider.embed([text]) 

616 new_id = uuid.uuid4().hex[:16] 

617 

618 from astrocyte.types import VectorItem 

619 

620 item = VectorItem( 

621 id=new_id, 

622 bank_id=target_bank, 

623 vector=vecs[0], 

624 text=text, 

625 fact_type="observation", 

626 metadata={ 

627 "_obs_proof_count": new_proof, 

628 "_obs_source_ids": json.dumps(merged_sources), 

629 "_obs_source_timestamps": json.dumps(merged_timestamps), 

630 "_obs_confidence": str(round(confidence, 3)), 

631 "_obs_updated_at": now_iso, 

632 "_obs_scope": old_scope, 

633 "_obs_freshness": "fresh", 

634 "_created_at": old_created_at, 

635 }, 

636 retained_at=datetime.now(timezone.utc), 

637 ) 

638 await vector_store.store_vectors([item]) 

639 return True