Skip to content

Tenant Stores

Abstract base

TenantStore

Bases: ABC, Generic[TenantT]

Abstract base class for tenant metadata storage backends.

Implementations must be:

  • Fully async — every method is a coroutine.
  • Concurrency-safe — instances are created once at startup and shared across all requests; no mutable instance state after init.
  • Raise on not-found — methods that look up a single tenant must raise TenantNotFoundError, never return None.
  • Wrap unexpected errors — unexpected storage errors should be wrapped in TenancyError with a details dict safe to log.

Type parameter

TenantT: The concrete tenant domain model (defaults to Tenant).

get_by_id abstractmethod async

Python
get_by_id(tenant_id: str) -> TenantT

Fetch a tenant by its opaque unique ID.

Parameters:

Name Type Description Default
tenant_id str

Opaque tenant primary key.

required

Returns:

Type Description
TenantT

The matching TenantT.

Raises:

Type Description
TenantNotFoundError

When no tenant with tenant_id exists.

Source code in src/fastapi_tenancy/storage/tenant_store.py
Python
@abstractmethod
async def get_by_id(self, tenant_id: str) -> TenantT:
    """Fetch a tenant by its opaque unique ID.

    Args:
        tenant_id: Opaque tenant primary key.

    Returns:
        The matching ``TenantT``.

    Raises:
        TenantNotFoundError: When no tenant with *tenant_id* exists.
    """

get_by_identifier abstractmethod async

Python
get_by_identifier(identifier: str) -> TenantT

Fetch a tenant by its human-readable slug identifier.

Parameters:

Name Type Description Default
identifier str

The tenant slug (e.g. "acme-corp").

required

Returns:

Type Description
TenantT

The matching TenantT.

Raises:

Type Description
TenantNotFoundError

When no tenant with identifier exists.

Source code in src/fastapi_tenancy/storage/tenant_store.py
Python
@abstractmethod
async def get_by_identifier(self, identifier: str) -> TenantT:
    """Fetch a tenant by its human-readable slug identifier.

    Args:
        identifier: The tenant slug (e.g. ``"acme-corp"``).

    Returns:
        The matching ``TenantT``.

    Raises:
        TenantNotFoundError: When no tenant with *identifier* exists.
    """

create abstractmethod async

Python
create(tenant: TenantT) -> TenantT

Persist a new tenant record.

Parameters:

Name Type Description Default
tenant TenantT

A fully-populated TenantT instance. The id and identifier must be unique.

required

Returns:

Type Description
TenantT

The stored tenant (may include server-generated timestamps).

Raises:

Type Description
ValueError

When the id or identifier already exists.

TenancyError

On unexpected storage failure.

Source code in src/fastapi_tenancy/storage/tenant_store.py
Python
@abstractmethod
async def create(self, tenant: TenantT) -> TenantT:
    """Persist a new tenant record.

    Args:
        tenant: A fully-populated ``TenantT`` instance.  The ``id`` and
            ``identifier`` must be unique.

    Returns:
        The stored tenant (may include server-generated timestamps).

    Raises:
        ValueError: When the ``id`` or ``identifier`` already exists.
        TenancyError: On unexpected storage failure.
    """

update abstractmethod async

Python
update(tenant: TenantT) -> TenantT

Replace all mutable fields of an existing tenant.

Because Tenant is immutable, callers must first build a modified copy with model_copy::

Text Only
updated = tenant.model_copy(update={"name": "New Name"})
result  = await store.update(updated)

Parameters:

Name Type Description Default
tenant TenantT

Modified tenant object. tenant.id must already exist.

required

Returns:

Type Description
TenantT

The updated tenant as stored (timestamps refreshed).

Raises:

Type Description
TenantNotFoundError

When tenant.id does not exist.

TenancyError

On unexpected storage failure.

Source code in src/fastapi_tenancy/storage/tenant_store.py
Python
@abstractmethod
async def update(self, tenant: TenantT) -> TenantT:
    """Replace all mutable fields of an existing tenant.

    Because ``Tenant`` is immutable, callers must first build a modified
    copy with ``model_copy``::

        updated = tenant.model_copy(update={"name": "New Name"})
        result  = await store.update(updated)

    Args:
        tenant: Modified tenant object.  ``tenant.id`` must already exist.

    Returns:
        The updated tenant as stored (timestamps refreshed).

    Raises:
        TenantNotFoundError: When ``tenant.id`` does not exist.
        TenancyError: On unexpected storage failure.
    """

delete abstractmethod async

Python
delete(tenant_id: str) -> None

Remove a tenant from the store.

Behaviour depends on TenancyConfig.enable_soft_delete: implementations should mark the tenant as DELETED rather than removing the row when soft-delete is enabled.

Parameters:

Name Type Description Default
tenant_id str

ID of the tenant to delete.

required

Raises:

Type Description
TenantNotFoundError

When tenant_id does not exist.

TenancyError

On unexpected storage failure.

Source code in src/fastapi_tenancy/storage/tenant_store.py
Python
@abstractmethod
async def delete(self, tenant_id: str) -> None:
    """Remove a tenant from the store.

    Behaviour depends on ``TenancyConfig.enable_soft_delete``:
    implementations should mark the tenant as ``DELETED`` rather than
    removing the row when soft-delete is enabled.

    Args:
        tenant_id: ID of the tenant to delete.

    Raises:
        TenantNotFoundError: When *tenant_id* does not exist.
        TenancyError: On unexpected storage failure.
    """

list abstractmethod async

Python
list(
    skip: int = 0,
    limit: int = 100,
    status: TenantStatus | None = None,
) -> Sequence[TenantT]

Return a page of tenants, ordered by creation date (newest first).

Parameters:

Name Type Description Default
skip int

Number of records to skip (offset-based pagination).

0
limit int

Maximum number of records to return.

100
status TenantStatus | None

When provided, only tenants with this status are returned.

None

Returns:

Type Description
Sequence[TenantT]

A sequence of TenantT objects.

Source code in src/fastapi_tenancy/storage/tenant_store.py
Python
@abstractmethod
async def list(
    self,
    skip: int = 0,
    limit: int = 100,
    status: TenantStatus | None = None,
) -> Sequence[TenantT]:
    """Return a page of tenants, ordered by creation date (newest first).

    Args:
        skip: Number of records to skip (offset-based pagination).
        limit: Maximum number of records to return.
        status: When provided, only tenants with this status are returned.

    Returns:
        A sequence of ``TenantT`` objects.
    """

count abstractmethod async

Python
count(status: TenantStatus | None = None) -> int

Return the total number of tenants, optionally filtered by status.

Parameters:

Name Type Description Default
status TenantStatus | None

When provided, count only tenants with this status.

None

Returns:

Type Description
int

Non-negative integer count.

Source code in src/fastapi_tenancy/storage/tenant_store.py
Python
@abstractmethod
async def count(self, status: TenantStatus | None = None) -> int:
    """Return the total number of tenants, optionally filtered by status.

    Args:
        status: When provided, count only tenants with this status.

    Returns:
        Non-negative integer count.
    """

exists abstractmethod async

Python
exists(tenant_id: str) -> bool

Return True if a tenant with tenant_id exists.

More efficient than get_by_id when only existence is needed.

Parameters:

Name Type Description Default
tenant_id str

Opaque tenant primary key.

required

Returns:

Type Description
bool

True when the tenant exists; False otherwise.

Source code in src/fastapi_tenancy/storage/tenant_store.py
Python
@abstractmethod
async def exists(self, tenant_id: str) -> bool:
    """Return ``True`` if a tenant with *tenant_id* exists.

    More efficient than ``get_by_id`` when only existence is needed.

    Args:
        tenant_id: Opaque tenant primary key.

    Returns:
        ``True`` when the tenant exists; ``False`` otherwise.
    """

set_status abstractmethod async

Python
set_status(tenant_id: str, status: TenantStatus) -> TenantT

Change the lifecycle status of a tenant.

This is the preferred method for status transitions (activate, suspend, etc.) because it only touches the status field.

Parameters:

Name Type Description Default
tenant_id str

ID of the tenant to update.

required
status TenantStatus

The new TenantStatus.

required

Returns:

Type Description
TenantT

The updated tenant.

Raises:

Type Description
TenantNotFoundError

When tenant_id does not exist.

Source code in src/fastapi_tenancy/storage/tenant_store.py
Python
@abstractmethod
async def set_status(self, tenant_id: str, status: TenantStatus) -> TenantT:
    """Change the lifecycle status of a tenant.

    This is the preferred method for status transitions (activate, suspend,
    etc.) because it only touches the status field.

    Args:
        tenant_id: ID of the tenant to update.
        status: The new ``TenantStatus``.

    Returns:
        The updated tenant.

    Raises:
        TenantNotFoundError: When *tenant_id* does not exist.
    """

update_metadata abstractmethod async

Python
update_metadata(
    tenant_id: str, metadata: dict[str, Any]
) -> TenantT

Atomically merge metadata into the tenant's metadata dictionary.

The merge is shallow: top-level keys in metadata overwrite existing keys; keys absent from metadata are preserved.

Implementations should perform this as an atomic operation at the database level (e.g. PostgreSQL jsonb || operator) to avoid the read-modify-write race that a naïve in-application merge introduces.

Parameters:

Name Type Description Default
tenant_id str

ID of the tenant to update.

required
metadata dict[str, Any]

Key-value pairs to merge.

required

Returns:

Type Description
TenantT

The updated tenant.

Raises:

Type Description
TenantNotFoundError

When tenant_id does not exist.

