Coverage for astrocyte/identity_jwt.py: 100%
55 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""JWT claim classification → :class:`ActorIdentity`.
3This module is intentionally pure: no network I/O, no dependency on
4``PyJWT`` or any specific token-signing library. It takes a decoded claim
5dict and returns an :class:`ActorIdentity`. The MCP transport layer is
6responsible for extracting, decoding, and signature-validating the token
7before handing the claim dict here; see
8``docs/_plugins/jwt-identity-middleware.md``.
10The split exists because:
121. Classification rules (which claim signals a user token vs a service
13 account credential) are stable and testable offline.
142. Token decoding and JWKS handling are deployment-specific (IdP, algorithm
15 suite, audience, clock skew) and belong in the MCP wiring layer.
163. Keeping the core pure means the whole ruleset can be exercised by unit
17 tests with fake claim dicts — no test fixtures, no expired-token
18 machinery, no JWKS mocks.
20References:
22- ``docs/_design/astrocyte_identity_spec.md`` §3 Gap 1 (the full spec that
23 this implements, modulo the MCP wiring which lives in ``astrocyte.mcp``).
24- ``docs/_design/adr/adr-002-identity-model.md`` (why :class:`ActorIdentity`
25 already uses ``type`` ∈ {user, agent, service} and how JWT classification
26 fits into it).
27"""
29from __future__ import annotations
31from typing import Any
33from astrocyte.errors import AuthorizationError
34from astrocyte.types import ActorIdentity
36# ---------------------------------------------------------------------------
37# Claim name constants
38# ---------------------------------------------------------------------------
39# Names come from the OpenID Connect spec, OAuth 2.0 JWT profile, and the two
40# largest IdP dialects (Entra ID / Azure AD and Google). Keeping them as
41# constants lets deployments override via a future config without touching
42# classification logic.
44#: Claims that prove a delegated user token (agent acting on behalf of a human).
45#: The first that resolves to a non-empty string is taken as the user's display
46#: identifier (email / UPN). This is used for audit logs, never as a bank key.
47_USER_DISPLAY_CLAIMS: tuple[str, ...] = ("upn", "preferred_username", "email")
49#: Claims that carry the user's immutable subject identifier. The first that
50#: resolves wins. ``oid`` is Entra-specific; ``sub`` is the OIDC standard.
51#: Whichever is used becomes :attr:`ActorIdentity.id` and therefore the bank
52#: key — picking a stable identifier is load-bearing (see spec §3 Gap 1
53#: "Why OID / sub — not email or username — as the bank key").
54_USER_SUBJECT_CLAIMS: tuple[str, ...] = ("oid", "sub")
56#: Claims that carry the service account's application ID. First wins.
57#: ``appid`` and ``azp`` are Entra / OIDC conventions; ``client_id`` appears
58#: in some IdPs. Without one of these we cannot key a service bank.
59_SERVICE_APP_CLAIMS: tuple[str, ...] = ("appid", "azp", "client_id")
61#: Claim that explicitly declares token type in Entra ID v2. When set to
62#: ``"app"`` the token is a service account credential regardless of any
63#: user-looking claims that may also be present.
64_IDTYP_APP_SIGNAL: str = "app"
67# ---------------------------------------------------------------------------
68# Public API
69# ---------------------------------------------------------------------------
72def classify_jwt_claims(
73 claims: dict[str, Any],
74) -> ActorIdentity:
75 """Classify a decoded JWT claim dict into an :class:`ActorIdentity`.
77 The classification rules mirror the spec's §3 Gap 1 decision tree:
79 1. **Service account override**: ``idtyp == "app"`` is an explicit
80 Entra ID v2 signal — respected even if user-looking claims are
81 present, because a service account can legitimately carry a ``sub``.
82 2. **Delegated user token**: any of :data:`_USER_DISPLAY_CLAIMS` is
83 present AND ``idtyp != "app"``. The subject identifier comes from
84 the first resolving :data:`_USER_SUBJECT_CLAIMS` — never from
85 ``email``/``upn`` (those mutate; subject identifiers are stable).
86 3. **Service account (fallback)**: any of :data:`_SERVICE_APP_CLAIMS`
87 is present.
88 4. **Unclassifiable**: raise :class:`AuthorizationError`. Fail closed
89 — we never mint an anonymous identity for a token we successfully
90 decoded but cannot classify.
92 The returned :class:`ActorIdentity` stashes the display identifier,
93 app id, tenant id, and ``idtyp`` into ``claims`` for audit trails;
94 the raw claim bag is not persisted.
96 Args:
97 claims: A decoded and signature-validated claim dict. Caller is
98 responsible for signature verification, expiry, audience, and
99 issuer — those are deployment-specific.
101 Returns:
102 :class:`ActorIdentity` whose ``type`` is ``"user"`` or ``"service"``
103 and whose ``id`` is the stable identifier for that principal. Use
104 :func:`derive_bank_id` (below) to turn it into a bank id.
106 Raises:
107 AuthorizationError: If the claim dict cannot be classified. Never
108 returns a fallback identity — silent fallthrough would route
109 authenticated data into a shared default bank.
110 """
111 if not isinstance(claims, dict):
112 raise AuthorizationError(
113 f"Token claims must be a dict (got {type(claims).__name__}). Decoded token is malformed."
114 )
116 tenant_id = _nonempty_str(claims.get("tid"))
117 idtyp = _nonempty_str(claims.get("idtyp"))
119 # 1. Service account override via explicit idtyp signal.
120 if idtyp == _IDTYP_APP_SIGNAL:
121 return _build_service_identity(claims, tenant_id, idtyp)
123 # 2. Delegated user token — user display claim present and idtyp != app.
124 user_display = _first_nonempty(claims, _USER_DISPLAY_CLAIMS)
125 if user_display is not None:
126 subject = _first_nonempty(claims, _USER_SUBJECT_CLAIMS)
127 if subject is None:
128 raise AuthorizationError(
129 "Delegated user token has a display claim "
130 f"({user_display!r}) but no stable subject claim "
131 f"({list(_USER_SUBJECT_CLAIMS)}). Refusing to route — bank "
132 "key must be stable across email/UPN changes."
133 )
134 stashed = {"upn": user_display}
135 if idtyp is not None:
136 stashed["idtyp"] = idtyp
137 return ActorIdentity(
138 type="user",
139 id=subject,
140 claims=_with_tenant(stashed, tenant_id),
141 )
143 # 3. Service account fallback — app id claim present.
144 if _first_nonempty(claims, _SERVICE_APP_CLAIMS) is not None:
145 return _build_service_identity(claims, tenant_id, idtyp)
147 # 4. Unclassifiable — fail closed.
148 raise AuthorizationError(
149 "Token decoded but identity type could not be determined. "
150 "Expected a delegated user token (one of "
151 f"{list(_USER_DISPLAY_CLAIMS)}) or a service account credential "
152 f"(one of {list(_SERVICE_APP_CLAIMS)}, or idtyp=app). "
153 "Available claim keys: "
154 f"{sorted(claims.keys()) if claims else '[]'}."
155 )
158def derive_bank_id(
159 identity: ActorIdentity,
160 *,
161 service_bank_prefix: str = "service-",
162 user_bank_prefix: str = "user-",
163) -> str:
164 """Bank id for a classified identity, using the caller's prefix choice.
166 Kept separate from :func:`classify_jwt_claims` because the bank-prefix
167 convention is a deployment decision (``svc-`` vs ``service-``) and
168 should not leak into the identity object itself — the identity is
169 stable, bank naming is a policy.
170 """
171 if identity.type == "user":
172 return f"{user_bank_prefix}{identity.id}"
173 if identity.type == "service":
174 return f"{service_bank_prefix}{identity.id}"
175 raise ValueError(
176 f"Cannot derive bank id for identity.type={identity.type!r}; "
177 "only 'user' and 'service' identities are supported by the JWT "
178 "classifier."
179 )
182# ---------------------------------------------------------------------------
183# Helpers
184# ---------------------------------------------------------------------------
187def _build_service_identity(
188 claims: dict[str, Any],
189 tenant_id: str | None,
190 idtyp: str | None,
191) -> ActorIdentity:
192 app_id = _first_nonempty(claims, _SERVICE_APP_CLAIMS)
193 if app_id is None:
194 raise AuthorizationError(
195 "Token classified as service account (idtyp=app) but no app id "
196 f"claim found (expected one of {list(_SERVICE_APP_CLAIMS)})."
197 )
198 stashed = {"app_id": app_id}
199 if idtyp is not None:
200 stashed["idtyp"] = idtyp
201 return ActorIdentity(
202 type="service",
203 id=app_id,
204 claims=_with_tenant(stashed, tenant_id),
205 )
208def _with_tenant(base: dict[str, str], tenant_id: str | None) -> dict[str, str]:
209 if tenant_id is not None:
210 base = {**base, "tenant_id": tenant_id}
211 return base
214def _first_nonempty(claims: dict[str, Any], keys: tuple[str, ...]) -> str | None:
215 for key in keys:
216 value = _nonempty_str(claims.get(key))
217 if value is not None:
218 return value
219 return None
222def _nonempty_str(value: Any) -> str | None:
223 if isinstance(value, str) and value.strip():
224 return value
225 return None