Coverage for astrocyte/identity.py: 91%
65 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""Identity resolution, OBO permission intersection, and bank resolution (M1 / v0.5.0 with M2).
3See ``docs/_design/adr/adr-002-identity-model.md``.
4"""
6from __future__ import annotations
8from astrocyte.types import AccessGrant, ActorIdentity, AstrocyteContext
11def format_principal(actor: ActorIdentity) -> str:
12 """Serialize actor to the grant-matching principal form ``{type}:{id}``."""
13 return f"{actor.type}:{actor.id}"
16def parse_principal(principal: str) -> ActorIdentity:
17 """Parse ``principal`` string into :class:`ActorIdentity` (ADR-002).
19 - ``agent:X`` / ``user:X`` / ``service:X`` → typed id ``X``
20 - no prefix → treated as ``user:{principal}``
21 """
22 for prefix, typ in (("agent:", "agent"), ("user:", "user"), ("service:", "service")):
23 if principal.startswith(prefix):
24 return ActorIdentity(type=typ, id=principal[len(prefix) :])
25 return ActorIdentity(type="user", id=principal)
28def resolve_actor(context: AstrocyteContext) -> ActorIdentity | None:
29 """Resolved actor: explicit ``context.actor``, else parse ``context.principal``."""
30 if context.actor is not None:
31 return context.actor
32 if context.principal:
33 return parse_principal(context.principal)
34 return None
37def context_principal_label(context: AstrocyteContext) -> str:
38 """Short label for logs and :class:`~astrocyte.errors.AccessDenied` messages."""
39 actor = resolve_actor(context)
40 base = format_principal(actor) if actor else "anonymous"
41 if context.on_behalf_of is not None:
42 return f"{base} obo {format_principal(context.on_behalf_of)}"
43 return base
46def _permissions_for_principal_on_bank(
47 grants: list[AccessGrant],
48 bank_id: str,
49 principal_str: str,
50) -> set[str]:
51 out: set[str] = set()
52 for g in grants:
53 bank_match = g.bank_id == "*" or g.bank_id == bank_id
54 principal_match = g.principal == "*" or g.principal == principal_str
55 if bank_match and principal_match:
56 out.update(g.permissions)
57 return out
60def effective_permissions(
61 context: AstrocyteContext,
62 grants: list[AccessGrant],
63 bank_id: str,
64) -> set[str]:
65 """Effective permission set for ``bank_id``, including OBO intersection."""
66 actor = resolve_actor(context)
67 if actor is None:
68 return set()
69 actor_perms = _permissions_for_principal_on_bank(grants, bank_id, format_principal(actor))
70 if context.on_behalf_of is None:
71 return actor_perms
72 obo_perms = _permissions_for_principal_on_bank(grants, bank_id, format_principal(context.on_behalf_of))
73 return actor_perms & obo_perms
76def accessible_read_banks(
77 context: AstrocyteContext,
78 grants: list[AccessGrant],
79 *,
80 known_bank_ids: list[str] | None = None,
81 resolver: BankResolver | None = None,
82) -> list[str]:
83 """Bank IDs where ``context`` has effective ``read`` after OBO intersection.
85 Candidates are: non-wildcard ``bank_id`` values from ``grants``, optional
86 ``known_bank_ids``, and (when no other candidates) a convention bank from
87 ``resolver`` for the resolved actor so wildcard-only ACL rows can still resolve.
88 """
89 candidates: set[str] = {g.bank_id for g in grants if g.bank_id != "*"}
90 if known_bank_ids:
91 candidates.update(known_bank_ids)
93 actor = resolve_actor(context)
94 if not candidates and resolver is not None and actor is not None:
95 suggested = resolver.default_bank_id(actor)
96 if suggested is not None:
97 candidates.add(suggested)
99 out: list[str] = []
100 for bid in sorted(candidates):
101 if "read" in effective_permissions(context, grants, bid):
102 out.append(bid)
103 return out
106class BankResolver:
107 """Convention-based default bank id from a resolved :class:`ActorIdentity`."""
109 def __init__(
110 self,
111 *,
112 user_prefix: str = "user-",
113 agent_prefix: str = "agent-",
114 service_prefix: str = "service-",
115 ) -> None:
116 self._user_prefix = user_prefix
117 self._agent_prefix = agent_prefix
118 self._service_prefix = service_prefix
120 def default_bank_id(self, actor: ActorIdentity) -> str | None:
121 if actor.type == "user":
122 return f"{self._user_prefix}{actor.id}"
123 if actor.type == "agent":
124 return f"{self._agent_prefix}{actor.id}"
125 if actor.type == "service":
126 return f"{self._service_prefix}{actor.id}"
127 return None