Source code in src/fastapi_tenancy/storage/tenant_store.py
Python
@abstractmethod
async def update_metadata(
    self,
    tenant_id: str,
    metadata: dict[str, Any],
) -> TenantT:
    """Atomically merge *metadata* into the tenant's metadata dictionary.

    The merge is shallow: top-level keys in *metadata* overwrite existing
    keys; keys absent from *metadata* are preserved.

    Implementations should perform this as an atomic operation at the
    database level (e.g. PostgreSQL ``jsonb ||`` operator) to avoid the
    read-modify-write race that a naïve in-application merge introduces.

    Args:
        tenant_id: ID of the tenant to update.
        metadata: Key-value pairs to merge.

    Returns:
        The updated tenant.

    Raises:
        TenantNotFoundError: When *tenant_id* does not exist.
    """

close async

Python
close() -> None

Release any resources held by this store (connections, pools, etc.).

The base implementation is a no-op — subclasses that hold external resources (database engines, Redis connections) must override this method and dispose them properly.

Called automatically by TenancyManager.close() on application shutdown. If your store holds connection pools, failing to override this will leak file descriptors and connections on process exit.

Example override::

Text Only
async def close(self) -> None:
    await self._engine.dispose()
Source code in src/fastapi_tenancy/storage/tenant_store.py
Python
async def close(self) -> None:
    """Release any resources held by this store (connections, pools, etc.).

    The base implementation is a no-op — subclasses that hold external
    resources (database engines, Redis connections) **must** override this
    method and dispose them properly.

    Called automatically by ``TenancyManager.close()`` on application
    shutdown.  If your store holds connection pools, failing to override
    this will leak file descriptors and connections on process exit.

    Example override::

        async def close(self) -> None:
            await self._engine.dispose()
    """

get_by_ids async

Python
get_by_ids(tenant_ids: Iterable[str]) -> Sequence[TenantT]

Fetch multiple tenants by their IDs in one logical call.

The base implementation issues one get_by_id call per ID. Override this for production backends to issue a single query.

Parameters:

Name Type Description Default
tenant_ids Iterable[str]

Iterable of opaque tenant IDs.

required

Returns:

Type Description
Sequence[TenantT]

Tenants that were found; IDs with no matching tenant are silently

Sequence[TenantT]

skipped.

Source code in src/fastapi_tenancy/storage/tenant_store.py
Python
async def get_by_ids(self, tenant_ids: Iterable[str]) -> Sequence[TenantT]:
    """Fetch multiple tenants by their IDs in one logical call.

    The base implementation issues one ``get_by_id`` call per ID.
    **Override this for production backends** to issue a single query.

    Args:
        tenant_ids: Iterable of opaque tenant IDs.

    Returns:
        Tenants that were found; IDs with no matching tenant are silently
        skipped.
    """
    result: list[TenantT] = []
    for tid in tenant_ids:
        try:
            result.append(await self.get_by_id(tid))
        except TenantNotFoundError:
            continue
    return result

search async

Python
search(
    query: str, limit: int = 10, _scan_limit: int = 100
) -> Sequence[TenantT]

Search for tenants whose name or identifier contains query.

The base implementation loads up to _scan_limit records and filters them in Python. Override for production backends with a database-level ILIKE/LIKE query.

Result ordering is backend-defined. Concrete implementations may apply relevance ranking (e.g. InMemoryTenantStore sorts by exact match → prefix match → substring match), or may return results in arbitrary order (e.g. the SQLAlchemyTenantStore override sorts alphabetically by identifier). Do not rely on a specific ordering unless the concrete implementation documents it.

Parameters:

Name Type Description Default
query str

Case-insensitive substring to search for.

required
limit int

Maximum number of results to return.

10
_scan_limit int

Maximum records fetched for in-memory filtering.

100

Returns:

Type Description
Sequence[TenantT]

Matching tenants, up to limit results.

Warning

The base implementation is O(n) in the number of tenants. For deployments with thousands of tenants, override with a DB-level search in the concrete subclass.

Source code in src/fastapi_tenancy/storage/tenant_store.py
Python
async def search(
    self,
    query: str,
    limit: int = 10,
    _scan_limit: int = 100,
) -> Sequence[TenantT]:
    """Search for tenants whose name or identifier contains *query*.

    The base implementation loads up to *_scan_limit* records and filters
    them in Python.  **Override for production backends** with a
    database-level ``ILIKE``/``LIKE`` query.

    **Result ordering is backend-defined.**  Concrete implementations may
    apply relevance ranking (e.g. ``InMemoryTenantStore`` sorts by exact
    match → prefix match → substring match), or may return results in
    arbitrary order (e.g. the ``SQLAlchemyTenantStore`` override sorts
    alphabetically by identifier).  Do not rely on a specific ordering
    unless the concrete implementation documents it.

    Args:
        query: Case-insensitive substring to search for.
        limit: Maximum number of results to return.
        _scan_limit: Maximum records fetched for in-memory filtering.

    Returns:
        Matching tenants, up to *limit* results.

    Warning:
        The base implementation is O(n) in the number of tenants.  For
        deployments with thousands of tenants, override with a DB-level
        search in the concrete subclass.
    """
    query_lower = query.lower()
    candidates = await self.list(skip=0, limit=_scan_limit)
    # Warn only when both the scan limit AND the result limit are saturated,
    # which is the scenario where results are most likely to be silently
    # truncated.  A store with exactly _scan_limit tenants where all match
    # would hit this; a store with _scan_limit tenants where none match
    # would not — avoiding a spurious warning in the latter case.
    matches: list[TenantT] = [
        t
        for t in candidates
        if query_lower in t.identifier.lower() or query_lower in t.name.lower()
    ]
    if len(candidates) == _scan_limit and len(matches) >= limit:
        logger.warning(
            "%s.search() reached both the scan limit (%d) and the result limit (%d). "
            "Results may be incomplete.  Override search() in your concrete store with "
            "a database-level query (e.g. ILIKE) to avoid missing results for large "
            "tenant fleets.",
            type(self).__name__,
            _scan_limit,
            limit,
        )
    return matches[:limit]

bulk_update_status async

Python
bulk_update_status(
    tenant_ids: Iterable[str], status: TenantStatus
) -> Sequence[TenantT]

Update the status of multiple tenants in one logical operation.

The base implementation calls set_status once per ID. Override for production backends to issue a single SQL UPDATE with an IN clause.

Parameters:

Name Type Description Default
tenant_ids Iterable[str]

IDs of the tenants to update.

required
status TenantStatus

New TenantStatus applied to all matched tenants.

required

Returns:

Type Description
Sequence[TenantT]

Updated tenants; IDs with no matching tenant are silently skipped.

Source code in src/fastapi_tenancy/storage/tenant_store.py
Python
async def bulk_update_status(
    self,
    tenant_ids: Iterable[str],
    status: TenantStatus,
) -> Sequence[TenantT]:
    """Update the status of multiple tenants in one logical operation.

    The base implementation calls ``set_status`` once per ID.
    **Override for production backends** to issue a single SQL ``UPDATE``
    with an ``IN`` clause.

    Args:
        tenant_ids: IDs of the tenants to update.
        status: New ``TenantStatus`` applied to all matched tenants.

    Returns:
        Updated tenants; IDs with no matching tenant are silently skipped.
    """
    updated: list[TenantT] = []
    for tid in tenant_ids:
        try:
            updated.append(await self.set_status(tid, status))
        except TenantNotFoundError:
            continue
    return updated

SQLAlchemy

SQLAlchemyTenantStore

Python
SQLAlchemyTenantStore(
    database_url: str,
    pool_size: int = 10,
    max_overflow: int = 20,
    pool_pre_ping: bool = True,
    pool_recycle: int = 3600,
    echo: bool = False,
)

Bases: TenantStore[Tenant]

Async SQLAlchemy-backed tenant store compatible with all major databases.

This is the recommended production store for fastapi-tenancy.

Lifecycle::

Text Only
store = SQLAlchemyTenantStore(
    database_url="postgresql+asyncpg://user:pass@localhost/myapp",
)
await store.initialize()   # create table if not exists

# ... serve requests ...

await store.close()        # dispose pool on shutdown

Parameters:

Name Type Description Default
database_url str

Async SQLAlchemy connection URL.

required
pool_size int

Number of persistent connections in the pool.

10
max_overflow int

Extra connections allowed under burst load.

20
pool_pre_ping bool

Verify connections before checkout (recommended).

True
pool_recycle int

Seconds after which a connection is proactively replaced.

3600
echo bool

Log every SQL statement (development only).

False
Source code in src/fastapi_tenancy/storage/database.py
Python
def __init__(
    self,
    database_url: str,
    pool_size: int = 10,
    max_overflow: int = 20,
    pool_pre_ping: bool = True,
    pool_recycle: int = 3600,
    echo: bool = False,
) -> None:
    self._dialect = detect_dialect(database_url)
    kw: dict[str, Any] = {"echo": echo}

    if requires_static_pool(self._dialect):
        kw["poolclass"] = StaticPool
        kw["connect_args"] = {"check_same_thread": False}
    else:
        kw["pool_size"] = pool_size
        kw["max_overflow"] = max_overflow
        kw["pool_pre_ping"] = pool_pre_ping
        kw["pool_recycle"] = pool_recycle

    self._engine: AsyncEngine = create_async_engine(database_url, **kw)
    # autobegin=False — explicit transaction control; no implicit BEGIN on
    # session creation.  expire_on_commit=False — prevent lazy loads after
    # commit on the now-closed session.
    self._session_factory: async_sessionmaker[AsyncSession] = async_sessionmaker(
        bind=self._engine,
        class_=AsyncSession,
        expire_on_commit=False,
        autoflush=False,
        autobegin=False,
    )
    logger.info(
        "SQLAlchemyTenantStore ready dialect=%s pool_size=%d",
        self._dialect.value,
        pool_size,
    )

