Coverage for astrocyte/pipeline/entity_resolution.py: 82%

254 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""M11: Entity resolution pipeline stage. 

2 

3``EntityResolver.resolve()`` takes newly-extracted entities from a retain 

4call and, for each one, looks for existing candidates in the graph store 

5that might be the same entity under a different surface form. Resolution 

6runs as a **tiered cascade** so the LLM is called only for genuinely 

7ambiguous pairs: 

8 

91. ``find_entity_candidates`` produces a list of candidates whose names 

10 pass the adapter's first-stage similarity filter. 

112. **String-similarity tier** (this module): each (new, candidate) pair is 

12 scored with :func:`difflib.SequenceMatcher` ratio on lowercase-stripped 

13 names. 

14 - ``ratio >= autolink_threshold`` (default 0.95) — names are effectively 

15 identical (case differences, trailing whitespace, minor variants). 

16 Link is written **without** an LLM call. This is the common case for 

17 conversational benchmarks where the same person is mentioned by the 

18 same name across many sessions. 

19 - ``ratio < skip_threshold`` (default 0.6) — names are clearly distinct 

20 despite the candidate filter matching them. Skip without an LLM call. 

21 - Otherwise the pair is genuinely ambiguous and falls through to tier 3. 

223. **LLM disambiguation tier**: only the pairs in the ambiguous middle 

23 band are sent to the LLM with a verbatim-evidence prompt. 

24 

25This cuts LLM call volume by 95-99% on workloads with consistent naming 

26(LoCoMo, conversational memory benchmarks) while preserving full LLM 

27disambiguation on the genuinely hard cases. 

28 

29Design goals 

30------------ 

31- **Opt-in**: resolver is instantiated only when 

32 ``entity_resolution: enabled: true`` is in config. A ``None`` resolver 

33 in the orchestrator means the stage is a no-op. 

34- **Bounded cost**: at most ``max_candidates_per_entity`` LLM calls per 

35 newly-extracted entity, controlled by the caller. 

36- **Testable**: all I/O goes through the ``GraphStore`` and ``LLMProvider`` 

37 SPIs; ``InMemoryGraphStore`` + ``MockLLMProvider`` cover the full path. 

38""" 

39 

40from __future__ import annotations 

41 

42import asyncio 

43import logging 

44from datetime import datetime, timezone 

45from difflib import SequenceMatcher 

46from typing import TYPE_CHECKING 

47 

48from astrocyte.types import Entity, EntityCandidateMatch, EntityLink, Message 

49 

50if TYPE_CHECKING: 

51 from astrocyte.provider import GraphStore, LLMProvider 

52 

53_logger = logging.getLogger("astrocyte.entity_resolution") 

54 

55# --------------------------------------------------------------------------- 

56# Prompt 

57# --------------------------------------------------------------------------- 

58 

59_SYSTEM_PROMPT = """\ 

60You are an entity resolution assistant. 

61 

62You will be given: 

63- ENTITY A: a named entity extracted from a new piece of text. 

64- ENTITY B: a candidate entity already in the knowledge graph. 

65- SOURCE TEXT: the text that contained Entity A. 

66 

67Your task: decide whether Entity A and Entity B refer to the same real-world 

68person, place, organisation, or concept. 

69 

70Respond with ONLY valid JSON (no markdown, no preamble): 

71 

72{ 

73 "same_entity": true | false, 

74 "confidence": <float 0.0–1.0>, 

75 "evidence": "<verbatim quote from SOURCE TEXT that supports your decision>" 

76} 

77 

78Rules: 

79- Set same_entity=true only when you are confident they refer to the same thing. 

80- confidence should reflect how certain you are (1.0 = certain, 0.5 = plausible). 

81- evidence must be a direct quote from SOURCE TEXT, not a paraphrase. 

82- If same_entity=false, set evidence to an empty string.\ 

83""" 

84 

85 

86def _name_similarity(a: str, b: str) -> float: 

87 """Case-insensitive string-similarity ratio in ``[0.0, 1.0]``. 

88 

89 Uses :class:`difflib.SequenceMatcher` over lowercase-stripped strings. 

90 Pure stdlib, microseconds per call. Used as the gate that decides 

91 whether a pair needs LLM disambiguation: 

92 

93 - ``1.0`` — identical after normalisation 

94 - ``> 0.95`` — effectively the same name (e.g. ``"Alice"`` vs ``"Alice "``) 

95 - ``0.6 – 0.95`` — ambiguous (e.g. ``"Bob"`` vs ``"Robert"``) 

96 - ``< 0.6`` — clearly different (e.g. ``"Alice"`` vs ``"Charlie"``) 

