Coverage for astrocyte/pipeline/entity_resolution.py: 82%
254 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""M11: Entity resolution pipeline stage.
3``EntityResolver.resolve()`` takes newly-extracted entities from a retain
4call and, for each one, looks for existing candidates in the graph store
5that might be the same entity under a different surface form. Resolution
6runs as a **tiered cascade** so the LLM is called only for genuinely
7ambiguous pairs:
91. ``find_entity_candidates`` produces a list of candidates whose names
10 pass the adapter's first-stage similarity filter.
112. **String-similarity tier** (this module): each (new, candidate) pair is
12 scored with :func:`difflib.SequenceMatcher` ratio on lowercase-stripped
13 names.
14 - ``ratio >= autolink_threshold`` (default 0.95) — names are effectively
15 identical (case differences, trailing whitespace, minor variants).
16 Link is written **without** an LLM call. This is the common case for
17 conversational benchmarks where the same person is mentioned by the
18 same name across many sessions.
19 - ``ratio < skip_threshold`` (default 0.6) — names are clearly distinct
20 despite the candidate filter matching them. Skip without an LLM call.
21 - Otherwise the pair is genuinely ambiguous and falls through to tier 3.
223. **LLM disambiguation tier**: only the pairs in the ambiguous middle
23 band are sent to the LLM with a verbatim-evidence prompt.
25This cuts LLM call volume by 95-99% on workloads with consistent naming
26(LoCoMo, conversational memory benchmarks) while preserving full LLM
27disambiguation on the genuinely hard cases.
29Design goals
30------------
31- **Opt-in**: resolver is instantiated only when
32 ``entity_resolution: enabled: true`` is in config. A ``None`` resolver
33 in the orchestrator means the stage is a no-op.
34- **Bounded cost**: at most ``max_candidates_per_entity`` LLM calls per
35 newly-extracted entity, controlled by the caller.
36- **Testable**: all I/O goes through the ``GraphStore`` and ``LLMProvider``
37 SPIs; ``InMemoryGraphStore`` + ``MockLLMProvider`` cover the full path.
38"""
40from __future__ import annotations
42import asyncio
43import logging
44from datetime import datetime, timezone
45from difflib import SequenceMatcher
46from typing import TYPE_CHECKING
48from astrocyte.types import Entity, EntityCandidateMatch, EntityLink, Message
50if TYPE_CHECKING:
51 from astrocyte.provider import GraphStore, LLMProvider
53_logger = logging.getLogger("astrocyte.entity_resolution")
55# ---------------------------------------------------------------------------
56# Prompt
57# ---------------------------------------------------------------------------
59_SYSTEM_PROMPT = """\
60You are an entity resolution assistant.
62You will be given:
63- ENTITY A: a named entity extracted from a new piece of text.
64- ENTITY B: a candidate entity already in the knowledge graph.
65- SOURCE TEXT: the text that contained Entity A.
67Your task: decide whether Entity A and Entity B refer to the same real-world
68person, place, organisation, or concept.
70Respond with ONLY valid JSON (no markdown, no preamble):
72{
73 "same_entity": true | false,
74 "confidence": <float 0.0–1.0>,
75 "evidence": "<verbatim quote from SOURCE TEXT that supports your decision>"
76}
78Rules:
79- Set same_entity=true only when you are confident they refer to the same thing.
80- confidence should reflect how certain you are (1.0 = certain, 0.5 = plausible).
81- evidence must be a direct quote from SOURCE TEXT, not a paraphrase.
82- If same_entity=false, set evidence to an empty string.\
83"""
86def _name_similarity(a: str, b: str) -> float:
87 """Case-insensitive string-similarity ratio in ``[0.0, 1.0]``.
89 Uses :class:`difflib.SequenceMatcher` over lowercase-stripped strings.
90 Pure stdlib, microseconds per call. Used as the gate that decides
91 whether a pair needs LLM disambiguation:
93 - ``1.0`` — identical after normalisation
94 - ``> 0.95`` — effectively the same name (e.g. ``"Alice"`` vs ``"Alice "``)
95 - ``0.6 – 0.95`` — ambiguous (e.g. ``"Bob"`` vs ``"Robert"``)
96 - ``< 0.6`` — clearly different (e.g. ``"Alice"`` vs ``"Charlie"``)
97 """
98 a_norm = (a or "").strip().lower()
99 b_norm = (b or "").strip().lower()
100 if not a_norm or not b_norm:
101 return 0.0
102 return SequenceMatcher(None, a_norm, b_norm).ratio()
105def _build_user_message(entity_a: Entity, entity_b: Entity, source_text: str) -> str:
106 aliases_a = f" (also known as: {', '.join(entity_a.aliases)})" if entity_a.aliases else ""
107 aliases_b = f" (also known as: {', '.join(entity_b.aliases)})" if entity_b.aliases else ""
108 return f"ENTITY A: {entity_a.name}{aliases_a}\nENTITY B: {entity_b.name}{aliases_b}\n\nSOURCE TEXT:\n{source_text}"
111# ---------------------------------------------------------------------------
112# EntityResolver
113# ---------------------------------------------------------------------------
116class EntityResolver:
117 """Identifies and persists alias-of links between entities (M11).
119 Args:
120 similarity_threshold: Minimum name-similarity required for a
121 candidate to be sent to the LLM confirmation step.
122 In the in-memory store this is a substring-match gate (0 or 1);
123 in production adapters it is a cosine/edit-distance threshold.
124 confirmation_threshold: Minimum ``confidence`` from the LLM for a
125 link to be written to the graph. Defaults to ``0.75``.
126 max_candidates_per_entity: Hard cap on LLM calls per entity.
127 Guards against runaway cost when the graph is large.
128 """
130 def __init__(
131 self,
132 *,
133 similarity_threshold: float = 0.8,
134 confirmation_threshold: float = 0.75,
135 max_candidates_per_entity: int = 10,
136 autolink_threshold: float = 0.95,
137 skip_threshold: float = 0.5,
138 embedding_autolink_threshold: float = 0.92,
139 autolink_confidence: float = 0.95,
140 trigram_threshold: float = 0.15,
141 composite_threshold: float = 0.6,
142 composite_name_weight: float = 0.5,
143 composite_cooccurrence_weight: float = 0.3,
144 composite_temporal_weight: float = 0.2,
145 composite_temporal_window_days: float = 7.0,
146 mention_count_bonus_cap: float = 0.05,
147 mention_count_saturation: int = 50,
148 enable_llm_disambiguation: bool = True,
149 canonical_resolution: bool = False,
150 ) -> None:
151 """Args:
152 similarity_threshold: Passed to ``find_entity_candidates`` —
153 the adapter's first-stage filter (legacy path).
154 confirmation_threshold: Minimum LLM-reported confidence for a
155 disambiguated pair to be linked.
156 max_candidates_per_entity: Hard cap on candidates evaluated per
157 new entity. Default ``10`` — matches Hindsight's natural
158 ``pg_trgm`` candidate count. The cascade's cheap tiers
159 evaluate all of them; only the genuinely-ambiguous middle
160 band reaches the LLM.
161 autolink_threshold: Trigram name-similarity at or above which a
162 pair is auto-linked **without** calling the LLM. Default
163 ``0.95`` — effectively identical names.
164 skip_threshold: Combined-score (max of name and embedding
165 similarity) below which a candidate is dropped **without**
166 an LLM call. Default ``0.5``.
167 embedding_autolink_threshold: Cosine-similarity at or above
168 which the embedding tier autolinks the pair. Catches
169 semantic aliases (``"Bob"`` ↔ ``"Robert"``) that string
170 similarity alone misses. Default ``0.92``.
171 autolink_confidence: Confidence value persisted on alias-of
172 links created via the autolink fast paths. Default ``0.95``.
173 trigram_threshold: Adapter-side prefilter passed to
174 ``find_entity_candidates_scored``. Candidates with name
175 similarity below this are dropped before being returned.
176 Default ``0.15`` matches Hindsight's empirically-tuned
177 threshold — catches substring relationships while staying
178 fully index-based.
179 composite_threshold: Score at or above which the Hindsight-style
180 composite tier autolinks. Composite =
181 ``name_weight * name_sim + cooccurrence_weight * overlap +
182 temporal_weight * decay``. Default ``0.6`` matches Hindsight.
183 composite_name_weight: Weight applied to ``name_similarity`` in
184 the composite score. Default ``0.5``.
185 composite_cooccurrence_weight: Weight applied to the Jaccard-
186 style overlap between the new entity's nearby-entity names
187 and the candidate's stored co-occurring entity names.
188 Default ``0.3``.
189 composite_temporal_weight: Weight applied to a temporal-decay
190 signal — ``max(0, 1 - days_diff / temporal_window_days)``.
191 Default ``0.2``.
192 composite_temporal_window_days: Days over which the temporal
193 decay falls from 1.0 to 0.0. Default ``7.0``.
194 mention_count_bonus_cap: Maximum additive bonus the
195 ``mention_count`` popularity signal contributes to the
196 composite score. Hindsight-parity tiebreaker — capped at
197 ``0.05`` so a popular candidate breaks ties between
198 near-identical composite scores without ever overriding
199 the primary name/cooccurrence/temporal signals on its
200 own. Set to ``0.0`` to disable.
201 mention_count_saturation: Mention count at which the bonus
202 reaches its cap. Bonus scales as
203 ``cap * log1p(count) / log1p(saturation)``. Default
204 ``50`` — popular entities saturate quickly so the signal
205 stops growing once an entity is well-established.
206 enable_llm_disambiguation: When ``True`` (default for backward
207 compat), ambiguous middle-band pairs (composite below
208 threshold, name/embedding combined above ``skip_threshold``)
209 escalate to an LLM call for final disambiguation. Set to
210 ``False`` to match Hindsight's LLM-free design — ambiguous
211 pairs are simply not linked. For LoCoMo-scale benchmarks
212 the cheap tiers (trigram, embedding, composite) handle the
213 common cases; the LLM tier mostly fires on hard cases that
214 don't move benchmark scores and adds significant retain
215 latency.
216 canonical_resolution: When ``True`` (Hindsight-style
217 lookup-then-decide), :meth:`resolve_canonical_ids_in_place`
218 rewrites each new entity's tentative ID to its existing
219 canonical's ID before storage — so different surface forms
220 that resolve to the same canonical never produce duplicate
221 entity rows. The post-storage :meth:`resolve` aliasing
222 pass becomes redundant in this mode. Default ``False``
223 preserves the legacy two-stage flow (store-by-ID then
224 resolve aliases). Orchestrator decides which path to run
225 based on this flag.
226 """
227 if not (0.0 <= skip_threshold <= autolink_threshold <= 1.0):
228 raise ValueError(
229 "Expected 0.0 <= skip_threshold <= autolink_threshold <= 1.0, "
230 f"got skip={skip_threshold}, autolink={autolink_threshold}"
231 )
232 if not (0.0 <= embedding_autolink_threshold <= 1.0):
233 raise ValueError(f"Expected 0.0 <= embedding_autolink_threshold <= 1.0, got {embedding_autolink_threshold}")
234 if composite_temporal_window_days <= 0.0:
235 raise ValueError(f"composite_temporal_window_days must be > 0, got {composite_temporal_window_days}")
236 if mention_count_bonus_cap < 0.0:
237 raise ValueError(f"mention_count_bonus_cap must be >= 0, got {mention_count_bonus_cap}")
238 if mention_count_saturation <= 0:
239 raise ValueError(f"mention_count_saturation must be > 0, got {mention_count_saturation}")
240 self.similarity_threshold = similarity_threshold
241 self.confirmation_threshold = confirmation_threshold
242 self.max_candidates_per_entity = max_candidates_per_entity
243 self.autolink_threshold = autolink_threshold
244 self.skip_threshold = skip_threshold
245 self.embedding_autolink_threshold = embedding_autolink_threshold
246 self.autolink_confidence = autolink_confidence
247 self.trigram_threshold = trigram_threshold
248 self.composite_threshold = composite_threshold
249 self.composite_name_weight = composite_name_weight
250 self.composite_cooccurrence_weight = composite_cooccurrence_weight
251 self.composite_temporal_weight = composite_temporal_weight
252 self.composite_temporal_window_days = composite_temporal_window_days
253 self.mention_count_bonus_cap = mention_count_bonus_cap
254 self.mention_count_saturation = mention_count_saturation
255 self.enable_llm_disambiguation = enable_llm_disambiguation
256 self.canonical_resolution = canonical_resolution
257 # Optional benchmark collector — installed by the eval harness via
258 # ``resolver.metrics_collector = collector``. Hot-path code probes
259 # with ``getattr`` and silently no-ops when absent.
260 self.metrics_collector: object | None = None
262 async def resolve(
263 self,
264 new_entities: list[Entity],
265 source_text: str,
266 bank_id: str,
267 graph_store: GraphStore,
268 llm_provider: LLMProvider,
269 *,
270 event_date: datetime | None = None,
271 ) -> list[EntityLink]:
272 """Run the Hindsight-inspired entity-resolution cascade.
274 Three tiers run in order, escalating only when the cheaper signal
275 is inconclusive:
277 1. **Trigram name similarity** (``find_entity_candidates_scored``).
278 Adapter computes ``pg_trgm.similarity()`` (PostgreSQL) or
279 :class:`difflib.SequenceMatcher` (in-memory) at indexed speed.
280 2. **Embedding cosine similarity** — when the new entity carries
281 an ``embedding`` and the candidate has one stored, the cosine
282 score discriminates semantic aliases (``"Bob"`` / ``"Robert"``)
283 that trigram similarity misses.
284 3. **LLM disambiguation** — only fires for the genuinely ambiguous
285 middle band where neither cheap signal is decisive.
287 Per-candidate decision logic:
289 - ``name_similarity >= autolink_threshold`` → autolink (no LLM)
290 - ``embedding_similarity >= embedding_autolink_threshold`` → autolink
291 - ``max(name, embedding) < skip_threshold`` → drop (no LLM)
292 - otherwise → fall through to LLM disambiguation
294 Adapters that haven't implemented ``find_entity_candidates_scored``
295 yet (it's optional in the SPI) gracefully fall back to the legacy
296 ``find_entity_candidates`` + Python-level scoring path.
298 Returns:
299 All ``EntityLink`` objects that were written to the graph store.
301 Concurrency: each new entity's resolution work — candidate fetch,
302 autolink decisions, LLM disambiguation — runs in parallel via
303 :func:`asyncio.gather`. Within an entity, each candidate's tier
304 decision runs concurrently. This brings retain-time entity
305 resolution from sequential ``O(N×K)`` round-trips to a single
306 async-batched pass.
308 ``event_date`` (when supplied) feeds the composite tier's temporal
309 signal — recent candidates score higher for the same name. Defaults
310 to "now" if not provided. Pass the retain request's ``occurred_at``
311 for benchmark workloads with simulated event timestamps.
312 """
313 if not new_entities:
314 return []
316 # Pre-compute the set of nearby entity names — every other new
317 # entity in this batch is a "nearby" candidate for cooccurrence
318 # overlap with each candidate's existing co_occurs links.
319 nearby_names: set[str] = {(e.name or "").strip().lower() for e in new_entities if e.name and e.name.strip()}
320 effective_event_date = event_date or datetime.now(timezone.utc)
322 per_entity = await asyncio.gather(
323 *[
324 self._resolve_one_entity(
325 entity,
326 source_text,
327 bank_id,
328 graph_store,
329 llm_provider,
330 nearby_names=nearby_names - {(entity.name or "").strip().lower()},
331 event_date=effective_event_date,
332 )
333 for entity in new_entities
334 ]
335 )
336 return [link for sublist in per_entity for link in sublist]
338 async def resolve_canonical_ids_in_place(
339 self,
340 new_entities: list[Entity],
341 bank_id: str,
342 graph_store: GraphStore,
343 *,
344 event_date: datetime | None = None,
345 ) -> dict[str, str]:
346 """Hindsight-style lookup-then-decide canonical resolution.
348 For each new entity, runs the cascade against existing canonicals
349 and — if any tier crosses its threshold — **rewrites the entity's
350 ID to the matched canonical's ID** in place. Entities that don't
351 match keep their tentative ID and will be inserted as new
352 canonicals at storage time.
354 Returns a mapping ``{tentative_id: canonical_id}`` for callers
355 that need to update co-occurrence links or memory associations
356 that reference the original tentative IDs.
358 Designed to run BEFORE ``store_entities``. Replaces the post-store
359 :meth:`resolve` alias-creation pass when
360 ``canonical_resolution=True`` is set on the resolver.
362 No alias-of links are created — different surface forms that
363 resolve to the same canonical simply share the canonical ID.
364 """
365 if not new_entities:
366 return {}
368 nearby_names: set[str] = {(e.name or "").strip().lower() for e in new_entities if e.name and e.name.strip()}
369 effective_event_date = event_date or datetime.now(timezone.utc)
370 id_remapping: dict[str, str] = {}
372 # Process all entities in parallel — independent lookups against
373 # existing canonicals. Each task returns the resolved canonical ID
374 # (or the original tentative ID if no match).
375 resolutions = await asyncio.gather(
376 *[
377 self._best_canonical_id(
378 entity,
379 bank_id,
380 graph_store,
381 nearby_names=nearby_names - {(entity.name or "").strip().lower()},
382 event_date=effective_event_date,
383 )
384 for entity in new_entities
385 ]
386 )
387 for entity, canonical_id in zip(new_entities, resolutions, strict=True):
388 tentative_id = entity.id
389 if canonical_id != tentative_id:
390 entity.id = canonical_id
391 id_remapping[tentative_id] = canonical_id
393 # Hindsight-parity: bump mention_count on every canonical we
394 # resolved-to so the popularity signal stays current. Best-effort
395 # (older adapters lacking the method are skipped). Run AFTER the
396 # remap so we count the resolution event, not the lookup.
397 if id_remapping:
398 increment = getattr(graph_store, "increment_mention_counts", None)
399 if increment is not None:
400 try:
401 await increment(list(set(id_remapping.values())), bank_id)
402 except Exception as exc: # pragma: no cover — best-effort
403 _logger.warning(
404 "increment_mention_counts failed (%s) — composite "
405 "scoring will use stale popularity signal until next "
406 "successful resolve.",
407 exc,
408 )
409 return id_remapping
411 async def _best_canonical_id(
412 self,
413 entity: Entity,
414 bank_id: str,
415 graph_store: GraphStore,
416 *,
417 nearby_names: set[str],
418 event_date: datetime,
419 ) -> str:
420 """Return the canonical entity ID for *entity*.
422 Looks up candidates, scores each via the same tier signals as
423 :meth:`resolve`, and returns the best match's ID when the
424 strongest signal crosses the corresponding tier's threshold. Falls
425 back to the entity's tentative ID when no candidate matches.
426 """
427 scored = await self._score_candidates(entity, bank_id, graph_store)
428 if not scored:
429 self._record_cascade("no_match")
430 return entity.id
432 best_match: EntityCandidateMatch | None = None
433 best_score = 0.0
434 best_decision: str | None = None
435 for match in scored[: self.max_candidates_per_entity]:
436 if match.entity.id == entity.id:
437 # Same-name same-ID candidate (deterministic-ID path) —
438 # already canonical, no remapping needed.
439 self._record_cascade("already_canonical")
440 return entity.id
441 score, decision = self._resolution_score_with_tier(
442 match,
443 nearby_names,
444 event_date,
445 )
446 if score > best_score:
447 best_score = score
448 best_match = match
449 best_decision = decision
451 if best_match is not None and best_score > 0.0:
452 composite = self._composite_score(
453 best_match.name_similarity,
454 best_match,
455 nearby_names,
456 event_date,
457 )
458 self._record_cascade(best_decision or "composite_autolink", composite_score=composite)
459 self._record_canonical_resolution(resolved=True)
460 return best_match.entity.id
461 self._record_cascade("no_match")
462 self._record_canonical_resolution(resolved=False)
463 return entity.id
465 def _resolution_score(
466 self,
467 match: EntityCandidateMatch,
468 nearby_names: set[str],
469 event_date: datetime,
470 ) -> float:
471 """Unified score across the cascade tiers (legacy entry point)."""
472 score, _ = self._resolution_score_with_tier(match, nearby_names, event_date)
473 return score
475 def _resolution_score_with_tier(
476 self,
477 match: EntityCandidateMatch,
478 nearby_names: set[str],
479 event_date: datetime,
480 ) -> tuple[float, str | None]:
481 """Tier-aware unified score.
483 Returns ``(score, tier_label)`` where tier_label is one of
484 ``"trigram_autolink"``, ``"embedding_autolink"``,
485 ``"composite_autolink"`` (matching the names recorded by the
486 benchmark metrics collector), or ``None`` when no tier qualifies.
487 """
488 name_sim = match.name_similarity
489 emb_sim = match.embedding_similarity or 0.0
490 composite = self._composite_score(name_sim, match, nearby_names, event_date)
492 candidates: list[tuple[float, str]] = []
493 if name_sim >= self.autolink_threshold:
494 candidates.append((name_sim, "trigram_autolink"))
495 if emb_sim >= self.embedding_autolink_threshold:
496 candidates.append((emb_sim, "embedding_autolink"))
497 if composite >= self.composite_threshold:
498 candidates.append((composite, "composite_autolink"))
500 if not candidates:
501 return 0.0, None
502 candidates.sort(key=lambda x: x[0], reverse=True)
503 return candidates[0]
505 def _record_cascade(self, decision: str, *, composite_score: float | None = None) -> None:
506 """Forward decision to the optional metrics collector. No-op when
507 no collector is attached. Caller-side error suppression — metrics
508 observability must never break a real call."""
509 collector = self.metrics_collector
510 if collector is None:
511 return
512 try:
513 collector.record_cascade_decision(decision, composite_score=composite_score)
514 except Exception:
515 pass # metrics are best-effort; never let collection failures disrupt the pipeline
517 def _record_canonical_resolution(self, *, resolved: bool) -> None:
518 collector = self.metrics_collector
519 if collector is None:
520 return
521 try:
522 collector.record_canonical_resolution(resolved=resolved)
523 except Exception:
524 pass # metrics are best-effort; never let collection failures disrupt the pipeline
526 async def _resolve_one_entity(
527 self,
528 entity: Entity,
529 source_text: str,
530 bank_id: str,
531 graph_store: GraphStore,
532 llm_provider: LLMProvider,
533 *,
534 nearby_names: set[str],
535 event_date: datetime,
536 ) -> list[EntityLink]:
537 """Cascade for a single new entity. Candidates are processed concurrently."""
538 scored = await self._score_candidates(entity, bank_id, graph_store)
539 if not scored:
540 return []
542 eligible = [match for match in scored[: self.max_candidates_per_entity] if match.entity.id != entity.id]
543 if not eligible:
544 return []
546 results = await asyncio.gather(
547 *[
548 self._handle_candidate(
549 entity,
550 match,
551 source_text,
552 bank_id,
553 graph_store,
554 llm_provider,
555 nearby_names=nearby_names,
556 event_date=event_date,
557 )
558 for match in eligible
559 ]
560 )
561 return [link for link in results if link is not None]
563 async def _handle_candidate(
564 self,
565 entity: Entity,
566 match: EntityCandidateMatch,
567 source_text: str,
568 bank_id: str,
569 graph_store: GraphStore,
570 llm_provider: LLMProvider,
571 *,
572 nearby_names: set[str],
573 event_date: datetime,
574 ) -> EntityLink | None:
575 """Route a single (entity, candidate) pair through the cascade tiers."""
576 candidate = match.entity
577 name_sim = match.name_similarity
578 emb_sim = match.embedding_similarity
580 # Tier A: trigram autolink — names are effectively identical.
581 if name_sim >= self.autolink_threshold:
582 self._record_cascade("trigram_autolink")
583 return await self._autolink(
584 entity,
585 candidate,
586 name_sim,
587 "trigram",
588 bank_id,
589 graph_store,
590 )
592 # Tier B: embedding autolink — semantic alias (Bob/Robert).
593 if emb_sim is not None and emb_sim >= self.embedding_autolink_threshold:
594 self._record_cascade("embedding_autolink")
595 return await self._autolink(
596 entity,
597 candidate,
598 emb_sim,
599 "embedding",
600 bank_id,
601 graph_store,
602 )
604 # Tier C: Hindsight-style composite — combine name similarity with
605 # cooccurrence-overlap and temporal proximity. Catches cases where
606 # name alone is borderline (e.g. ``"Bob"`` vs ``"Robert"``) but the
607 # broader context (same friends mentioned, recent activity) gives
608 # the resolver enough evidence to autolink without an LLM.
609 composite = self._composite_score(
610 name_sim,
611 match,
612 nearby_names,
613 event_date,
614 )
615 if composite >= self.composite_threshold:
616 self._record_cascade("composite_autolink", composite_score=composite)
617 return await self._autolink(
618 entity,
619 candidate,
620 composite,
621 "composite",
622 bank_id,
623 graph_store,
624 )
626 # Combined-signal skip: cheap signals say "no" — embedding has been
627 # consulted (when available) and the composite didn't cross the
628 # threshold either. Drop without an LLM.
629 combined = max(name_sim, emb_sim or 0.0)
630 if combined < self.skip_threshold:
631 _logger.debug(
632 "entity resolution: skipping %r vs %r (name_sim=%.2f, emb_sim=%s, composite=%.2f, max < %.2f)",
633 entity.name,
634 candidate.name,
635 name_sim,
636 f"{emb_sim:.2f}" if emb_sim is not None else "n/a",
637 composite,
638 self.skip_threshold,
639 )
640 self._record_cascade("skipped", composite_score=composite)
641 return None
643 # Tier D: ambiguous middle band — escalate to LLM (or skip when
644 # ``enable_llm_disambiguation`` is False, matching Hindsight's
645 # LLM-free design).
646 if not self.enable_llm_disambiguation:
647 _logger.debug(
648 "entity resolution: skipping ambiguous %r vs %r (LLM tier disabled, name_sim=%.2f, composite=%.2f)",
649 entity.name,
650 candidate.name,
651 name_sim,
652 composite,
653 )
654 self._record_cascade("skipped_no_llm", composite_score=composite)
655 return None
656 self._record_cascade("llm_disambiguation", composite_score=composite)
657 return await self._confirm_and_link(
658 entity,
659 candidate,
660 source_text,
661 bank_id,
662 graph_store,
663 llm_provider,
664 )
666 def _composite_score(
667 self,
668 name_sim: float,
669 match: EntityCandidateMatch,
670 nearby_names: set[str],
671 event_date: datetime,
672 ) -> float:
673 """Weighted-sum signal blending name + cooccurrence + temporal.
675 Mirrors Hindsight's retain-time scoring (paper §4.1.3). Each
676 component is in ``[0, 1]``; weights sum to 1.0 so the composite
677 is also in ``[0, 1]``. Missing inputs (no nearby names, no
678 ``last_seen``) contribute ``0`` for that signal — the composite
679 remains well-defined even when the cooccurrence/temporal data
680 isn't populated yet.
681 """
682 # Cooccurrence overlap: Jaccard-style intersection of nearby names
683 # with the candidate's stored co-occurring entity names. Both sides
684 # are lowercased and stripped on the way in.
685 cooc_score = 0.0
686 if nearby_names and match.co_occurring_names:
687 candidate_names = {n for n in match.co_occurring_names if n}
688 if candidate_names:
689 overlap = len(nearby_names & candidate_names)
690 # Normalise by the smaller side to keep the signal in [0,1]
691 # without penalising large candidate co-occurrence sets.
692 cooc_score = overlap / min(len(nearby_names), len(candidate_names))
694 # Temporal decay: 1.0 if seen today, 0.0 once outside the window.
695 temporal_score = 0.0
696 if match.last_seen is not None:
697 last_seen = match.last_seen
698 now = event_date
699 if last_seen.tzinfo is None:
700 last_seen = last_seen.replace(tzinfo=timezone.utc)
701 if now.tzinfo is None:
702 now = now.replace(tzinfo=timezone.utc)
703 days_diff = abs((now - last_seen).total_seconds() / 86400.0)
704 if days_diff < self.composite_temporal_window_days:
705 temporal_score = 1.0 - (days_diff / self.composite_temporal_window_days)
707 base = (
708 self.composite_name_weight * name_sim
709 + self.composite_cooccurrence_weight * cooc_score
710 + self.composite_temporal_weight * temporal_score
711 )
713 # Hindsight-parity popularity bonus — log-scaled so a 10-mention
714 # entity scores roughly half as much as a 50-mention one, and a
715 # 100-mention entity is fully saturated. Hard-capped at
716 # ``mention_count_bonus_cap`` so this signal can never on its own
717 # promote a candidate past the composite threshold; it only
718 # breaks ties between near-equal name/cooccurrence/temporal blends.
719 if self.mention_count_bonus_cap > 0.0 and match.mention_count > 1:
720 import math
722 saturation = max(1, self.mention_count_saturation)
723 bonus = self.mention_count_bonus_cap * min(
724 1.0,
725 math.log1p(match.mention_count) / math.log1p(saturation),
726 )
727 base += bonus
729 return base
731 async def _score_candidates(
732 self,
733 entity: Entity,
734 bank_id: str,
735 graph_store: GraphStore,
736 ) -> list[EntityCandidateMatch]:
737 """Fetch candidates from the graph store with all signals populated.
739 Tries the new ``find_entity_candidates_scored`` first (returns
740 :class:`EntityCandidateMatch` instances with name / embedding /
741 cooccurrence / temporal signals). Falls back to the legacy
742 ``find_entity_candidates`` plus Python-level string scoring when
743 the adapter hasn't implemented the scored variant — old adapters
744 keep working, they just lose the embedding, cooccurrence, and
745 temporal tiers.
746 """
747 scored_method = getattr(graph_store, "find_entity_candidates_scored", None)
748 if scored_method is not None:
749 try:
750 matches = await scored_method(
751 entity.name,
752 bank_id,
753 name_embedding=entity.embedding,
754 trigram_threshold=self.trigram_threshold,
755 limit=self.max_candidates_per_entity + 5,
756 )
757 except Exception as exc:
758 _logger.warning(
759 "find_entity_candidates_scored failed for %r: %s — falling back to legacy path",
760 entity.name,
761 exc,
762 )
763 matches = None
764 if matches is not None:
765 return list(matches)
767 # Legacy fallback: adapter only implements find_entity_candidates.
768 try:
769 candidates = await graph_store.find_entity_candidates(
770 entity.name,
771 bank_id,
772 threshold=self.similarity_threshold,
773 limit=self.max_candidates_per_entity + 5,
774 )
775 except Exception as exc:
776 _logger.warning("find_entity_candidates failed for %r: %s", entity.name, exc)
777 return []
779 return [
780 EntityCandidateMatch(
781 entity=c,
782 name_similarity=_name_similarity(entity.name, c.name),
783 embedding_similarity=None,
784 )
785 for c in candidates
786 ]
788 async def _autolink(
789 self,
790 entity_a: Entity,
791 entity_b: Entity,
792 score: float,
793 tier: str,
794 bank_id: str,
795 graph_store: GraphStore,
796 ) -> EntityLink | None:
797 """Persist an alias-of link without LLM confirmation.
799 Used when one of the cheap signals (trigram or embedding) is high
800 enough that LLM consultation would be wasted spend. The persisted
801 ``evidence`` records both the tier and the score so audits can
802 distinguish trigram autolinks, embedding autolinks, and
803 LLM-confirmed links from one another.
804 """
805 link = EntityLink(
806 entity_a=entity_a.id,
807 entity_b=entity_b.id,
808 link_type="alias_of",
809 evidence=f"{tier} similarity {score:.2f}",
810 confidence=self.autolink_confidence,
811 created_at=datetime.now(timezone.utc),
812 )
813 try:
814 await graph_store.store_entity_link(link, bank_id)
815 except Exception as exc:
816 _logger.warning(
817 "store_entity_link (autolink/%s) failed for %r <-> %r: %s",
818 tier,
819 entity_a.name,
820 entity_b.name,
821 exc,
822 )
823 return None
825 _logger.debug(
826 "entity resolution: autolinked (%s) %r (%s) alias_of %r (%s) score=%.2f",
827 tier,
828 entity_a.name,
829 entity_a.id,
830 entity_b.name,
831 entity_b.id,
832 score,
833 )
834 return link
836 async def _confirm_and_link(
837 self,
838 entity_a: Entity,
839 entity_b: Entity,
840 source_text: str,
841 bank_id: str,
842 graph_store: GraphStore,
843 llm_provider: LLMProvider,
844 ) -> EntityLink | None:
845 """Ask the LLM to confirm whether two entities are aliases, then store the link."""
846 import json
848 messages = [
849 Message(role="system", content=_SYSTEM_PROMPT),
850 Message(role="user", content=_build_user_message(entity_a, entity_b, source_text)),
851 ]
853 try:
854 completion = await llm_provider.complete(messages, max_tokens=256, temperature=0.0)
855 raw = (completion.text or "").strip()
856 except Exception as exc:
857 _logger.warning("LLM confirmation failed for %r <-> %r: %s", entity_a.name, entity_b.name, exc)
858 return None
860 # Strip markdown fences
861 if raw.startswith("```"):
862 lines = raw.splitlines()
863 raw = "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:])
865 try:
866 data = json.loads(raw)
867 except json.JSONDecodeError:
868 _logger.warning(
869 "entity resolution LLM returned non-JSON for %r <-> %r: %r",
870 entity_a.name,
871 entity_b.name,
872 raw[:120],
873 )
874 return None
876 same = bool(data.get("same_entity", False))
877 if not same:
878 return None
880 confidence_raw = data.get("confidence", 0.0)
881 try:
882 confidence = max(0.0, min(1.0, float(confidence_raw)))
883 except (TypeError, ValueError):
884 confidence = 0.0
886 if confidence < self.confirmation_threshold:
887 _logger.debug(
888 "entity resolution: %r <-> %r confidence %.2f below threshold %.2f — skipped",
889 entity_a.name,
890 entity_b.name,
891 confidence,
892 self.confirmation_threshold,
893 )
894 return None
896 evidence = str(data.get("evidence", ""))
898 link = EntityLink(
899 entity_a=entity_a.id,
900 entity_b=entity_b.id,
901 link_type="alias_of",
902 evidence=evidence,
903 confidence=confidence,
904 created_at=datetime.now(timezone.utc),
905 )
907 try:
908 await graph_store.store_entity_link(link, bank_id)
909 except Exception as exc:
910 _logger.warning("store_entity_link failed for %r <-> %r: %s", entity_a.name, entity_b.name, exc)
911 return None
913 _logger.info(
914 "entity resolution: linked %r (%s) alias_of %r (%s) confidence=%.2f",
915 entity_a.name,
916 entity_a.id,
917 entity_b.name,
918 entity_b.id,
919 confidence,
920 )
921 return link