Coverage for astrocyte/identity.py: 91%

65 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""Identity resolution, OBO permission intersection, and bank resolution (M1 / v0.5.0 with M2). 

2 

3See ``docs/_design/adr/adr-002-identity-model.md``. 

4""" 

5 

6from __future__ import annotations 

7 

8from astrocyte.types import AccessGrant, ActorIdentity, AstrocyteContext 

9 

10 

11def format_principal(actor: ActorIdentity) -> str: 

12 """Serialize actor to the grant-matching principal form ``{type}:{id}``.""" 

13 return f"{actor.type}:{actor.id}" 

14 

15 

16def parse_principal(principal: str) -> ActorIdentity: 

17 """Parse ``principal`` string into :class:`ActorIdentity` (ADR-002). 

18 

19 - ``agent:X`` / ``user:X`` / ``service:X`` → typed id ``X`` 

20 - no prefix → treated as ``user:{principal}`` 

21 """ 

22 for prefix, typ in (("agent:", "agent"), ("user:", "user"), ("service:", "service")): 

23 if principal.startswith(prefix): 

24 return ActorIdentity(type=typ, id=principal[len(prefix) :]) 

25 return ActorIdentity(type="user", id=principal) 

26 

27 

28def resolve_actor(context: AstrocyteContext) -> ActorIdentity | None: 

29 """Resolved actor: explicit ``context.actor``, else parse ``context.principal``.""" 

30 if context.actor is not None: 

31 return context.actor 

32 if context.principal: 

33 return parse_principal(context.principal) 

34 return None 

35 

36 

37def context_principal_label(context: AstrocyteContext) -> str: 

38 """Short label for logs and :class:`~astrocyte.errors.AccessDenied` messages.""" 

39 actor = resolve_actor(context) 

40 base = format_principal(actor) if actor else "anonymous" 

41 if context.on_behalf_of is not None: 

42 return f"{base} obo {format_principal(context.on_behalf_of)}" 

43 return base 

44 

45 

46def _permissions_for_principal_on_bank( 

47 grants: list[AccessGrant], 

48 bank_id: str, 

49 principal_str: str, 

50) -> set[str]: 

51 out: set[str] = set() 

52 for g in grants: 

53 bank_match = g.bank_id == "*" or g.bank_id == bank_id 

54 principal_match = g.principal == "*" or g.principal == principal_str 

55 if bank_match and principal_match: 

56 out.update(g.permissions) 

57 return out 

58 

59 

60def effective_permissions( 

61 context: AstrocyteContext, 

62 grants: list[AccessGrant], 

63 bank_id: str, 

64) -> set[str]: 

65 """Effective permission set for ``bank_id``, including OBO intersection.""" 

66 actor = resolve_actor(context) 

67 if actor is None: 

68 return set() 

69 actor_perms = _permissions_for_principal_on_bank(grants, bank_id, format_principal(actor)) 

70 if context.on_behalf_of is None: 

71 return actor_perms 

72 obo_perms = _permissions_for_principal_on_bank(grants, bank_id, format_principal(context.on_behalf_of)) 

73 return actor_perms & obo_perms 

74 

75 

76def accessible_read_banks( 

77 context: AstrocyteContext, 

78 grants: list[AccessGrant], 

79 *, 

80 known_bank_ids: list[str] | None = None, 

81 resolver: BankResolver | None = None, 

82) -> list[str]: 

83 """Bank IDs where ``context`` has effective ``read`` after OBO intersection. 

84 

85 Candidates are: non-wildcard ``bank_id`` values from ``grants``, optional 

86 ``known_bank_ids``, and (when no other candidates) a convention bank from 

87 ``resolver`` for the resolved actor so wildcard-only ACL rows can still resolve. 

88 """ 

89 candidates: set[str] = {g.bank_id for g in grants if g.bank_id != "*"} 

90 if known_bank_ids: 

91 candidates.update(known_bank_ids) 

92 

93 actor = resolve_actor(context) 

94 if not candidates and resolver is not None and actor is not None: 

95 suggested = resolver.default_bank_id(actor) 

96 if suggested is not None: 

97 candidates.add(suggested) 

98 

99 out: list[str] = [] 

100 for bid in sorted(candidates): 

101 if "read" in effective_permissions(context, grants, bid): 

102 out.append(bid) 

103 return out 

104 

105 

106class BankResolver: 

107 """Convention-based default bank id from a resolved :class:`ActorIdentity`.""" 

108 

109 def __init__( 

110 self, 

111 *, 

112 user_prefix: str = "user-", 

113 agent_prefix: str = "agent-", 

114 service_prefix: str = "service-", 

115 ) -> None: 

116 self._user_prefix = user_prefix 

117 self._agent_prefix = agent_prefix 

118 self._service_prefix = service_prefix 

119 

120 def default_bank_id(self, actor: ActorIdentity) -> str | None: 

121 if actor.type == "user": 

122 return f"{self._user_prefix}{actor.id}" 

123 if actor.type == "agent": 

124 return f"{self._agent_prefix}{actor.id}" 

125 if actor.type == "service": 

126 return f"{self._service_prefix}{actor.id}" 

127 return None