initialize async

Python
initialize() -> None

Create the tenants table if it does not already exist (idempotent).

Source code in src/fastapi_tenancy/storage/database.py
Python
async def initialize(self) -> None:
    """Create the ``tenants`` table if it does not already exist (idempotent)."""
    async with self._engine.begin() as conn:
        await conn.run_sync(_Base.metadata.create_all)
    logger.info("tenants table ready")

close async

Python
close() -> None

Dispose the engine and release all pooled connections.

Source code in src/fastapi_tenancy/storage/database.py
Python
async def close(self) -> None:
    """Dispose the engine and release all pooled connections."""
    await self._engine.dispose()
    logger.info("SQLAlchemyTenantStore closed")

get_by_id async

Python
get_by_id(tenant_id: str) -> Tenant

Fetch a tenant by its opaque unique ID.

Parameters:

Name Type Description Default
tenant_id str

Opaque tenant primary key.

required

Returns:

Type Description
Tenant

The matching Tenant.

Raises:

Type Description
TenantNotFoundError

When no tenant with tenant_id exists.

Source code in src/fastapi_tenancy/storage/database.py
Python
async def get_by_id(self, tenant_id: str) -> Tenant:
    """Fetch a tenant by its opaque unique ID.

    Args:
        tenant_id: Opaque tenant primary key.

    Returns:
        The matching ``Tenant``.

    Raises:
        TenantNotFoundError: When no tenant with *tenant_id* exists.
    """
    async with self._session_factory() as session, session.begin():
        row = await session.execute(select(TenantModel).where(TenantModel.id == tenant_id))
        model = row.scalar_one_or_none()
    if model is None:
        raise TenantNotFoundError(identifier=tenant_id)
    return model.to_domain()

get_by_identifier async

Python
get_by_identifier(identifier: str) -> Tenant

Fetch a tenant by its human-readable slug identifier.

Parameters:

Name Type Description Default
identifier str

The tenant slug (e.g. "acme-corp").

required

Returns:

Type Description
Tenant

The matching Tenant.

Raises:

Type Description
TenantNotFoundError

When no tenant with identifier exists.

Source code in src/fastapi_tenancy/storage/database.py
Python
async def get_by_identifier(self, identifier: str) -> Tenant:
    """Fetch a tenant by its human-readable slug identifier.

    Args:
        identifier: The tenant slug (e.g. ``"acme-corp"``).

    Returns:
        The matching ``Tenant``.

    Raises:
        TenantNotFoundError: When no tenant with *identifier* exists.
    """
    async with self._session_factory() as session, session.begin():
        row = await session.execute(
            select(TenantModel).where(TenantModel.identifier == identifier)
        )
        model = row.scalar_one_or_none()
    if model is None:
        raise TenantNotFoundError(identifier=identifier)
    return model.to_domain()

list async

Python
list(
    skip: int = 0,
    limit: int = 100,
    status: TenantStatus | None = None,
) -> list[Tenant]

Return a page of tenants ordered by creation date (newest first).

Parameters:

Name Type Description Default
skip int

Number of records to skip (offset-based pagination).

0
limit int

Maximum number of records to return.

100
status TenantStatus | None

Optional status filter.

None

Returns:

Type Description
list[Tenant]

List of Tenant objects.

Source code in src/fastapi_tenancy/storage/database.py
Python
async def list(
    self,
    skip: int = 0,
    limit: int = 100,
    status: TenantStatus | None = None,
) -> list[Tenant]:
    """Return a page of tenants ordered by creation date (newest first).

    Args:
        skip: Number of records to skip (offset-based pagination).
        limit: Maximum number of records to return.
        status: Optional status filter.

    Returns:
        List of ``Tenant`` objects.
    """
    async with self._session_factory() as session, session.begin():
        query = select(TenantModel)
        if status is not None:
            query = query.where(TenantModel.status == status.value)
        query = query.order_by(TenantModel.created_at.desc()).offset(skip).limit(limit)
        result = await session.execute(query)
        return [m.to_domain() for m in result.scalars().all()]

count async

Python
count(status: TenantStatus | None = None) -> int

Return the total number of tenants, optionally filtered by status.

Parameters:

Name Type Description Default
status TenantStatus | None

Optional status filter.

None

Returns:

Type Description
int

Non-negative integer count.

Source code in src/fastapi_tenancy/storage/database.py
Python
async def count(self, status: TenantStatus | None = None) -> int:
    """Return the total number of tenants, optionally filtered by status.

    Args:
        status: Optional status filter.

    Returns:
        Non-negative integer count.
    """
    async with self._session_factory() as session, session.begin():
        query = select(func.count(TenantModel.id))
        if status is not None:
            query = query.where(TenantModel.status == status.value)
        result = await session.execute(query)
        return result.scalar() or 0

exists async

Python
exists(tenant_id: str) -> bool

Return True when a tenant with tenant_id exists.

Parameters:

Name Type Description Default
tenant_id str

Opaque tenant primary key.

required

Returns:

Type Description
bool

Existence flag.

Source code in src/fastapi_tenancy/storage/database.py
Python
async def exists(self, tenant_id: str) -> bool:
    """Return ``True`` when a tenant with *tenant_id* exists.

    Args:
        tenant_id: Opaque tenant primary key.

    Returns:
        Existence flag.
    """
    async with self._session_factory() as session, session.begin():
        result = await session.execute(
            select(TenantModel.id).where(TenantModel.id == tenant_id)
        )
        return result.scalar_one_or_none() is not None

create async

Python
create(tenant: Tenant) -> Tenant

Persist a new tenant record.

Parameters:

Name Type Description Default
tenant Tenant

Fully-populated tenant object.

required

Returns:

Type Description
Tenant

The stored tenant with server-generated timestamps.

Raises:

Type Description
ValueError

When the id or identifier already exists.

TenancyError

On unexpected storage failure.

Source code in src/fastapi_tenancy/storage/database.py
Python
async def create(self, tenant: Tenant) -> Tenant:
    """Persist a new tenant record.

    Args:
        tenant: Fully-populated tenant object.

    Returns:
        The stored tenant with server-generated timestamps.

    Raises:
        ValueError: When the ``id`` or ``identifier`` already exists.
        TenancyError: On unexpected storage failure.
    """
    try:
        async with self._session_factory() as session:
            model = TenantModel(
                id=tenant.id,
                identifier=tenant.identifier,
                name=tenant.name,
                status=tenant.status.value,
                isolation_strategy=(
                    tenant.isolation_strategy.value if tenant.isolation_strategy else None
                ),
                database_url=tenant.database_url,
                schema_name=tenant.schema_name,
                tenant_metadata=json.dumps(tenant.metadata),
                created_at=tenant.created_at,
                updated_at=tenant.updated_at,
            )

            try:
                async with session.begin():
                    session.add(model)
                    await session.flush()
            except IntegrityError:
                msg = (
                    f"Tenant id={tenant.id!r} or identifier={tenant.identifier!r} "
                    "already exists."
                )
                raise ValueError(msg) from None

            logger.info(
                "Created tenant id=%s identifier=%s",
                tenant.id,
                tenant.identifier,
            )
            return model.to_domain()

    except ValueError:
        raise
    except Exception as exc:
        raise TenancyError(f"Failed to create tenant: {exc}") from exc

update async

Python
update(tenant: Tenant) -> Tenant

Replace all mutable fields of an existing tenant.

Parameters:

Name Type Description Default
tenant Tenant

Updated tenant object.

required

Returns:

Type Description
Tenant

The updated tenant with a refreshed updated_at timestamp.

Raises:

Type Description
TenantNotFoundError

When tenant.id does not exist.

TenancyError

On unexpected storage failure.

Source code in src/fastapi_tenancy/storage/database.py
Python
async def update(self, tenant: Tenant) -> Tenant:
    """Replace all mutable fields of an existing tenant.

    Args:
        tenant: Updated tenant object.

    Returns:
        The updated tenant with a refreshed ``updated_at`` timestamp.

    Raises:
        TenantNotFoundError: When ``tenant.id`` does not exist.
        TenancyError: On unexpected storage failure.
    """
    async with self._session_factory() as session:
        try:
            async with session.begin():
                result = await session.execute(
                    select(TenantModel).where(TenantModel.id == tenant.id)
                )
                model = result.scalar_one_or_none()
                if model is None:
                    raise TenantNotFoundError(identifier=tenant.id)  # noqa: TRY301
                model.identifier = tenant.identifier
                model.name = tenant.name
                model.status = tenant.status.value
                model.isolation_strategy = (
                    tenant.isolation_strategy.value if tenant.isolation_strategy else None
                )
                model.database_url = tenant.database_url
                model.schema_name = tenant.schema_name
                model.tenant_metadata = json.dumps(tenant.metadata)
                model.updated_at = datetime.now(UTC)
                # Convert to domain INSIDE the transaction while the ORM
                # model is still attached and all attributes are loaded.
                # Calling to_domain() after the begin() block exits risks
                # DetachedInstanceError when expire_on_commit=True is active.
                domain = model.to_domain()
        except TenantNotFoundError:
            raise
        except IntegrityError:
            msg = f"Tenant identifier={tenant.identifier!r} already exists."
            raise ValueError(msg) from None
        except Exception as exc:
            raise TenancyError(f"Failed to update tenant: {exc}") from exc

        logger.info("Updated tenant id=%s", tenant.id)
        return domain