97 """ 

98 a_norm = (a or "").strip().lower() 

99 b_norm = (b or "").strip().lower() 

100 if not a_norm or not b_norm: 

101 return 0.0 

102 return SequenceMatcher(None, a_norm, b_norm).ratio() 

103 

104 

105def _build_user_message(entity_a: Entity, entity_b: Entity, source_text: str) -> str: 

106 aliases_a = f" (also known as: {', '.join(entity_a.aliases)})" if entity_a.aliases else "" 

107 aliases_b = f" (also known as: {', '.join(entity_b.aliases)})" if entity_b.aliases else "" 

108 return f"ENTITY A: {entity_a.name}{aliases_a}\nENTITY B: {entity_b.name}{aliases_b}\n\nSOURCE TEXT:\n{source_text}" 

109 

110 

111# --------------------------------------------------------------------------- 

112# EntityResolver 

113# --------------------------------------------------------------------------- 

114 

115 

116class EntityResolver: 

117 """Identifies and persists alias-of links between entities (M11). 

118 

119 Args: 

120 similarity_threshold: Minimum name-similarity required for a 

121 candidate to be sent to the LLM confirmation step. 

122 In the in-memory store this is a substring-match gate (0 or 1); 

123 in production adapters it is a cosine/edit-distance threshold. 

124 confirmation_threshold: Minimum ``confidence`` from the LLM for a 

125 link to be written to the graph. Defaults to ``0.75``. 

126 max_candidates_per_entity: Hard cap on LLM calls per entity. 

127 Guards against runaway cost when the graph is large. 

128 """ 

129 

130 def __init__( 

131 self, 

132 *, 

133 similarity_threshold: float = 0.8, 

134 confirmation_threshold: float = 0.75, 

135 max_candidates_per_entity: int = 10, 

136 autolink_threshold: float = 0.95, 

137 skip_threshold: float = 0.5, 

138 embedding_autolink_threshold: float = 0.92, 

139 autolink_confidence: float = 0.95, 

140 trigram_threshold: float = 0.15, 

141 composite_threshold: float = 0.6, 

142 composite_name_weight: float = 0.5, 

143 composite_cooccurrence_weight: float = 0.3, 

144 composite_temporal_weight: float = 0.2, 

145 composite_temporal_window_days: float = 7.0, 

146 mention_count_bonus_cap: float = 0.05, 

147 mention_count_saturation: int = 50, 

148 enable_llm_disambiguation: bool = True, 

149 canonical_resolution: bool = False, 

150 ) -> None: 

151 """Args: 

152 similarity_threshold: Passed to ``find_entity_candidates`` — 

153 the adapter's first-stage filter (legacy path). 

154 confirmation_threshold: Minimum LLM-reported confidence for a 

155 disambiguated pair to be linked. 

156 max_candidates_per_entity: Hard cap on candidates evaluated per 

157 new entity. Default ``10`` — matches Hindsight's natural 

158 ``pg_trgm`` candidate count. The cascade's cheap tiers 

159 evaluate all of them; only the genuinely-ambiguous middle 

160 band reaches the LLM. 

161 autolink_threshold: Trigram name-similarity at or above which a 

162 pair is auto-linked **without** calling the LLM. Default 

163 ``0.95`` — effectively identical names. 

164 skip_threshold: Combined-score (max of name and embedding 

165 similarity) below which a candidate is dropped **without** 

166 an LLM call. Default ``0.5``. 

167 embedding_autolink_threshold: Cosine-similarity at or above 

168 which the embedding tier autolinks the pair. Catches 

169 semantic aliases (``"Bob"`` ↔ ``"Robert"``) that string 

170 similarity alone misses. Default ``0.92``. 

171 autolink_confidence: Confidence value persisted on alias-of 

172 links created via the autolink fast paths. Default ``0.95``. 

173 trigram_threshold: Adapter-side prefilter passed to 

174 ``find_entity_candidates_scored``. Candidates with name 

175 similarity below this are dropped before being returned. 

176 Default ``0.15`` matches Hindsight's empirically-tuned 

177 threshold — catches substring relationships while staying 

178 fully index-based. 

179 composite_threshold: Score at or above which the Hindsight-style 

180 composite tier autolinks. Composite = 

181 ``name_weight * name_sim + cooccurrence_weight * overlap + 

182 temporal_weight * decay``. Default ``0.6`` matches Hindsight. 

183 composite_name_weight: Weight applied to ``name_similarity`` in 

184 the composite score. Default ``0.5``. 

185 composite_cooccurrence_weight: Weight applied to the Jaccard- 

186 style overlap between the new entity's nearby-entity names 

187 and the candidate's stored co-occurring entity names. 

188 Default ``0.3``. 

189 composite_temporal_weight: Weight applied to a temporal-decay 

190 signal — ``max(0, 1 - days_diff / temporal_window_days)``. 

191 Default ``0.2``. 

192 composite_temporal_window_days: Days over which the temporal 

193 decay falls from 1.0 to 0.0. Default ``7.0``. 

194 mention_count_bonus_cap: Maximum additive bonus the 

