Skip to content

Isolation Providers

Base

BaseIsolationProvider

Python
BaseIsolationProvider(config: TenancyConfig)

Bases: ABC

Abstract base class for data isolation strategies.

Isolation providers are responsible for:

  1. Scoping sessions — returning a database session configured so that queries automatically target the correct tenant namespace (:meth:get_session).

  2. Filtering queries — adding WHERE clauses or other predicates that enforce per-tenant visibility (:meth:apply_filters).

  3. Provisioning — creating the necessary database structures (schemas, tables, RLS policies) when a new tenant is onboarded (:meth:initialize_tenant).

  4. Deprovisioning — removing all tenant data and structures when a tenant is deleted (:meth:destroy_tenant).

Type safety

:meth:apply_filters is generic over :data:~fastapi_tenancy.core.types.SelectT (bound to sqlalchemy.sql.Select) so that the concrete query type is preserved through the call::

Text Only
q: Select[tuple[User]] = select(User)
filtered: Select[tuple[User]] = await provider.apply_filters(q, tenant)

Parameters:

Name Type Description Default
config TenancyConfig

Application-wide tenancy configuration.

required
Source code in src/fastapi_tenancy/isolation/base.py
Python
def __init__(self, config: TenancyConfig) -> None:
    self.config = config
    logger.debug("Initialised %s", type(self).__name__)

get_session abstractmethod

Python
get_session(
    tenant: Tenant,
) -> AbstractAsyncContextManager[AsyncSession]

Yield a database session scoped to tenant's namespace.

The session must be fully configured before yielding — i.e. the search_path, session variable, or connection must already be set so that subsequent queries are automatically isolated.

Parameters:

Name Type Description Default
tenant Tenant

The tenant whose data namespace should be active.

required

Returns:

Type Description
AbstractAsyncContextManager[AsyncSession]

An async context manager that yields a configured

AbstractAsyncContextManager[AsyncSession]

class:~sqlalchemy.ext.asyncio.AsyncSession.

Raises:

Type Description
IsolationError

When the session cannot be opened or configured.

Example::

Text Only
async with provider.get_session(tenant) as session:
    result = await session.execute(select(User))
    await session.commit()
Source code in src/fastapi_tenancy/isolation/base.py
Python
@abstractmethod
def get_session(self, tenant: Tenant) -> AbstractAsyncContextManager[AsyncSession]:
    """Yield a database session scoped to *tenant*'s namespace.

    The session must be fully configured before yielding — i.e. the
    ``search_path``, session variable, or connection must already be
    set so that subsequent queries are automatically isolated.

    Args:
        tenant: The tenant whose data namespace should be active.

    Returns:
        An async context manager that yields a configured
        :class:`~sqlalchemy.ext.asyncio.AsyncSession`.

    Raises:
        IsolationError: When the session cannot be opened or configured.

    Example::

        async with provider.get_session(tenant) as session:
            result = await session.execute(select(User))
            await session.commit()
    """
    ...

apply_filters abstractmethod async

Python
apply_filters(query: SelectT, tenant: Tenant) -> SelectT

Return query filtered to only expose tenant's data.

The implementation should use SQLAlchemy's .where() method with a properly bound parameter — never string interpolation.

For strategies that already enforce isolation at the session level (e.g. DATABASE isolation with separate databases per tenant), this method may return query unchanged.

The generic bound SelectT ensures the concrete Select subtype is preserved through the call, so mypy can type-check the result.

Parameters:

Name Type Description Default
query SelectT

A SQLAlchemy Select query construct.

required
tenant Tenant

The currently active tenant.

required

Returns:

Type Description
SelectT

The (potentially) filtered query.

Source code in src/fastapi_tenancy/isolation/base.py
Python
@abstractmethod
async def apply_filters(self, query: SelectT, tenant: Tenant) -> SelectT:
    """Return *query* filtered to only expose *tenant*'s data.

    The implementation should use SQLAlchemy's ``.where()`` method with
    a properly bound parameter — never string interpolation.

    For strategies that already enforce isolation at the session level
    (e.g. ``DATABASE`` isolation with separate databases per tenant),
    this method may return *query* unchanged.

    The generic bound ``SelectT`` ensures the concrete ``Select`` subtype
    is preserved through the call, so mypy can type-check the result.

    Args:
        query: A SQLAlchemy ``Select`` query construct.
        tenant: The currently active tenant.

    Returns:
        The (potentially) filtered query.
    """

initialize_tenant abstractmethod async

Python
initialize_tenant(
    tenant: Tenant, metadata: MetaData | None = None
) -> None

Provision database structures for a newly created tenant.

Called once when a tenant is registered. The implementation should be idempotent so that repeated calls do not cause errors.

Parameters:

Name Type Description Default
tenant Tenant

The newly created tenant.

required
metadata MetaData | None

Optional SQLAlchemy MetaData — when provided, the provider creates all mapped tables in the tenant's namespace.

None

Raises:

Type Description
IsolationError

When provisioning fails.

Source code in src/fastapi_tenancy/isolation/base.py
Python
@abstractmethod
async def initialize_tenant(
    self,
    tenant: Tenant,
    metadata: MetaData | None = None,
) -> None:
    """Provision database structures for a newly created *tenant*.

    Called once when a tenant is registered.  The implementation should
    be idempotent so that repeated calls do not cause errors.

    Args:
        tenant: The newly created tenant.
        metadata: Optional SQLAlchemy ``MetaData`` — when provided, the
            provider creates all mapped tables in the tenant's namespace.

    Raises:
        IsolationError: When provisioning fails.
    """

destroy_tenant abstractmethod async

Python
destroy_tenant(tenant: Tenant, **kwargs: Any) -> None

Deprovision and permanently delete all data for tenant.

.. warning:: This is a destructive, irreversible operation. Ensure the caller has appropriate authorisation and a confirmed audit trail before invoking.

Parameters:

Name Type Description Default
tenant Tenant

The tenant to destroy.

required
**kwargs Any

Provider-specific options.

{}

Raises:

Type Description
IsolationError

When deprovisioning fails.

Source code in src/fastapi_tenancy/isolation/base.py
Python
@abstractmethod
async def destroy_tenant(self, tenant: Tenant, **kwargs: Any) -> None:
    """Deprovision and permanently delete all data for *tenant*.

    .. warning::
        This is a **destructive, irreversible** operation.  Ensure the
        caller has appropriate authorisation and a confirmed audit trail
        before invoking.

    Args:
        tenant: The tenant to destroy.
        **kwargs: Provider-specific options.

    Raises:
        IsolationError: When deprovisioning fails.
    """

verify_isolation async

Python
verify_isolation(tenant: Tenant) -> bool

Return True if the tenant's isolation is functioning correctly.

The base implementation logs a warning and returns True. Subclasses should override this with a meaningful check (e.g. verify that the schema exists, the database is reachable, RLS policies are active).

Parameters:

Name Type Description Default
tenant Tenant

The tenant to verify.

required

Returns:

Type Description
bool

True when isolation is verified; False when a problem is

bool

detected.

Source code in src/fastapi_tenancy/isolation/base.py
Python
async def verify_isolation(self, tenant: Tenant) -> bool:
    """Return ``True`` if the tenant's isolation is functioning correctly.

    The base implementation logs a warning and returns ``True``.
    Subclasses should override this with a meaningful check (e.g.
    verify that the schema exists, the database is reachable, RLS
    policies are active).

    Args:
        tenant: The tenant to verify.

    Returns:
        ``True`` when isolation is verified; ``False`` when a problem is
        detected.
    """
    logger.warning(
        "%s does not implement verify_isolation() — returning True",
        type(self).__name__,
    )
    return True

get_schema_name

Python
get_schema_name(tenant: Tenant) -> str

Compute the schema name for tenant using the global config.

