Skip to content

TenancyManager

TenancyManager

Python
TenancyManager(
    config: TenancyConfig,
    store: TenantStore[Tenant],
    custom_resolver: TenantResolver | None = None,
    isolation_provider: BaseIsolationProvider | None = None,
    audit_writer: Any | None = None,
)

Central orchestrator wiring resolver, store, and isolation provider.

Parameters:

Name Type Description Default
config TenancyConfig

Application-wide tenancy configuration.

required
store TenantStore[Tenant]

Tenant metadata storage backend.

required
custom_resolver TenantResolver | None

Optional custom resolver (required when config.resolution_strategy == "custom").

None
isolation_provider BaseIsolationProvider | None

Optional pre-built isolation provider. When None, one is built from config.isolation_strategy.

None

Attributes:

Name Type Description
config

The TenancyConfig this manager was constructed with.

store

The underlying TenantStore.

resolver TenantResolver

The active TenantResolver.

isolation_provider BaseIsolationProvider

The active BaseIsolationProvider.

Source code in src/fastapi_tenancy/manager.py
Python
def __init__(
    self,
    config: TenancyConfig,
    store: TenantStore[Tenant],
    custom_resolver: TenantResolver | None = None,
    isolation_provider: BaseIsolationProvider | None = None,
    audit_writer: Any | None = None,
) -> None:
    self.config = config

    # Build L1 cache first so we can wrap the store before the resolver
    # is constructed — the resolver will then call get_by_identifier on
    # the proxy, getting automatic L1 cache hits on every warm request.
    self._l1_cache: Any = None
    _effective_store: Any = store
    if config.cache_enabled:
        from fastapi_tenancy.cache.tenant_cache import TenantCache  # noqa: PLC0415

        self._l1_cache = TenantCache(
            max_size=config.l1_cache_max_size,
            ttl=config.l1_cache_ttl_seconds,
        )
        _effective_store = _CachingStoreProxy(store, self._l1_cache)
        logger.info(
            "L1 TenantCache wired max_size=%d ttl=%ds",
            self._l1_cache._max_size,
            self._l1_cache._ttl,
        )

    self.store = _effective_store
    self.resolver: TenantResolver = _build_resolver(config, _effective_store, custom_resolver)
    self.isolation_provider: BaseIsolationProvider = (
        isolation_provider if isolation_provider is not None else _build_provider(config)
    )
    # Accept any object with an async ``write(entry)`` method — satisfies
    # the AuditLogWriter protocol.  Falls back to the default logger writer.
    self._audit_writer: Any = (
        audit_writer if audit_writer is not None else _DefaultAuditLogWriter()
    )
    self._rate_limiter: Any = None  # Lazy-initialised from Redis.
    self._rate_limiting_enabled: bool = config.enable_rate_limiting

    # Background task that periodically evicts expired L1 cache entries.
    # Initialised to None here; started by initialize() when the cache is
    # enabled; cancelled by close() on shutdown.
    self._purge_task: asyncio.Task[None] | None = None

    # Field-level encryption — None when enable_encryption=False.
    from fastapi_tenancy.utils.encryption import TenancyEncryption  # noqa: PLC0415

    self._encryption: TenancyEncryption | None = TenancyEncryption.from_config(config)
    if self._encryption is not None:
        logger.info("Field-level encryption enabled (Fernet/AES-128-CBC+HMAC-SHA256)")

    logger.info(
        "TenancyManager created resolver=%s isolation=%s audit_writer=%s",
        type(self.resolver).__name__,
        type(self.isolation_provider).__name__,
        type(self._audit_writer).__name__,
    )

initialize async

Python
initialize() -> None

Initialise all components.

Call this once at application startup — typically inside a FastAPI lifespan context manager. It:

  1. Initialises the store (creates tables if applicable).
  2. Warms the Redis cache when configured.
  3. Starts the L1 cache background purge task (when cache is enabled).
  4. Establishes the Redis rate-limiter connection (when rate limiting is enabled).

Safe to call multiple times — all operations are idempotent. A second call while the purge task is already running will not start a duplicate.

