Source code for litestar_queues.execution.factory

"""Execution backend registry and factory functions."""

from importlib import import_module
from inspect import signature
from typing import TYPE_CHECKING, Any, cast

from litestar_queues.config import ExecutionBackendConfig, QueueConfig, execution_backend_name

if TYPE_CHECKING:
    from collections.abc import Callable

    from litestar_queues.execution.base import BaseExecutionBackend

__all__ = ("execution_backend", "get_execution_backend", "get_execution_backend_class", "list_execution_backends")

_execution_backend_registry: "dict[str, type[BaseExecutionBackend]]" = {}

_BUILTIN_BACKENDS: "dict[str, str]" = {
    "cloudrun": "litestar_queues.execution.cloudrun:CloudRunExecutionBackend",
    "immediate": "litestar_queues.execution.immediate:ImmediateExecutionBackend",
    "local": "litestar_queues.execution.local:LocalExecutionBackend",
}


[docs] def execution_backend(name: "str") -> "Callable[[type[BaseExecutionBackend]], type[BaseExecutionBackend]]": """Decorator to register an execution backend class with a short name. Returns: A decorator that registers the backend class. """ def decorator(cls: "type[BaseExecutionBackend]") -> "type[BaseExecutionBackend]": _execution_backend_registry[name] = cls return cls return decorator
[docs] def get_execution_backend_class(backend_path: "str") -> "type[BaseExecutionBackend]": """Get an execution backend class by short name or import path. Returns: The resolved execution backend class. Raises: ValueError: If a short backend name is unknown. """ if backend_path in _execution_backend_registry: return _execution_backend_registry[backend_path] if backend_path in _BUILTIN_BACKENDS: module_path, class_name = _BUILTIN_BACKENDS[backend_path].split(":", 1) module = import_module(module_path) backend_class = _backend_class(getattr(module, class_name)) _execution_backend_registry[backend_path] = backend_class return backend_class if "." not in backend_path: available = sorted({*_execution_backend_registry, *_BUILTIN_BACKENDS}) msg = f"Unknown execution backend: {backend_path!r}. Available: {available}" raise ValueError(msg) module_path, class_name = backend_path.rsplit(".", 1) module = import_module(module_path) return _backend_class(getattr(module, class_name))
[docs] def get_execution_backend( backend: "ExecutionBackendConfig" = "immediate", config: "QueueConfig | None" = None ) -> "BaseExecutionBackend": """Get an instantiated execution backend. Returns: A configured execution backend instance. Raises: TypeError: If a typed execution config selects a backend class that does not accept ``execution_config``. """ execution_config = None if isinstance(backend, str) else backend backend_class = get_execution_backend_class(execution_backend_name(backend)) backend_kwargs: "dict[str, Any]" = {"config": config} if execution_config is not None: backend_kwargs["execution_config"] = execution_config init_signature = signature(backend_class.__init__) accepts_kwargs = any(param.kind == param.VAR_KEYWORD for param in init_signature.parameters.values()) if execution_config is not None and not accepts_kwargs and "execution_config" not in init_signature.parameters: msg = f"{backend_class.__name__} must accept execution_config when selected by a typed execution config." raise TypeError(msg) if not accepts_kwargs: backend_kwargs = {key: value for key, value in backend_kwargs.items() if key in init_signature.parameters} return backend_class(**backend_kwargs)
[docs] def list_execution_backends() -> "list[str]": """Return registered execution backend names.""" return sorted({*_execution_backend_registry, *_BUILTIN_BACKENDS})
def _backend_class(value: "Any") -> "type[BaseExecutionBackend]": return cast("type[BaseExecutionBackend]", value)