Coverage for astrocyte/ingest/bank_resolve.py: 100%
13 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""Resolve target bank for configured ingest sources (M4)."""
3from __future__ import annotations
5from astrocyte.config import SourceConfig
6from astrocyte.errors import IngestError
9def resolve_ingest_bank_id(config: SourceConfig, *, principal: str | None = None) -> str:
10 """Pick ``target_bank`` or expand ``target_bank_template`` (``{principal}``)."""
11 if config.target_bank and str(config.target_bank).strip():
12 return str(config.target_bank).strip()
13 tpl = config.target_bank_template
14 if tpl and str(tpl).strip():
15 p = (principal or "").strip()
16 if not p:
17 raise IngestError("target_bank_template requires principal (in payload or argument)")
18 return str(tpl).replace("{principal}", p)
19 raise IngestError("source must set target_bank or target_bank_template")