delete async

Python
delete(tenant_id: str) -> None

Remove a tenant from the store.

Parameters:

Name Type Description Default
tenant_id str

ID of the tenant to delete.

required

Raises:

Type Description
TenantNotFoundError

When tenant_id does not exist.

TenancyError

On unexpected storage failure.

Source code in src/fastapi_tenancy/storage/database.py
Python
async def delete(self, tenant_id: str) -> None:
    """Remove a tenant from the store.

    Args:
        tenant_id: ID of the tenant to delete.

    Raises:
        TenantNotFoundError: When *tenant_id* does not exist.
        TenancyError: On unexpected storage failure.
    """
    try:
        async with self._session_factory() as session:
            try:
                async with session.begin():
                    result = await session.execute(
                        select(TenantModel).where(TenantModel.id == tenant_id)
                    )
                    model = result.scalar_one_or_none()
                    if not model:
                        raise TenantNotFoundError(identifier=tenant_id)  # noqa: TRY301
                    await session.delete(model)

            except TenantNotFoundError:  # noqa: TRY203
                raise

            logger.info("Deleted tenant id=%s", tenant_id)

    except TenantNotFoundError:
        raise
    except Exception as exc:
        raise TenancyError(f"Failed to delete tenant: {exc}") from exc

set_status async

Python
set_status(tenant_id: str, status: TenantStatus) -> Tenant

Change the lifecycle status of a tenant.

Parameters:

Name Type Description Default
tenant_id str

ID of the tenant to update.

required
status TenantStatus

The new TenantStatus.

required

Returns:

Type Description
Tenant

The updated tenant.

Raises:

Type Description
TenantNotFoundError

When tenant_id does not exist.

TenancyError

On unexpected storage failure.

Source code in src/fastapi_tenancy/storage/database.py
Python
async def set_status(self, tenant_id: str, status: TenantStatus) -> Tenant:
    """Change the lifecycle status of a tenant.

    Args:
        tenant_id: ID of the tenant to update.
        status: The new ``TenantStatus``.

    Returns:
        The updated tenant.

    Raises:
        TenantNotFoundError: When *tenant_id* does not exist.
        TenancyError: On unexpected storage failure.
    """
    async with self._session_factory() as session:
        try:
            async with session.begin():
                result = await session.execute(
                    select(TenantModel).where(TenantModel.id == tenant_id)
                )
                model = result.scalar_one_or_none()
                if model is None:
                    raise TenantNotFoundError(identifier=tenant_id)  # noqa: TRY301
                model.status = status.value
                model.updated_at = datetime.now(UTC)
                # Convert to domain INSIDE the transaction (same reason as update()).
                domain = model.to_domain()
        except TenantNotFoundError:
            raise
        except Exception as exc:
            raise TenancyError(f"Failed to update status: {exc}") from exc
        logger.info("Set tenant %s status → %s", tenant_id, status.value)
        return domain

update_metadata async

Python
update_metadata(
    tenant_id: str, metadata: dict[str, Any]
) -> Tenant

Atomically merge metadata into the tenant's metadata blob.

Uses a database-level atomic JSON merge on PostgreSQL (|| operator) and a serialisable read-modify-write transaction on all other dialects.

Parameters:

Name Type Description Default
tenant_id str

ID of the tenant to update.

required
metadata dict[str, Any]

Key-value pairs to shallow-merge into existing metadata.

required

Returns:

Type Description
Tenant

The updated tenant.

Raises:

Type Description
TenantNotFoundError

When tenant_id does not exist.

TenancyError

On unexpected storage failure.

Source code in src/fastapi_tenancy/storage/database.py
Python
async def update_metadata(
    self,
    tenant_id: str,
    metadata: dict[str, Any],
) -> Tenant:
    """Atomically merge *metadata* into the tenant's metadata blob.

    Uses a database-level atomic JSON merge on PostgreSQL (``||`` operator)
    and a serialisable read-modify-write transaction on all other dialects.

    Args:
        tenant_id: ID of the tenant to update.
        metadata: Key-value pairs to shallow-merge into existing metadata.

    Returns:
        The updated tenant.

    Raises:
        TenantNotFoundError: When *tenant_id* does not exist.
        TenancyError: On unexpected storage failure.
    """
    import asyncio as _asyncio  # noqa: PLC0415

    if self._dialect != DbDialect.POSTGRESQL:
        async with self._session_factory() as session:
            return await self._update_metadata_generic(session, tenant_id, metadata)

    _MAX_RETRIES = 5
    _BASE_BACKOFF = 0.005

    for attempt in range(_MAX_RETRIES):
        async with self._session_factory() as session:
            try:
                return await self._update_metadata_pg(session, tenant_id, metadata)

            except TenantNotFoundError:
                raise

            except Exception as exc:
                is_serialization = (
                    "SerializationError" in type(exc).__name__
                    or "SerializationError" in type(getattr(exc, "orig", None)).__name__
                    or "could not serialize" in str(exc).lower()
                )

                if is_serialization and attempt < _MAX_RETRIES - 1:
                    backoff = _BASE_BACKOFF * (2**attempt)
                    await _asyncio.sleep(backoff)
                    continue

                raise TenancyError(f"Failed to update metadata: {exc}") from exc

    raise TenancyError(f"update_metadata exhausted retries for tenant {tenant_id!r}")

get_by_ids async

Python
get_by_ids(tenant_ids: Any) -> Sequence[Tenant]

Fetch multiple tenants in a single query using IN clause.

Overrides the base implementation to avoid N+1 queries.

Parameters:

Name Type Description Default
tenant_ids Any

Iterable of opaque tenant IDs.

required

Returns:

Type Description
Sequence[Tenant]

Found tenants (order is not guaranteed).

Source code in src/fastapi_tenancy/storage/database.py
Python
async def get_by_ids(self, tenant_ids: Any) -> Sequence[Tenant]:
    """Fetch multiple tenants in a single query using ``IN`` clause.

    Overrides the base implementation to avoid N+1 queries.

    Args:
        tenant_ids: Iterable of opaque tenant IDs.

    Returns:
        Found tenants (order is not guaranteed).
    """
    ids = list(tenant_ids)
    if not ids:
        return []
    async with self._session_factory() as session, session.begin():
        result = await session.execute(select(TenantModel).where(TenantModel.id.in_(ids)))
        return [m.to_domain() for m in result.scalars().all()]

search async

Python
search(
    query: str, limit: int = 10, _scan_limit: int = 100
) -> Sequence[Tenant]

Search tenants by name or identifier using a database-level LIKE query.

Overrides the O(n) base implementation with a proper DB query.

Parameters:

Name Type Description Default
query str

Case-insensitive substring to search for.

required
limit int

Maximum number of results to return.

10
_scan_limit int

Ignored (present for interface compatibility).

100

Returns:

Type Description
Sequence[Tenant]

Matching tenants, up to limit results.

Source code in src/fastapi_tenancy/storage/database.py
Python
async def search(
    self,
    query: str,
    limit: int = 10,
    _scan_limit: int = 100,
) -> Sequence[Tenant]:
    """Search tenants by name or identifier using a database-level LIKE query.

    Overrides the O(n) base implementation with a proper DB query.

    Args:
        query: Case-insensitive substring to search for.
        limit: Maximum number of results to return.
        _scan_limit: Ignored (present for interface compatibility).

    Returns:
        Matching tenants, up to *limit* results.
    """
    # Escape LIKE metacharacters before embedding in the pattern so that
    # a query containing '%' or '_' cannot enumerate all tenants or expand
    # to match unintended identifiers.
    escaped = query.lower().replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
    pattern = f"%{escaped}%"
    async with self._session_factory() as session, session.begin():
        # MSSQL does not support ILIKE — use LIKE instead.  MSSQL's LIKE is
        # case-insensitive by default when the column collation is CI (the
        # common default), which gives the same practical behaviour.
        # All other dialects use ILIKE for explicit case-insensitive matching.
        if self._dialect == DbDialect.MSSQL:
            where_clause = TenantModel.identifier.like(
                pattern, escape="\\"
            ) | TenantModel.name.like(pattern, escape="\\")
        else:
            where_clause = TenantModel.identifier.ilike(
                pattern, escape="\\"
            ) | TenantModel.name.ilike(pattern, escape="\\")
        result = await session.execute(
            select(TenantModel)
            .where(where_clause)
            .order_by(TenantModel.identifier)
            .limit(limit)
        )
        return [m.to_domain() for m in result.scalars().all()]

bulk_update_status async

Python
bulk_update_status(
    tenant_ids: Any, status: TenantStatus
) -> Sequence[Tenant]

Update status for multiple tenants in a single UPDATE ... WHERE IN query.

Overrides the N+1 base implementation.

Parameters:

Name Type Description Default
tenant_ids Any

IDs of the tenants to update.

required
status TenantStatus

New status applied to every matched tenant.

required

Returns:

Type Description
Sequence[Tenant]

Updated tenants.

