Source code for litestar_queues.config

from collections.abc import AsyncIterator, Awaitable, Callable, Mapping
from contextlib import suppress
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, ClassVar, Protocol

from litestar.di import Provide

from litestar_queues.events import QueueEventConfig

if TYPE_CHECKING:
    from types import TracebackType

    from litestar.datastructures import State

    from litestar_queues.backends import BaseQueueBackend
    from litestar_queues.events import QueueEventPublisher, TaskExecutionContext
    from litestar_queues.execution import BaseExecutionBackend
    from litestar_queues.models import QueuedTaskRecord
    from litestar_queues.service import QueueService
    from litestar_queues.task import Task

__all__ = (
    "AsyncServiceProvider",
    "ExecutionBackendConfig",
    "ExecutionBackendConfigProtocol",
    "QueueBackendConfig",
    "QueueBackendConfigProtocol",
    "QueueConfig",
    "QueueEventConfig",
    "TaskDependencyResolver",
    "execution_backend_name",
    "queue_backend_name",
)


[docs] class QueueBackendConfigProtocol(Protocol): """Protocol for typed queue backend configuration objects.""" backend_name: "ClassVar[str]"
[docs] class ExecutionBackendConfigProtocol(Protocol): """Protocol for typed execution backend configuration objects.""" backend_name: "ClassVar[str]"
QueueBackendConfig = str | QueueBackendConfigProtocol """Type alias for queue backend selectors.""" ExecutionBackendConfig = str | ExecutionBackendConfigProtocol """Type alias for execution backend selectors."""
[docs] def queue_backend_name(backend: "QueueBackendConfig") -> "str": """Return the registered queue backend name for a selector.""" return backend if isinstance(backend, str) else backend.backend_name
[docs] def execution_backend_name(backend: "ExecutionBackendConfig") -> "str": """Return the registered execution backend name for a selector.""" return backend if isinstance(backend, str) else backend.backend_name
TaskDependencyResolver = Callable[ ["Task[..., object]", "QueuedTaskRecord", "TaskExecutionContext"], Awaitable[Mapping[str, object]] ] """User-supplied callable that resolves extra kwargs for a task before execution."""
[docs] class AsyncServiceProvider: """Provides QueueService as an async context manager.""" __slots__ = ("_config", "_service")
[docs] def __init__(self, config: "QueueConfig") -> "None": """Initialize the service provider. Args: config: Queue configuration. """ self._config = config self._service: "QueueService | None" = None
async def __aenter__(self) -> "QueueService": """Enter the async context and return a QueueService. Returns: A managed QueueService instance. """ from litestar_queues.service import QueueService self._service = QueueService(self._config) await self._service.__aenter__() return self._service async def __aexit__( self, exc_type: "type[BaseException] | None", # noqa: PYI036 exc_val: "BaseException | None", # noqa: PYI036 exc_tb: "TracebackType | None", # noqa: PYI036 ) -> "None": """Exit the async context and close the QueueService.""" if self._service is not None: await self._service.__aexit__(exc_type, exc_val, exc_tb) self._service = None async def __aiter__(self) -> 'AsyncIterator["QueueService"]': """Yield a managed QueueService for Litestar dependency injection. Yields: Managed queue service instance. """ service = await self.__aenter__() try: yield service except BaseException as exc: await self.__aexit__(type(exc), exc, exc.__traceback__) raise else: await self.__aexit__(None, None, None)
[docs] @dataclass(slots=True) class QueueConfig: """Configuration for QueuePlugin.""" queue_backend: "QueueBackendConfig" = "memory" execution_backend: "ExecutionBackendConfig" = "local" task_dependency_resolver: "TaskDependencyResolver | None" = None in_app_worker: "bool" = True queue_service_dependency_key: "str" = "queue_service" queue_service_state_key: "str" = "queue_service" queue_worker_state_key: "str" = "queue_worker" queue_event_publisher_state_key: "str" = "queue_event_publisher" queue_event_channels_backend_state_key: "str" = "queue_event_channels_backend" event_config: "QueueEventConfig" = field(default_factory=QueueEventConfig) task_modules: "tuple[str, ...]" = () initialize_schedules: "bool" = True worker_batch_size: "int" = 10 worker_poll_interval: "float" = 0.1 worker_max_concurrency: "int" = 1 worker_heartbeat_interval: "float" = 30 worker_reconcile_interval: "float" = 30 worker_stale_after: "float | None" = None worker_stale_check_interval: "float" = 60.0 worker_graceful_shutdown_timeout: "float" = 30 worker_final_cancel_timeout: "float" = 5 worker_queues: "tuple[str, ...]" = () sync_executor_max_workers: "int | None" = None sync_executor_thread_name_prefix: "str" = "litestar-queues" scheduler_canary_task: "str" = "scheduler.heartbeat" @property def signature_namespace(self) -> "dict[str, Any]": """Names added to Litestar's signature namespace. Optional backends (advanced_alchemy, sqlspec, redis, valkey) are added only when their driver extra is installed; missing extras silently drop the corresponding entries. """ from litestar.di import NamedDependency from litestar_queues.backends import BaseQueueBackend, InMemoryQueueBackend from litestar_queues.events import ( InMemoryQueueEventSink, NoopQueueEventSink, QueueChannels, QueueEvent, QueueEventActor, QueueEventConfig, QueueEventEntityRef, QueueEventPublisher, TaskExecutionContext, ) from litestar_queues.exceptions import NonRetryableError, non_retryable from litestar_queues.execution import ( BaseExecutionBackend, CloudRunExecutionBackend, CloudRunExecutionConfig, CloudRunExecutionStatus, ImmediateExecutionBackend, LocalExecutionBackend, ) from litestar_queues.models import ( QueueBackendCapabilities, QueuedTaskRecord, QueueStatistics, StaleTaskRecoveryResult, ) from litestar_queues.service import QueueService from litestar_queues.task import ScheduleConfig, Task, TaskResult from litestar_queues.worker import Worker namespace: "dict[str, Any]" = { "BaseExecutionBackend": BaseExecutionBackend, "BaseQueueBackend": BaseQueueBackend, "CloudRunExecutionBackend": CloudRunExecutionBackend, "CloudRunExecutionConfig": CloudRunExecutionConfig, "CloudRunExecutionStatus": CloudRunExecutionStatus, "ImmediateExecutionBackend": ImmediateExecutionBackend, "InMemoryQueueBackend": InMemoryQueueBackend, "LocalExecutionBackend": LocalExecutionBackend, "NamedDependency": NamedDependency, "NonRetryableError": NonRetryableError, "NoopQueueEventSink": NoopQueueEventSink, "InMemoryQueueEventSink": InMemoryQueueEventSink, "ExecutionBackendConfig": ExecutionBackendConfig, "QueueChannels": QueueChannels, "QueueConfig": QueueConfig, "QueueBackendCapabilities": QueueBackendCapabilities, "QueueBackendConfig": QueueBackendConfig, "QueueBackendConfigProtocol": QueueBackendConfigProtocol, "QueueEvent": QueueEvent, "QueueEventActor": QueueEventActor, "QueueEventConfig": QueueEventConfig, "QueueEventEntityRef": QueueEventEntityRef, "QueueEventPublisher": QueueEventPublisher, "QueuedTaskRecord": QueuedTaskRecord, "QueueService": QueueService, "QueueStatistics": QueueStatistics, "ScheduleConfig": ScheduleConfig, "StaleTaskRecoveryResult": StaleTaskRecoveryResult, "Task": Task, "TaskDependencyResolver": TaskDependencyResolver, "ExecutionBackendConfigProtocol": ExecutionBackendConfigProtocol, "TaskExecutionContext": TaskExecutionContext, "TaskResult": TaskResult, "Worker": Worker, "non_retryable": non_retryable, } with suppress(ImportError): from litestar_queues.backends.advanced_alchemy import AdvancedAlchemyQueueBackend namespace["AdvancedAlchemyQueueBackend"] = AdvancedAlchemyQueueBackend with suppress(ImportError): from litestar_queues.backends.sqlspec import SQLSpecQueueBackend namespace["SQLSpecQueueBackend"] = SQLSpecQueueBackend with suppress(ImportError): from litestar_queues.backends.redis import RedisBackendConfig, RedisQueueBackend namespace["RedisBackendConfig"] = RedisBackendConfig namespace["RedisQueueBackend"] = RedisQueueBackend with suppress(ImportError): from litestar_queues.backends.valkey import ValkeyBackendConfig, ValkeyQueueBackend namespace["ValkeyBackendConfig"] = ValkeyBackendConfig namespace["ValkeyQueueBackend"] = ValkeyQueueBackend return namespace @property def dependencies(self) -> "dict[str, Any]": """Dependency providers for Litestar's DI system.""" return {self.queue_service_dependency_key: Provide(self.provide_service_dependency)}
[docs] def get_service(self, state: "State | None" = None) -> "QueueService": """Return a QueueService for this configuration.""" from litestar_queues.service import QueueService if state is not None and self.queue_service_state_key in state: cached = state[self.queue_service_state_key] if isinstance(cached, QueueService): return cached if isinstance(cached, QueueConfig): return QueueService(cached) return QueueService(self)
[docs] def get_queue_backend(self) -> "BaseQueueBackend": """Return a configured queue backend instance.""" from litestar_queues.backends import get_queue_backend return get_queue_backend(self.queue_backend, config=self)
[docs] def get_execution_backend(self) -> "BaseExecutionBackend": """Return a configured execution backend instance.""" from litestar_queues.execution import get_execution_backend return get_execution_backend(self.execution_backend, config=self)
[docs] def get_event_publisher(self) -> "QueueEventPublisher": """Return a configured queue event publisher.""" from litestar_queues.events import ( ChannelsQueueEventSink, NoopQueueEventSink, QueueEventPublisher, QueueEventSink, ) event_config = self.event_config sink: "QueueEventSink" if not event_config.enabled: sink = NoopQueueEventSink() elif event_config.sink is not None: sink = event_config.sink elif event_config.channels_backend is not None: sink = ChannelsQueueEventSink(event_config.channels_backend) else: sink = NoopQueueEventSink() return QueueEventPublisher( sink, strict=event_config.strict, publish_task_channel=event_config.publish_task_channel, publish_queue_channel=event_config.publish_queue_channel, publish_global_lifecycle=event_config.publish_global_lifecycle, )
[docs] def provide_service(self) -> "AsyncServiceProvider": """Provide a QueueService instance as an async context manager. Returns: An async service provider. """ return AsyncServiceProvider(self)
[docs] async def provide_service_dependency(self, state: "State") -> 'AsyncIterator["QueueService"]': """Yield the application-scoped QueueService for Litestar dependency injection.""" yield self.get_service(state)