Coverage for astrocyte/identity_jwt.py: 100%

55 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""JWT claim classification → :class:`ActorIdentity`. 

2 

3This module is intentionally pure: no network I/O, no dependency on 

4``PyJWT`` or any specific token-signing library. It takes a decoded claim 

5dict and returns an :class:`ActorIdentity`. The MCP transport layer is 

6responsible for extracting, decoding, and signature-validating the token 

7before handing the claim dict here; see 

8``docs/_plugins/jwt-identity-middleware.md``. 

9 

10The split exists because: 

11 

121. Classification rules (which claim signals a user token vs a service 

13 account credential) are stable and testable offline. 

142. Token decoding and JWKS handling are deployment-specific (IdP, algorithm 

15 suite, audience, clock skew) and belong in the MCP wiring layer. 

163. Keeping the core pure means the whole ruleset can be exercised by unit 

17 tests with fake claim dicts — no test fixtures, no expired-token 

18 machinery, no JWKS mocks. 

19 

20References: 

21 

22- ``docs/_design/astrocyte_identity_spec.md`` §3 Gap 1 (the full spec that 

23 this implements, modulo the MCP wiring which lives in ``astrocyte.mcp``). 

24- ``docs/_design/adr/adr-002-identity-model.md`` (why :class:`ActorIdentity` 

25 already uses ``type`` ∈ {user, agent, service} and how JWT classification 

26 fits into it). 

27""" 

28 

29from __future__ import annotations 

30 

31from typing import Any 

32 

33from astrocyte.errors import AuthorizationError 

34from astrocyte.types import ActorIdentity 

35 

36# --------------------------------------------------------------------------- 

37# Claim name constants 

38# --------------------------------------------------------------------------- 

39# Names come from the OpenID Connect spec, OAuth 2.0 JWT profile, and the two 

40# largest IdP dialects (Entra ID / Azure AD and Google). Keeping them as 

41# constants lets deployments override via a future config without touching 

42# classification logic. 

43 

44#: Claims that prove a delegated user token (agent acting on behalf of a human). 

45#: The first that resolves to a non-empty string is taken as the user's display 

46#: identifier (email / UPN). This is used for audit logs, never as a bank key. 

47_USER_DISPLAY_CLAIMS: tuple[str, ...] = ("upn", "preferred_username", "email") 

48 

49#: Claims that carry the user's immutable subject identifier. The first that 

50#: resolves wins. ``oid`` is Entra-specific; ``sub`` is the OIDC standard. 

51#: Whichever is used becomes :attr:`ActorIdentity.id` and therefore the bank 

52#: key — picking a stable identifier is load-bearing (see spec §3 Gap 1 

53#: "Why OID / sub — not email or username — as the bank key"). 

54_USER_SUBJECT_CLAIMS: tuple[str, ...] = ("oid", "sub") 

55 

56#: Claims that carry the service account's application ID. First wins. 

57#: ``appid`` and ``azp`` are Entra / OIDC conventions; ``client_id`` appears 

58#: in some IdPs. Without one of these we cannot key a service bank. 

59_SERVICE_APP_CLAIMS: tuple[str, ...] = ("appid", "azp", "client_id") 

60 

61#: Claim that explicitly declares token type in Entra ID v2. When set to 

62#: ``"app"`` the token is a service account credential regardless of any 

63#: user-looking claims that may also be present. 

64_IDTYP_APP_SIGNAL: str = "app" 

65 

66 

67# --------------------------------------------------------------------------- 

68# Public API 

69# --------------------------------------------------------------------------- 

70 

71 

72def classify_jwt_claims( 

73 claims: dict[str, Any], 

74) -> ActorIdentity: 

75 """Classify a decoded JWT claim dict into an :class:`ActorIdentity`. 

76 

77 The classification rules mirror the spec's §3 Gap 1 decision tree: 

78 

79 1. **Service account override**: ``idtyp == "app"`` is an explicit 

80 Entra ID v2 signal — respected even if user-looking claims are 

81 present, because a service account can legitimately carry a ``sub``. 

82 2. **Delegated user token**: any of :data:`_USER_DISPLAY_CLAIMS` is 

83 present AND ``idtyp != "app"``. The subject identifier comes from 

84 the first resolving :data:`_USER_SUBJECT_CLAIMS` — never from 

85 ``email``/``upn`` (those mutate; subject identifiers are stable). 

86 3. **Service account (fallback)**: any of :data:`_SERVICE_APP_CLAIMS` 

87 is present. 

88 4. **Unclassifiable**: raise :class:`AuthorizationError`. Fail closed 

89 — we never mint an anonymous identity for a token we successfully 

90 decoded but cannot classify. 

91 

92 The returned :class:`ActorIdentity` stashes the display identifier, 

93 app id, tenant id, and ``idtyp`` into ``claims`` for audit trails; 

94 the raw claim bag is not persisted. 

95 

96 Args: 

97 claims: A decoded and signature-validated claim dict. Caller is 

98 responsible for signature verification, expiry, audience, and 

99 issuer — those are deployment-specific. 

100 

101 Returns: 

