Coverage for astrocyte/ingest/bank_resolve.py: 100%

13 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""Resolve target bank for configured ingest sources (M4).""" 

2 

3from __future__ import annotations 

4 

5from astrocyte.config import SourceConfig 

6from astrocyte.errors import IngestError 

7 

8 

9def resolve_ingest_bank_id(config: SourceConfig, *, principal: str | None = None) -> str: 

10 """Pick ``target_bank`` or expand ``target_bank_template`` (``{principal}``).""" 

11 if config.target_bank and str(config.target_bank).strip(): 

12 return str(config.target_bank).strip() 

13 tpl = config.target_bank_template 

14 if tpl and str(tpl).strip(): 

15 p = (principal or "").strip() 

16 if not p: 

17 raise IngestError("target_bank_template requires principal (in payload or argument)") 

18 return str(tpl).replace("{principal}", p) 

19 raise IngestError("source must set target_bank or target_bank_template")