Source code for litestar_queues.backends.sqlspec.stores.factory

"""SQLSpec queue store factory."""

from typing import TYPE_CHECKING, Any

from litestar_queues.backends.sqlspec.stores.aiomysql import AiomysqlQueueStore
from litestar_queues.backends.sqlspec.stores.aiosqlite import AiosqliteQueueStore
from litestar_queues.backends.sqlspec.stores.asyncmy import AsyncmyQueueStore
from litestar_queues.backends.sqlspec.stores.asyncpg import AsyncpgQueueStore
from litestar_queues.backends.sqlspec.stores.base import SQLSpecQueueStore, _adapter_name
from litestar_queues.backends.sqlspec.stores.cockroach_asyncpg import CockroachAsyncpgQueueStore
from litestar_queues.backends.sqlspec.stores.cockroach_psycopg import (
    CockroachPsycopgAsyncQueueStore,
    CockroachPsycopgSyncQueueStore,
)
from litestar_queues.backends.sqlspec.stores.duckdb import DuckDBQueueStore
from litestar_queues.backends.sqlspec.stores.mysqlconnector import (
    MysqlConnectorAsyncQueueStore,
    MysqlConnectorSyncQueueStore,
)
from litestar_queues.backends.sqlspec.stores.oracledb import OracledbAsyncQueueStore, OracledbSyncQueueStore
from litestar_queues.backends.sqlspec.stores.psqlpy import PsqlpyQueueStore
from litestar_queues.backends.sqlspec.stores.psycopg import PsycopgAsyncQueueStore, PsycopgSyncQueueStore
from litestar_queues.backends.sqlspec.stores.pymysql import PymysqlQueueStore
from litestar_queues.backends.sqlspec.stores.sqlite import SqliteQueueStore
from litestar_queues.exceptions import QueueConfigurationError

if TYPE_CHECKING:
    from collections.abc import Mapping

__all__ = ("create_queue_store",)

_ADAPTER_STORE_TYPES: "dict[str, type[SQLSpecQueueStore]]" = {
    "aiomysql": AiomysqlQueueStore,
    "aiosqlite": AiosqliteQueueStore,
    "asyncmy": AsyncmyQueueStore,
    "asyncpg": AsyncpgQueueStore,
    "cockroach_asyncpg": CockroachAsyncpgQueueStore,
    "duckdb": DuckDBQueueStore,
    "pymysql": PymysqlQueueStore,
    "psqlpy": PsqlpyQueueStore,
    "sqlite": SqliteQueueStore,
}
_ASYNC_OR_SYNC_ADAPTER_NAMES = frozenset({"cockroach_psycopg", "mysqlconnector", "oracledb", "psycopg"})
_SUPPORTED_ADAPTER_NAMES = frozenset(_ADAPTER_STORE_TYPES) | _ASYNC_OR_SYNC_ADAPTER_NAMES


[docs] def create_queue_store( config: "Any", *, table_name: "str | None" = None, column_map: "Mapping[str, str] | None" = None, native_json_columns: "frozenset[str] | None" = None, manage_schema: "bool" = True, ) -> "SQLSpecQueueStore": """Create a queue store for a SQLSpec adapter configuration. Returns: The queue store implementation for the SQLSpec adapter. """ store_type = _adapter_store_type(config) return store_type( config, table_name=table_name, column_map=column_map, native_json_columns=native_json_columns or None, manage_schema=manage_schema, )
def _adapter_store_type(config: "Any") -> "type[SQLSpecQueueStore]": name = _adapter_name(config) if name == "mysqlconnector": return _async_or_sync_store_type( config, async_store_type=MysqlConnectorAsyncQueueStore, sync_store_type=MysqlConnectorSyncQueueStore ) if name == "oracledb": return _async_or_sync_store_type( config, async_store_type=OracledbAsyncQueueStore, sync_store_type=OracledbSyncQueueStore ) if name == "psycopg": return _async_or_sync_store_type( config, async_store_type=PsycopgAsyncQueueStore, sync_store_type=PsycopgSyncQueueStore ) if name == "cockroach_psycopg": return _async_or_sync_store_type( config, async_store_type=CockroachPsycopgAsyncQueueStore, sync_store_type=CockroachPsycopgSyncQueueStore ) if name in _ADAPTER_STORE_TYPES: return _ADAPTER_STORE_TYPES[name] if name: supported = ", ".join(sorted(_SUPPORTED_ADAPTER_NAMES)) msg = f"SQLSpec adapter {name!r} is not supported by this queue backend. Supported adapters: {supported}." raise QueueConfigurationError(msg) return SQLSpecQueueStore def _async_or_sync_store_type( config: "Any", *, async_store_type: "type[SQLSpecQueueStore]", sync_store_type: "type[SQLSpecQueueStore]" ) -> "type[SQLSpecQueueStore]": config_type_name = type(config).__name__.lower() if "async" in config_type_name: return async_store_type return sync_store_type