Skip to content

Migrations

TenantMigrationManager

Python
TenantMigrationManager(
    config: TenancyConfig,
    store: TenantStore[Any],
    alembic_cfg_path: str | Path = "alembic.ini",
    executor: Any | None = None,
)

Alembic migration manager for multi-tenant databases.

Supports SCHEMA and DATABASE isolation strategies.

Parameters:

Name Type Description Default
config TenancyConfig

Tenancy configuration.

required
store TenantStore[Any]

Tenant store used by upgrade_all / downgrade_all.

required
alembic_cfg_path str | Path

Path to alembic.ini (default: "alembic.ini").

'alembic.ini'
executor Any | None

Optional concurrent.futures.Executor for running synchronous Alembic migrations. Defaults to None, which uses the event loop's default ThreadPoolExecutor.

With concurrency=20 workers, the default executor (size = min(32, os.cpu_count() + 4)) may be exhausted when the CPU count is low. Supply a larger executor to avoid starvation::

Text Only
from concurrent.futures import ThreadPoolExecutor
migrator = TenantMigrationManager(
    config, store,
    executor=ThreadPoolExecutor(max_workers=30),
)
None

Example::

Text Only
migrator = TenantMigrationManager(config, store)

# Migrate a single tenant
await migrator.upgrade_tenant(tenant, revision="head")

# Migrate all tenants with 20 concurrent workers
results = await migrator.upgrade_all(revision="head", concurrency=20)
failed = [r for r in results if not r["success"]]
Source code in src/fastapi_tenancy/migrations/manager.py
Python
def __init__(
    self,
    config: TenancyConfig,
    store: TenantStore[Any],
    alembic_cfg_path: str | Path = "alembic.ini",
    executor: Any | None = None,
) -> None:
    self._config = config
    self._store = store
    self._alembic_cfg_path = Path(alembic_cfg_path)
    # Optional custom executor for thread-pool control (see docstring).
    self._executor: Any = executor

    if not self._alembic_cfg_path.exists():
        logger.warning(
            "alembic.ini not found at %s — migration calls will fail.",
            self._alembic_cfg_path.resolve(),
        )

upgrade_tenant async

Python
upgrade_tenant(
    tenant: Tenant, revision: str = "head"
) -> None

Run Alembic upgrade for tenant.

Executes the migration in a thread pool executor to avoid blocking the asyncio event loop with synchronous Alembic I/O.

Parameters:

Name Type Description Default
tenant Tenant

Target tenant.

required
revision str

Alembic revision target (default: "head").

'head'

Raises:

Type Description
MigrationError

When the migration fails.

Source code in src/fastapi_tenancy/migrations/manager.py
Python
async def upgrade_tenant(
    self,
    tenant: Tenant,
    revision: str = "head",
) -> None:
    """Run Alembic ``upgrade`` for *tenant*.

    Executes the migration in a thread pool executor to avoid blocking
    the asyncio event loop with synchronous Alembic I/O.

    Args:
        tenant: Target tenant.
        revision: Alembic revision target (default: ``"head"``).

    Raises:
        MigrationError: When the migration fails.
    """
    logger.info("Upgrading tenant %s to revision %r", tenant.id, revision)
    try:
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(
            self._executor,
            self._run_migration_sync,
            tenant,
            "upgrade",
            revision,
        )
    except MigrationError:
        raise
    except Exception as exc:
        raise MigrationError(
            tenant_id=tenant.id,
            operation="upgrade",
            reason=str(exc),
        ) from exc
    logger.info("Tenant %s upgraded to %r successfully", tenant.id, revision)

downgrade_tenant async

Python
downgrade_tenant(
    tenant: Tenant, revision: str = "-1"
) -> None

Run Alembic downgrade for tenant.

Parameters:

Name Type Description Default
tenant Tenant

Target tenant.

required
revision str

Alembic revision to downgrade to (default: "-1" meaning one step back).

'-1'

Raises:

Type Description
MigrationError

When the migration fails.

Source code in src/fastapi_tenancy/migrations/manager.py
Python
async def downgrade_tenant(
    self,
    tenant: Tenant,
    revision: str = "-1",
) -> None:
    """Run Alembic ``downgrade`` for *tenant*.

    Args:
        tenant: Target tenant.
        revision: Alembic revision to downgrade to (default: ``"-1"``
            meaning one step back).

    Raises:
        MigrationError: When the migration fails.
    """
    logger.warning("Downgrading tenant %s to revision %r", tenant.id, revision)
    try:
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(
            self._executor,
            self._run_migration_sync,
            tenant,
            "downgrade",
            revision,
        )
    except MigrationError:
        raise
    except Exception as exc:
        raise MigrationError(
            tenant_id=tenant.id,
            operation="downgrade",
            reason=str(exc),
        ) from exc

get_current_revision async

Python
get_current_revision(tenant: Tenant) -> str | None

Return the current Alembic revision for tenant.

Parameters:

Name Type Description Default
tenant Tenant

Target tenant.

required

Returns:

Type Description
str | None

The current revision string (e.g. "abc123"), or None

