Coverage for astrocyte/pipeline/curated_recall.py: 100%

36 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""Curated recall — post-retrieval re-scoring by freshness, reliability, and salience. 

2 

3Applied after retrieval and fusion, before returning to the caller. 

4Provider-agnostic — works with any storage or engine backend. 

5 

6Sync, pure computation — Rust migration candidate. 

7""" 

8 

9from __future__ import annotations 

10 

11import math 

12from datetime import datetime, timezone 

13 

14from astrocyte.types import MemoryHit 

15 

16 

17def curate_recall_hits( 

18 hits: list[MemoryHit], 

19 *, 

20 freshness_weight: float = 0.3, 

21 reliability_weight: float = 0.2, 

22 salience_weight: float = 0.2, 

23 original_score_weight: float = 0.3, 

24 freshness_half_life_days: float = 30.0, 

25 min_score: float | None = None, 

26) -> list[MemoryHit]: 

27 """Re-score recall hits by freshness, reliability, and salience. 

28 

29 Combines original retrieval score with: 

30 - Freshness: exponential decay based on occurred_at 

31 - Reliability: metadata-based scoring (source trust, fact_type) 

32 - Salience: memory_layer boosting (models > observations > facts) 

33 

34 Returns re-ranked hits. Optionally filters below min_score. 

35 Sync, pure computation — Rust migration candidate. 

36 """ 

37 if not hits: 

38 return [] 

39 

40 now = datetime.now(timezone.utc) 

41 half_life_seconds = freshness_half_life_days * 86400.0 

42 

43 scored: list[tuple[float, MemoryHit]] = [] 

44 

45 for hit in hits: 

46 # Freshness: decay from occurred_at (or assume recent if missing) 

47 if hit.occurred_at: 

48 age_seconds = max(0.0, (now - hit.occurred_at).total_seconds()) 

49 freshness = math.exp(-0.693 * age_seconds / max(half_life_seconds, 1.0)) 

50 else: 

51 freshness = 0.5 # Unknown age → neutral 

52 

53 # Reliability: based on fact_type and source 

54 reliability = _reliability_score(hit) 

55 

56 # Salience: based on memory_layer 

57 salience = _salience_score(hit) 

58 

59 # Composite score 

60 composite = ( 

61 original_score_weight * hit.score 

62 + freshness_weight * freshness 

63 + reliability_weight * reliability 

64 + salience_weight * salience 

65 ) 

66 

67 scored.append((composite, hit)) 

68 

69 # Sort by composite score descending 

70 scored.sort(key=lambda x: x[0], reverse=True) 

71 

72 # Update scores on hits 

73 result: list[MemoryHit] = [] 

74 for composite, hit in scored: 

75 if min_score is not None and composite < min_score: 

76 continue 

77 # Create new MemoryHit with updated score (preserve all other fields) 

78 from dataclasses import replace 

79 

80 result.append(replace(hit, score=composite)) 

81 

82 return result 

83 

84 

85def _reliability_score(hit: MemoryHit) -> float: 

86 """Score reliability based on fact_type and metadata. 

87 

88 Higher for experience (first-hand) > world (general) > observation (derived). 

89 """ 

90 type_scores = { 

91 "experience": 0.9, 

92 "world": 0.7, 

93 "observation": 0.6, 

94 "model": 0.5, 

95 } 

96 base = type_scores.get(hit.fact_type or "", 0.5) 

97 

98 # Boost if source is specified (provenance exists) 

99 if hit.source: 

100 base = min(1.0, base + 0.1) 

101 

102 return base 

103 

104 

105def _salience_score(hit: MemoryHit) -> float: 

106 """Score salience based on memory_layer. 

107 

108 Models > observations > facts (higher layers = more curated knowledge). 

109 """ 

110 layer_scores = { 

111 "model": 1.0, 

112 "observation": 0.75, 

113 "fact": 0.5, 

114 } 

115 return layer_scores.get(hit.memory_layer or "", 0.5)