Delegates to :meth:~fastapi_tenancy.core.config.TenancyConfig.get_schema_name so the naming convention is always consistent.

Parameters:

Name Type Description Default
tenant Tenant

The tenant to compute the schema name for.

required

Returns:

Type Description
str

A safe, lowercased schema name string.

Source code in src/fastapi_tenancy/isolation/base.py
Python
def get_schema_name(self, tenant: Tenant) -> str:
    """Compute the schema name for *tenant* using the global config.

    Delegates to :meth:`~fastapi_tenancy.core.config.TenancyConfig.get_schema_name`
    so the naming convention is always consistent.

    Args:
        tenant: The tenant to compute the schema name for.

    Returns:
        A safe, lowercased schema name string.
    """
    return self.config.get_schema_name(tenant.identifier)

get_database_url

Python
get_database_url(tenant: Tenant) -> str

Return the database connection URL for tenant.

If the tenant has a database_url override set, that is returned. Otherwise the URL is derived from the global config template.

Parameters:

Name Type Description Default
tenant Tenant

The tenant to get the URL for.

required

Returns:

Type Description
str

A fully-qualified async database URL string.

Source code in src/fastapi_tenancy/isolation/base.py
Python
def get_database_url(self, tenant: Tenant) -> str:
    """Return the database connection URL for *tenant*.

    If the tenant has a ``database_url`` override set, that is returned.
    Otherwise the URL is derived from the global config template.

    Args:
        tenant: The tenant to get the URL for.

    Returns:
        A fully-qualified async database URL string.
    """
    if tenant.database_url:
        return tenant.database_url
    return self.config.get_database_url_for_tenant(tenant.id)

Schema

SchemaIsolationProvider

Python
SchemaIsolationProvider(
    config: TenancyConfig, engine: AsyncEngine | None = None
)

Bases: BaseIsolationProvider

Schema-per-tenant isolation with automatic dialect-based fallback.

PostgreSQL / MSSQL — native schema isolation Creates a dedicated schema per tenant. Sets search_path (with SET LOCAL) on every session connection so unqualified table references resolve correctly.

SQLite / unknown dialects — table-name prefix Copies the application MetaData with a tenant-specific prefix applied to every table name (e.g. t_acme_corp_users).

MySQL / MariaDB — database-per-tenant delegation MySQL's SCHEMA == DATABASE. Transparently delegates to :class:~fastapi_tenancy.isolation.database.DatabaseIsolationProvider.

Parameters:

Name Type Description Default
config TenancyConfig

Tenancy configuration.

required
engine AsyncEngine | None

Optional pre-built engine to reuse (avoids a duplicate pool when this provider is used inside :class:~fastapi_tenancy.isolation.hybrid.HybridIsolationProvider).

None

Example::

Text Only
provider = SchemaIsolationProvider(config)
await provider.initialize_tenant(tenant, metadata=Base.metadata)

async with provider.get_session(tenant) as session:
    result = await session.execute(select(User))
Source code in src/fastapi_tenancy/isolation/schema.py
Python
def __init__(self, config: TenancyConfig, engine: AsyncEngine | None = None) -> None:
    super().__init__(config)
    self.dialect = detect_dialect(str(config.database_url))

    if engine is not None:
        self.engine = engine
    else:
        kw: dict[str, Any] = {
            "echo": config.database_echo,
            "pool_pre_ping": config.database_pool_pre_ping,
        }
        if requires_static_pool(self.dialect):
            kw["poolclass"] = StaticPool
            kw["connect_args"] = {"check_same_thread": False}
            del kw["pool_pre_ping"]
        else:
            kw["pool_size"] = config.database_pool_size
            kw["max_overflow"] = config.database_max_overflow
            kw["pool_timeout"] = config.database_pool_timeout
            kw["pool_recycle"] = config.database_pool_recycle
        self.engine = create_async_engine(str(config.database_url), **kw)

    logger.info(
        "SchemaIsolationProvider dialect=%s native_schemas=%s",
        self.dialect.value,
        supports_native_schemas(self.dialect),
    )

    # For MySQL, delegate all operations to a single cached
    # DatabaseIsolationProvider that shares this provider's engine.
    self._mysql_delegate: DatabaseIsolationProvider | None = None
    if self.dialect == DbDialect.MYSQL:
        from fastapi_tenancy.isolation.database import (  # noqa: PLC0415
            DatabaseIsolationProvider,
        )

        self._mysql_delegate = DatabaseIsolationProvider(self.config, master_engine=self.engine)

get_table_prefix

Python
get_table_prefix(tenant: Tenant) -> str

Return the table-name prefix for non-schema dialects.

Parameters:

Name Type Description Default
tenant Tenant

Target tenant.

required

Returns:

Type Description
str

Table-name prefix string ending with _.

Source code in src/fastapi_tenancy/isolation/schema.py
Python
def get_table_prefix(self, tenant: Tenant) -> str:
    """Return the table-name prefix for non-schema dialects.

    Args:
        tenant: Target tenant.

    Returns:
        Table-name prefix string ending with ``_``.
    """
    return make_table_prefix(tenant.identifier)

get_session async

Python
get_session(tenant: Tenant) -> AsyncIterator[AsyncSession]

Yield a tenant-scoped AsyncSession.

Dispatches to the correct session strategy based on the detected database dialect.

Parameters:

Name Type Description Default
tenant Tenant

Currently active tenant.

required

Yields:

Name Type Description
Configured AsyncIterator[AsyncSession]

class:~sqlalchemy.ext.asyncio.AsyncSession.

Raises:

Type Description
IsolationError

When the session cannot be opened.

Source code in src/fastapi_tenancy/isolation/schema.py
Python
@asynccontextmanager
async def get_session(self, tenant: Tenant) -> AsyncIterator[AsyncSession]:
    """Yield a tenant-scoped ``AsyncSession``.

    Dispatches to the correct session strategy based on the detected
    database dialect.

    Args:
        tenant: Currently active tenant.

    Yields:
        Configured :class:`~sqlalchemy.ext.asyncio.AsyncSession`.

    Raises:
        IsolationError: When the session cannot be opened.
    """
    if self.dialect == DbDialect.MYSQL:
        assert self._mysql_delegate is not None
        async with self._mysql_delegate.get_session(tenant) as session:
            yield session
    elif self.dialect == DbDialect.MSSQL:
        async with self._mssql_schema_session(tenant) as session:
            yield session
    elif supports_native_schemas(self.dialect):
        async with self._schema_session(tenant) as session:
            yield session
    else:
        async with self._prefix_session(tenant) as session:
            yield session

apply_filters async

Python
apply_filters(query: SelectT, tenant: Tenant) -> SelectT

Apply WHERE tenant_id = :id as a defence-in-depth filter.

For native-schema dialects the search_path already enforces isolation; this filter is an additional safety net. For prefix-mode dialects it is the primary isolation mechanism.

Parameters:

Name Type Description Default
query SelectT

SQLAlchemy Select query.

required
tenant Tenant

Currently active tenant.

required

Returns:

Type Description
SelectT

Filtered query.

Source code in src/fastapi_tenancy/isolation/schema.py
Python
async def apply_filters(self, query: SelectT, tenant: Tenant) -> SelectT:
    """Apply ``WHERE tenant_id = :id`` as a defence-in-depth filter.

    For native-schema dialects the ``search_path`` already enforces
    isolation; this filter is an additional safety net.  For prefix-mode
    dialects it is the primary isolation mechanism.

    Args:
        query: SQLAlchemy ``Select`` query.
        tenant: Currently active tenant.

    Returns:
        Filtered query.
    """
    from sqlalchemy import column  # noqa: PLC0415

    return query.where(column("tenant_id") == tenant.id)

initialize_tenant async

Python
initialize_tenant(
    tenant: Tenant, metadata: MetaData | None = None
) -> None

