Coverage for astrocyte/pipeline/curated_retain.py: 76%
58 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""LLM-curated retain — the reasoning LLM decides what/how to store.
3Inspired by ByteRover: instead of mechanical chunk+embed, the LLM analyzes
4incoming content against existing memories and decides ADD/UPDATE/MERGE/SKIP/DELETE.
6Async (requires LLM calls).
7"""
9from __future__ import annotations
11import json
12import logging
13from dataclasses import dataclass
14from typing import TYPE_CHECKING
16from astrocyte.types import MemoryHit, Message
18if TYPE_CHECKING:
19 from astrocyte.provider import LLMProvider
21logger = logging.getLogger("astrocyte.pipeline")
24@dataclass
25class CurationDecision:
26 """Result of LLM curation analysis."""
28 action: str # "add" | "update" | "merge" | "skip" | "delete"
29 content: str # Processed content (may be rewritten by LLM)
30 memory_layer: str # "fact" | "observation" | "model"
31 reasoning: str # LLM's explanation for the decision
32 merge_target_id: str | None = None # Memory ID to merge with (for "merge" and "update")
35_CURATION_SYSTEM_PROMPT = """You are a memory curation agent. Analyze new content against existing memories and decide the best action.
37## Actions available:
38- ADD: Store as a new memory (genuinely new information)
39- UPDATE: Replace an existing memory with updated information (specify which memory_id)
40- MERGE: Combine with an existing memory into a richer entry (specify which memory_id)
41- SKIP: Don't store (redundant, low-value, or noise)
42- DELETE: The new content contradicts/supersedes old info — delete the old memory (specify which memory_id)
44## Memory layers:
45- fact: Raw factual information
46- observation: A pattern or insight derived from multiple facts
47- model: A consolidated understanding or mental model
49Respond with a JSON object:
50{"action": "add|update|merge|skip|delete", "content": "processed content to store", "memory_layer": "fact|observation|model", "reasoning": "why this action", "merge_target_id": "memory_id or null"}"""
53async def curate_retain(
54 new_content: str,
55 existing_memories: list[MemoryHit],
56 llm_provider: LLMProvider,
57 *,
58 model: str | None = None,
59) -> CurationDecision:
60 """Ask the LLM to curate a retain operation.
62 Analyzes new content against existing similar memories and decides
63 the best action (ADD/UPDATE/MERGE/SKIP/DELETE) + memory layer classification.
65 Returns CurationDecision. Falls back to ADD with memory_layer="fact" on failure.
66 """
67 # Format existing memories for the prompt (only IDs and text, no raw interpolation)
68 if existing_memories:
69 existing_text = "\n".join(
70 f"- [{m.memory_id or 'unknown'}] (score={m.score:.2f}): {m.text[:200]}" for m in existing_memories[:5]
71 )
72 else:
73 existing_text = "(no existing memories in this bank)"
75 # Collect valid memory IDs for validation
76 valid_ids = {m.memory_id for m in existing_memories if m.memory_id}
78 user_msg = (
79 f"<existing_memories>\n{existing_text}\n</existing_memories>\n\n"
80 f"<new_content>\n{new_content[:2000]}\n</new_content>"
81 )
83 try:
84 completion = await llm_provider.complete(
85 messages=[
86 Message(role="system", content=_CURATION_SYSTEM_PROMPT),
87 Message(role="user", content=user_msg),
88 ],
89 model=model,
90 max_tokens=500,
91 temperature=0.0,
92 )
93 return _parse_curation_response(completion.text, new_content, valid_ids)
94 except Exception:
95 logger.warning("LLM curation failed, defaulting to ADD")
96 return CurationDecision(
97 action="add",
98 content=new_content,
99 memory_layer="fact",
100 reasoning="LLM curation failed, defaulting to ADD",
101 )
104def _parse_curation_response(response: str, original_content: str, valid_memory_ids: set[str]) -> CurationDecision:
105 """Parse the LLM's curation response JSON.
107 Validates merge_target_id against known memory IDs.
108 Falls back to ADD if destructive action references unknown ID.
109 """
110 try:
111 text = response.strip()
112 # Handle markdown code blocks
113 if "```" in text:
114 start = text.index("```") + 3
115 if text[start:].startswith("json"):
116 start += 4
117 end = text.index("```", start)
118 text = text[start:end].strip()
120 data = json.loads(text)
121 if not isinstance(data, dict):
122 raise ValueError("Expected JSON object")
124 action = data.get("action", "add").lower()
125 if action not in ("add", "update", "merge", "skip", "delete"):
126 action = "add"
128 memory_layer = data.get("memory_layer", "fact").lower()
129 if memory_layer not in ("fact", "observation", "model"):
130 memory_layer = "fact"
132 merge_target_id = data.get("merge_target_id")
134 # Validate merge_target_id for destructive actions
135 if action in ("update", "merge", "delete") and merge_target_id:
136 if merge_target_id not in valid_memory_ids:
137 logger.warning(
138 "LLM returned merge_target_id '%s' not in valid memory IDs, falling back to ADD",
139 merge_target_id,
140 )
141 action = "add"
142 merge_target_id = None
144 # Destructive actions without a target are invalid
145 if action in ("update", "merge", "delete") and not merge_target_id:
146 logger.warning("LLM returned '%s' action without merge_target_id, falling back to ADD", action)
147 action = "add"
149 # Use original content — don't let LLM rewrite stored content
150 content = original_content
152 return CurationDecision(
153 action=action,
154 content=content,
155 memory_layer=memory_layer,
156 reasoning=data.get("reasoning", ""),
157 merge_target_id=merge_target_id,
158 )
159 except (json.JSONDecodeError, ValueError):
160 return CurationDecision(
161 action="add",
162 content=original_content,
163 memory_layer="fact",
164 reasoning="Failed to parse LLM response, defaulting to ADD",
165 )