Coverage for astrocyte/tenancy.py: 100%
58 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""Schema-per-tenant primitives.
3Astrocyte supports tenant isolation at the **PostgreSQL schema** level. Each
4tenant gets a dedicated schema (e.g. ``tenant_acme``); every adapter SQL string
5that references a table goes through :func:`fq_table` so the table reference
6gets prefixed with the active tenant's schema name.
8The active schema for a request is set via ``contextvars.ContextVar``, which
9gives us automatic per-async-task isolation: concurrent requests for different
10tenants never see each other's schema, and no thread-locals or manual passing
11through every function are required.
13## Public surface
15- :class:`TenantContext`, :class:`Tenant`, :class:`TenantExtension`,
16 :class:`DefaultTenantExtension` — the pluggable auth contract.
17 Implement :class:`TenantExtension` to map an inbound request to a schema.
18- :func:`get_current_schema` — the active schema (defaults to ``"public"``).
19- :func:`fq_table` — table-name helper used by every adapter SQL string.
20- :func:`use_schema` — context manager / decorator helper for binding the
21 schema for a block of code (used by gateway middleware and workers).
23## Design rationale
25- **ContextVar over thread-local**: we run async; ContextVar is the correct
26 primitive and propagates across ``asyncio.create_task`` automatically.
27- **Default schema = ``"public"``**: existing single-schema deployments keep
28 working without code changes. Schema-per-tenant is opt-in via a custom
29 :class:`TenantExtension`.
30- **No global mutable state besides the ContextVar**: tests can call
31 ``use_schema("test_xyz")`` to scope a block.
32- **Identifier validation in :func:`fq_table`**: schema and table names are
33 validated against a strict regex (alphanumeric + underscore). Anything
34 else raises ``ValueError``. This is the *only* defense against SQL injection
35 through the schema name — there is no way to safely parameterize an
36 identifier in PostgreSQL.
37"""
39from __future__ import annotations
41import contextvars
42import re
43from abc import ABC, abstractmethod
44from contextlib import contextmanager
45from dataclasses import dataclass
46from typing import Any
48# ---------------------------------------------------------------------------
49# Identifier validation
50# ---------------------------------------------------------------------------
52#: Regex allowed for schema and table names. Postgres unquoted identifiers
53#: tolerate a wider set, but we restrict to ``[a-zA-Z_][a-zA-Z0-9_]*`` so
54#: identifiers can never need quoting and SQL-injection through this surface
55#: is structurally impossible.
56_IDENT_RE = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$")
58#: Default schema when no tenant context is active. Single-schema deployments
59#: never need to set anything; multi-tenant deployments override this per
60#: request via :func:`use_schema` or by setting :data:`_current_schema`.
61DEFAULT_SCHEMA = "public"
64def _validate_identifier(value: str, *, label: str) -> str:
65 """Validate an identifier against the safe-character regex.
67 Raises ``ValueError`` if the identifier contains anything that would need
68 quoting. This is the SQL-injection guard — never relax it.
69 """
70 if not isinstance(value, str) or not _IDENT_RE.match(value):
71 raise ValueError(f"{label} {value!r} is not a valid PostgreSQL identifier (must match {_IDENT_RE.pattern})")
72 return value
75# ---------------------------------------------------------------------------
76# Schema context (per-request, propagates across asyncio.create_task)
77# ---------------------------------------------------------------------------
79#: Per-request active schema. ``None`` means "use :data:`DEFAULT_SCHEMA`".
80_current_schema: contextvars.ContextVar[str | None] = contextvars.ContextVar(
81 "astrocyte.tenancy.current_schema",
82 default=None,
83)
86def get_current_schema() -> str:
87 """Return the active schema name (defaults to :data:`DEFAULT_SCHEMA`)."""
88 return _current_schema.get() or DEFAULT_SCHEMA
91def set_current_schema(schema: str) -> contextvars.Token:
92 """Bind ``schema`` as the active schema and return a token for reset.
94 Prefer :func:`use_schema` (context manager) when possible — it can't
95 forget to reset. ``set_current_schema`` is provided for middleware that
96 needs to bind manually around try/finally.
97 """
98 _validate_identifier(schema, label="schema")
99 return _current_schema.set(schema)
102def reset_current_schema(token: contextvars.Token) -> None:
103 """Restore the previous schema after :func:`set_current_schema`."""
104 _current_schema.reset(token)
107@contextmanager
108def use_schema(schema: str):
109 """Bind ``schema`` as the active schema for the lifetime of the block.
111 Example::
113 with use_schema("tenant_acme"):
114 await store.search_similar(...)
115 """
116 token = set_current_schema(schema)
117 try:
118 yield
119 finally:
120 reset_current_schema(token)
123# ---------------------------------------------------------------------------
124# Fully-qualified table-name helper
125# ---------------------------------------------------------------------------
128def fq_table(table_name: str, *, schema: str | None = None) -> str:
129 """Return ``"<schema>"."<table>"`` using the active or explicit schema.
131 Every adapter SQL string that references a tenant-scoped table goes
132 through this helper. Use the explicit ``schema`` parameter only from
133 workers/jobs that don't run in a request context; everything else relies
134 on the ContextVar via :func:`get_current_schema`.
136 Both schema and table identifiers are validated; anything that would
137 require quoting raises :class:`ValueError`. Output is intentionally
138 double-quoted on both halves so the result is safe regardless of any
139 Postgres reserved-word behaviour.
140 """
141 schema = schema if schema is not None else get_current_schema()
142 _validate_identifier(schema, label="schema")
143 _validate_identifier(table_name, label="table")
144 return f'"{schema}"."{table_name}"'
147def fq_function(function_name: str, *, schema: str | None = None) -> str:
148 """Return ``"<schema>"."<function>"`` (for trigger functions, etc.)."""
149 schema = schema if schema is not None else get_current_schema()
150 _validate_identifier(schema, label="schema")
151 _validate_identifier(function_name, label="function")
152 return f'"{schema}"."{function_name}"'
155# ---------------------------------------------------------------------------
156# Tenant extension contract
157# ---------------------------------------------------------------------------
160@dataclass(frozen=True)
161class TenantContext:
162 """Result of authenticating a request.
164 A tenant context is *just* the schema name. Anything richer (entitlements,
165 quotas, feature flags) belongs in higher-level auth/policy layers — this
166 is the minimal contract needed for storage isolation.
167 """
169 schema_name: str
172@dataclass(frozen=True)
173class Tenant:
174 """A tenant the worker should poll for background tasks."""
176 schema: str
179class AuthenticationError(Exception):
180 """Raised by a :class:`TenantExtension` when auth fails."""
183class TenantExtension(ABC):
184 """Pluggable contract: map a request to a Postgres schema.
186 Implementations decide *how* a request maps to a tenant — API key lookup,
187 JWT claim, mTLS subject, environment, etc. The only thing the storage
188 layer needs back is the schema name to bind for that request.
189 """
191 @abstractmethod
192 async def authenticate(self, context: Any) -> TenantContext:
193 """Validate ``context`` and return the schema the request should run in.
195 ``context`` is intentionally typed as :class:`Any` because the request
196 shape varies across HTTP, MCP, gRPC, in-process, etc. Implementations
197 should accept whatever their transport layer provides and produce a
198 :class:`TenantContext`.
200 Raises :class:`AuthenticationError` on failure.
201 """
203 @abstractmethod
204 async def list_tenants(self) -> list[Tenant]:
205 """List all tenants whose schemas should be polled by background workers.
207 Single-tenant deployments return ``[Tenant(schema=DEFAULT_SCHEMA)]``.
208 """
211class DefaultTenantExtension(TenantExtension):
212 """Single-tenant default: no authentication, fixed schema.
214 Use this when you don't need tenant isolation. It returns the same schema
215 for every request. ``schema`` defaults to :data:`DEFAULT_SCHEMA` (``"public"``)
216 so existing single-schema deployments keep working without configuration.
218 For real multi-tenant setups, write a custom :class:`TenantExtension` that
219 looks up the schema for each request (e.g., from an API-key table or JWT
220 claim) and configure it via the gateway's tenant-extension hook.
221 """
223 def __init__(self, schema: str = DEFAULT_SCHEMA) -> None:
224 _validate_identifier(schema, label="schema")
225 self._schema = schema
227 async def authenticate(self, context: Any) -> TenantContext: # noqa: ARG002 — context unused
228 return TenantContext(schema_name=self._schema)
230 async def list_tenants(self) -> list[Tenant]:
231 return [Tenant(schema=self._schema)]
234__all__ = [
235 "AuthenticationError",
236 "DEFAULT_SCHEMA",
237 "DefaultTenantExtension",
238 "Tenant",
239 "TenantContext",
240 "TenantExtension",
241 "fq_function",
242 "fq_table",
243 "get_current_schema",
244 "reset_current_schema",
245 "set_current_schema",
246 "use_schema",
247]