Create the tenant's isolation namespace.

  • Native schemas → CREATE SCHEMA IF NOT EXISTS
  • Prefix mode → copy+rename tables in metadata with tenant prefix
  • MySQL → delegates to database provider

Parameters:

Name Type Description Default
tenant Tenant

Target tenant.

required
metadata MetaData | None

Application :class:~sqlalchemy.MetaData. When supplied, create_all is executed in the tenant namespace.

None

Raises:

Type Description
IsolationError

When provisioning fails.

Source code in src/fastapi_tenancy/isolation/schema.py
Python
async def initialize_tenant(
    self,
    tenant: Tenant,
    metadata: MetaData | None = None,
) -> None:
    """Create the tenant's isolation namespace.

    * Native schemas → ``CREATE SCHEMA IF NOT EXISTS``
    * Prefix mode → copy+rename tables in *metadata* with tenant prefix
    * MySQL → delegates to database provider

    Args:
        tenant: Target tenant.
        metadata: Application :class:`~sqlalchemy.MetaData`.  When supplied,
            ``create_all`` is executed in the tenant namespace.

    Raises:
        IsolationError: When provisioning fails.
    """
    if self.dialect == DbDialect.MYSQL:
        assert self._mysql_delegate is not None
        await self._mysql_delegate.initialize_tenant(tenant, metadata=metadata)
        return

    if self.dialect == DbDialect.MSSQL:
        await self._initialize_mssql_schema(tenant, metadata)
    elif supports_native_schemas(self.dialect):
        await self._initialize_schema(tenant, metadata)
    else:
        await self._initialize_prefix(tenant, metadata)

destroy_tenant async

Python
destroy_tenant(tenant: Tenant, **kwargs: Any) -> None

Drop the tenant's isolation namespace.

.. warning:: Permanently destroys all tenant data.

Parameters:

Name Type Description Default
tenant Tenant

The tenant to deprovision.

required

Raises:

Type Description
IsolationError

When the drop operation fails.

Source code in src/fastapi_tenancy/isolation/schema.py
Python
async def destroy_tenant(self, tenant: Tenant, **kwargs: Any) -> None:  # noqa: D417
    """Drop the tenant's isolation namespace.

    .. warning::
        Permanently destroys all tenant data.

    Args:
        tenant: The tenant to deprovision.

    Raises:
        IsolationError: When the drop operation fails.
    """
    if self.dialect == DbDialect.MYSQL:
        assert self._mysql_delegate is not None
        await self._mysql_delegate.destroy_tenant(tenant)
        return

    if self.dialect == DbDialect.MSSQL:
        schema = self._validated_schema_name(tenant)
        assert_safe_schema_name(schema, context=f"destroy_tenant tenant={tenant.id!r}")
        async with self.engine.begin() as conn:
            try:
                # Drop all tables in the schema before dropping the schema
                # itself.  MSSQL has no CASCADE on DROP SCHEMA.
                #
                # QUOTENAME() wraps each identifier in [brackets], providing
                # defence-in-depth against injection even though
                # assert_safe_schema_name already validates the schema name.
                # :schema is passed as a bound parameter to the outer query
                # (WHERE TABLE_SCHEMA = :schema) — the only place the value
                # crosses the parameterisation boundary.
                await conn.execute(
                    text(
                        "DECLARE @sql NVARCHAR(MAX) = N'';"
                        "SELECT @sql = @sql"
                        "  + N'DROP TABLE '"
                        "  + QUOTENAME(TABLE_SCHEMA)"
                        "  + N'.'"
                        "  + QUOTENAME(TABLE_NAME)"
                        "  + N';' "
                        "FROM INFORMATION_SCHEMA.TABLES "
                        "WHERE TABLE_SCHEMA = :schema "
                        "  AND TABLE_TYPE = N'BASE TABLE';"
                        "IF LEN(@sql) > 0 EXEC sp_executesql @sql;"
                    ),
                    {"schema": schema},
                )
                await conn.execute(text(f"DROP SCHEMA [{schema}]"))
                logger.warning("Destroyed MSSQL schema %r for tenant %s", schema, tenant.id)
            except Exception as exc:
                raise IsolationError(
                    operation="destroy_tenant",
                    tenant_id=tenant.id,
                    details={"schema": schema, "dialect": "mssql", "error": str(exc)},
                ) from exc
    elif supports_native_schemas(self.dialect):
        schema = self._validated_schema_name(tenant)
        assert_safe_schema_name(schema, context=f"destroy_tenant tenant={tenant.id!r}")
        async with self.engine.begin() as conn:
            try:
                await conn.execute(text(f'DROP SCHEMA IF EXISTS "{schema}" CASCADE'))
                logger.warning("Destroyed schema %r for tenant %s", schema, tenant.id)
            except Exception as exc:
                raise IsolationError(
                    operation="destroy_tenant",
                    tenant_id=tenant.id,
                    details={"schema": schema, "error": str(exc)},
                ) from exc
    else:
        await self._destroy_prefix(tenant)

verify_isolation async

Python
verify_isolation(tenant: Tenant) -> bool

Verify that the tenant's schema / tables exist and are reachable.

Parameters:

Name Type Description Default
tenant Tenant

Tenant to verify.

required

Returns:

Type Description
bool

True when isolation structures exist; False on failure.

Source code in src/fastapi_tenancy/isolation/schema.py
Python
async def verify_isolation(self, tenant: Tenant) -> bool:
    """Verify that the tenant's schema / tables exist and are reachable.

    Args:
        tenant: Tenant to verify.

    Returns:
        ``True`` when isolation structures exist; ``False`` on failure.
    """
    if supports_native_schemas(self.dialect):
        schema = self._schema_name(tenant)
        try:
            async with self.engine.connect() as conn:
                result = await conn.execute(
                    text(
                        "SELECT schema_name FROM information_schema.schemata "
                        "WHERE schema_name = :name"
                    ),
                    {"name": schema},
                )
                return result.scalar() is not None
        except Exception:
            return False

    prefix = self.get_table_prefix(tenant)
    try:
        async with self.engine.connect() as conn:

            def _check_tables(sync_conn: Any) -> bool:
                from sqlalchemy import inspect as sa_inspect  # noqa: PLC0415

                insp = sa_inspect(sync_conn)
                tables = insp.get_table_names()
                return any(t.startswith(prefix) for t in tables)

            return await conn.run_sync(_check_tables)
    except Exception:
        return False

close async

Python
close() -> None

Dispose the engine and release pooled connections.

Source code in src/fastapi_tenancy/isolation/schema.py
Python
async def close(self) -> None:
    """Dispose the engine and release pooled connections."""
    # Close the MySQL delegate first (it holds its own engine reference).
    if self._mysql_delegate is not None:
        await self._mysql_delegate.close()
    await self.engine.dispose()
    logger.info("SchemaIsolationProvider closed")

Database

DatabaseIsolationProvider

Python
DatabaseIsolationProvider(
    config: TenancyConfig,
    master_engine: AsyncEngine | None = None,
)

Bases: BaseIsolationProvider

Separate database per tenant with automatic dialect-based provisioning.

A single master engine connects to the admin/default database for DDL. Per-tenant engines are created lazily on the first request and cached in a bounded LRU cache for the lifetime of the application.

Parameters:

Name Type Description Default
config TenancyConfig

Tenancy configuration.

required
master_engine AsyncEngine | None

Optional pre-built master engine (shared engine injected by :class:~fastapi_tenancy.isolation.hybrid.HybridIsolationProvider to avoid a duplicate connection pool).

None

Example::

Text Only
provider = DatabaseIsolationProvider(config)
await provider.initialize_tenant(tenant, metadata=Base.metadata)

async with provider.get_session(tenant) as session:
    result = await session.execute(select(Order))

