Core API¶
Configuration¶
- class litestar_queues.config.AsyncServiceProvider(config: QueueConfig)[source]¶
Bases:
objectProvides QueueService as an async context manager.
- __init__(config: QueueConfig) None[source]¶
Initialize the service provider.
- Parameters:
config – Queue configuration.
- litestar_queues.config.ExecutionBackendConfig = str | litestar_queues.config.ExecutionBackendConfigProtocol¶
Type alias for execution backend selectors.
- class litestar_queues.config.ExecutionBackendConfigProtocol(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for typed execution backend configuration objects.
- __init__(*args, **kwargs)¶
- litestar_queues.config.QueueBackendConfig = str | litestar_queues.config.QueueBackendConfigProtocol¶
Type alias for queue backend selectors.
- class litestar_queues.config.QueueBackendConfigProtocol(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for typed queue backend configuration objects.
- __init__(*args, **kwargs)¶
- class litestar_queues.config.QueueConfig(queue_backend: QueueBackendConfig = 'memory', execution_backend: ExecutionBackendConfig = 'local', task_dependency_resolver: TaskDependencyResolver | None = None, in_app_worker: bool = True, queue_service_dependency_key: str = 'queue_service', queue_service_state_key: str = 'queue_service', queue_worker_state_key: str = 'queue_worker', queue_event_publisher_state_key: str = 'queue_event_publisher', queue_event_channels_backend_state_key: str = 'queue_event_channels_backend', event_config: QueueEventConfig = <factory>, task_modules: tuple[str, ...]=(), initialize_schedules: bool = True, worker_batch_size: int = 10, worker_poll_interval: float = 0.1, worker_max_concurrency: int = 1, worker_heartbeat_interval: float = 30, worker_reconcile_interval: float = 30, worker_stale_after: float | None = None, worker_stale_check_interval: float = 60.0, worker_graceful_shutdown_timeout: float = 30, worker_final_cancel_timeout: float = 5, worker_queues: tuple[str, ...]=(), sync_executor_max_workers: int | None = None, sync_executor_thread_name_prefix: str = 'litestar-queues', scheduler_canary_task: str = 'scheduler.heartbeat')[source]¶
Bases:
objectConfiguration for QueuePlugin.
- queue_backend: QueueBackendConfig¶
- execution_backend: ExecutionBackendConfig¶
- task_dependency_resolver: TaskDependencyResolver | None¶
- event_config: QueueEventConfig¶
- property signature_namespace: dict[str, Any]¶
Names added to Litestar’s signature namespace.
Optional backends (advanced_alchemy, sqlspec, redis, valkey) are added only when their driver extra is installed; missing extras silently drop the corresponding entries.
- get_service(state: State | None = None) QueueService[source]¶
Return a QueueService for this configuration.
- get_queue_backend() BaseQueueBackend[source]¶
Return a configured queue backend instance.
- get_execution_backend() BaseExecutionBackend[source]¶
Return a configured execution backend instance.
- __init__(queue_backend: QueueBackendConfig = 'memory', execution_backend: ExecutionBackendConfig = 'local', task_dependency_resolver: TaskDependencyResolver | None = None, in_app_worker: bool = True, queue_service_dependency_key: str = 'queue_service', queue_service_state_key: str = 'queue_service', queue_worker_state_key: str = 'queue_worker', queue_event_publisher_state_key: str = 'queue_event_publisher', queue_event_channels_backend_state_key: str = 'queue_event_channels_backend', event_config: QueueEventConfig = <factory>, task_modules: tuple[str, ...]=(), initialize_schedules: bool = True, worker_batch_size: int = 10, worker_poll_interval: float = 0.1, worker_max_concurrency: int = 1, worker_heartbeat_interval: float = 30, worker_reconcile_interval: float = 30, worker_stale_after: float | None = None, worker_stale_check_interval: float = 60.0, worker_graceful_shutdown_timeout: float = 30, worker_final_cancel_timeout: float = 5, worker_queues: tuple[str, ...]=(), sync_executor_max_workers: int | None = None, sync_executor_thread_name_prefix: str = 'litestar-queues', scheduler_canary_task: str = 'scheduler.heartbeat') None¶
- get_event_publisher() QueueEventPublisher[source]¶
Return a configured queue event publisher.
- provide_service() AsyncServiceProvider[source]¶
Provide a QueueService instance as an async context manager.
- Returns:
An async service provider.
- class litestar_queues.config.QueueEventConfig(enabled: bool = False, sink: QueueEventSink | None = None, channels_backend: object | None = None, strict: bool = False, publish_task_channel: bool = True, publish_queue_channel: bool = True, publish_global_lifecycle: bool = False)[source]¶
Bases:
objectConfiguration for queue event publishing.
- sink: QueueEventSink | None¶
- litestar_queues.config.TaskDependencyResolver¶
User-supplied callable that resolves extra kwargs for a task before execution.
alias of
Callable[[Task[…, object], QueuedTaskRecord, TaskExecutionContext],Awaitable[Mapping[str,object]]]
- litestar_queues.config.execution_backend_name(backend: str | ExecutionBackendConfigProtocol) str[source]¶
Return the registered execution backend name for a selector.
- litestar_queues.config.queue_backend_name(backend: str | QueueBackendConfigProtocol) str[source]¶
Return the registered queue backend name for a selector.
Service¶
- class litestar_queues.service.QueueService(config: QueueConfig, *, queue_backend: BaseQueueBackend | None = None, execution_backend: BaseExecutionBackend | None = None, event_publisher: QueueEventPublisher | None = None)[source]¶
Bases:
objectHigh-level facade for queue and execution backends.
- __init__(config: QueueConfig, *, queue_backend: BaseQueueBackend | None = None, execution_backend: BaseExecutionBackend | None = None, event_publisher: QueueEventPublisher | None = None) None[source]¶
Initialize the queue service.
- property config: QueueConfig¶
Queue configuration.
- get_queue_backend() BaseQueueBackend[source]¶
Return the configured queue backend.
- get_execution_backend() BaseExecutionBackend[source]¶
Return the configured execution backend.
- get_event_publisher() QueueEventPublisher[source]¶
Return the configured event publisher.
- async enqueue(task: str | Task[Any, Any], *args: Any, scheduled_at: datetime | None = None, run_after: float | timedelta | None = None, key: str | None = None, queue: str | None = None, priority: int | None = None, retries: int | None = None, timeout: float | None = None, execution_backend: str | None = None, execution_profile: str | None = None, description: str | None = None, log_level: str | None = None, quiet_success: bool | None = None, requeue_on_stale: bool | None = None, metadata: dict[str, Any] | None = None, **kwargs: Any) TaskResult[source]¶
Enqueue a registered task.
- Returns:
A result handle for the queued record.
- resolve_task(task: str | Task[Any, Any]) Task[Any, Any][source]¶
Resolve a task name or wrapper to a registered task.
- Returns:
The registered task wrapper.
- Raises:
KeyError – If a task name is not registered.
- async get_task(task_id: UUID) QueuedTaskRecord | None[source]¶
Return a queued task record by ID.
- async claim_next(*, queue: str | None = None, execution_backend: str | None = None) QueuedTaskRecord | None[source]¶
Claim the next due queued task.
- Returns:
The claimed task record, if one was available.
- async execute_record(record: QueuedTaskRecord, *, worker_id: str | None = None) QueuedTaskRecord[source]¶
Execute a claimed queue record and persist the lifecycle result.
- Parameters:
record – The claimed queue record to execute.
worker_id – Identity of the worker driving execution, if any. The value is forwarded to
TaskExecutionContext.worker_idso published events carry stable worker provenance. Service-driven executions (no worker) leave this asNone.
- Returns:
The updated queue record.
- Raises:
asyncio.CancelledError – If task execution is cancelled.
- async recover_stale_tasks(*, stale_after: timedelta, worker_id: str | None = None) StaleTaskRecoveryResult[source]¶
Recover stale running tasks and publish a worker summary event.
- Returns:
Summary of recovered, failed, skipped, and handler-needed tasks.
- async initialize_schedules() list[QueuedTaskRecord][source]¶
Create queue records for registered recurring schedules.
- Returns:
The created or reused schedule records.
- async publish_claim_lost(record: QueuedTaskRecord, *, phase: str, task_context: TaskExecutionContext | None = None, worker_id: str | None = None, expected_retry_count: int | None = None) QueuedTaskRecord[source]¶
Publish an ownership-loss event and return the current record state.
- Returns:
Current queue task record state.
Tasks¶
- class litestar_queues.task.ScheduleConfig(task_name: str, cron: str | None = None, interval: timedelta | int | float | None = None, timezone: str = 'UTC', initial_delay: timedelta | int | float = 0, jitter: timedelta | int | float = 0, max_instances: int = 1, timeout: float | None = None)[source]¶
Bases:
objectConfiguration for a recurring task schedule.
- class litestar_queues.task.Task(func: Callable[[P], T | Awaitable[T]], *, name: str, queue: str = 'default', priority: int = 0, retries: int = 0, timeout: float | None = None, execution_backend: str | None = None, execution_profile: str | None = None, key: str | None = None, run_after: float | timedelta | None = None, description: str | None = None, log_level: str | None = None, quiet_success: bool | None = None, requeue_on_stale: bool | None = None)[source]¶
Bases:
Generic[P,T]Registered task wrapper with direct call and enqueue APIs.
- __init__(func: Callable[[P], T | Awaitable[T]], *, name: str, queue: str = 'default', priority: int = 0, retries: int = 0, timeout: float | None = None, execution_backend: str | None = None, execution_profile: str | None = None, key: str | None = None, run_after: float | timedelta | None = None, description: str | None = None, log_level: str | None = None, quiet_success: bool | None = None, requeue_on_stale: bool | None = None) None[source]¶
- property requeue_on_stale: bool¶
Whether stale running records should be requeued when retries remain.
- async execute_record(record: QueuedTaskRecord, *, task_context: TaskExecutionContext | None = None, extra_kwargs: Mapping[str, object] | None = None, sync_executor: Executor | None = None) T[source]¶
Execute this task for a queued record in worker context.
- Returns:
The wrapped callable result.
- metadata(values: dict[str, Any] | None = None) dict[str, Any][source]¶
Return enqueue metadata for this task.
- using(*, queue: str | None = None, priority: int | None = None, retries: int | None = None, timeout: float | None = None, execution_backend: str | None = None, execution_profile: str | None = None, key: str | None = None, run_after: float | timedelta | None = None, description: str | None = None, log_level: str | None = None, quiet_success: bool | None = None, requeue_on_stale: bool | None = None) Task[P, T][source]¶
Return a configured copy with enqueue overrides.
- async enqueue(*args: ~P, **kwargs: ~P) TaskResult[source]¶
Enqueue this task using the configured default service or fall back to an immediate service.
- Returns:
A result handle for the queued record.
- class litestar_queues.task.TaskResult(task_id: UUID, task_name: str, *, service: QueueService | None = None, record: QueuedTaskRecord | None = None)[source]¶
Bases:
objectHandle to a queued task result.
- __init__(task_id: UUID, task_name: str, *, service: QueueService | None = None, record: QueuedTaskRecord | None = None) None[source]¶
- property id: UUID¶
Queue record ID.
- property status: TaskStatus | None¶
Cached task status.
- property record: QueuedTaskRecord | None¶
Cached queue record.
- async refresh() Self[source]¶
Refresh this handle from its queue service.
- Returns:
The refreshed result handle.
- Raises:
RuntimeError – If the result has no associated service.
- litestar_queues.task.discover_tasks(package: str, subpackage: str = 'jobs', *, force_reload: bool = False) tuple[str, ...][source]¶
Walk
packageand import every<package>.<...>.<subpackage>.<...>module.Adopters with
app.domain.<x>.jobs/layouts can call this once at startup so@task-decorated callables register without having to enumerateQueueConfig.task_modulesby hand.- Parameters:
package – Dotted package name to walk (e.g.
"app.domain").subpackage – Path segment that marks task modules. Any module whose dotted path (excluding the root) contains this segment is imported. Defaults to
"jobs".force_reload – Re-import modules already in
sys.modules.
- Returns:
Sorted, deduplicated tuple of task names registered after the walk.
- Raises:
ModuleNotFoundError – If
packagecannot be imported, or if it resolves to a plain module rather than a package.
- litestar_queues.task.get_default_service() QueueService | None[source]¶
Return the global default QueueService instance.
- litestar_queues.task.get_scheduled_tasks() dict[str, ScheduleConfig][source]¶
Return the global scheduled task registry.
- litestar_queues.task.get_task_registry() dict[str, Task[Any, Any]][source]¶
Return the global task registry.
- litestar_queues.task.load_task_modules(modules: tuple[str, ...] | list[str], *, force_reload: bool = False) int[source]¶
Import configured task modules so decorators register tasks.
- Returns:
Number of imported modules.
- litestar_queues.task.set_default_service(service: QueueService | None) None[source]¶
Set the global default QueueService instance.
- litestar_queues.task.task(func: Callable[[P], Awaitable[T]], /) Task[P, T][source]¶
- litestar_queues.task.task(func: Callable[[P], T], /) Task[P, T]
- litestar_queues.task.task(name: str | None = None, /, *, queue: str = 'default', priority: int = 0, retries: int = 0, timeout: float | None = None, execution_backend: str | None = None, execution_profile: str | None = None, key: str | None = None, run_after: float | timedelta | None = None, description: str | None = None, log_level: str | None = None, quiet_success: bool | None = None, requeue_on_stale: bool | None = None, cron: str | None = None, interval: float | timedelta | None = None, timezone: str = 'UTC', initial_delay: float | timedelta = 0, jitter: float | timedelta = 0, max_instances: int = 1) Callable[[Callable[[...], Any]], Task[Any, Any]]
Register a callable as a queue task.
- Returns:
A task wrapper when used bare, otherwise a decorator.
- Raises:
ValueError – If both cron and interval are configured.
Models¶
- litestar_queues.models.TERMINAL_STATUSES: frozenset[Literal['pending', 'scheduled', 'running', 'completed', 'failed', 'cancelled']] = frozenset({'cancelled', 'completed', 'failed'})¶
Statuses that represent finished queue records.
- class litestar_queues.models.EnqueueSpec(task_name: str, args: tuple[Any, ...] = (), kwargs: dict[str, Any] | None = None, queue: str = 'default', priority: int = 0, max_retries: int = 0, scheduled_at: datetime | None = None, key: str | None = None, execution_backend: str = 'local', execution_profile: str | None = None, metadata: dict[str, Any] | None = None)[source]¶
Bases:
objectA single task specification for bulk enqueue via
enqueue_many.Mirrors the keyword arguments of
BaseQueueBackend.enqueue()so a batch of tasks can be described declaratively and submitted in one call.- __init__(task_name: str, args: tuple[Any, ...] = (), kwargs: dict[str, Any] | None = None, queue: str = 'default', priority: int = 0, max_retries: int = 0, scheduled_at: datetime | None = None, key: str | None = None, execution_backend: str = 'local', execution_profile: str | None = None, metadata: dict[str, Any] | None = None) None¶
- class litestar_queues.models.QueueBackendCapabilities(supports_notifications: bool = False, notification_backend: str | None = None, notifications_durable: bool = False, supports_heartbeats: bool = True, supports_atomic_claim: bool = True, supports_atomic_delayed_promotion: bool = True, supports_external_refs: bool = True, supports_terminal_cleanup: bool = True)[source]¶
Bases:
objectBehavior advertised by a queue backend.
- __init__(supports_notifications: bool = False, notification_backend: str | None = None, notifications_durable: bool = False, supports_heartbeats: bool = True, supports_atomic_claim: bool = True, supports_atomic_delayed_promotion: bool = True, supports_external_refs: bool = True, supports_terminal_cleanup: bool = True) None¶
- class litestar_queues.models.QueueStatistics(pending: int = 0, scheduled: int = 0, running: int = 0, completed: int = 0, failed: int = 0, cancelled: int = 0)[source]¶
Bases:
objectOperational status counts for a queue backend.
- class litestar_queues.models.QueuedTaskRecord(task_name: str, id: UUID = <factory>, args: tuple[~typing.Any, ...]=(), kwargs: dict[str, ~typing.Any]=<factory>, queue: str = 'default', execution_backend: str = 'local', execution_profile: str | None = None, execution_ref: str | None = None, status: Literal['pending', 'scheduled', 'running', 'completed', 'failed', 'cancelled']='pending', priority: int = 0, max_retries: int = 0, retry_count: int = 0, scheduled_at: datetime | None = None, created_at: datetime = <factory>, started_at: datetime | None = None, completed_at: datetime | None = None, heartbeat_at: datetime | None = None, result: Any = None, error: str | None = None, key: str | None = None, metadata: dict[str, ~typing.Any]=<factory>)[source]¶
Bases:
objectBackend-neutral representation of a queued task.
- __init__(task_name: str, id: UUID = <factory>, args: tuple[~typing.Any, ...]=(), kwargs: dict[str, ~typing.Any]=<factory>, queue: str = 'default', execution_backend: str = 'local', execution_profile: str | None = None, execution_ref: str | None = None, status: Literal['pending', 'scheduled', 'running', 'completed', 'failed', 'cancelled']='pending', priority: int = 0, max_retries: int = 0, retry_count: int = 0, scheduled_at: datetime | None = None, created_at: datetime = <factory>, started_at: datetime | None = None, completed_at: datetime | None = None, heartbeat_at: datetime | None = None, result: Any = None, error: str | None = None, key: str | None = None, metadata: dict[str, ~typing.Any]=<factory>) None¶
Worker¶
- class litestar_queues.worker.Worker(service: QueueService, *, batch_size: int = 10, poll_interval: float = 0.1, max_concurrency: int = 1, heartbeat_interval: float = 30, reconcile_interval: float = 30, stale_after: timedelta | None = None, stale_check_interval: float = 60.0, graceful_shutdown_timeout: float = 30, final_cancel_timeout: float = 5, worker_id: str | None = None, queues: tuple[str, ...] = ())[source]¶
Bases:
objectLocal in-process queue worker.
- __init__(service: QueueService, *, batch_size: int = 10, poll_interval: float = 0.1, max_concurrency: int = 1, heartbeat_interval: float = 30, reconcile_interval: float = 30, stale_after: timedelta | None = None, stale_check_interval: float = 60.0, graceful_shutdown_timeout: float = 30, final_cancel_timeout: float = 5, worker_id: str | None = None, queues: tuple[str, ...] = ()) None[source]¶
Initialize the worker.
- async stop(*, force: bool = False) None[source]¶
Stop the worker loop and drain or cancel in-flight work.
Plugin¶
- class litestar_queues.plugin.QueuePlugin(config: QueueConfig | None = None)[source]¶
Bases:
InitPluginLitestar plugin for queue service dependency registration and lifecycle.
- __init__(config: QueueConfig | None = None) None[source]¶
Initialize the queue plugin.
- property config: QueueConfig¶
Plugin configuration.
- get_service(state: State | None = None) QueueService[source]¶
Return a QueueService for this plugin.
Exceptions¶
- exception litestar_queues.exceptions.MissingDependencyError(package: str, install_package: str | None = None)[source]¶
Bases:
QueueError,ImportErrorRaised when a required optional dependency is not installed.
- exception litestar_queues.exceptions.NonRetryableError[source]¶
Bases:
QueueErrorRaised by a task to mark the current failure as permanent.
- exception litestar_queues.exceptions.QueueConfigurationError[source]¶
Bases:
QueueErrorRaised when queue backend configuration is invalid.
- exception litestar_queues.exceptions.QueueError[source]¶
Bases:
ExceptionBase exception for litestar-queues errors.
- litestar_queues.exceptions.non_retryable(message: str) None[source]¶
Raise a non-retryable task failure.
- Raises:
NonRetryableError – Always raised with the provided message.