Source code for litestar_queues.backends.advanced_alchemy.config

"""Advanced Alchemy backend configuration."""

from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, ClassVar

if TYPE_CHECKING:
    from advanced_alchemy.config.asyncio import SQLAlchemyAsyncConfig
    from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker

__all__ = ("AdvancedAlchemyBackendConfig",)


def _default_model_class() -> "type[Any]":
    from litestar_queues.backends.advanced_alchemy.models import QueueTaskModel

    return QueueTaskModel


[docs] @dataclass(slots=True) class AdvancedAlchemyBackendConfig: """Configuration values for the Advanced Alchemy queue backend.""" backend_name: "ClassVar[str]" = "advanced-alchemy" sqlalchemy_config: "SQLAlchemyAsyncConfig | None" = None heartbeat_session_maker: "async_sessionmaker[AsyncSession] | None" = None model_class: "type[Any] | None" = field(default_factory=_default_model_class) create_schema: "bool" = False