await provider.close()  # disposes all per-tenant engines
Source code in src/fastapi_tenancy/isolation/database.py
Python
def __init__(
    self,
    config: TenancyConfig,
    master_engine: AsyncEngine | None = None,
) -> None:
    super().__init__(config)
    self.dialect = detect_dialect(str(config.database_url))
    self._engine_cache = _LRUEngineCache(max_size=config.max_cached_engines)
    # Per-tenant asyncio.Lock objects prevent two concurrent coroutines from
    # racing through _get_engine() and creating duplicate engines for the same
    # tenant.  Without this guard a second engine is created, never added to
    # the cache, and its connection pool leaks until the process exits.
    #
    # WeakValueDictionary instead of plain dict:
    # In high-churn environments (many tenants provisioned/deprovisioned
    # rapidly), a plain dict can accumulate Lock entries indefinitely.
    # WeakValueDictionary solves this structurally: when no coroutine holds
    # a live reference to the Lock (no waiter, no creator), the GC removes
    # the entry automatically.  The dict never exceeds the number of
    # *actively contested* tenants at any given moment.
    self._creation_locks: weakref.WeakValueDictionary[str, asyncio.Lock] = (
        weakref.WeakValueDictionary()
    )
    self._creation_locks_lock: asyncio.Lock = asyncio.Lock()

    if master_engine is not None:
        self._master = master_engine
    else:
        kw: dict[str, Any] = {
            "echo": config.database_echo,
            "isolation_level": "AUTOCOMMIT",
        }
        if requires_static_pool(self.dialect):
            kw["poolclass"] = StaticPool
            kw["connect_args"] = {"check_same_thread": False}
            del kw["isolation_level"]
        else:
            kw["pool_size"] = max(config.database_pool_size, 5)
            kw["max_overflow"] = config.database_max_overflow
            kw["pool_pre_ping"] = config.database_pool_pre_ping
        self._master = create_async_engine(str(config.database_url), **kw)

    logger.info(
        "DatabaseIsolationProvider dialect=%s max_cached_engines=%d",
        self.dialect.value,
        config.max_cached_engines,
    )

get_session async

Python
get_session(tenant: Tenant) -> AsyncIterator[AsyncSession]

Yield a session connected to tenant's dedicated database.

Parameters:

Name Type Description Default
tenant Tenant

Currently active tenant.

required

Yields:

Name Type Description
An AsyncIterator[AsyncSession]

class:~sqlalchemy.ext.asyncio.AsyncSession.

Raises:

Type Description
IsolationError

When the session cannot be opened.

Source code in src/fastapi_tenancy/isolation/database.py
Python
@asynccontextmanager
async def get_session(self, tenant: Tenant) -> AsyncIterator[AsyncSession]:
    """Yield a session connected to *tenant*'s dedicated database.

    Args:
        tenant: Currently active tenant.

    Yields:
        An :class:`~sqlalchemy.ext.asyncio.AsyncSession`.

    Raises:
        IsolationError: When the session cannot be opened.
    """
    engine = await self._get_engine(tenant)
    async with AsyncSession(engine, expire_on_commit=False) as session:
        try:
            yield session
        except Exception as exc:
            await session.rollback()
            raise IsolationError(
                operation="get_session",
                tenant_id=tenant.id,
                details={"error": str(exc)},
            ) from exc

apply_filters async

Python
apply_filters(query: SelectT, tenant: Tenant) -> SelectT

No filtering required — each tenant has a dedicated database.

Parameters:

Name Type Description Default
query SelectT

SQLAlchemy Select query (returned unchanged).

required
tenant Tenant

Currently active tenant (ignored).

required

Returns:

Type Description
SelectT

query unchanged.

Source code in src/fastapi_tenancy/isolation/database.py
Python
async def apply_filters(self, query: SelectT, tenant: Tenant) -> SelectT:
    """No filtering required — each tenant has a dedicated database.

    Args:
        query: SQLAlchemy ``Select`` query (returned unchanged).
        tenant: Currently active tenant (ignored).

    Returns:
        *query* unchanged.
    """
    return query

initialize_tenant async

Python
initialize_tenant(
    tenant: Tenant, metadata: MetaData | None = None
) -> None

Create tenant's dedicated database and optionally create tables.

Parameters:

Name Type Description Default
tenant Tenant

Target tenant.

required
metadata MetaData | None

Application :class:~sqlalchemy.MetaData. When supplied, create_all is executed in the newly created database.

None

Raises:

Type Description
IsolationError

When database creation or table creation fails.

Source code in src/fastapi_tenancy/isolation/database.py
Python
async def initialize_tenant(
    self,
    tenant: Tenant,
    metadata: MetaData | None = None,
) -> None:
    """Create *tenant*'s dedicated database and optionally create tables.

    Args:
        tenant: Target tenant.
        metadata: Application :class:`~sqlalchemy.MetaData`.  When supplied,
            ``create_all`` is executed in the newly created database.

    Raises:
        IsolationError: When database creation or table creation fails.
    """
    if self.dialect == DbDialect.SQLITE:
        engine = await self._get_engine(tenant)
        if metadata is not None:
            async with engine.begin() as conn:
                await conn.run_sync(metadata.create_all)
        logger.info("SQLite tenant %s initialised", tenant.id)
        return

    if self.dialect == DbDialect.MSSQL:
        raise IsolationError(
            operation="initialize_tenant",
            tenant_id=tenant.id,
            details={
                "reason": (
                    "DATABASE isolation on MSSQL requires manual database creation. "
                    "Use SCHEMA isolation or create the database manually."
                )
            },
        )

    db_name = self._database_name(tenant)
    try:
        assert_safe_database_name(db_name, context=f"tenant id={tenant.id!r}")
    except ValueError as exc:
        raise IsolationError(
            operation="initialize_tenant",
            tenant_id=tenant.id,
            details={"database": db_name, "error": str(exc)},
        ) from exc

    try:
        async with self._master.connect() as conn:
            if self.dialect == DbDialect.POSTGRESQL:
                result = await conn.execute(
                    text("SELECT 1 FROM pg_database WHERE datname = :name"),
                    {"name": db_name},
                )
                if result.scalar() is not None:
                    logger.warning("Database %r already exists — skipping CREATE", db_name)
                else:
                    await conn.execute(text(f'CREATE DATABASE "{db_name}"'))
                    logger.info("Created database %r for tenant %s", db_name, tenant.id)
            elif self.dialect == DbDialect.MYSQL:
                await conn.execute(
                    text(
                        f"CREATE DATABASE IF NOT EXISTS `{db_name}` "
                        "CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci"
                    )
                )
                logger.info("Created database %r for tenant %s", db_name, tenant.id)

        if metadata is not None:
            engine = await self._get_engine(tenant)
            async with engine.begin() as conn:
                await conn.run_sync(metadata.create_all)
            logger.info("Created tables in database %r", db_name)

    except IsolationError:
        raise
    except Exception as exc:
        raise IsolationError(
            operation="initialize_tenant",
            tenant_id=tenant.id,
            details={"database": db_name, "error": str(exc)},
        ) from exc

destroy_tenant async

Python
destroy_tenant(tenant: Tenant, **kwargs: Any) -> None

Drop tenant's dedicated database.

.. warning:: Permanently destroys all tenant data.

Parameters:

Name Type Description Default
tenant Tenant

The tenant to destroy.

required
**kwargs Any

Accepted for interface compatibility; not used by this provider.

{}

Raises:

Type Description
IsolationError

When the database cannot be dropped.