102 :class:`ActorIdentity` whose ``type`` is ``"user"`` or ``"service"`` 

103 and whose ``id`` is the stable identifier for that principal. Use 

104 :func:`derive_bank_id` (below) to turn it into a bank id. 

105 

106 Raises: 

107 AuthorizationError: If the claim dict cannot be classified. Never 

108 returns a fallback identity — silent fallthrough would route 

109 authenticated data into a shared default bank. 

110 """ 

111 if not isinstance(claims, dict): 

112 raise AuthorizationError( 

113 f"Token claims must be a dict (got {type(claims).__name__}). Decoded token is malformed." 

114 ) 

115 

116 tenant_id = _nonempty_str(claims.get("tid")) 

117 idtyp = _nonempty_str(claims.get("idtyp")) 

118 

119 # 1. Service account override via explicit idtyp signal. 

120 if idtyp == _IDTYP_APP_SIGNAL: 

121 return _build_service_identity(claims, tenant_id, idtyp) 

122 

123 # 2. Delegated user token — user display claim present and idtyp != app. 

124 user_display = _first_nonempty(claims, _USER_DISPLAY_CLAIMS) 

125 if user_display is not None: 

126 subject = _first_nonempty(claims, _USER_SUBJECT_CLAIMS) 

127 if subject is None: 

128 raise AuthorizationError( 

129 "Delegated user token has a display claim " 

130 f"({user_display!r}) but no stable subject claim " 

131 f"({list(_USER_SUBJECT_CLAIMS)}). Refusing to route — bank " 

132 "key must be stable across email/UPN changes." 

133 ) 

134 stashed = {"upn": user_display} 

135 if idtyp is not None: 

136 stashed["idtyp"] = idtyp 

137 return ActorIdentity( 

138 type="user", 

139 id=subject, 

140 claims=_with_tenant(stashed, tenant_id), 

141 ) 

142 

143 # 3. Service account fallback — app id claim present. 

144 if _first_nonempty(claims, _SERVICE_APP_CLAIMS) is not None: 

145 return _build_service_identity(claims, tenant_id, idtyp) 

146 

147 # 4. Unclassifiable — fail closed. 

148 raise AuthorizationError( 

149 "Token decoded but identity type could not be determined. " 

150 "Expected a delegated user token (one of " 

151 f"{list(_USER_DISPLAY_CLAIMS)}) or a service account credential " 

152 f"(one of {list(_SERVICE_APP_CLAIMS)}, or idtyp=app). " 

153 "Available claim keys: " 

154 f"{sorted(claims.keys()) if claims else '[]'}." 

155 ) 

156 

157 

158def derive_bank_id( 

159 identity: ActorIdentity, 

160 *, 

161 service_bank_prefix: str = "service-", 

162 user_bank_prefix: str = "user-", 

163) -> str: 

164 """Bank id for a classified identity, using the caller's prefix choice. 

165 

166 Kept separate from :func:`classify_jwt_claims` because the bank-prefix 

167 convention is a deployment decision (``svc-`` vs ``service-``) and 

168 should not leak into the identity object itself — the identity is 

169 stable, bank naming is a policy. 

170 """ 

171 if identity.type == "user": 

172 return f"{user_bank_prefix}{identity.id}" 

173 if identity.type == "service": 

174 return f"{service_bank_prefix}{identity.id}" 

175 raise ValueError( 

176 f"Cannot derive bank id for identity.type={identity.type!r}; " 

177 "only 'user' and 'service' identities are supported by the JWT " 

178 "classifier." 

179 ) 

180 

181 

182# --------------------------------------------------------------------------- 

183# Helpers 

184# --------------------------------------------------------------------------- 

185 

186 

187def _build_service_identity( 

188 claims: dict[str, Any], 

189 tenant_id: str | None, 

190 idtyp: str | None, 

191) -> ActorIdentity: 

192 app_id = _first_nonempty(claims, _SERVICE_APP_CLAIMS) 

193 if app_id is None: 

194 raise AuthorizationError( 

195 "Token classified as service account (idtyp=app) but no app id " 

196 f"claim found (expected one of {list(_SERVICE_APP_CLAIMS)})." 

197 ) 

198 stashed = {"app_id": app_id} 

199 if idtyp is not None: 

200 stashed["idtyp"] = idtyp 

201 return ActorIdentity( 

202 type="service", 

203 id=app_id, 

204 claims=_with_tenant(stashed, tenant_id), 

205 ) 

206 

207 

208def _with_tenant(base: dict[str, str], tenant_id: str | None) -> dict[str, str]: 

209 if tenant_id is not None: 

210 base = {**base, "tenant_id": tenant_id} 

211 return base 

212 

213 

214def _first_nonempty(claims: dict[str, Any], keys: tuple[str, ...]) -> str | None: 

215 for key in keys: 

216 value = _nonempty_str(claims.get(key)) 

217 if value is not None: 

218 return value 

219 return None 

220 

221 

222def _nonempty_str(value: Any) -> str | None: 

223 if isinstance(value, str) and value.strip(): 

224 return value 

225 return None