Source code in src/fastapi_tenancy/manager.py
Python
async def initialize(self) -> None:
    """Initialise all components.

    Call this once at application startup — typically inside a FastAPI
    lifespan context manager.  It:

    1. Initialises the store (creates tables if applicable).
    2. Warms the Redis cache when configured.
    3. Starts the L1 cache background purge task (when cache is enabled).
    4. Establishes the Redis rate-limiter connection (when rate limiting is enabled).

    Safe to call multiple times — all operations are idempotent.  A second
    call while the purge task is already running will not start a duplicate.
    """
    if hasattr(self.store, "initialize"):
        await self.store.initialize()
        logger.info("Store initialised: %s", type(self.store).__name__)

    if self.config.cache_enabled and hasattr(self.store, "warm_cache"):
        await self.store.warm_cache()
        logger.info("Cache warmed")

    if self._l1_cache is not None:
        logger.info("L1 in-process tenant cache active")
        # Start the background purge task only when it is not already running.
        # The task runs every half-TTL interval so that on average no entry
        # survives longer than 1.5x its configured TTL in memory.  This is
        # a best-effort sweep — lazy eviction on access is still the primary
        # mechanism; the task merely reclaims memory in low-traffic processes.
        if self._purge_task is None or self._purge_task.done():
            self._purge_task = asyncio.create_task(
                self._run_cache_purge_loop(),
                name="fastapi-tenancy:l1-cache-purge",
            )
            logger.info(
                "L1 cache purge task started (interval=%ds)",
                max(1, self.config.l1_cache_ttl_seconds // 2),
            )

    if self.config.enable_rate_limiting and self.config.redis_url:
        await self._init_rate_limiter()

    logger.info("TenancyManager initialised")

close async

Python
close() -> None

Dispose all resources.

Call this inside a FastAPI lifespan finally block or on SIGTERM. Disposes engine pools, closes Redis connections, and cancels the background L1 cache purge task.

Both isolation_provider.close() and store.close() are called unconditionally — TenantStore now declares a concrete no-op close() that subclasses override when they hold external resources, so the old hasattr guard is no longer necessary.

Source code in src/fastapi_tenancy/manager.py
Python
async def close(self) -> None:
    """Dispose all resources.

    Call this inside a FastAPI lifespan ``finally`` block or on SIGTERM.
    Disposes engine pools, closes Redis connections, and cancels the
    background L1 cache purge task.

    Both ``isolation_provider.close()`` and ``store.close()`` are called
    unconditionally — ``TenantStore`` now declares a concrete no-op
    ``close()`` that subclasses override when they hold external resources,
    so the old ``hasattr`` guard is no longer necessary.
    """
    # Cancel the background purge task first so it cannot reference the
    # cache after the store is closed.
    if self._purge_task is not None and not self._purge_task.done():
        self._purge_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._purge_task
        logger.info("L1 cache purge task cancelled")
    self._purge_task = None

    if hasattr(self.isolation_provider, "close"):
        await self.isolation_provider.close()
    logger.info("Isolation provider closed")

    await self.store.close()
    logger.info("Store closed")

    logger.info("TenancyManager shut down cleanly")

create_lifespan

Python
create_lifespan() -> Any

Return an async context manager suitable for FastAPI's lifespan parameter.

Example::

Text Only
app = FastAPI(lifespan=manager.create_lifespan())

Returns:

Type Description
Any

An async context manager that calls initialize() on enter

Any

and close() on exit.

Source code in src/fastapi_tenancy/manager.py
Python
def create_lifespan(self) -> Any:
    """Return an async context manager suitable for FastAPI's ``lifespan`` parameter.

    Example::

        app = FastAPI(lifespan=manager.create_lifespan())

    Returns:
        An async context manager that calls ``initialize()`` on enter
        and ``close()`` on exit.
    """
    from contextlib import asynccontextmanager  # noqa: PLC0415

    @asynccontextmanager
    async def _lifespan(app: Any) -> AsyncIterator[None]:
        await self.initialize()
        try:
            yield
        finally:
            await self.close()

    return _lifespan

register_tenant async

Python
register_tenant(
    identifier: str,
    name: str,
    metadata: dict[str, Any] | None = None,
    isolation_strategy: IsolationStrategy | None = None,
    app_metadata: MetaData | None = None,
) -> Tenant

Register a new tenant and provision its database namespace.

This is the recommended way to onboard tenants programmatically. It:

  1. Validates the identifier.
  2. Generates a cryptographically secure id.
  3. Persists the tenant in the store.
  4. Calls isolation_provider.initialize_tenant() to create the schema/database.

.. note:: This does not auto-seed demo tenants. Call this explicitly from your onboarding flow or admin CLI.

Parameters:

Name Type Description Default
identifier str

Human-readable slug (must pass :func:~fastapi_tenancy.utils.validation.validate_tenant_identifier).

required
name str

Display name.

required
metadata dict[str, Any] | None

Optional initial metadata dict.

None
isolation_strategy IsolationStrategy | None

Per-tenant isolation override.

None
app_metadata MetaData | None

SQLAlchemy MetaData to create tables in the new tenant namespace.

None

Returns:

Type Description
Tenant

The newly created, stored :class:~fastapi_tenancy.core.types.Tenant.

Raises:

Type Description
ValueError

When identifier is invalid or already taken.

TenancyError

When store or isolation provider raises.

Source code in src/fastapi_tenancy/manager.py
Python
async def register_tenant(
    self,
    identifier: str,
    name: str,
    metadata: dict[str, Any] | None = None,
    isolation_strategy: IsolationStrategy | None = None,
    app_metadata: MetaData | None = None,
) -> Tenant:
    """Register a new tenant and provision its database namespace.

    This is the recommended way to onboard tenants programmatically.
    It:

    1. Validates the identifier.
    2. Generates a cryptographically secure ``id``.
    3. Persists the tenant in the store.
    4. Calls ``isolation_provider.initialize_tenant()`` to create the
       schema/database.

    .. note::
        This does **not** auto-seed demo tenants.  Call this explicitly
        from your onboarding flow or admin CLI.

    Args:
        identifier: Human-readable slug (must pass
            :func:`~fastapi_tenancy.utils.validation.validate_tenant_identifier`).
        name: Display name.
        metadata: Optional initial metadata dict.
        isolation_strategy: Per-tenant isolation override.
        app_metadata: SQLAlchemy ``MetaData`` to create tables in the
            new tenant namespace.

    Returns:
        The newly created, stored :class:`~fastapi_tenancy.core.types.Tenant`.

    Raises:
        ValueError: When *identifier* is invalid or already taken.
        TenancyError: When store or isolation provider raises.
    """
    from fastapi_tenancy.utils.validation import validate_tenant_identifier  # noqa: PLC0415

    if not validate_tenant_identifier(identifier):
        msg = (
            f"Invalid tenant identifier {identifier!r}. "
            "Must be 3-63 lowercase alphanumeric characters and hyphens."
        )
        raise ValueError(msg)

    tenant_id = generate_tenant_id()
    tenant = Tenant(
        id=tenant_id,
        identifier=identifier,
        name=name,
        status=TenantStatus(self.config.default_tenant_status),
        isolation_strategy=isolation_strategy,
        metadata=metadata or {},
    )

    # Encrypt sensitive fields before writing to the store.
    if self._encryption is not None:
        tenant = self._encryption.encrypt_tenant_fields(tenant)

    created = await self.store.create(tenant)
    logger.info("Registered tenant id=%s identifier=%s", created.id, created.identifier)

    try:
        await self.isolation_provider.initialize_tenant(created, metadata=app_metadata)
    except Exception as exc:
        # Rollback: remove from store so the identifier is not poisoned.
        try:
            await self.store.delete(created.id)
        except Exception as rollback_exc:  # pragma: no cover
            logger.error(  # noqa: TRY400
                "Rollback failed for tenant %s after initialize_tenant error — "
                "identifier %r may be poisoned in the store: %s",
                created.id,
                identifier,
                rollback_exc,
            )
        raise TenancyError(
            f"Failed to initialise tenant {created.id!r}: {exc}",
            details={"identifier": identifier},
        ) from exc

    # Return a decrypted copy so callers receive plaintext values.
    if self._encryption is not None:
        return self._encryption.decrypt_tenant_fields(created)
    return created

suspend_tenant async

Python
suspend_tenant(tenant_id: str) -> Tenant

Suspend a tenant, blocking all future requests.

Parameters:

Name Type Description Default
tenant_id str

ID of the tenant to suspend.

required

Returns:

Type Description
Tenant

The updated tenant with status=SUSPENDED.

Raises:

Type Description
TenantNotFoundError

When tenant_id does not exist.

Source code in src/fastapi_tenancy/manager.py
Python
async def suspend_tenant(self, tenant_id: str) -> Tenant:
    """Suspend a tenant, blocking all future requests.

    Args:
        tenant_id: ID of the tenant to suspend.

    Returns:
        The updated tenant with ``status=SUSPENDED``.

    Raises:
        TenantNotFoundError: When *tenant_id* does not exist.
    """
    tenant = await self.store.set_status(tenant_id, TenantStatus.SUSPENDED)
    logger.warning("Suspended tenant %s", tenant_id)
    return tenant

activate_tenant async

Python
activate_tenant(tenant_id: str) -> Tenant

Reinstate a suspended tenant.

Parameters:

Name Type Description Default
tenant_id str

ID of the tenant to activate.

required

Returns:

Type Description
Tenant

The updated tenant with status=ACTIVE.

Raises:

Type Description
TenantNotFoundError

When tenant_id does not exist.

Source code in src/fastapi_tenancy/manager.py
Python
async def activate_tenant(self, tenant_id: str) -> Tenant:
    """Reinstate a suspended tenant.

    Args:
        tenant_id: ID of the tenant to activate.

    Returns:
        The updated tenant with ``status=ACTIVE``.

    Raises:
        TenantNotFoundError: When *tenant_id* does not exist.
    """
    tenant = await self.store.set_status(tenant_id, TenantStatus.ACTIVE)
    logger.info("Activated tenant %s", tenant_id)
    return tenant

delete_tenant async

Python
delete_tenant(
    tenant_id: str,
    destroy_data: bool = False,
    app_metadata: MetaData | None = None,
) -> None

Delete a tenant, optionally destroying all associated data.

When destroy_data=True and config.enable_soft_delete=False, the tenant's isolation namespace (schema/database/rows) is permanently removed. This is irreversible.

Parameters:

Name Type Description Default
tenant_id str

ID of the tenant to delete.

required
destroy_data bool

When True, call isolation_provider.destroy_tenant() to purge all data. Default: False (soft-delete only).

False
app_metadata MetaData | None

SQLAlchemy MetaData passed to the isolation provider when destroy_data=True (required for RLS mode).

None

Raises:

Type Description
TenantNotFoundError

When tenant_id does not exist.

TenancyError

When data destruction fails.

Source code in src/fastapi_tenancy/manager.py
Python
async def delete_tenant(
    self,
    tenant_id: str,
    destroy_data: bool = False,
    app_metadata: MetaData | None = None,
) -> None:
    """Delete a tenant, optionally destroying all associated data.

    When ``destroy_data=True`` **and** ``config.enable_soft_delete=False``,
    the tenant's isolation namespace (schema/database/rows) is permanently
    removed.  This is irreversible.

    Args:
        tenant_id: ID of the tenant to delete.
        destroy_data: When ``True``, call
            ``isolation_provider.destroy_tenant()`` to purge all data.
            Default: ``False`` (soft-delete only).
        app_metadata: SQLAlchemy ``MetaData`` passed to the isolation
            provider when ``destroy_data=True`` (required for RLS mode).

    Raises:
        TenantNotFoundError: When *tenant_id* does not exist.
        TenancyError: When data destruction fails.
    """
    try:
        tenant = await self.store.get_by_id(tenant_id)
    except TenantNotFoundError:  # noqa: TRY203
        raise

    if destroy_data and not self.config.enable_soft_delete:
        await self.isolation_provider.destroy_tenant(
            tenant,
            metadata=app_metadata,
        )
        logger.warning("Destroyed data for tenant %s", tenant_id)

    if self.config.enable_soft_delete:
        await self.store.set_status(tenant_id, TenantStatus.DELETED)
        logger.warning("Soft-deleted tenant %s", tenant_id)
    else:
        await self.store.delete(tenant_id)
        logger.warning("Hard-deleted tenant %s", tenant_id)

check_rate_limit async

Python
check_rate_limit(tenant: Tenant) -> None

Check and atomically increment the sliding-window rate limit for tenant.

Uses a single Lua script executed server-side by Redis, replacing the previous two-pipeline approach that had an off-by-one race condition: concurrent requests at the exact boundary could all read count = limit - 1, all pass the check, and then all add their timestamps, silently breaching the limit.

The Lua script is atomic — Redis executes it without interleaving any other commands — so the check-and-increment is always consistent.

Each call generates a unique member string ("{now}:{uuid4}") so that two requests arriving within the same microsecond each add a distinct sorted-set entry rather than overwriting each other.

Parameters:

Name Type Description Default
tenant Tenant

The tenant whose rate limit to check.

required

Raises:

Type Description
RateLimitExceededError

When the tenant has exceeded its limit.

Source code in src/fastapi_tenancy/manager.py
Python
async def check_rate_limit(self, tenant: Tenant) -> None:
    """Check and atomically increment the sliding-window rate limit for *tenant*.

    Uses a single Lua script executed server-side by Redis, replacing the
    previous two-pipeline approach that had an off-by-one race condition:
    concurrent requests at the exact boundary could all read ``count =
    limit - 1``, all pass the check, and then all add their timestamps,
    silently breaching the limit.

    The Lua script is atomic — Redis executes it without interleaving any
    other commands — so the check-and-increment is always consistent.

    Each call generates a unique member string (``"{now}:{uuid4}"``) so
    that two requests arriving within the same microsecond each add a
    distinct sorted-set entry rather than overwriting each other.

    Args:
        tenant: The tenant whose rate limit to check.

    Raises:
        RateLimitExceededError: When the tenant has exceeded its limit.
    """
    if not self._rate_limiting_enabled or self._rate_limiter is None:
        return

    import time  # noqa: PLC0415
    import uuid  # noqa: PLC0415

    key = f"tenancy:ratelimit:{tenant.id}"
    window = self.config.rate_limit_window_seconds
    limit = self.config.rate_limit_per_minute
    now = time.time()
    window_start = now - window
    # Unique member: timestamp prefix for human readability, uuid4 suffix
    # to guarantee per-request uniqueness within the same microsecond.
    member = f"{now}:{uuid.uuid4().hex}"

    try:
        count: int = await self._rate_limiter.eval(
            self._RATE_LIMIT_LUA,
            1,  # number of KEYS
            key,
            now,
            window_start,
            limit,
            window,
            member,
        )
        if count > limit:
            raise RateLimitExceededError(  # noqa: TRY301
                tenant_id=tenant.id,
                limit=limit,
                window_seconds=window,
            )
    except RateLimitExceededError:
        raise
    except Exception as exc:
        # Redis unavailability previously caused silent fail-open.
        # Log at Exception level so it surfaces in alerting dashboards, and
        # honour the fail_closed flag when operators want strict enforcement.
        logger.exception(
            f"Rate limit Redis failure for tenant {tenant.id!r}{exc.__repr__()}. "
            "Operating in fail-{} mode.".format(
                "closed" if getattr(self.config, "rate_limit_fail_closed", False) else "open"
            )
        )
        if getattr(self.config, "rate_limit_fail_closed", False):
            raise RateLimitExceededError(
                tenant_id=tenant.id,
                limit=self.config.rate_limit_per_minute,
                window_seconds=self.config.rate_limit_window_seconds,
            ) from exc

write_audit_log async

Python
write_audit_log(entry: AuditLog) -> None

Persist an audit log entry via the configured AuditLogWriter.

Delegates to the AuditLogWriter supplied at construction time. The default writer logs the entry at INFO level. Supply a custom writer to persist to a database, message queue, or external service::

Text Only
class CloudWatchAuditWriter:
    async def write(self, entry: AuditLog) -> None:
        await cw.put_log_events(...)

manager = TenancyManager(config, store, audit_writer=CloudWatchAuditWriter())

Parameters:

Name Type Description Default
entry AuditLog

The audit log entry to persist.

required
Source code in src/fastapi_tenancy/manager.py
Python
async def write_audit_log(self, entry: AuditLog) -> None:
    """Persist an audit log entry via the configured ``AuditLogWriter``.

    Delegates to the ``AuditLogWriter`` supplied at construction time.
    The default writer logs the entry at ``INFO`` level.  Supply a custom
    writer to persist to a database, message queue, or external service::

        class CloudWatchAuditWriter:
            async def write(self, entry: AuditLog) -> None:
                await cw.put_log_events(...)

        manager = TenancyManager(config, store, audit_writer=CloudWatchAuditWriter())

    Args:
        entry: The audit log entry to persist.
    """
    await self._audit_writer.write(entry)