Source code in src/fastapi_tenancy/isolation/database.py
Python
async def destroy_tenant(self, tenant: Tenant, **kwargs: Any) -> None:
    """Drop *tenant*'s dedicated database.

    .. warning::
        Permanently destroys all tenant data.

    Args:
        tenant: The tenant to destroy.
        **kwargs: Accepted for interface compatibility; not used by this provider.

    Raises:
        IsolationError: When the database cannot be dropped.
    """
    if self.dialect == DbDialect.SQLITE:
        from pathlib import Path  # noqa: PLC0415
        from urllib.parse import urlparse  # noqa: PLC0415

        engine = await self._engine_cache.remove(tenant.id)
        if engine:
            await engine.dispose()
        url = self._tenant_url(tenant)
        # Use urlparse to correctly extract the file path from the URL,
        # handling both absolute (sqlite:////abs/path) and relative
        # (sqlite:///./rel/path) forms without fragile string splitting.
        parsed = urlparse(url)
        raw_path = parsed.netloc + parsed.path  # netloc is empty for sqlite
        db_path = Path(raw_path) if raw_path else None
        if db_path and db_path.exists():
            try:
                db_path.unlink()
                logger.warning("Deleted SQLite file %s for tenant %s", db_path, tenant.id)
            except OSError as exc:
                raise IsolationError(
                    operation="destroy_tenant",
                    tenant_id=tenant.id,
                    details={"path": str(db_path), "error": str(exc)},
                ) from exc
        # SQLite handling is complete — return early so we don't attempt
        # pg_database / DROP DATABASE logic below, which would fail against
        # a SQLite connection and raise a misleading IsolationError.
        return

    db_name = self._database_name(tenant)
    try:
        assert_safe_database_name(db_name, context=f"tenant id={tenant.id!r}")
    except ValueError as exc:
        raise IsolationError(
            operation="destroy_tenant",
            tenant_id=tenant.id,
            details={"database": db_name, "error": str(exc)},
        ) from exc

    evicted = await self._engine_cache.remove(tenant.id)
    if evicted:
        await evicted.dispose()

    try:
        async with self._master.connect() as conn:
            if self.dialect == DbDialect.POSTGRESQL:
                # Terminate active connections before dropping.
                await conn.execute(
                    text(
                        "SELECT pg_terminate_backend(pid) FROM pg_stat_activity "
                        "WHERE datname = :name AND pid <> pg_backend_pid()"
                    ),
                    {"name": db_name},
                )
                await conn.execute(text(f'DROP DATABASE IF EXISTS "{db_name}"'))
            elif self.dialect == DbDialect.MYSQL:
                await conn.execute(text(f"DROP DATABASE IF EXISTS `{db_name}`"))
        logger.warning("Destroyed database %r for tenant %s", db_name, tenant.id)
    except IsolationError:
        raise
    except Exception as exc:
        raise IsolationError(
            operation="destroy_tenant",
            tenant_id=tenant.id,
            details={"database": db_name, "error": str(exc)},
        ) from exc

verify_isolation async

Python
verify_isolation(tenant: Tenant) -> bool

Return True if tenant's database exists and is reachable.

Parameters:

Name Type Description Default
tenant Tenant

Tenant to verify.

required

Returns:

Type Description
bool

True when the database exists; False otherwise.

Source code in src/fastapi_tenancy/isolation/database.py
Python
async def verify_isolation(self, tenant: Tenant) -> bool:
    """Return ``True`` if *tenant*'s database exists and is reachable.

    Args:
        tenant: Tenant to verify.

    Returns:
        ``True`` when the database exists; ``False`` otherwise.
    """
    if self.dialect == DbDialect.SQLITE:
        from pathlib import Path  # noqa: PLC0415
        from urllib.parse import urlparse  # noqa: PLC0415

        url = self._tenant_url(tenant)
        parsed = urlparse(url)
        raw_path = parsed.netloc + parsed.path
        db_path = Path(raw_path) if raw_path else None
        return db_path is not None and db_path.exists()

    db_name = self._database_name(tenant)
    try:
        assert_safe_database_name(db_name)
    except ValueError:
        return False

    try:
        async with self._master.connect() as conn:
            if self.dialect == DbDialect.POSTGRESQL:
                r = await conn.execute(
                    text("SELECT 1 FROM pg_database WHERE datname = :name"),
                    {"name": db_name},
                )
            elif self.dialect == DbDialect.MYSQL:
                r = await conn.execute(
                    text(
                        "SELECT SCHEMA_NAME FROM information_schema.SCHEMATA "
                        "WHERE SCHEMA_NAME = :name"
                    ),
                    {"name": db_name},
                )
            else:
                return False
            return r.scalar() is not None
    except Exception:
        return False

close async

Python
close() -> None

Dispose all per-tenant engines from the LRU cache and the master engine.

Source code in src/fastapi_tenancy/isolation/database.py
Python
async def close(self) -> None:
    """Dispose all per-tenant engines from the LRU cache and the master engine."""
    disposed = await self._engine_cache.dispose_all()
    logger.debug("Disposed %d per-tenant engines", disposed)
    await self._master.dispose()
    logger.info("DatabaseIsolationProvider closed")

RLS

RLSIsolationProvider

Python
RLSIsolationProvider(
    config: TenancyConfig, engine: AsyncEngine | None = None
)

Bases: BaseIsolationProvider

PostgreSQL Row-Level Security data isolation.

Activates per-request tenant filtering by setting the app.current_tenant session variable at the start of every transaction. PostgreSQL RLS policies reference this variable to restrict row visibility transparently.

Raises :exc:~fastapi_tenancy.core.exceptions.ConfigurationError at construction if the configured database is not PostgreSQL, so misconfigured deployments fail immediately at startup.

Parameters:

Name Type Description Default
config TenancyConfig

Application-wide tenancy configuration.

required
engine AsyncEngine | None

Optional pre-built engine to reuse (share with the store to avoid a duplicate connection pool).

None

Example::

Text Only
provider = RLSIsolationProvider(config)
await provider.initialize_tenant(tenant, metadata=Base.metadata)

async with provider.get_session(tenant) as session:
    # RLS policy silently adds: WHERE tenant_id = '<current tenant>'
    result = await session.execute(select(User))

Schema hint::

Text Only
ALTER TABLE users ENABLE ROW LEVEL SECURITY;
ALTER TABLE users FORCE ROW LEVEL SECURITY;

CREATE POLICY tenant_isolation ON users
    USING (tenant_id = current_setting('app.current_tenant', TRUE));
Source code in src/fastapi_tenancy/isolation/rls.py
Python
def __init__(
    self,
    config: TenancyConfig,
    engine: AsyncEngine | None = None,
) -> None:
    super().__init__(config)
    dialect = detect_dialect(str(config.database_url))
    if not supports_native_rls(dialect):
        raise ConfigurationError(
            parameter="isolation_strategy",
            reason=(
                f"RLS isolation requires PostgreSQL but dialect is {dialect.value!r}. "
                "Use SCHEMA or DATABASE isolation for this database."
            ),
        )
    self.dialect = dialect

    if engine is not None:
        self.engine = engine
    else:
        kw: dict[str, Any] = {
            "echo": config.database_echo,
            "pool_pre_ping": config.database_pool_pre_ping,
            "pool_size": config.database_pool_size,
            "max_overflow": config.database_max_overflow,
            "pool_timeout": config.database_pool_timeout,
            "pool_recycle": config.database_pool_recycle,
        }
        self.engine = create_async_engine(str(config.database_url), **kw)

    logger.info("RLSIsolationProvider ready (PostgreSQL RLS)")

get_session async

Python
get_session(tenant: Tenant) -> AsyncIterator[AsyncSession]

Yield a session with app.current_tenant set to tenant's ID.

Uses SELECT set_config('app.current_tenant', :id, TRUE) — a fully parameterised call — to avoid injection via crafted tenant IDs.

Connection-level event listener design

set_config(..., TRUE) (equivalent to SET LOCAL) is transaction-scoped: the GUC reverts when the current transaction ends. If we simply call it once before yielding, any subsequent session.begin() block opened by the route handler would operate without the GUC, silently bypassing all RLS policies.