Source code in src/fastapi_tenancy/storage/database.py
Python
async def bulk_update_status(
    self,
    tenant_ids: Any,
    status: TenantStatus,
) -> Sequence[Tenant]:
    """Update status for multiple tenants in a single ``UPDATE ... WHERE IN`` query.

    Overrides the N+1 base implementation.

    Args:
        tenant_ids: IDs of the tenants to update.
        status: New status applied to every matched tenant.

    Returns:
        Updated tenants.
    """
    ids = list(tenant_ids)
    if not ids:
        return []
    now = datetime.now(UTC)
    async with self._session_factory() as session, session.begin():
        # ``UPDATE … RETURNING`` is only supported on PostgreSQL (and
        # MariaDB 10.5+).  For other dialects we fall back to a SELECT
        # after the UPDATE so the method stays dialect-agnostic.
        # Use self._dialect (stored at __init__) — session.bind is
        # deprecated in SQLAlchemy 2.0 and returns None with AsyncSession.
        if self._dialect == DbDialect.POSTGRESQL:
            result = await session.execute(
                update(TenantModel)
                .where(TenantModel.id.in_(ids))
                .values(status=status.value, updated_at=now)
                .returning(TenantModel)
            )
            return [m.to_domain() for m in result.scalars().all()]
        # Fallback path: plain UPDATE then SELECT.
        await session.execute(
            update(TenantModel)
            .where(TenantModel.id.in_(ids))
            .values(status=status.value, updated_at=now)
        )
        fetch = await session.execute(select(TenantModel).where(TenantModel.id.in_(ids)))
        return [m.to_domain() for m in fetch.scalars().all()]

In-memory

InMemoryTenantStore

Python
InMemoryTenantStore()

Bases: TenantStore[Tenant]

In-memory tenant store for testing and local development.

All state lives in two Python dictionaries:

  • _tenants — maps tenant_id → Tenant
  • _identifier_map — maps identifier → tenant_id

Both indices are kept in sync by every mutating method so that get_by_id and get_by_identifier are always O(1).

Example — pytest fixture::

Text Only
import pytest
from fastapi_tenancy.storage.memory import InMemoryTenantStore

@pytest.fixture
async def tenant_store():
    store = InMemoryTenantStore()
    yield store
    store.clear()  # reset between tests

Example — seeded store::

Text Only
store = InMemoryTenantStore()
await store.create(Tenant(id="t1", identifier="acme", name="Acme"))
await store.create(Tenant(id="t2", identifier="globex", name="Globex"))

tenants = await store.list()
assert len(tenants) == 2

Initialise an empty in-memory store.

Source code in src/fastapi_tenancy/storage/memory.py
Python
def __init__(self) -> None:
    """Initialise an empty in-memory store."""
    self._tenants: dict[str, Tenant] = {}
    self._identifier_map: dict[str, str] = {}  # identifier → tenant_id
    # Protects all mutating operations.  Read-only methods (get_by_id,
    # list, count, etc.) do not acquire the lock — they are pure dict
    # reads, which are already atomic in CPython.  Under Trio or any
    # backend that allows concurrent async tasks to interleave at await
    # points, write methods MUST hold the lock for their full
    # read-check-mutate sequence to avoid lost updates or index corruption.
    self._lock: asyncio.Lock = asyncio.Lock()
    logger.debug("InMemoryTenantStore initialised")

get_by_id async

Python
get_by_id(tenant_id: str) -> Tenant

Return the tenant with tenant_id or raise TenantNotFoundError.

Parameters:

Name Type Description Default
tenant_id str

Opaque tenant primary key.

required

Returns:

Type Description
Tenant

Matching Tenant.

Raises:

Type Description
TenantNotFoundError

When tenant_id is not in the store.

Source code in src/fastapi_tenancy/storage/memory.py
Python
async def get_by_id(self, tenant_id: str) -> Tenant:
    """Return the tenant with *tenant_id* or raise ``TenantNotFoundError``.

    Args:
        tenant_id: Opaque tenant primary key.

    Returns:
        Matching ``Tenant``.

    Raises:
        TenantNotFoundError: When *tenant_id* is not in the store.
    """
    tenant = self._tenants.get(tenant_id)
    if tenant is None:
        raise TenantNotFoundError(identifier=tenant_id)
    return tenant

get_by_identifier async

Python
get_by_identifier(identifier: str) -> Tenant

Return the tenant with identifier or raise TenantNotFoundError.

Parameters:

Name Type Description Default
identifier str

Human-readable tenant slug.

required

Returns:

Type Description
Tenant

Matching Tenant.

Raises:

Type Description
TenantNotFoundError

When no tenant with identifier exists.

Source code in src/fastapi_tenancy/storage/memory.py
Python
async def get_by_identifier(self, identifier: str) -> Tenant:
    """Return the tenant with *identifier* or raise ``TenantNotFoundError``.

    Args:
        identifier: Human-readable tenant slug.

    Returns:
        Matching ``Tenant``.

    Raises:
        TenantNotFoundError: When no tenant with *identifier* exists.
    """
    tenant_id = self._identifier_map.get(identifier)
    if tenant_id is None:
        raise TenantNotFoundError(identifier=identifier)
    return self._tenants[tenant_id]

list async

Python
list(
    skip: int = 0,
    limit: int = 100,
    status: TenantStatus | None = None,
) -> list[Tenant]

Return a page of tenants sorted by created_at descending.

Parameters:

Name Type Description Default
skip int

Number of records to skip.

0
limit int

Maximum number of records to return.

100
status TenantStatus | None

Optional status filter.

None

Returns:

Type Description
list[Tenant]

Filtered and paginated list of tenants.

Source code in src/fastapi_tenancy/storage/memory.py
Python
async def list(
    self,
    skip: int = 0,
    limit: int = 100,
    status: TenantStatus | None = None,
) -> list[Tenant]:
    """Return a page of tenants sorted by ``created_at`` descending.

    Args:
        skip: Number of records to skip.
        limit: Maximum number of records to return.
        status: Optional status filter.

    Returns:
        Filtered and paginated list of tenants.
    """
    tenants = list(self._tenants.values())
    if status is not None:
        tenants = [t for t in tenants if t.status == status]
    tenants.sort(key=lambda t: t.created_at, reverse=True)
    return tenants[skip : skip + limit]

count async

Python
count(status: TenantStatus | None = None) -> int

Return the number of stored tenants, optionally filtered by status.

Parameters:

Name Type Description Default
status TenantStatus | None

Optional status filter.

None

Returns:

Type Description
int

Non-negative integer count.

Source code in src/fastapi_tenancy/storage/memory.py
Python
async def count(self, status: TenantStatus | None = None) -> int:
    """Return the number of stored tenants, optionally filtered by status.

    Args:
        status: Optional status filter.

    Returns:
        Non-negative integer count.
    """
    if status is None:
        return len(self._tenants)
    return sum(1 for t in self._tenants.values() if t.status == status)

exists async

Python
exists(tenant_id: str) -> bool

Return True when a tenant with tenant_id exists.

Parameters:

Name Type Description Default
tenant_id str

Opaque tenant primary key.

required

Returns:

Type Description
bool

Existence flag.

Source code in src/fastapi_tenancy/storage/memory.py
Python
async def exists(self, tenant_id: str) -> bool:
    """Return ``True`` when a tenant with *tenant_id* exists.

    Args:
        tenant_id: Opaque tenant primary key.

    Returns:
        Existence flag.
    """
    return tenant_id in self._tenants

get_by_ids async

Python
get_by_ids(tenant_ids: Any) -> Sequence[Tenant]

Return all tenants whose IDs appear in tenant_ids.

Silently skips IDs with no matching tenant.

Parameters:

Name Type Description Default
tenant_ids Any

Iterable of opaque tenant IDs.

required

Returns:

Type Description
Sequence[Tenant]

Found tenants in the order their IDs appeared.

Source code in src/fastapi_tenancy/storage/memory.py
Python
async def get_by_ids(self, tenant_ids: Any) -> Sequence[Tenant]:
    """Return all tenants whose IDs appear in *tenant_ids*.

    Silently skips IDs with no matching tenant.

    Args:
        tenant_ids: Iterable of opaque tenant IDs.

    Returns:
        Found tenants in the order their IDs appeared.
    """
    return [self._tenants[tid] for tid in tenant_ids if tid in self._tenants]

create async

Python
create(tenant: Tenant) -> Tenant

Persist a new tenant in memory.

Parameters:

Name Type Description Default
tenant Tenant

Fully-populated tenant object.

required

Returns:

Type Description
Tenant

The stored tenant (identical to input; no server-side transforms).

Raises:

Type Description
ValueError

When id or identifier already exists.

Source code in src/fastapi_tenancy/storage/memory.py
Python
async def create(self, tenant: Tenant) -> Tenant:
    """Persist a new tenant in memory.

    Args:
        tenant: Fully-populated tenant object.

    Returns:
        The stored tenant (identical to input; no server-side transforms).

    Raises:
        ValueError: When ``id`` or ``identifier`` already exists.
    """
    async with self._lock:
        if tenant.id in self._tenants:
            msg = f"Tenant id={tenant.id!r} already exists."
            raise ValueError(msg)
        if tenant.identifier in self._identifier_map:
            msg = f"Tenant identifier={tenant.identifier!r} already exists."
            raise ValueError(msg)

        self._tenants[tenant.id] = tenant
        self._identifier_map[tenant.identifier] = tenant.id
    logger.debug("Created tenant id=%s identifier=%s", tenant.id, tenant.identifier)
    return tenant

update async

Python
update(tenant: Tenant) -> Tenant

Replace all mutable fields of an existing tenant.

If the identifier changes, the old identifier key is removed from the index and the new one is added atomically.

Parameters:

Name Type Description Default
tenant Tenant

Updated tenant object. tenant.id must exist.

required

Returns:

Type Description
Tenant