195 ``mention_count`` popularity signal contributes to the 

196 composite score. Hindsight-parity tiebreaker — capped at 

197 ``0.05`` so a popular candidate breaks ties between 

198 near-identical composite scores without ever overriding 

199 the primary name/cooccurrence/temporal signals on its 

200 own. Set to ``0.0`` to disable. 

201 mention_count_saturation: Mention count at which the bonus 

202 reaches its cap. Bonus scales as 

203 ``cap * log1p(count) / log1p(saturation)``. Default 

204 ``50`` — popular entities saturate quickly so the signal 

205 stops growing once an entity is well-established. 

206 enable_llm_disambiguation: When ``True`` (default for backward 

207 compat), ambiguous middle-band pairs (composite below 

208 threshold, name/embedding combined above ``skip_threshold``) 

209 escalate to an LLM call for final disambiguation. Set to 

210 ``False`` to match Hindsight's LLM-free design — ambiguous 

211 pairs are simply not linked. For LoCoMo-scale benchmarks 

212 the cheap tiers (trigram, embedding, composite) handle the 

213 common cases; the LLM tier mostly fires on hard cases that 

214 don't move benchmark scores and adds significant retain 

215 latency. 

216 canonical_resolution: When ``True`` (Hindsight-style 

217 lookup-then-decide), :meth:`resolve_canonical_ids_in_place` 

218 rewrites each new entity's tentative ID to its existing 

219 canonical's ID before storage — so different surface forms 

220 that resolve to the same canonical never produce duplicate 

221 entity rows. The post-storage :meth:`resolve` aliasing 

222 pass becomes redundant in this mode. Default ``False`` 

223 preserves the legacy two-stage flow (store-by-ID then 

224 resolve aliases). Orchestrator decides which path to run 

225 based on this flag. 

226 """ 

227 if not (0.0 <= skip_threshold <= autolink_threshold <= 1.0): 

228 raise ValueError( 

229 "Expected 0.0 <= skip_threshold <= autolink_threshold <= 1.0, " 

230 f"got skip={skip_threshold}, autolink={autolink_threshold}" 

231 ) 

232 if not (0.0 <= embedding_autolink_threshold <= 1.0): 

233 raise ValueError(f"Expected 0.0 <= embedding_autolink_threshold <= 1.0, got {embedding_autolink_threshold}") 

234 if composite_temporal_window_days <= 0.0: 

235 raise ValueError(f"composite_temporal_window_days must be > 0, got {composite_temporal_window_days}") 

236 if mention_count_bonus_cap < 0.0: 

237 raise ValueError(f"mention_count_bonus_cap must be >= 0, got {mention_count_bonus_cap}") 

238 if mention_count_saturation <= 0: 

239 raise ValueError(f"mention_count_saturation must be > 0, got {mention_count_saturation}") 

240 self.similarity_threshold = similarity_threshold 

241 self.confirmation_threshold = confirmation_threshold 

242 self.max_candidates_per_entity = max_candidates_per_entity 

243 self.autolink_threshold = autolink_threshold 

244 self.skip_threshold = skip_threshold 

245 self.embedding_autolink_threshold = embedding_autolink_threshold 

246 self.autolink_confidence = autolink_confidence 

247 self.trigram_threshold = trigram_threshold 

248 self.composite_threshold = composite_threshold 

249 self.composite_name_weight = composite_name_weight 

250 self.composite_cooccurrence_weight = composite_cooccurrence_weight 

251 self.composite_temporal_weight = composite_temporal_weight 

252 self.composite_temporal_window_days = composite_temporal_window_days 

253 self.mention_count_bonus_cap = mention_count_bonus_cap 

254 self.mention_count_saturation = mention_count_saturation 

255 self.enable_llm_disambiguation = enable_llm_disambiguation 

256 self.canonical_resolution = canonical_resolution 

257 # Optional benchmark collector — installed by the eval harness via 

258 # ``resolver.metrics_collector = collector``. Hot-path code probes 

259 # with ``getattr`` and silently no-ops when absent. 

260 self.metrics_collector: object | None = None 

261 

262 async def resolve( 

263 self, 

264 new_entities: list[Entity], 

265 source_text: str, 

266 bank_id: str, 

267 graph_store: GraphStore, 

268 llm_provider: LLMProvider, 

269 *, 

270 event_date: datetime | None = None, 

271 ) -> list[EntityLink]: 

272 """Run the Hindsight-inspired entity-resolution cascade. 

273 

274 Three tiers run in order, escalating only when the cheaper signal 

275 is inconclusive: 

276 

277 1. **Trigram name similarity** (``find_entity_candidates_scored``). 

278 Adapter computes ``pg_trgm.similarity()`` (PostgreSQL) or 

279 :class:`difflib.SequenceMatcher` (in-memory) at indexed speed. 

280 2. **Embedding cosine similarity** — when the new entity carries 