The fix is to install a begin event listener on the underlying synchronous DBAPI connection object so that set_config is re-executed at the start of every transaction opened on this connection during the request lifetime.

Critical: the listener must be removed before the connection returns to the pool. Without removal, when the physical connection is reused by a future request for a different tenant, the old tenant's GUC listener fires on every new transaction — a silent cross-tenant data-read breach.

We save the listener function as a local variable and call event.remove(sync_conn, "begin", _set_rls_guc) in a finally block that wraps the entire session lifetime. The finally is positioned outside the AsyncSession context manager so it always runs — even if AsyncSession construction raises, or if an uncaught exception propagates from the route handler.

Parameterisation note

Two different parameterisation syntaxes are intentionally used:

  1. exec_driver_sql (in the begin listener): uses the native asyncpg $1 placeholder because exec_driver_sql bypasses SQLAlchemy's parameter rendering layer entirely.
  2. session.execute + text() (initial GUC set): uses SQLAlchemy's :name syntax, translated to $1 by asyncpg at render time.

Both are correct for their respective call sites.

Parameters:

Name Type Description Default
tenant Tenant

Currently active tenant.

required

Yields:

Name Type Description
Configured AsyncIterator[AsyncSession]

class:~sqlalchemy.ext.asyncio.AsyncSession.

Raises:

Type Description
IsolationError

When the session variable cannot be set.

Source code in src/fastapi_tenancy/isolation/rls.py
Python
@asynccontextmanager
async def get_session(self, tenant: Tenant) -> AsyncIterator[AsyncSession]:
    """Yield a session with ``app.current_tenant`` set to *tenant*'s ID.

    Uses ``SELECT set_config('app.current_tenant', :id, TRUE)`` — a
    fully parameterised call — to avoid injection via crafted tenant IDs.

    Connection-level event listener design
    ---------------------------------------
    ``set_config(..., TRUE)`` (equivalent to ``SET LOCAL``) is
    **transaction-scoped**: the GUC reverts when the current transaction
    ends.  If we simply call it once before yielding, any subsequent
    ``session.begin()`` block opened by the route handler would operate
    without the GUC, silently bypassing all RLS policies.

    The fix is to install a ``begin`` event listener on the underlying
    synchronous DBAPI *connection* object so that ``set_config`` is
    re-executed at the start of **every** transaction opened on this
    connection during the request lifetime.

    Critical: the listener must be removed before the connection returns
    to the pool. Without removal, when the physical connection is reused
    by a future request for a *different* tenant, the old tenant's GUC
    listener fires on every new transaction — a silent cross-tenant
    data-read breach.

    We save the listener function as a local variable and call
    ``event.remove(sync_conn, "begin", _set_rls_guc)`` in a ``finally``
    block that wraps the entire session lifetime.  The ``finally`` is
    positioned *outside* the ``AsyncSession`` context manager so it always
    runs — even if ``AsyncSession`` construction raises, or if an uncaught
    exception propagates from the route handler.

    Parameterisation note
    ----------------------
    Two different parameterisation syntaxes are intentionally used:

    1. ``exec_driver_sql`` (in the ``begin`` listener): uses the native
       asyncpg ``$1`` placeholder because ``exec_driver_sql`` bypasses
       SQLAlchemy's parameter rendering layer entirely.
    2. ``session.execute + text()`` (initial GUC set): uses SQLAlchemy's
       ``:name`` syntax, translated to ``$1`` by asyncpg at render time.

    Both are correct for their respective call sites.

    Args:
        tenant: Currently active tenant.

    Yields:
        Configured :class:`~sqlalchemy.ext.asyncio.AsyncSession`.

    Raises:
        IsolationError: When the session variable cannot be set.
    """
    from sqlalchemy import event  # noqa: PLC0415

    # sync_conn is obtained inside the session context; initialise to None
    # so the finally block can safely guard against the case where the
    # session never opened (e.g. pool exhausted before AsyncSession init).
    sync_conn: Any = None

    def _set_rls_guc(sync_connection: Any) -> None:
        """Re-apply the RLS GUC at the start of every transaction.

        Called by the SQLAlchemy ``begin`` event on the individual DBAPI
        connection.  ``_RLS_GUC`` is a module-level constant — it is never
        user-supplied and cannot be influenced by tenant data.  The f-string
        interpolation of ``_RLS_GUC`` is therefore safe.  ``tenant.id`` is
        passed as a positional bind parameter (``$1``) and is never
        interpolated into the SQL string.
        """
        sync_connection.exec_driver_sql(
            f"SELECT set_config('{_RLS_GUC}', $1, TRUE)",
            (tenant.id,),
        )
        logger.debug("RLS GUC %s = %r (tenant %s)", _RLS_GUC, tenant.id, tenant.id)

    try:
        async with AsyncSession(self.engine, expire_on_commit=False) as session:
            try:
                conn = await session.connection()
                sync_conn = conn.sync_connection

                # Set the GUC for the *current* transaction immediately so
                # the session is isolated from the moment it is yielded.
                await session.execute(
                    text("SELECT set_config(:guc, :val, TRUE)"),
                    {"guc": _RLS_GUC, "val": tenant.id},
                )

                # Install the per-connection begin listener *after* we have
                # a reference to sync_conn so the finally block can remove it.
                event.listen(sync_conn, "begin", _set_rls_guc)

                yield session
            except IsolationError:
                raise
            except Exception as exc:
                await session.rollback()
                raise IsolationError(
                    operation="get_session",
                    tenant_id=tenant.id,
                    details={"guc": _RLS_GUC, "error": str(exc)},
                ) from exc
    finally:
        # Always remove the begin listener so the pooled connection is
        # returned to the pool in a clean state — no stale GUC callbacks
        # that would set the wrong tenant on the next request.
        if sync_conn is not None and event.contains(sync_conn, "begin", _set_rls_guc):
            event.remove(sync_conn, "begin", _set_rls_guc)

apply_filters async

Python
apply_filters(query: SelectT, tenant: Tenant) -> SelectT

Add defence-in-depth WHERE tenant_id = :id to query.

Although the RLS policy already restricts rows at the database level, this explicit application-layer filter prevents accidental data leakage if the RLS policy is ever accidentally disabled on a table.

Parameters:

Name Type Description Default
query SelectT

SQLAlchemy Select query.

required
tenant Tenant

Currently active tenant.

required

Returns:

Type Description
SelectT

Query with an additional tenant_id equality predicate.

Source code in src/fastapi_tenancy/isolation/rls.py
Python
async def apply_filters(self, query: SelectT, tenant: Tenant) -> SelectT:
    """Add defence-in-depth ``WHERE tenant_id = :id`` to *query*.

    Although the RLS policy already restricts rows at the database level,
    this explicit application-layer filter prevents accidental data leakage
    if the RLS policy is ever accidentally disabled on a table.

    Args:
        query: SQLAlchemy ``Select`` query.
        tenant: Currently active tenant.

    Returns:
        Query with an additional ``tenant_id`` equality predicate.
    """
    tenant_col: Column[String] = column(_TENANT_COLUMN)  # type: ignore[assignment]
    return query.where(tenant_col == tenant.id)

initialize_tenant async

Python
initialize_tenant(
    tenant: Tenant, metadata: MetaData | None = None
) -> None

Verify RLS policies are active; optionally create tables.

Unlike schema/database isolation, RLS uses shared tables — there is nothing to physically create per-tenant. This method logs a reminder about the required RLS policy and creates tables if metadata is supplied and they do not already exist.

When metadata is supplied, create_all is called with checkfirst=True so repeated calls are idempotent.

Parameters:

Name Type Description Default
tenant Tenant

The newly onboarded tenant.

required
metadata MetaData | None

Application MetaData. When supplied, the shared tables are created if they do not already exist.

None

Raises:

Type Description
IsolationError

When table creation fails.