Updated tenant with a refreshed updated_at timestamp.

Raises:

Type Description
TenantNotFoundError

When tenant.id is not in the store.

Source code in src/fastapi_tenancy/storage/memory.py
Python
async def update(self, tenant: Tenant) -> Tenant:
    """Replace all mutable fields of an existing tenant.

    If the identifier changes, the old identifier key is removed from
    the index and the new one is added atomically.

    Args:
        tenant: Updated tenant object.  ``tenant.id`` must exist.

    Returns:
        Updated tenant with a refreshed ``updated_at`` timestamp.

    Raises:
        TenantNotFoundError: When ``tenant.id`` is not in the store.
    """
    async with self._lock:
        old = self._tenants.get(tenant.id)
        if old is None:
            raise TenantNotFoundError(identifier=tenant.id)

        if old.identifier != tenant.identifier:
            del self._identifier_map[old.identifier]
            self._identifier_map[tenant.identifier] = tenant.id

        updated = tenant.model_copy(update={"updated_at": datetime.now(UTC)})
        self._tenants[tenant.id] = updated
    logger.debug("Updated tenant id=%s", tenant.id)
    return updated

delete async

Python
delete(tenant_id: str) -> None

Remove a tenant from both internal indices.

Parameters:

Name Type Description Default
tenant_id str

ID of the tenant to delete.

required

Raises:

Type Description
TenantNotFoundError

When tenant_id is not in the store.

Source code in src/fastapi_tenancy/storage/memory.py
Python
async def delete(self, tenant_id: str) -> None:
    """Remove a tenant from both internal indices.

    Args:
        tenant_id: ID of the tenant to delete.

    Raises:
        TenantNotFoundError: When *tenant_id* is not in the store.
    """
    async with self._lock:
        tenant = self._tenants.get(tenant_id)
        if tenant is None:
            raise TenantNotFoundError(identifier=tenant_id)

        del self._identifier_map[tenant.identifier]
        del self._tenants[tenant_id]
    logger.debug("Deleted tenant id=%s", tenant_id)

set_status async

Python
set_status(tenant_id: str, status: TenantStatus) -> Tenant

Update the lifecycle status of a tenant.

Parameters:

Name Type Description Default
tenant_id str

ID of the tenant to update.

required
status TenantStatus

New status value.

required

Returns:

Type Description
Tenant

Updated tenant with a refreshed updated_at timestamp.

Raises:

Type Description
TenantNotFoundError

When tenant_id is not in the store.

Source code in src/fastapi_tenancy/storage/memory.py
Python
async def set_status(self, tenant_id: str, status: TenantStatus) -> Tenant:
    """Update the lifecycle status of a tenant.

    Args:
        tenant_id: ID of the tenant to update.
        status: New status value.

    Returns:
        Updated tenant with a refreshed ``updated_at`` timestamp.

    Raises:
        TenantNotFoundError: When *tenant_id* is not in the store.
    """
    async with self._lock:
        tenant = self._tenants.get(tenant_id)
        if tenant is None:
            raise TenantNotFoundError(identifier=tenant_id)
        updated = tenant.model_copy(update={"status": status, "updated_at": datetime.now(UTC)})
        self._tenants[tenant_id] = updated
    logger.debug("Set tenant %s status → %s", tenant_id, status.value)
    return updated

update_metadata async

Python
update_metadata(
    tenant_id: str, metadata: dict[str, Any]
) -> Tenant

Shallow-merge metadata into the tenant's metadata dictionary.

Parameters:

Name Type Description Default
tenant_id str

ID of the tenant to update.

required
metadata dict[str, Any]

Key-value pairs to merge.

required

Returns:

Type Description
Tenant

Updated tenant.

Raises:

Type Description
TenantNotFoundError

When tenant_id is not in the store.

Source code in src/fastapi_tenancy/storage/memory.py
Python
async def update_metadata(
    self,
    tenant_id: str,
    metadata: dict[str, Any],
) -> Tenant:
    """Shallow-merge *metadata* into the tenant's metadata dictionary.

    Args:
        tenant_id: ID of the tenant to update.
        metadata: Key-value pairs to merge.

    Returns:
        Updated tenant.

    Raises:
        TenantNotFoundError: When *tenant_id* is not in the store.
    """
    async with self._lock:
        tenant = self._tenants.get(tenant_id)
        if tenant is None:
            raise TenantNotFoundError(identifier=tenant_id)
        updated = tenant.model_copy(
            update={
                "metadata": {**tenant.metadata, **metadata},
                "updated_at": datetime.now(UTC),
            }
        )
        self._tenants[tenant_id] = updated
    logger.debug("Updated metadata for tenant id=%s", tenant_id)
    return updated

bulk_update_status async

Python
bulk_update_status(
    tenant_ids: Any, status: TenantStatus
) -> Sequence[Tenant]

Update the status of multiple tenants in one pass.

Uses a single shared timestamp so all updated_at values are identical across the batch. IDs with no matching tenant are skipped.

Parameters:

Name Type Description Default
tenant_ids Any

Iterable of opaque tenant IDs.

required
status TenantStatus

New status applied to every matched tenant.

required

Returns:

Type Description
Sequence[Tenant]

Updated tenants in the order their IDs appeared.

Source code in src/fastapi_tenancy/storage/memory.py
Python
async def bulk_update_status(
    self,
    tenant_ids: Any,
    status: TenantStatus,
) -> Sequence[Tenant]:
    """Update the status of multiple tenants in one pass.

    Uses a single shared timestamp so all ``updated_at`` values are
    identical across the batch.  IDs with no matching tenant are skipped.

    Args:
        tenant_ids: Iterable of opaque tenant IDs.
        status: New status applied to every matched tenant.

    Returns:
        Updated tenants in the order their IDs appeared.
    """
    timestamp = datetime.now(UTC)
    updated: list[Tenant] = []
    async with self._lock:
        for tid in tenant_ids:
            tenant = self._tenants.get(tid)
            if tenant is not None:
                result = tenant.model_copy(update={"status": status, "updated_at": timestamp})
                self._tenants[tid] = result
                updated.append(result)
    logger.debug("Bulk updated %d tenants → %s", len(updated), status.value)
    return updated

search async

Python
search(
    query: str, limit: int = 10, _scan_limit: int = 100
) -> Sequence[Tenant]

Search tenants by name or identifier substring.

Results are ranked by relevance

  1. Exact identifier match.
  2. Identifier starts with query.
  3. Name contains query.
  4. Identifier contains query.

Parameters:

Name Type Description Default
query str

Case-insensitive search string.

required
limit int

Maximum number of results to return.

10
_scan_limit int

Accepted for interface compatibility; all records scanned.

100

Returns:

Type Description
Sequence[Tenant]

Matching tenants sorted by relevance, up to limit results.

Source code in src/fastapi_tenancy/storage/memory.py
Python
async def search(
    self,
    query: str,
    limit: int = 10,
    _scan_limit: int = 100,
) -> Sequence[Tenant]:
    """Search tenants by name or identifier substring.

    Results are ranked by relevance:
        1. Exact identifier match.
        2. Identifier starts with *query*.
        3. Name contains *query*.
        4. Identifier contains *query*.

    Args:
        query: Case-insensitive search string.
        limit: Maximum number of results to return.
        _scan_limit: Accepted for interface compatibility; all records scanned.

    Returns:
        Matching tenants sorted by relevance, up to *limit* results.
    """
    q = query.lower()
    matches = [
        t for t in self._tenants.values() if q in t.identifier.lower() or q in t.name.lower()
    ]
    matches.sort(
        key=lambda t: (
            t.identifier.lower() == q,
            t.identifier.lower().startswith(q),
            q in t.name.lower(),
        ),
        reverse=True,
    )
    return matches[:limit]

clear

Python
clear() -> None

Remove all tenants from both internal indices.

Use in test teardown / fixture cleanup to reset state between test cases::

Text Only
@pytest.fixture
async def store():
    store = InMemoryTenantStore()
    yield store
    store.clear()
Source code in src/fastapi_tenancy/storage/memory.py
Python
def clear(self) -> None:
    """Remove all tenants from both internal indices.

    Use in test teardown / fixture cleanup to reset state between test
    cases::

        @pytest.fixture
        async def store():
            store = InMemoryTenantStore()
            yield store
            store.clear()
    """
    self._tenants.clear()
    self._identifier_map.clear()
    logger.debug("InMemoryTenantStore cleared")

get_all

Python
get_all() -> dict[str, Tenant]

Return a shallow-copy snapshot of all stored tenants keyed by ID.

Returns:

Type Description
dict[str, Tenant]

{tenant_id: Tenant} dict. Mutating it does not affect

dict[str, Tenant]

the store.

Source code in src/fastapi_tenancy/storage/memory.py
Python
def get_all(self) -> dict[str, Tenant]:
    """Return a shallow-copy snapshot of all stored tenants keyed by ID.

    Returns:
        ``{tenant_id: Tenant}`` dict.  Mutating it does not affect
        the store.
    """
    return dict(self._tenants)

statistics

Python
statistics() -> dict[str, Any]

Return a summary of current store state for monitoring and debugging.

Returns:

Type Description
dict[str, Any]

Dictionary with: - total: total number of stored tenants. - by_status: count per status value. - identifier_index_size: should always equal total.