str | None

when no migrations have been applied.

Source code in src/fastapi_tenancy/migrations/manager.py
Python
async def get_current_revision(self, tenant: Tenant) -> str | None:
    """Return the current Alembic revision for *tenant*.

    Args:
        tenant: Target tenant.

    Returns:
        The current revision string (e.g. ``"abc123"``), or ``None``
        when no migrations have been applied.
    """
    try:
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(
            self._executor,
            self._get_current_revision_sync,
            tenant,
        )
    except Exception as exc:
        logger.warning("Could not read revision for tenant %s: %s", tenant.id, exc)
        return None

upgrade_all async

Python
upgrade_all(
    revision: str = "head",
    concurrency: int = 10,
    page_size: int = 100,
) -> list[dict[str, Any]]

Upgrade all active tenants to revision with bounded concurrency.

Loads tenants in pages from the store to avoid loading the entire fleet into memory at once. Uses asyncio.Semaphore(concurrency) to cap parallel database connections.

Parameters:

Name Type Description Default
revision str

Alembic revision target.

'head'
concurrency int

Maximum concurrent migration workers.

10
page_size int

Number of tenants to fetch per page from the store.

100

Returns:

Type Description
list[dict[str, Any]]

List of result dicts, one per tenant::

[ {"tenant_id": "...", "success": True, "revision": "head"}, {"tenant_id": "...", "success": False, "error": "..."}, ]

Example::

Text Only
results = await migrator.upgrade_all(revision="head", concurrency=20)
failed = [r for r in results if not r["success"]]
print(f"{len(failed)} of {len(results)} tenants failed migration")
Source code in src/fastapi_tenancy/migrations/manager.py
Python
async def upgrade_all(
    self,
    revision: str = "head",
    concurrency: int = 10,
    page_size: int = 100,
) -> list[dict[str, Any]]:
    """Upgrade all active tenants to *revision* with bounded concurrency.

    Loads tenants in pages from the store to avoid loading the entire
    fleet into memory at once.  Uses ``asyncio.Semaphore(concurrency)``
    to cap parallel database connections.

    Args:
        revision: Alembic revision target.
        concurrency: Maximum concurrent migration workers.
        page_size: Number of tenants to fetch per page from the store.

    Returns:
        List of result dicts, one per tenant::

            [
                {"tenant_id": "...", "success": True,  "revision": "head"},
                {"tenant_id": "...", "success": False, "error": "..."},
            ]

    Example::

        results = await migrator.upgrade_all(revision="head", concurrency=20)
        failed = [r for r in results if not r["success"]]
        print(f"{len(failed)} of {len(results)} tenants failed migration")
    """
    from fastapi_tenancy.core.types import TenantStatus  # noqa: PLC0415

    semaphore = asyncio.Semaphore(concurrency)
    results: list[dict[str, Any]] = []
    skip = 0

    while True:
        page = await self._store.list(skip=skip, limit=page_size, status=TenantStatus.ACTIVE)
        if not page:
            break
        skip += len(page)

        tasks = [self._migrate_one(tenant, "upgrade", revision, semaphore) for tenant in page]
        page_results = await asyncio.gather(*tasks, return_exceptions=False)
        results.extend(page_results)

        if len(page) < page_size:
            break  # Last page reached.

    success_count = sum(1 for r in results if r["success"])
    logger.info(
        "upgrade_all complete: %d/%d tenants succeeded (revision=%r)",
        success_count,
        len(results),
        revision,
    )
    return results

downgrade_all async

Python
downgrade_all(
    revision: str = "-1",
    concurrency: int = 10,
    page_size: int = 100,
) -> list[dict[str, Any]]

Downgrade all active tenants with bounded concurrency.

Parameters:

Name Type Description Default
revision str

Alembic revision target.

'-1'
concurrency int

Maximum concurrent workers.

10
page_size int

Page size for store pagination.

100

Returns:

Type Description
list[dict[str, Any]]

List of result dicts (same format as :meth:upgrade_all).

Source code in src/fastapi_tenancy/migrations/manager.py
Python
async def downgrade_all(
    self,
    revision: str = "-1",
    concurrency: int = 10,
    page_size: int = 100,
) -> list[dict[str, Any]]:
    """Downgrade all active tenants with bounded concurrency.

    Args:
        revision: Alembic revision target.
        concurrency: Maximum concurrent workers.
        page_size: Page size for store pagination.

    Returns:
        List of result dicts (same format as :meth:`upgrade_all`).
    """
    from fastapi_tenancy.core.types import TenantStatus  # noqa: PLC0415

    semaphore = asyncio.Semaphore(concurrency)
    results: list[dict[str, Any]] = []
    skip = 0

    while True:
        page = await self._store.list(skip=skip, limit=page_size, status=TenantStatus.ACTIVE)
        if not page:
            break
        skip += len(page)

        tasks = [self._migrate_one(tenant, "downgrade", revision, semaphore) for tenant in page]
        page_results = await asyncio.gather(*tasks, return_exceptions=False)
        results.extend(page_results)

        if len(page) < page_size:
            break

    return results