281 an ``embedding`` and the candidate has one stored, the cosine 

282 score discriminates semantic aliases (``"Bob"`` / ``"Robert"``) 

283 that trigram similarity misses. 

284 3. **LLM disambiguation** — only fires for the genuinely ambiguous 

285 middle band where neither cheap signal is decisive. 

286 

287 Per-candidate decision logic: 

288 

289 - ``name_similarity >= autolink_threshold`` → autolink (no LLM) 

290 - ``embedding_similarity >= embedding_autolink_threshold`` → autolink 

291 - ``max(name, embedding) < skip_threshold`` → drop (no LLM) 

292 - otherwise → fall through to LLM disambiguation 

293 

294 Adapters that haven't implemented ``find_entity_candidates_scored`` 

295 yet (it's optional in the SPI) gracefully fall back to the legacy 

296 ``find_entity_candidates`` + Python-level scoring path. 

297 

298 Returns: 

299 All ``EntityLink`` objects that were written to the graph store. 

300 

301 Concurrency: each new entity's resolution work — candidate fetch, 

302 autolink decisions, LLM disambiguation — runs in parallel via 

303 :func:`asyncio.gather`. Within an entity, each candidate's tier 

304 decision runs concurrently. This brings retain-time entity 

305 resolution from sequential ``O(N×K)`` round-trips to a single 

306 async-batched pass. 

307 

308 ``event_date`` (when supplied) feeds the composite tier's temporal 

309 signal — recent candidates score higher for the same name. Defaults 

310 to "now" if not provided. Pass the retain request's ``occurred_at`` 

311 for benchmark workloads with simulated event timestamps. 

312 """ 

313 if not new_entities: 

314 return [] 

315 

316 # Pre-compute the set of nearby entity names — every other new 

317 # entity in this batch is a "nearby" candidate for cooccurrence 

318 # overlap with each candidate's existing co_occurs links. 

319 nearby_names: set[str] = {(e.name or "").strip().lower() for e in new_entities if e.name and e.name.strip()} 

320 effective_event_date = event_date or datetime.now(timezone.utc) 

321 

322 per_entity = await asyncio.gather( 

323 *[ 

324 self._resolve_one_entity( 

325 entity, 

326 source_text, 

327 bank_id, 

328 graph_store, 

329 llm_provider, 

330 nearby_names=nearby_names - {(entity.name or "").strip().lower()}, 

331 event_date=effective_event_date, 

332 ) 

333 for entity in new_entities 

334 ] 

335 ) 

336 return [link for sublist in per_entity for link in sublist] 

337 

338 async def resolve_canonical_ids_in_place( 

339 self, 

340 new_entities: list[Entity], 

341 bank_id: str, 

342 graph_store: GraphStore, 

343 *, 

344 event_date: datetime | None = None, 

345 ) -> dict[str, str]: 

346 """Hindsight-style lookup-then-decide canonical resolution. 

347 

348 For each new entity, runs the cascade against existing canonicals 

349 and — if any tier crosses its threshold — **rewrites the entity's 

350 ID to the matched canonical's ID** in place. Entities that don't 

351 match keep their tentative ID and will be inserted as new 

352 canonicals at storage time. 

353 

354 Returns a mapping ``{tentative_id: canonical_id}`` for callers 

355 that need to update co-occurrence links or memory associations 

356 that reference the original tentative IDs. 

357 

358 Designed to run BEFORE ``store_entities``. Replaces the post-store 

359 :meth:`resolve` alias-creation pass when 

360 ``canonical_resolution=True`` is set on the resolver. 

361 

362 No alias-of links are created — different surface forms that 

363 resolve to the same canonical simply share the canonical ID. 

364 """ 

365 if not new_entities: 

366 return {} 

367 

368 nearby_names: set[str] = {(e.name or "").strip().lower() for e in new_entities if e.name and e.name.strip()} 

369 effective_event_date = event_date or datetime.now(timezone.utc) 

370 id_remapping: dict[str, str] = {} 

371 

372 # Process all entities in parallel — independent lookups against 

373 # existing canonicals. Each task returns the resolved canonical ID 

374 # (or the original tentative ID if no match). 

375 resolutions = await asyncio.gather( 

376 *[ 

377 self._best_canonical_id( 

378 entity, 

379 bank_id, 

380 graph_store, 

381 nearby_names=nearby_names - {(entity.name or "").strip().lower()}, 

382 event_date=effective_event_date, 

383 ) 

384 for entity in new_entities 

385 ] 

386 ) 

387 for entity, canonical_id in zip(new_entities, resolutions, strict=True): 

388 tentative_id = entity.id 

389 if canonical_id != tentative_id: 

390 entity.id = canonical_id 

391 id_remapping[tentative_id] = canonical_id 

392 

393 # Hindsight-parity: bump mention_count on every canonical we 

394 # resolved-to so the popularity signal stays current. Best-effort 

395 # (older adapters lacking the method are skipped). Run AFTER the 

396 # remap so we count the resolution event, not the lookup. 

397 if id_remapping: 

398 increment = getattr(graph_store, "increment_mention_counts", None) 

399 if increment is not None: 

400 try: 

401 await increment(list(set(id_remapping.values())), bank_id) 

402 except Exception as exc: # pragma: no cover — best-effort 

403 _logger.warning( 

404 "increment_mention_counts failed (%s) — composite " 

405 "scoring will use stale popularity signal until next " 

406 "successful resolve.", 

407 exc, 

408 ) 

409 return id_remapping 

410 

411 async def _best_canonical_id( 

412 self, 

413 entity: Entity, 

414 bank_id: str, 

415 graph_store: GraphStore, 

416 *, 

417 nearby_names: set[str], 

418 event_date: datetime, 

419 ) -> str: 

420 """Return the canonical entity ID for *entity*. 

421 

422 Looks up candidates, scores each via the same tier signals as 

423 :meth:`resolve`, and returns the best match's ID when the 

424 strongest signal crosses the corresponding tier's threshold. Falls 

425 back to the entity's tentative ID when no candidate matches. 

426 """ 

427 scored = await self._score_candidates(entity, bank_id, graph_store) 

428 if not scored: 

429 self._record_cascade("no_match") 

430 return entity.id 

431 

432 best_match: EntityCandidateMatch | None = None 

433 best_score = 0.0 

434 best_decision: str | None = None 

435 for match in scored[: self.max_candidates_per_entity]: 

436 if match.entity.id == entity.id: 

437 # Same-name same-ID candidate (deterministic-ID path) — 

438 # already canonical, no remapping needed. 

439 self._record_cascade("already_canonical") 

440 return entity.id 

441 score, decision = self._resolution_score_with_tier( 

442 match, 

443 nearby_names, 

444 event_date, 

445 ) 

446 if score > best_score: 

447 best_score = score 

448 best_match = match 

449 best_decision = decision 

450 

451 if best_match is not None and best_score > 0.0: 

452 composite = self._composite_score( 

453 best_match.name_similarity, 

454 best_match, 

455 nearby_names, 

456 event_date, 

457 ) 

458 self._record_cascade(best_decision or "composite_autolink", composite_score=composite) 

459 self._record_canonical_resolution(resolved=True) 

460 return best_match.entity.id 

461 self._record_cascade("no_match") 

462 self._record_canonical_resolution(resolved=False) 

463 return entity.id 

464 

465 def _resolution_score( 

466 self, 

467 match: EntityCandidateMatch, 

468 nearby_names: set[str], 

469 event_date: datetime, 

470 ) -> float: 

471 """Unified score across the cascade tiers (legacy entry point).""" 

472 score, _ = self._resolution_score_with_tier(match, nearby_names, event_date) 

473 return score 

474 

475 def _resolution_score_with_tier( 

476 self, 

477 match: EntityCandidateMatch, 

478 nearby_names: set[str], 

479 event_date: datetime, 

480 ) -> tuple[float, str | None]: 

481 """Tier-aware unified score. 

482 

483 Returns ``(score, tier_label)`` where tier_label is one of 

484 ``"trigram_autolink"``, ``"embedding_autolink"``, 

485 ``"composite_autolink"`` (matching the names recorded by the 

486 benchmark metrics collector), or ``None`` when no tier qualifies. 

487 """ 

488 name_sim = match.name_similarity 

489 emb_sim = match.embedding_similarity or 0.0 

490 composite = self._composite_score(name_sim, match, nearby_names, event_date) 

491 

492 candidates: list[tuple[float, str]] = [] 

493 if name_sim >= self.autolink_threshold: 

494 candidates.append((name_sim, "trigram_autolink")) 

495 if emb_sim >= self.embedding_autolink_threshold: 

496 candidates.append((emb_sim, "embedding_autolink")) 

497 if composite >= self.composite_threshold: 

498 candidates.append((composite, "composite_autolink")) 

499 

500 if not candidates: 

501 return 0.0, None 

502 candidates.sort(key=lambda x: x[0], reverse=True) 

503 return candidates[0] 

504 

505 def _record_cascade(self, decision: str, *, composite_score: float | None = None) -> None: 

506 """Forward decision to the optional metrics collector. No-op when 

507 no collector is attached. Caller-side error suppression — metrics 

508 observability must never break a real call.""" 

509 collector = self.metrics_collector 

510 if collector is None: 

511 return 

512 try: 

513 collector.record_cascade_decision(decision, composite_score=composite_score) 

514 except Exception: 

515 pass # metrics are best-effort; never let collection failures disrupt the pipeline 

516 

517 def _record_canonical_resolution(self, *, resolved: bool) -> None: 

518 collector = self.metrics_collector 

519 if collector is None: 

520 return 

521 try: 

522 collector.record_canonical_resolution(resolved=resolved) 

523 except Exception: 

524 pass # metrics are best-effort; never let collection failures disrupt the pipeline 

525 

526 async def _resolve_one_entity( 

527 self, 

528 entity: Entity, 

529 source_text: str, 

530 bank_id: str, 

531 graph_store: GraphStore, 

532 llm_provider: LLMProvider, 

533 *, 

534 nearby_names: set[str], 

535 event_date: datetime, 

536 ) -> list[EntityLink]: 

537 """Cascade for a single new entity. Candidates are processed concurrently.""" 

538 scored = await self._score_candidates(entity, bank_id, graph_store) 

539 if not scored: 

540 return [] 

541 

542 eligible = [match for match in scored[: self.max_candidates_per_entity] if match.entity.id != entity.id] 

543 if not eligible: 

544 return [] 

545 

546 results = await asyncio.gather( 

547 *[ 

548 self._handle_candidate( 

549 entity, 

550 match, 

551 source_text, 

552 bank_id, 

553 graph_store, 

554 llm_provider, 

555 nearby_names=nearby_names, 

556 event_date=event_date, 

557 ) 

558 for match in eligible 

559 ] 

560 ) 

561 return [link for link in results if link is not None] 

562 

563 async def _handle_candidate( 

564 self, 

565 entity: Entity, 

566 match: EntityCandidateMatch, 

567 source_text: str, 

568 bank_id: str, 

569 graph_store: GraphStore, 

570 llm_provider: LLMProvider, 

571 *, 

572 nearby_names: set[str], 

573 event_date: datetime, 

574 ) -> EntityLink | None: 

575 """Route a single (entity, candidate) pair through the cascade tiers.""" 

576 candidate = match.entity 

577 name_sim = match.name_similarity 

578 emb_sim = match.embedding_similarity 

579 

580 # Tier A: trigram autolink — names are effectively identical. 

581 if name_sim >= self.autolink_threshold: 

582 self._record_cascade("trigram_autolink") 

583 return await self._autolink( 

584 entity, 

585 candidate, 

586 name_sim, 

587 "trigram", 

588 bank_id, 

589 graph_store, 

590 ) 

591 

592 # Tier B: embedding autolink — semantic alias (Bob/Robert). 

593 if emb_sim is not None and emb_sim >= self.embedding_autolink_threshold: 

594 self._record_cascade("embedding_autolink") 

595 return await self._autolink( 

596 entity, 

597 candidate, 

598 emb_sim, 

599 "embedding", 

600 bank_id, 

601 graph_store, 

602 ) 

603 

604 # Tier C: Hindsight-style composite — combine name similarity with 

605 # cooccurrence-overlap and temporal proximity. Catches cases where 

606 # name alone is borderline (e.g. ``"Bob"`` vs ``"Robert"``) but the 

607 # broader context (same friends mentioned, recent activity) gives 

608 # the resolver enough evidence to autolink without an LLM. 

609 composite = self._composite_score( 

610 name_sim, 

611 match, 

612 nearby_names, 

613 event_date, 

614 ) 

615 if composite >= self.composite_threshold: 

616 self._record_cascade("composite_autolink", composite_score=composite) 

617 return await self._autolink( 

618 entity, 

619 candidate, 

620 composite, 

621 "composite", 

622 bank_id, 

623 graph_store, 

624 ) 

625 

626 # Combined-signal skip: cheap signals say "no" — embedding has been 

627 # consulted (when available) and the composite didn't cross the 

628 # threshold either. Drop without an LLM. 

629 combined = max(name_sim, emb_sim or 0.0) 

630 if combined < self.skip_threshold: 

631 _logger.debug( 

632 "entity resolution: skipping %r vs %r (name_sim=%.2f, emb_sim=%s, composite=%.2f, max < %.2f)", 

633 entity.name, 

634 candidate.name, 

635 name_sim, 

636 f"{emb_sim:.2f}" if emb_sim is not None else "n/a", 

637 composite, 

638 self.skip_threshold, 

639 ) 

640 self._record_cascade("skipped", composite_score=composite) 

641 return None 

642 

643 # Tier D: ambiguous middle band — escalate to LLM (or skip when 

644 # ``enable_llm_disambiguation`` is False, matching Hindsight's 

645 # LLM-free design). 

646 if not self.enable_llm_disambiguation: 

647 _logger.debug( 

648 "entity resolution: skipping ambiguous %r vs %r (LLM tier disabled, name_sim=%.2f, composite=%.2f)", 

649 entity.name, 

650 candidate.name, 

651 name_sim, 

652 composite, 

653 ) 

654 self._record_cascade("skipped_no_llm", composite_score=composite) 

655 return None 

656 self._record_cascade("llm_disambiguation", composite_score=composite) 

657 return await self._confirm_and_link( 

658 entity, 

659 candidate, 

660 source_text, 

661 bank_id, 

662 graph_store, 

663 llm_provider, 

664 ) 

665 

666 def _composite_score( 

667 self, 

668 name_sim: float, 

669 match: EntityCandidateMatch, 

670 nearby_names: set[str], 

671 event_date: datetime, 

672 ) -> float: 

673 """Weighted-sum signal blending name + cooccurrence + temporal. 

674 

675 Mirrors Hindsight's retain-time scoring (paper §4.1.3). Each 

676 component is in ``[0, 1]``; weights sum to 1.0 so the composite 

677 is also in ``[0, 1]``. Missing inputs (no nearby names, no 

678 ``last_seen``) contribute ``0`` for that signal — the composite 

679 remains well-defined even when the cooccurrence/temporal data 

680 isn't populated yet. 

681 """ 

682 # Cooccurrence overlap: Jaccard-style intersection of nearby names 

683 # with the candidate's stored co-occurring entity names. Both sides 

684 # are lowercased and stripped on the way in. 

685 cooc_score = 0.0 

686 if nearby_names and match.co_occurring_names: 

687 candidate_names = {n for n in match.co_occurring_names if n} 

688 if candidate_names: 

689 overlap = len(nearby_names & candidate_names) 

690 # Normalise by the smaller side to keep the signal in [0,1] 

691 # without penalising large candidate co-occurrence sets. 

692 cooc_score = overlap / min(len(nearby_names), len(candidate_names)) 

693 

694 # Temporal decay: 1.0 if seen today, 0.0 once outside the window. 

695 temporal_score = 0.0 

696 if match.last_seen is not None: 

697 last_seen = match.last_seen 

698 now = event_date 

699 if last_seen.tzinfo is None: 

700 last_seen = last_seen.replace(tzinfo=timezone.utc) 

701 if now.tzinfo is None: 

702 now = now.replace(tzinfo=timezone.utc) 

703 days_diff = abs((now - last_seen).total_seconds() / 86400.0) 

704 if days_diff < self.composite_temporal_window_days: 

705 temporal_score = 1.0 - (days_diff / self.composite_temporal_window_days) 

706 

707 base = ( 

708 self.composite_name_weight * name_sim 

709 + self.composite_cooccurrence_weight * cooc_score 

710 + self.composite_temporal_weight * temporal_score 

711 ) 

712 

713 # Hindsight-parity popularity bonus — log-scaled so a 10-mention 

714 # entity scores roughly half as much as a 50-mention one, and a 

715 # 100-mention entity is fully saturated. Hard-capped at 

716 # ``mention_count_bonus_cap`` so this signal can never on its own 

717 # promote a candidate past the composite threshold; it only 

718 # breaks ties between near-equal name/cooccurrence/temporal blends. 

719 if self.mention_count_bonus_cap > 0.0 and match.mention_count > 1: 

720 import math 

721 

722 saturation = max(1, self.mention_count_saturation) 

723 bonus = self.mention_count_bonus_cap * min( 

724 1.0, 

725 math.log1p(match.mention_count) / math.log1p(saturation), 

726 ) 

727 base += bonus 

728 

729 return base 

730 

731 async def _score_candidates( 

732 self, 

733 entity: Entity, 

734 bank_id: str, 

735 graph_store: GraphStore, 

736 ) -> list[EntityCandidateMatch]: 

737 """Fetch candidates from the graph store with all signals populated. 

738 

739 Tries the new ``find_entity_candidates_scored`` first (returns 

740 :class:`EntityCandidateMatch` instances with name / embedding / 

741 cooccurrence / temporal signals). Falls back to the legacy 

742 ``find_entity_candidates`` plus Python-level string scoring when 

743 the adapter hasn't implemented the scored variant — old adapters 

744 keep working, they just lose the embedding, cooccurrence, and 

745 temporal tiers. 

746 """ 

747 scored_method = getattr(graph_store, "find_entity_candidates_scored", None) 

748 if scored_method is not None: 

749 try: 

750 matches = await scored_method( 

751 entity.name, 

752 bank_id, 

753 name_embedding=entity.embedding, 

754 trigram_threshold=self.trigram_threshold, 

755 limit=self.max_candidates_per_entity + 5, 

756 ) 

757 except Exception as exc: 

758 _logger.warning( 

759 "find_entity_candidates_scored failed for %r: %s — falling back to legacy path", 

760 entity.name, 

761 exc, 

762 ) 

763 matches = None 

764 if matches is not None: 

765 return list(matches) 

766 

767 # Legacy fallback: adapter only implements find_entity_candidates. 

768 try: 

769 candidates = await graph_store.find_entity_candidates( 

770 entity.name, 

771 bank_id, 

772 threshold=self.similarity_threshold, 

773 limit=self.max_candidates_per_entity + 5, 

774 ) 

775 except Exception as exc: 

776 _logger.warning("find_entity_candidates failed for %r: %s", entity.name, exc) 

777 return [] 

778 

779 return [ 

780 EntityCandidateMatch( 

781 entity=c, 

782 name_similarity=_name_similarity(entity.name, c.name), 

783 embedding_similarity=None, 

784 ) 

785 for c in candidates 

786 ] 

787 

788 async def _autolink( 

789 self, 

790 entity_a: Entity, 

791 entity_b: Entity, 

792 score: float, 

793 tier: str, 

794 bank_id: str, 

795 graph_store: GraphStore, 

796 ) -> EntityLink | None: 

797 """Persist an alias-of link without LLM confirmation. 

798 

799 Used when one of the cheap signals (trigram or embedding) is high 

800 enough that LLM consultation would be wasted spend. The persisted 

801 ``evidence`` records both the tier and the score so audits can 

802 distinguish trigram autolinks, embedding autolinks, and 

803 LLM-confirmed links from one another. 

804 """ 

805 link = EntityLink( 

806 entity_a=entity_a.id, 

807 entity_b=entity_b.id, 

808 link_type="alias_of", 

809 evidence=f"{tier} similarity {score:.2f}", 

810 confidence=self.autolink_confidence, 

811 created_at=datetime.now(timezone.utc), 

812 ) 

813 try: 

814 await graph_store.store_entity_link(link, bank_id) 

815 except Exception as exc: 

816 _logger.warning( 

817 "store_entity_link (autolink/%s) failed for %r <-> %r: %s", 

818 tier, 

819 entity_a.name, 

820 entity_b.name, 

821 exc, 

822 ) 

823 return None 

824 

825 _logger.debug( 

826 "entity resolution: autolinked (%s) %r (%s) alias_of %r (%s) score=%.2f", 

827 tier, 

828 entity_a.name, 

829 entity_a.id, 

830 entity_b.name, 

831 entity_b.id, 

832 score, 

833 ) 

834 return link 

835 

836 async def _confirm_and_link( 

837 self, 

838 entity_a: Entity, 

839 entity_b: Entity, 

840 source_text: str, 

841 bank_id: str, 

842 graph_store: GraphStore, 

843 llm_provider: LLMProvider, 

844 ) -> EntityLink | None: 

845 """Ask the LLM to confirm whether two entities are aliases, then store the link.""" 

846 import json 

847 

848 messages = [ 

849 Message(role="system", content=_SYSTEM_PROMPT), 

850 Message(role="user", content=_build_user_message(entity_a, entity_b, source_text)), 

851 ] 

852 

853 try: 

854 completion = await llm_provider.complete(messages, max_tokens=256, temperature=0.0) 

855 raw = (completion.text or "").strip() 

856 except Exception as exc: 

857 _logger.warning("LLM confirmation failed for %r <-> %r: %s", entity_a.name, entity_b.name, exc) 

858 return None 

859 

860 # Strip markdown fences 

861 if raw.startswith("```"): 

862 lines = raw.splitlines() 

863 raw = "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:]) 

864 

865 try: 

866 data = json.loads(raw) 

867 except json.JSONDecodeError: 

868 _logger.warning( 

869 "entity resolution LLM returned non-JSON for %r <-> %r: %r", 

870 entity_a.name, 

871 entity_b.name, 

872 raw[:120], 

873 ) 

874 return None 

875 

876 same = bool(data.get("same_entity", False)) 

877 if not same: 

878 return None 

879 

880 confidence_raw = data.get("confidence", 0.0) 

881 try: 

882 confidence = max(0.0, min(1.0, float(confidence_raw))) 

883 except (TypeError, ValueError): 

884 confidence = 0.0 

885 

886 if confidence < self.confirmation_threshold: 

887 _logger.debug( 

888 "entity resolution: %r <-> %r confidence %.2f below threshold %.2f — skipped", 

889 entity_a.name, 

890 entity_b.name, 

891 confidence, 

892 self.confirmation_threshold, 

893 ) 

894 return None 

895 

896 evidence = str(data.get("evidence", "")) 

897 

898 link = EntityLink( 

899 entity_a=entity_a.id, 

900 entity_b=entity_b.id, 

901 link_type="alias_of", 

902 evidence=evidence, 

903 confidence=confidence, 

904 created_at=datetime.now(timezone.utc), 

905 ) 

906 

907 try: 

908 await graph_store.store_entity_link(link, bank_id) 

909 except Exception as exc: 

910 _logger.warning("store_entity_link failed for %r <-> %r: %s", entity_a.name, entity_b.name, exc) 

911 return None 

912 

913 _logger.info( 

914 "entity resolution: linked %r (%s) alias_of %r (%s) confidence=%.2f", 

915 entity_a.name, 

916 entity_a.id, 

917 entity_b.name, 

918 entity_b.id, 

919 confidence, 

920 ) 

921 return link