Source code in src/fastapi_tenancy/storage/memory.py
Python
def statistics(self) -> dict[str, Any]:
    """Return a summary of current store state for monitoring and debugging.

    Returns:
        Dictionary with:
            - ``total``: total number of stored tenants.
            - ``by_status``: count per status value.
            - ``identifier_index_size``: should always equal ``total``.
    """
    by_status: dict[str, int] = {}
    for tenant in self._tenants.values():
        by_status[tenant.status.value] = by_status.get(tenant.status.value, 0) + 1

    return {
        "total": len(self._tenants),
        "by_status": by_status,
        "identifier_index_size": len(self._identifier_map),
    }

Redis

RedisTenantStore

Python
RedisTenantStore(
    redis_url: str,
    primary_store: TenantStore[Tenant],
    ttl: int = 3600,
    key_prefix: str = "tenant",
)

Bases: TenantStore[Tenant]

Redis write-through cache on top of a primary :class:TenantStore.

Reads are served from Redis when the key is warm. Every write goes to the primary store first and then populates (or invalidates) the cache — Redis is never the source of truth.

Parameters:

Name Type Description Default
redis_url str

Redis connection URL (e.g. redis://localhost:6379/0).

required
primary_store TenantStore[Tenant]

Backing :class:TenantStore for cold reads and writes.

required
ttl int

Cache TTL in seconds (default 3600 = 1 hour).

3600
key_prefix str

Prefix applied to all Redis keys. Use a distinct prefix per application to avoid key collisions in a shared Redis instance.

'tenant'

Example::

Text Only
from fastapi_tenancy.storage.database import SQLAlchemyTenantStore
from fastapi_tenancy.storage.redis import RedisTenantStore

primary = SQLAlchemyTenantStore(
    database_url="postgresql+asyncpg://user:pass@localhost/myapp"
)
await primary.initialize()

cache = RedisTenantStore(
    redis_url="redis://localhost:6379/0",
    primary_store=primary,
    ttl=1800,
)
tenant = await cache.get_by_identifier("acme-corp")
await cache.close()
Source code in src/fastapi_tenancy/storage/redis.py
Python
def __init__(
    self,
    redis_url: str,
    primary_store: TenantStore[Tenant],
    ttl: int = 3600,
    key_prefix: str = "tenant",
) -> None:
    aioredis = _require_redis()
    self._primary = primary_store
    self._ttl = ttl
    self._prefix = key_prefix
    self._redis: Any = aioredis.from_url(
        redis_url,
        encoding="utf-8",
        decode_responses=False,
    )
    logger.info("RedisTenantStore initialised ttl=%ds prefix=%s", ttl, key_prefix)

initialize async

Python
initialize() -> None

Initialise the primary store if it supports it.

Delegates to the primary store's initialize() so callers do not need to initialise both stores separately, satisfying LSP: a RedisTenantStore can be used anywhere a TenantStore is expected without additional lifecycle management by the caller.

Source code in src/fastapi_tenancy/storage/redis.py
Python
async def initialize(self) -> None:
    """Initialise the primary store if it supports it.

    Delegates to the primary store's ``initialize()`` so callers do not
    need to initialise both stores separately, satisfying LSP: a
    ``RedisTenantStore`` can be used anywhere a ``TenantStore`` is expected
    without additional lifecycle management by the caller.
    """
    if hasattr(self._primary, "initialize"):
        await self._primary.initialize()
        logger.info("RedisTenantStore: delegated initialize() to primary store")

close async

Python
close() -> None

Close the Redis connection pool and the primary store.

Both resources are closed so callers only need to call close() on the outermost store.

Source code in src/fastapi_tenancy/storage/redis.py
Python
async def close(self) -> None:
    """Close the Redis connection pool and the primary store.

    Both resources are closed so callers only need to call ``close()``
    on the outermost store.
    """
    await self._redis.aclose()
    if hasattr(self._primary, "close"):
        await self._primary.close()
        logger.info("RedisTenantStore: delegated close() to primary store")
    logger.info("RedisTenantStore closed")

get_by_id async

Python
get_by_id(tenant_id: str) -> Tenant

Return tenant from cache; fall back to primary on miss.

Parameters:

Name Type Description Default
tenant_id str

Opaque tenant primary key.

required

Returns:

Name Type Description
Matching Tenant

class:~fastapi_tenancy.core.types.Tenant.

Raises:

Type Description
TenantNotFoundError

Propagated from the primary store.

Source code in src/fastapi_tenancy/storage/redis.py
Python
async def get_by_id(self, tenant_id: str) -> Tenant:
    """Return tenant from cache; fall back to primary on miss.

    Args:
        tenant_id: Opaque tenant primary key.

    Returns:
        Matching :class:`~fastapi_tenancy.core.types.Tenant`.

    Raises:
        TenantNotFoundError: Propagated from the primary store.
    """
    cached = await self._redis.get(self._id_key(tenant_id))
    if cached is not None:
        logger.debug("Cache HIT id=%s", tenant_id)
        try:
            return self._deserialize(cached)
        except Exception:
            # Corrupt cache entry — treat as miss and refetch from primary.
            logger.warning("Corrupt cache entry for id=%s, treating as miss", tenant_id)
    logger.debug("Cache MISS id=%s", tenant_id)
    tenant = await self._primary.get_by_id(tenant_id)
    await self._cache_set(tenant)
    return tenant

get_by_identifier async

Python
get_by_identifier(identifier: str) -> Tenant

Return tenant from cache; fall back to primary on miss.

Parameters:

Name Type Description Default
identifier str

Human-readable tenant slug.

required

Returns:

Name Type Description
Matching Tenant

class:~fastapi_tenancy.core.types.Tenant.

Raises:

Type Description
TenantNotFoundError

Propagated from the primary store.

Source code in src/fastapi_tenancy/storage/redis.py
Python
async def get_by_identifier(self, identifier: str) -> Tenant:
    """Return tenant from cache; fall back to primary on miss.

    Args:
        identifier: Human-readable tenant slug.

    Returns:
        Matching :class:`~fastapi_tenancy.core.types.Tenant`.

    Raises:
        TenantNotFoundError: Propagated from the primary store.
    """
    cached = await self._redis.get(self._slug_key(identifier))
    if cached is not None:
        logger.debug("Cache HIT identifier=%s", identifier)
        return self._deserialize(cached)
    logger.debug("Cache MISS identifier=%s", identifier)
    tenant = await self._primary.get_by_identifier(identifier)
    await self._cache_set(tenant)
    return tenant

list async

Python
list(
    skip: int = 0,
    limit: int = 100,
    status: TenantStatus | None = None,
) -> Sequence[Tenant]

Delegate list to primary — paginated results are not cached.

Caching paginated list results adds complex invalidation overhead for negligible benefit. Always reads from the primary store.

Parameters:

Name Type Description Default
skip int

Pagination offset.

0
limit int

Maximum number of results.

100
status TenantStatus | None

Optional status filter.

None

Returns:

Type Description
Sequence[Tenant]

Tenants from the primary store.

Source code in src/fastapi_tenancy/storage/redis.py
Python
async def list(
    self,
    skip: int = 0,
    limit: int = 100,
    status: TenantStatus | None = None,
) -> Sequence[Tenant]:
    """Delegate list to primary — paginated results are not cached.

    Caching paginated list results adds complex invalidation overhead
    for negligible benefit.  Always reads from the primary store.

    Args:
        skip: Pagination offset.
        limit: Maximum number of results.
        status: Optional status filter.

    Returns:
        Tenants from the primary store.
    """
    return await self._primary.list(skip=skip, limit=limit, status=status)

count async

Python
count(status: TenantStatus | None = None) -> int

Delegate count to primary store.

Parameters:

Name Type Description Default
status TenantStatus | None

Optional status filter.

None

Returns:

Type Description
int

Total matching tenant count.

Source code in src/fastapi_tenancy/storage/redis.py
Python
async def count(self, status: TenantStatus | None = None) -> int:
    """Delegate count to primary store.

    Args:
        status: Optional status filter.

    Returns:
        Total matching tenant count.
    """
    return await self._primary.count(status=status)

exists async

Python
exists(tenant_id: str) -> bool

Return True if the tenant exists in cache or primary.

Checks Redis first (O(1)); falls back to primary on a cache miss.

Parameters:

Name Type Description Default
tenant_id str

Opaque tenant primary key.

required

Returns:

Type Description
bool

Existence flag.

Source code in src/fastapi_tenancy/storage/redis.py
Python
async def exists(self, tenant_id: str) -> bool:
    """Return ``True`` if the tenant exists in cache or primary.

    Checks Redis first (O(1)); falls back to primary on a cache miss.

    Args:
        tenant_id: Opaque tenant primary key.

    Returns:
        Existence flag.
    """
    if await self._redis.exists(self._id_key(tenant_id)):
        return True
    return await self._primary.exists(tenant_id)

create async

Python
create(tenant: Tenant) -> Tenant

Create in primary, then populate cache.

Parameters:

Name Type Description Default
tenant Tenant

Fully-populated tenant object.

required

Returns:

Type Description
Tenant

Stored tenant with server-generated timestamps.

Raises:

Type Description
ValueError

When id or identifier already exists.

Source code in src/fastapi_tenancy/storage/redis.py
Python
async def create(self, tenant: Tenant) -> Tenant:
    """Create in primary, then populate cache.

    Args:
        tenant: Fully-populated tenant object.

    Returns:
        Stored tenant with server-generated timestamps.

    Raises:
        ValueError: When ``id`` or ``identifier`` already exists.
    """
    created = await self._primary.create(tenant)
    await self._cache_set(created)
    logger.info("Created and cached tenant id=%s", created.id)
    return created

update async

Python
update(tenant: Tenant) -> Tenant

Update primary, invalidate old cache keys, repopulate with new values.

Cache-first optimisation: tries to read the old identifier from the cache before falling back to the primary store, avoiding an extra database round-trip on warm cache paths.

Parameters:

Name Type Description Default
tenant Tenant

Updated tenant object.

required

Returns:

Type Description
Tenant

Updated tenant.

Raises:

Type Description
TenantNotFoundError

When tenant.id does not exist.

Source code in src/fastapi_tenancy/storage/redis.py
Python
async def update(self, tenant: Tenant) -> Tenant:
    """Update primary, invalidate old cache keys, repopulate with new values.

    Cache-first optimisation: tries to read the old identifier from the
    cache before falling back to the primary store, avoiding an extra
    database round-trip on warm cache paths.

    Args:
        tenant: Updated tenant object.

    Returns:
        Updated tenant.

    Raises:
        TenantNotFoundError: When ``tenant.id`` does not exist.
    """
    # Try cache first to avoid extra primary store round-trip.
    old = await self._get_old_tenant(tenant.id)
    if old is None:
        old = await self._primary.get_by_id(tenant.id)

    updated = await self._primary.update(tenant)
    await self._cache_invalidate(old.id, old.identifier)
    await self._cache_set(updated)
    logger.info("Updated and re-cached tenant id=%s", updated.id)
    return updated

delete async

Python
delete(tenant_id: str) -> None

Delete from primary and invalidate cache keys.

Parameters:

Name Type Description Default
tenant_id str

ID of the tenant to delete.

required

Raises:

Type Description
TenantNotFoundError

When tenant_id does not exist.

Source code in src/fastapi_tenancy/storage/redis.py
Python
async def delete(self, tenant_id: str) -> None:
    """Delete from primary and invalidate cache keys.

    Args:
        tenant_id: ID of the tenant to delete.

    Raises:
        TenantNotFoundError: When *tenant_id* does not exist.
    """
    old = await self._get_old_tenant(tenant_id)
    if old is None:
        old = await self._primary.get_by_id(tenant_id)

    await self._primary.delete(tenant_id)
    await self._cache_invalidate(old.id, old.identifier)
    logger.info("Deleted and invalidated cache tenant id=%s", tenant_id)

set_status async

Python
set_status(tenant_id: str, status: TenantStatus) -> Tenant

Update status in primary and refresh cache.

Cache-first optimisation for reading the old identifier.

Parameters:

Name Type Description Default
tenant_id str

ID of the tenant to update.

required
status TenantStatus

New lifecycle status.

required

Returns:

Type Description
Tenant

Updated tenant.

Raises:

Type Description
TenantNotFoundError

When tenant_id does not exist.

Source code in src/fastapi_tenancy/storage/redis.py
Python
async def set_status(self, tenant_id: str, status: TenantStatus) -> Tenant:
    """Update status in primary and refresh cache.

    Cache-first optimisation for reading the old identifier.

    Args:
        tenant_id: ID of the tenant to update.
        status: New lifecycle status.

    Returns:
        Updated tenant.

    Raises:
        TenantNotFoundError: When *tenant_id* does not exist.
    """
    old = await self._get_old_tenant(tenant_id)
    if old is None:
        old = await self._primary.get_by_id(tenant_id)

    updated = await self._primary.set_status(tenant_id, status)
    await self._cache_invalidate(old.id, old.identifier)
    await self._cache_set(updated)
    return updated

update_metadata async

Python
update_metadata(
    tenant_id: str, metadata: dict[str, Any]
) -> Tenant

Merge metadata in primary and refresh cache.

Parameters:

Name Type Description Default
tenant_id str

ID of the tenant to update.

required
metadata dict[str, Any]

Key-value pairs to shallow-merge.

required

Returns:

Type Description
Tenant

Updated tenant.

Raises:

Type Description
TenantNotFoundError

When tenant_id does not exist.

Source code in src/fastapi_tenancy/storage/redis.py
Python
async def update_metadata(
    self,
    tenant_id: str,
    metadata: dict[str, Any],
) -> Tenant:
    """Merge metadata in primary and refresh cache.

    Args:
        tenant_id: ID of the tenant to update.
        metadata: Key-value pairs to shallow-merge.

    Returns:
        Updated tenant.

    Raises:
        TenantNotFoundError: When *tenant_id* does not exist.
    """
    old = await self._get_old_tenant(tenant_id)
    if old is None:
        old = await self._primary.get_by_id(tenant_id)

    updated = await self._primary.update_metadata(tenant_id, metadata)
    await self._cache_invalidate(old.id, old.identifier)
    await self._cache_set(updated)
    return updated

get_by_ids async

Python
get_by_ids(tenant_ids: Iterable[str]) -> Sequence[Tenant]

Fetch multiple tenants — cache hits served first; misses delegated.

Uses a Redis pipeline to batch all cache lookups into one round-trip. The returned list preserves the same order as tenant_ids: IDs that are not found in the cache or primary store are silently omitted.

Parameters:

Name Type Description Default
tenant_ids Iterable[str]

Iterable of opaque tenant IDs.

required

Returns:

Type Description
Sequence[Tenant]

Found tenants in the same order as tenant_ids.

Source code in src/fastapi_tenancy/storage/redis.py
Python
async def get_by_ids(self, tenant_ids: Iterable[str]) -> Sequence[Tenant]:
    """Fetch multiple tenants — cache hits served first; misses delegated.

    Uses a Redis pipeline to batch all cache lookups into one round-trip.
    The returned list preserves the **same order as** *tenant_ids*: IDs
    that are not found in the cache or primary store are silently omitted.

    Args:
        tenant_ids: Iterable of opaque tenant IDs.

    Returns:
        Found tenants in the same order as *tenant_ids*.
    """
    ids = list(tenant_ids)
    if not ids:
        return []

    # Batch cache lookup via pipeline.
    pipe = self._redis.pipeline()
    for tid in ids:
        pipe.get(self._id_key(tid))
    raw_results = await pipe.execute()

    # Build a map of id → Tenant for hits; collect miss ids for DB fetch.
    tenant_map: dict[str, Tenant] = {}
    miss_ids: list[str] = []

    for tid, raw in zip(ids, raw_results, strict=False):
        if raw is not None:
            try:
                tenant_map[tid] = self._deserialize(raw)
                continue
            except Exception:  # noqa: S110
                pass  # Treat corrupt entry as a miss.
        miss_ids.append(tid)

    if miss_ids:
        fetched = await self._primary.get_by_ids(miss_ids)
        for tenant in fetched:
            await self._cache_set(tenant)
            tenant_map[tenant.id] = tenant

    # Return in original input order, skipping IDs not found anywhere.
    return [tenant_map[tid] for tid in ids if tid in tenant_map]

bulk_update_status async

Python
bulk_update_status(
    tenant_ids: Iterable[str], status: TenantStatus
) -> Sequence[Tenant]

Update status for multiple tenants and invalidate their cache entries.

Parameters:

Name Type Description Default
tenant_ids Iterable[str]

IDs of tenants to update.

required
status TenantStatus

New status applied to every matched tenant.

required

Returns:

Type Description
Sequence[Tenant]

Updated tenants.

Source code in src/fastapi_tenancy/storage/redis.py
Python
async def bulk_update_status(
    self,
    tenant_ids: Iterable[str],
    status: TenantStatus,
) -> Sequence[Tenant]:
    """Update status for multiple tenants and invalidate their cache entries.

    Args:
        tenant_ids: IDs of tenants to update.
        status: New status applied to every matched tenant.

    Returns:
        Updated tenants.
    """
    ids = list(tenant_ids)
    if not ids:
        return []
    updated = await self._primary.bulk_update_status(ids, status)
    for tenant in updated:
        await self._cache_set(tenant)
    return updated

invalidate_all async

Python
invalidate_all() -> int

Delete every cache key owned by this store.

Uses SCAN with a count hint rather than KEYS to avoid blocking the Redis event loop on large keyspaces.

Returns:

Type Description
int

Number of Redis keys deleted.

Source code in src/fastapi_tenancy/storage/redis.py
Python
async def invalidate_all(self) -> int:
    """Delete every cache key owned by this store.

    Uses ``SCAN`` with a count hint rather than ``KEYS`` to avoid
    blocking the Redis event loop on large keyspaces.

    Returns:
        Number of Redis keys deleted.
    """
    pattern = f"{self._prefix}:*"
    keys: list[bytes] = []
    async for key in self._redis.scan_iter(match=pattern, count=100):
        keys.append(key)
    if keys:
        deleted: int = await self._redis.delete(*keys)
        logger.info("Invalidated %d cache entries (pattern=%s)", deleted, pattern)
        return deleted
    return 0

cache_stats async

Python
cache_stats() -> dict[str, Any]

Return lightweight cache statistics.

Scans the keyspace via SCAN — does not block Redis.

Returns:

Type Description
dict[str, Any]

Dictionary with total_keys, ttl_seconds, and key_prefix.

Source code in src/fastapi_tenancy/storage/redis.py
Python
async def cache_stats(self) -> dict[str, Any]:
    """Return lightweight cache statistics.

    Scans the keyspace via ``SCAN`` — does not block Redis.

    Returns:
        Dictionary with ``total_keys``, ``ttl_seconds``, and ``key_prefix``.
    """
    count = 0
    async for _ in self._redis.scan_iter(match=f"{self._prefix}:*", count=100):
        count += 1
    return {
        "total_keys": count,
        "ttl_seconds": self._ttl,
        "key_prefix": self._prefix,
    }