Source code in src/fastapi_tenancy/isolation/rls.py
Python
async def initialize_tenant(
    self,
    tenant: Tenant,
    metadata: MetaData | None = None,
) -> None:
    """Verify RLS policies are active; optionally create tables.

    Unlike schema/database isolation, RLS uses shared tables — there
    is nothing to physically create per-tenant.  This method logs a
    reminder about the required RLS policy and creates tables if
    *metadata* is supplied and they do not already exist.

    When *metadata* is supplied, ``create_all`` is called with
    ``checkfirst=True`` so repeated calls are idempotent.

    Args:
        tenant: The newly onboarded tenant.
        metadata: Application ``MetaData``.  When supplied, the shared
            tables are created if they do not already exist.

    Raises:
        IsolationError: When table creation fails.
    """
    logger.info(
        "RLS: initialising tenant %s — shared tables; no schema created.",
        tenant.id,
    )
    logger.info(
        "RLS policy hint:\n%s",
        _SAMPLE_POLICY_SQL.format(table_name="<your_table>"),
    )

    if metadata is not None:
        try:
            async with self.engine.begin() as conn:
                await conn.run_sync(metadata.create_all, checkfirst=True)
            logger.info("RLS: created / verified shared tables for tenant %s", tenant.id)
        except Exception as exc:
            raise IsolationError(
                operation="initialize_tenant",
                tenant_id=tenant.id,
                details={"mode": "rls", "error": str(exc)},
            ) from exc

destroy_tenant async

Python
destroy_tenant(tenant: Tenant, **kwargs: Any) -> None

Delete all rows belonging to tenant from every tenant-scoped table.

Executes a DELETE FROM <table> WHERE tenant_id = :id for every table in metadata (passed via keyword argument metadata).

.. warning:: This is a destructive, irreversible operation. Tables that share rows with other tenants will retain their other data; only rows where tenant_id = tenant.id are deleted.

Parameters:

Name Type Description Default
tenant Tenant

Tenant to destroy.

required
**kwargs Any
  • metadata (:class:~sqlalchemy.MetaData): Application metadata. Required — raises :exc:IsolationError if not supplied.
{}

Raises:

Type Description
IsolationError

When metadata is not supplied or deletion fails.

Source code in src/fastapi_tenancy/isolation/rls.py
Python
async def destroy_tenant(self, tenant: Tenant, **kwargs: Any) -> None:
    """Delete all rows belonging to *tenant* from every tenant-scoped table.

    Executes a ``DELETE FROM <table> WHERE tenant_id = :id`` for every
    table in *metadata* (passed via keyword argument ``metadata``).

    .. warning::
        This is a **destructive, irreversible** operation.  Tables that
        share rows with other tenants will retain their other data;
        only rows where ``tenant_id = tenant.id`` are deleted.

    Args:
        tenant: Tenant to destroy.
        **kwargs:
            - ``metadata`` (:class:`~sqlalchemy.MetaData`): Application
              metadata.  **Required** — raises :exc:`IsolationError` if
              not supplied.

    Raises:
        IsolationError: When metadata is not supplied or deletion fails.
    """
    metadata: MetaData | None = kwargs.get("metadata")
    if metadata is None:
        raise IsolationError(
            operation="destroy_tenant",
            tenant_id=tenant.id,
            details={
                "reason": (
                    "RLS destroy_tenant requires metadata=Base.metadata to identify "
                    "which tables to delete rows from. "
                    "Call destroy_tenant(tenant, metadata=Base.metadata)."
                )
            },
        )

    async with AsyncSession(self.engine) as session:
        try:
            async with session.begin():
                for table in metadata.sorted_tables:
                    if _TENANT_COLUMN in table.c:
                        result = await session.execute(
                            table.delete().where(table.c[_TENANT_COLUMN] == tenant.id)
                        )
                        logger.warning(
                            "RLS destroy: deleted %d rows from %s for tenant %s",
                            result.rowcount,  # type: ignore[attr-defined]
                            table.name,
                            tenant.id,
                        )
        except Exception as exc:
            await session.rollback()
            raise IsolationError(
                operation="destroy_tenant",
                tenant_id=tenant.id,
                details={"mode": "rls", "error": str(exc)},
            ) from exc

verify_isolation async

Python
verify_isolation(tenant: Tenant) -> bool

Verify that the RLS GUC can be set and read back correctly.

Parameters:

Name Type Description Default
tenant Tenant

Tenant to verify.

required

Returns:

Type Description
bool

True when the session variable round-trips correctly.

Source code in src/fastapi_tenancy/isolation/rls.py
Python
async def verify_isolation(self, tenant: Tenant) -> bool:
    """Verify that the RLS GUC can be set and read back correctly.

    Args:
        tenant: Tenant to verify.

    Returns:
        ``True`` when the session variable round-trips correctly.
    """
    try:
        async with self.engine.connect() as conn:
            result = await conn.execute(
                text("SELECT set_config(:guc, :val, TRUE), current_setting(:guc, TRUE)"),
                {"guc": _RLS_GUC, "val": tenant.id},
            )
            row = result.fetchone()
            if row is None:
                return False
            # Both set_config return value and current_setting should
            # match the tenant ID.
            return row[0] == tenant.id and row[1] == tenant.id
    except Exception:
        return False

close async

Python
close() -> None

Dispose the engine and release all pooled connections.

Source code in src/fastapi_tenancy/isolation/rls.py
Python
async def close(self) -> None:
    """Dispose the engine and release all pooled connections."""
    await self.engine.dispose()
    logger.info("RLSIsolationProvider closed")

Hybrid

HybridIsolationProvider

Python
HybridIsolationProvider(
    config: TenancyConfig,
    premium_engine: AsyncEngine | None = None,
    standard_engine: AsyncEngine | None = None,
)

Bases: BaseIsolationProvider

Two-tier hybrid isolation: premium → strong; standard → economical.

Premium and standard isolation strategies are set in TenancyConfig::

Text Only
config = TenancyConfig(
    ...
    isolation_strategy="hybrid",
    premium_tenants=["tenant-enterprise-1", "tenant-enterprise-2"],
    premium_isolation_strategy="schema",
    standard_isolation_strategy="rls",
)

Then::

Text Only
provider = HybridIsolationProvider(config)

# Premium tenant → SchemaIsolationProvider
async with provider.get_session(premium_tenant) as session: ...

# Standard tenant → RLSIsolationProvider
async with provider.get_session(standard_tenant) as session: ...

Parameters:

Name Type Description Default
config TenancyConfig

Application-wide tenancy configuration.

required
premium_engine AsyncEngine | None

Pre-built engine to inject into the premium provider. When None (default), a shared engine is created for both inner providers.

None
standard_engine AsyncEngine | None

Pre-built engine to inject into the standard provider. When None, the shared engine is used.

None

Raises:

Type Description
ConfigurationError

When config.isolation_strategy is not HYBRID, or when the inner strategies are invalid.

Source code in src/fastapi_tenancy/isolation/hybrid.py
Python
def __init__(
    self,
    config: TenancyConfig,
    premium_engine: AsyncEngine | None = None,
    standard_engine: AsyncEngine | None = None,
) -> None:
    super().__init__(config)

    if config.isolation_strategy != IsolationStrategy.HYBRID:
        raise ConfigurationError(
            parameter="isolation_strategy",
            reason=(
                "HybridIsolationProvider requires isolation_strategy='hybrid' "
                f"but got {config.isolation_strategy.value!r}."
            ),
        )

    # Build a shared engine when explicit engines are not supplied.
    dialect = detect_dialect(str(config.database_url))
    kw: dict[str, Any] = {"echo": config.database_echo}
    if requires_static_pool(dialect):
        kw["poolclass"] = StaticPool
        kw["connect_args"] = {"check_same_thread": False}
    else:
        kw["pool_size"] = config.database_pool_size
        kw["max_overflow"] = config.database_max_overflow
        kw["pool_pre_ping"] = config.database_pool_pre_ping
        kw["pool_recycle"] = config.database_pool_recycle

    self._shared_engine: AsyncEngine = create_async_engine(str(config.database_url), **kw)

    # Inner providers share the same physical engine.
    effective_premium_engine = premium_engine or self._shared_engine
    effective_standard_engine = standard_engine or self._shared_engine

    self._premium_provider: BaseIsolationProvider = _build_provider(
        config.premium_isolation_strategy,
        config,
        effective_premium_engine,
    )
    self._standard_provider: BaseIsolationProvider = _build_provider(
        config.standard_isolation_strategy,
        config,
        effective_standard_engine,
    )

    logger.info(
        "HybridIsolationProvider ready: premium=%s standard=%s shared_engine=%s",
        config.premium_isolation_strategy.value,
        config.standard_isolation_strategy.value,
        id(self._shared_engine),
    )

premium_provider property

Python
premium_provider: BaseIsolationProvider

The inner provider used for premium tenants.

standard_provider property

Python
standard_provider: BaseIsolationProvider

The inner provider used for standard tenants.

get_session async

Python
get_session(tenant: Tenant) -> AsyncIterator[AsyncSession]

Yield a session from the tier-appropriate inner provider.

Parameters:

Name Type Description Default
tenant Tenant

Currently active tenant.

required

Yields:

Name Type Description
Configured AsyncIterator[AsyncSession]

class:~sqlalchemy.ext.asyncio.AsyncSession.

Raises:

Type Description
IsolationError

When the session cannot be obtained.

Source code in src/fastapi_tenancy/isolation/hybrid.py
Python
@asynccontextmanager
async def get_session(self, tenant: Tenant) -> AsyncIterator[AsyncSession]:
    """Yield a session from the tier-appropriate inner provider.

    Args:
        tenant: Currently active tenant.

    Yields:
        Configured :class:`~sqlalchemy.ext.asyncio.AsyncSession`.

    Raises:
        IsolationError: When the session cannot be obtained.
    """
    provider = self._provider_for(tenant)
    async with provider.get_session(tenant) as session:
        yield session

apply_filters async

Python
apply_filters(query: SelectT, tenant: Tenant) -> SelectT

Delegate query filtering to the tier-appropriate inner provider.

Parameters:

Name Type Description Default
query SelectT

SQLAlchemy Select query.

required
tenant Tenant

Currently active tenant.

required

Returns:

Type Description
SelectT

Filtered query.

Source code in src/fastapi_tenancy/isolation/hybrid.py
Python
async def apply_filters(self, query: SelectT, tenant: Tenant) -> SelectT:
    """Delegate query filtering to the tier-appropriate inner provider.

    Args:
        query: SQLAlchemy ``Select`` query.
        tenant: Currently active tenant.

    Returns:
        Filtered query.
    """
    return await self._provider_for(tenant).apply_filters(query, tenant)

initialize_tenant async

Python
initialize_tenant(
    tenant: Tenant, metadata: MetaData | None = None
) -> None

Provision tenant's isolation namespace using the correct inner provider.

Parameters:

Name Type Description Default
tenant Tenant

Tenant to provision.

required
metadata MetaData | None

Application :class:~sqlalchemy.MetaData.

None

Raises:

Type Description
IsolationError

When provisioning fails.

Source code in src/fastapi_tenancy/isolation/hybrid.py
Python
async def initialize_tenant(
    self,
    tenant: Tenant,
    metadata: MetaData | None = None,
) -> None:
    """Provision *tenant*'s isolation namespace using the correct inner provider.

    Args:
        tenant: Tenant to provision.
        metadata: Application :class:`~sqlalchemy.MetaData`.

    Raises:
        IsolationError: When provisioning fails.
    """
    provider = self._provider_for(tenant)
    logger.info(
        "HybridIsolationProvider: initialising tenant %s via %s",
        tenant.id,
        type(provider).__name__,
    )
    await provider.initialize_tenant(tenant, metadata=metadata)

destroy_tenant async

Python
destroy_tenant(tenant: Tenant, **kwargs: Any) -> None

Deprovision tenant using the correct inner provider.

.. warning:: Permanently destroys all tenant data.

Parameters:

Name Type Description Default
tenant Tenant

Tenant to destroy.

required
**kwargs Any

Forwarded to the inner provider's destroy_tenant.

{}

Raises:

Type Description
IsolationError

When deprovisioning fails.

Source code in src/fastapi_tenancy/isolation/hybrid.py
Python
async def destroy_tenant(self, tenant: Tenant, **kwargs: Any) -> None:
    """Deprovision *tenant* using the correct inner provider.

    .. warning::
        Permanently destroys all tenant data.

    Args:
        tenant: Tenant to destroy.
        **kwargs: Forwarded to the inner provider's ``destroy_tenant``.

    Raises:
        IsolationError: When deprovisioning fails.
    """
    provider = self._provider_for(tenant)
    logger.warning(
        "HybridIsolationProvider: destroying tenant %s via %s",
        tenant.id,
        type(provider).__name__,
    )
    await provider.destroy_tenant(tenant, **kwargs)

verify_isolation async

Python
verify_isolation(tenant: Tenant) -> bool

Delegate isolation verification to the correct inner provider.

Parameters:

Name Type Description Default
tenant Tenant

Tenant to verify.

required

Returns:

Type Description
bool

True when isolation is verified.

Source code in src/fastapi_tenancy/isolation/hybrid.py
Python
async def verify_isolation(self, tenant: Tenant) -> bool:
    """Delegate isolation verification to the correct inner provider.

    Args:
        tenant: Tenant to verify.

    Returns:
        ``True`` when isolation is verified.
    """
    return await self._provider_for(tenant).verify_isolation(tenant)

get_provider_for_tenant

Python
get_provider_for_tenant(
    tenant: Tenant,
) -> BaseIsolationProvider

Return the inner provider that would handle tenant (no I/O).

Useful for introspection and testing.

Parameters:

Name Type Description Default
tenant Tenant

Target tenant.

required

Returns:

Type Description
BaseIsolationProvider

The inner :class:BaseIsolationProvider.

Source code in src/fastapi_tenancy/isolation/hybrid.py
Python
def get_provider_for_tenant(self, tenant: Tenant) -> BaseIsolationProvider:
    """Return the inner provider that would handle *tenant* (no I/O).

    Useful for introspection and testing.

    Args:
        tenant: Target tenant.

    Returns:
        The inner :class:`BaseIsolationProvider`.
    """
    return self._provider_for(tenant)

close async

Python
close() -> None

Dispose all engines and close inner providers.

Closes both inner providers first (which disposes any engines they hold that were passed via premium_engine / standard_engine at construction time), then disposes the shared engine created by this provider. AsyncEngine.dispose() is idempotent, so disposing an engine that was already disposed by an inner provider is safe and produces no error.

Source code in src/fastapi_tenancy/isolation/hybrid.py
Python
async def close(self) -> None:
    """Dispose all engines and close inner providers.

    Closes both inner providers first (which disposes any engines they hold
    that were passed via ``premium_engine`` / ``standard_engine`` at
    construction time), then disposes the shared engine created by this
    provider.  ``AsyncEngine.dispose()`` is idempotent, so disposing an
    engine that was already disposed by an inner provider is safe and
    produces no error.
    """
    # Close inner providers first so any custom engines they hold are
    # released before we dispose the shared engine.
    await self._premium_provider.close()  # type: ignore[attr-defined]
    await self._standard_provider.close()  # type: ignore[attr-defined]
    # Dispose the shared engine last.  If both inner providers share this
    # engine, dispose() on it is idempotent — safe to call multiple times.
    await self._shared_engine.dispose()
    logger.info("HybridIsolationProvider closed (all engines disposed)")