Queue Backends¶
Registry¶
Queue backends are registered through litestar_queues.backends.
- litestar_queues.backends.get_queue_backend(backend: QueueBackendConfig = 'memory', config: QueueConfig | None = None) BaseQueueBackend[source]¶
Get an instantiated queue backend.
- Returns:
A configured queue backend instance.
- Raises:
TypeError – If a typed backend config selects a backend class that does not accept
backend_config.
- litestar_queues.backends.get_queue_backend_class(backend_path: str) type[BaseQueueBackend][source]¶
Get a queue backend class by short name or import path.
Optional backends are imported lazily on first lookup so unused adapters do not require their driver extras to be installed.
- Returns:
The resolved queue backend class.
- Raises:
ValueError – If a short backend name is unknown.
- litestar_queues.backends.queue_backend(name: str) Callable[[type[BaseQueueBackend]], type[BaseQueueBackend]][source]¶
Decorator to register a queue backend class with a short name.
- Returns:
A decorator that registers the backend class.
Base Backend¶
- class litestar_queues.backends.base.BaseQueueBackend(config: QueueConfig | None = None)[source]¶
Bases:
objectBase class for queue persistence backends.
- __init__(config: QueueConfig | None = None) None[source]¶
Initialize the queue backend.
- config¶
- property capabilities: QueueBackendCapabilities¶
Backend behavior capabilities.
- async enqueue(task_name: str, *, args: tuple[Any, ...] = (), kwargs: dict[str, Any] | None = None, queue: str = 'default', priority: int = 0, max_retries: int = 0, scheduled_at: datetime | None = None, key: str | None = None, execution_backend: str = 'local', execution_profile: str | None = None, metadata: dict[str, Any] | None = None) QueuedTaskRecord[source]¶
Persist a queued task.
- async enqueue_many(specs: Sequence[EnqueueSpec]) list[QueuedTaskRecord][source]¶
Persist multiple queued tasks, returning records in input order.
The default implementation issues one
enqueue()per spec, which preserves per-key deduplication and ordering. Backends with a native bulk path (e.g. SQLSpec COPY/Arrow/execute_many) override this for throughput while keeping the same semantics.- Returns:
Queue task records in the same order as
specs.
- async get_task(task_id: UUID) QueuedTaskRecord | None[source]¶
Return a queued task by ID.
- async get_task_by_key(key: str) QueuedTaskRecord | None[source]¶
Return a queued task by deduplication key.
- async list_pending(*, limit: int = 1, queue: str | None = None, execution_backend: str | None = None) list[QueuedTaskRecord][source]¶
Return due pending or scheduled tasks ordered for execution.
- async claim_task(task_id: UUID) QueuedTaskRecord | None[source]¶
Atomically claim a pending task.
- async claim_next(*, queue: str | None = None, execution_backend: str | None = None) QueuedTaskRecord | None[source]¶
Claim the next due task.
- Returns:
The claimed task record, if one was available.
- async complete_task(task_id: UUID, *, result: Any = None, expected_retry_count: int | None = None) QueuedTaskRecord | None[source]¶
Mark a task as completed.
- Parameters:
task_id – Queue record identifier.
result – Task result payload.
expected_retry_count – When provided, update only if the record is still running with this retry count.
- async fail_task(task_id: UUID, error: str, *, retry: bool = True, expected_retry_count: int | None = None) QueuedTaskRecord | None[source]¶
Mark a task as failed or retry it.
- Parameters:
task_id – Queue record identifier.
error – Error message to persist.
retry – Whether retry policy may requeue the task.
expected_retry_count – When provided, update only if the record is still running with this retry count.
- async touch_heartbeat(task_id: UUID, *, expected_retry_count: int | None = None) bool[source]¶
Update the heartbeat timestamp for a running task.
- Returns:
True when a running record matched the optional retry-count fence.
- async null_heartbeats(task_ids: list[UUID], *, expected_retry_count: int | None = None) None[source]¶
Clear heartbeat timestamps for task IDs.
- Parameters:
task_ids – Queue record identifiers.
expected_retry_count – When provided, clear only records that still match this retry count.
- async requeue_stale_running(*, stale_after: timedelta) StaleTaskRecoveryResult[source]¶
Recover running tasks with stale heartbeats.
- Returns:
Summary of requeued, failed, skipped, and handler-needed records.
- async set_execution_ref(task_id: UUID, execution_backend: str, execution_ref: str, *, execution_profile: str | None = None) QueuedTaskRecord | None[source]¶
Persist an external execution reference for a running task.
- Returns:
The updated queued task record, if one exists.
- async set_execution_backend(task_id: UUID, execution_backend: str, *, execution_profile: str | None = None) QueuedTaskRecord | None[source]¶
Persist an execution backend/profile change for a queued task.
- Returns:
The updated queued task record, if one exists.
- async list_running_external(*, limit: int | None = None) list[QueuedTaskRecord][source]¶
Return externally dispatched tasks with references to reconcile.
- async get_statistics() QueueStatistics[source]¶
Return queue status counts.
- async list_completed_by_task(task_name: str, *, since: datetime | None = None, limit: int = 10) list[QueuedTaskRecord][source]¶
Return recent completed records for a task name.
- async cleanup_terminal(before: datetime) int[source]¶
Delete terminal records completed before a cutoff.
- Returns:
The number of deleted records.
- async notify_new_task(record: QueuedTaskRecord) None[source]¶
Notify waiters that a new task is available.
Memory¶
- class litestar_queues.backends.memory.backend.InMemoryQueueBackend(config: QueueConfig | None = None)[source]¶
Bases:
BaseQueueBackendIn-process queue backend for tests, local development, and examples.
- __init__(config: QueueConfig | None = None) None[source]¶
Initialize the queue backend.
- property capabilities: QueueBackendCapabilities¶
Backend behavior capabilities.
- async enqueue(task_name: str, *, args: tuple[Any, ...] = (), kwargs: dict[str, Any] | None = None, queue: str = 'default', priority: int = 0, max_retries: int = 0, scheduled_at: datetime | None = None, key: str | None = None, execution_backend: str = 'local', execution_profile: str | None = None, metadata: dict[str, Any] | None = None) QueuedTaskRecord[source]¶
Persist a queued task.
- async get_task(task_id: UUID) QueuedTaskRecord | None[source]¶
Return a queued task by ID.
- async get_task_by_key(key: str) QueuedTaskRecord | None[source]¶
Return a queued task by deduplication key.
- async list_pending(*, limit: int = 1, queue: str | None = None, execution_backend: str | None = None) list[QueuedTaskRecord][source]¶
Return due pending or scheduled tasks ordered for execution.
- async claim_task(task_id: UUID) QueuedTaskRecord | None[source]¶
Atomically claim a pending task.
- async complete_task(task_id: UUID, *, result: Any = None, expected_retry_count: int | None = None) QueuedTaskRecord | None[source]¶
Mark a task as completed.
- Parameters:
task_id – Queue record identifier.
result – Task result payload.
expected_retry_count – When provided, update only if the record is still running with this retry count.
- async fail_task(task_id: UUID, error: str, *, retry: bool = True, expected_retry_count: int | None = None) QueuedTaskRecord | None[source]¶
Mark a task as failed or retry it.
- Parameters:
task_id – Queue record identifier.
error – Error message to persist.
retry – Whether retry policy may requeue the task.
expected_retry_count – When provided, update only if the record is still running with this retry count.
- async touch_heartbeat(task_id: UUID, *, expected_retry_count: int | None = None) bool[source]¶
Update the heartbeat timestamp for a running task.
- Returns:
True when a running record matched the optional retry-count fence.
- async null_heartbeats(task_ids: list[UUID], *, expected_retry_count: int | None = None) None[source]¶
Clear heartbeat timestamps for task IDs.
- Parameters:
task_ids – Queue record identifiers.
expected_retry_count – When provided, clear only records that still match this retry count.
- async requeue_stale_running(*, stale_after: timedelta) StaleTaskRecoveryResult[source]¶
Recover running tasks with stale heartbeats.
- Returns:
Summary of requeued, failed, skipped, and handler-needed records.
- async set_execution_ref(task_id: UUID, execution_backend: str, execution_ref: str, *, execution_profile: str | None = None) QueuedTaskRecord | None[source]¶
Persist an external execution reference for a running task.
- Returns:
The updated queued task record, if one exists.
- async set_execution_backend(task_id: UUID, execution_backend: str, *, execution_profile: str | None = None) QueuedTaskRecord | None[source]¶
Persist an execution backend/profile change for a queued task.
- Returns:
The updated queued task record, if one exists.
- async list_running_external(*, limit: int | None = None) list[QueuedTaskRecord][source]¶
Return externally dispatched tasks with references to reconcile.
- async get_statistics() QueueStatistics[source]¶
Return queue status counts.
- async list_completed_by_task(task_name: str, *, since: datetime | None = None, limit: int = 10) list[QueuedTaskRecord][source]¶
Return recent completed records for a task name.
- async cleanup_terminal(before: datetime) int[source]¶
Delete terminal records completed before a cutoff.
- Returns:
The number of deleted records.
- async notify_new_task(record: QueuedTaskRecord) None[source]¶
Notify waiters that a new task is available.
SQLSpec¶
SQLSpec queue backend.
- class litestar_queues.backends.sqlspec.backend.SQLSpecQueueBackend(config: QueueConfig | None = None, *, backend_config: SQLSpecBackendConfig | None = None)[source]¶
Bases:
BaseQueueBackendSQLSpec-backed queue backend.
- __init__(config: QueueConfig | None = None, *, backend_config: SQLSpecBackendConfig | None = None) None[source]¶
Initialize the queue backend.
- property capabilities: QueueBackendCapabilities¶
Backend behavior capabilities.
- async enqueue(task_name: str, *, args: tuple[Any, ...] = (), kwargs: dict[str, Any] | None = None, queue: str = 'default', priority: int = 0, max_retries: int = 0, scheduled_at: datetime | None = None, key: str | None = None, execution_backend: str = 'local', execution_profile: str | None = None, metadata: dict[str, Any] | None = None) QueuedTaskRecord[source]¶
Persist a queued task.
- async enqueue_many(specs: Sequence[EnqueueSpec]) list[QueuedTaskRecord][source]¶
Persist many tasks via the adapter’s fastest bulk path.
Resolves existing deduplication keys in one round trip, then inserts the remaining rows through the native Arrow ingest path (
load_from_records()) when the adapter supports it, otherwise via a batchedexecute_many. Returns records in input order, with existing non-terminal keyed tasks returned as-is (no duplicate insert) to match the semantics ofenqueue().- Returns:
Queue task records in the same order as
specs.
- async get_task(task_id: UUID) QueuedTaskRecord | None[source]¶
Return a queued task by ID.
- async get_task_by_key(key: str) QueuedTaskRecord | None[source]¶
Return a queued task by deduplication key.
- async list_pending(*, limit: int = 1, queue: str | None = None, execution_backend: str | None = None) list[QueuedTaskRecord][source]¶
Return due pending or scheduled tasks ordered for execution.
- async claim_task(task_id: UUID) QueuedTaskRecord | None[source]¶
Atomically claim a pending task.
- async claim_next(*, queue: str | None = None, execution_backend: str | None = None) QueuedTaskRecord | None[source]¶
Claim the next due task.
- Returns:
The claimed task record, if one was available.
- async complete_task(task_id: UUID, *, result: Any = None, expected_retry_count: int | None = None) QueuedTaskRecord | None[source]¶
Mark a task as completed.
- Parameters:
task_id – Queue record identifier.
result – Task result payload.
expected_retry_count – When provided, update only if the record is still running with this retry count.
- async fail_task(task_id: UUID, error: str, *, retry: bool = True, expected_retry_count: int | None = None) QueuedTaskRecord | None[source]¶
Mark a task as failed or retry it.
- Parameters:
task_id – Queue record identifier.
error – Error message to persist.
retry – Whether retry policy may requeue the task.
expected_retry_count – When provided, update only if the record is still running with this retry count.
- async touch_heartbeat(task_id: UUID, *, expected_retry_count: int | None = None) bool[source]¶
Update the heartbeat timestamp for a running task.
- Returns:
True when a running record matched the optional retry-count fence.
- async null_heartbeats(task_ids: list[UUID], *, expected_retry_count: int | None = None) None[source]¶
Clear heartbeat timestamps for task IDs.
- Parameters:
task_ids – Queue record identifiers.
expected_retry_count – When provided, clear only records that still match this retry count.
- async requeue_stale_running(*, stale_after: timedelta) StaleTaskRecoveryResult[source]¶
Recover running tasks with stale heartbeats.
- Returns:
Summary of requeued, failed, skipped, and handler-needed records.
- async set_execution_ref(task_id: UUID, execution_backend: str, execution_ref: str, *, execution_profile: str | None = None) QueuedTaskRecord | None[source]¶
Persist an external execution reference for a running task.
- Returns:
The updated queued task record, if one exists.
- async set_execution_backend(task_id: UUID, execution_backend: str, *, execution_profile: str | None = None) QueuedTaskRecord | None[source]¶
Persist an execution backend/profile change for a queued task.
- Returns:
The updated queued task record, if one exists.
- async list_running_external(*, limit: int | None = None) list[QueuedTaskRecord][source]¶
Return externally dispatched tasks with references to reconcile.
- async get_statistics() QueueStatistics[source]¶
Return queue status counts.
- async iter_all(*, chunk_size: int = 1000) AsyncIterator[QueuedTaskRecord][source]¶
Stream every queue record without materializing the full table.
Uses SQLSpec
select_streamso large administrative scans and exports consume rows in chunks ofchunk_sizerather than loading the entire result set into memory. The backend session stays open for the duration of iteration, so callers should consume the iterator promptly.- Yields:
Queue task records from the backing SQLSpec table.
- async list_completed_by_task(task_name: str, *, since: datetime | None = None, limit: int = 10) list[QueuedTaskRecord][source]¶
Return recent completed records for a task name.
- async cleanup_terminal(before: datetime) int[source]¶
Delete terminal records completed before a cutoff.
- Returns:
The number of deleted records.
- async notify_new_task(record: QueuedTaskRecord) None[source]¶
Publish a SQLSpec event when configured queue work becomes available.
SQLSpec backend configuration.
- litestar_queues.backends.sqlspec.config.NOTIFY_TRANSPORTS: frozenset[str] = frozenset({'aq', 'listen_notify', 'listen_notify_durable', 'polling', 'table_queue', 'txeventq'})¶
Valid worker-wakeup transports for
SQLSpecBackendConfig.notify_transport.listen_notify/listen_notify_durablepush wakeups through native LISTEN/NOTIFY,table_queueuses the durable events table,aqandtxeventquse Oracle Advanced Queuing backends, andpollingdisables push wakeups so workers fall back to interval polling.
- class litestar_queues.backends.sqlspec.config.SQLSpecBackendConfig(sqlspec: SQLSpec | None = None, config: Any | None = None, heartbeat_pool_config: Any | None = None, table_name: str | None = None, create_schema: bool | None = None, run_migrations: bool | None = None, event_channel: AsyncEventChannel | None = None, notifications: bool | None = None, notification_channel: str | None = None, notify_transport: str | None = None, event_backend: str | None = None, event_queue_table: str | None = None, event_poll_interval: float | None = None, event_settings: dict[str, Any]=<factory>, queue_observability: bool = True, column_map: Mapping[str, str]=<factory>, native_json_columns: frozenset[str] = <factory>, manage_schema: bool = True)[source]¶
Bases:
objectConfiguration values for the SQLSpec queue backend.
- __init__(sqlspec: SQLSpec | None = None, config: Any | None = None, heartbeat_pool_config: Any | None = None, table_name: str | None = None, create_schema: bool | None = None, run_migrations: bool | None = None, event_channel: AsyncEventChannel | None = None, notifications: bool | None = None, notification_channel: str | None = None, notify_transport: str | None = None, event_backend: str | None = None, event_queue_table: str | None = None, event_poll_interval: float | None = None, event_settings: dict[str, Any]=<factory>, queue_observability: bool = True, column_map: Mapping[str, str]=<factory>, native_json_columns: frozenset[str] = <factory>, manage_schema: bool = True) None¶
Schema and migration helpers for the SQLSpec queue backend.
- litestar_queues.backends.sqlspec.schema.migration_directory() Path[source]¶
Return the packaged SQLSpec queue extension migration directory.
- litestar_queues.backends.sqlspec.schema.migration_paths() tuple[str, ...][source]¶
Return packaged SQLSpec migration file paths.
- litestar_queues.backends.sqlspec.schema.validate_column_map(column_map: Mapping[str, str]) dict[str, str][source]¶
Validate a canonical-to-adopter column map.
- Returns:
A defensive copy of the validated map.
- Raises:
QueueConfigurationError – If a canonical name is unknown or a mapped name is not a valid SQL identifier.
- litestar_queues.backends.sqlspec.schema.validate_native_json_columns(columns: frozenset[str]) frozenset[str][source]¶
Validate native JSON passthrough columns.
- Returns:
The validated column set.
- Raises:
QueueConfigurationError – If any column is not a canonical JSON column.
- litestar_queues.backends.sqlspec.schema.validate_table_name(table_name: str) str[source]¶
Validate a SQL identifier used for the queue table name.
- Returns:
The validated table name, normalized to unquoted SQLSpec identifier parts.
- Raises:
QueueConfigurationError – If the table name is not a valid SQL identifier.
SQLSpec queue store factory.
- litestar_queues.backends.sqlspec.stores.factory.create_queue_store(config: Any, *, table_name: str | None = None, column_map: Mapping[str, str] | None = None, native_json_columns: frozenset[str] | None = None, manage_schema: bool = True) SQLSpecQueueStore[source]¶
Create a queue store for a SQLSpec adapter configuration.
- Returns:
The queue store implementation for the SQLSpec adapter.
Advanced Alchemy¶
Advanced Alchemy queue backend.
- class litestar_queues.backends.advanced_alchemy.backend.AdvancedAlchemyQueueBackend(config: QueueConfig | None = None, *, backend_config: AdvancedAlchemyBackendConfig | None = None)[source]¶
Bases:
BaseQueueBackendAdvanced Alchemy-backed queue backend.
- __init__(config: QueueConfig | None = None, *, backend_config: AdvancedAlchemyBackendConfig | None = None) None[source]¶
Initialize the queue backend.
- property capabilities: QueueBackendCapabilities¶
Backend behavior capabilities.
- async enqueue(task_name: str, *, args: tuple[Any, ...] = (), kwargs: dict[str, Any] | None = None, queue: str = 'default', priority: int = 0, max_retries: int = 0, scheduled_at: datetime | None = None, key: str | None = None, execution_backend: str = 'local', execution_profile: str | None = None, metadata: dict[str, Any] | None = None) QueuedTaskRecord[source]¶
Persist a queued task.
- async get_task(task_id: UUID) QueuedTaskRecord | None[source]¶
Return a queued task by ID.
- async get_task_by_key(key: str) QueuedTaskRecord | None[source]¶
Return a queued task by deduplication key.
- async list_pending(*, limit: int = 1, queue: str | None = None, execution_backend: str | None = None) list[QueuedTaskRecord][source]¶
Return due pending or scheduled tasks ordered for execution.
- async claim_task(task_id: UUID) QueuedTaskRecord | None[source]¶
Atomically claim a pending task.
- async claim_next(*, queue: str | None = None, execution_backend: str | None = None) QueuedTaskRecord | None[source]¶
Claim the next due task.
- Returns:
The claimed task record, if one was available.
- async complete_task(task_id: UUID, *, result: Any = None, expected_retry_count: int | None = None) QueuedTaskRecord | None[source]¶
Mark a task as completed.
- Parameters:
task_id – Queue record identifier.
result – Task result payload.
expected_retry_count – When provided, update only if the record is still running with this retry count.
- async fail_task(task_id: UUID, error: str, *, retry: bool = True, expected_retry_count: int | None = None) QueuedTaskRecord | None[source]¶
Mark a task as failed or retry it.
- Parameters:
task_id – Queue record identifier.
error – Error message to persist.
retry – Whether retry policy may requeue the task.
expected_retry_count – When provided, update only if the record is still running with this retry count.
- async touch_heartbeat(task_id: UUID, *, expected_retry_count: int | None = None) bool[source]¶
Update the heartbeat timestamp for a running task.
- Returns:
True when a running record matched the optional retry-count fence.
- async null_heartbeats(task_ids: list[UUID], *, expected_retry_count: int | None = None) None[source]¶
Clear heartbeat timestamps for task IDs.
- Parameters:
task_ids – Queue record identifiers.
expected_retry_count – When provided, clear only records that still match this retry count.
- async requeue_stale_running(*, stale_after: timedelta) StaleTaskRecoveryResult[source]¶
Recover running tasks with stale heartbeats.
- Returns:
Summary of requeued, failed, skipped, and handler-needed records.
- async set_execution_ref(task_id: UUID, execution_backend: str, execution_ref: str, *, execution_profile: str | None = None) QueuedTaskRecord | None[source]¶
Persist an external execution reference for a running task.
- Returns:
The updated queued task record, if one exists.
- async set_execution_backend(task_id: UUID, execution_backend: str, *, execution_profile: str | None = None) QueuedTaskRecord | None[source]¶
Persist an execution backend/profile change for a queued task.
- Returns:
The updated queued task record, if one exists.
- async list_running_external(*, limit: int | None = None) list[QueuedTaskRecord][source]¶
Return externally dispatched tasks with references to reconcile.
- async get_statistics() QueueStatistics[source]¶
Return queue status counts.
Advanced Alchemy backend configuration.
- class litestar_queues.backends.advanced_alchemy.config.AdvancedAlchemyBackendConfig(sqlalchemy_config: SQLAlchemyAsyncConfig | None = None, heartbeat_session_maker: async_sessionmaker[AsyncSession] | None = None, model_class: type[Any] | None = <factory>, create_schema: bool = False)[source]¶
Bases:
objectConfiguration values for the Advanced Alchemy queue backend.
Advanced Alchemy queue task mixins.
- class litestar_queues.backends.advanced_alchemy.mixins.QueueTaskModelMixin[source]¶
Bases:
objectDeclarative mixin carrying queue task columns and indexes.
Compose this with an application-owned Advanced Alchemy base that provides compatible
idandcreated_atcolumns.- task_name = <sqlalchemy.orm.properties.MappedColumn object>¶
- args_json = <sqlalchemy.orm.properties.MappedColumn object>¶
- kwargs_json = <sqlalchemy.orm.properties.MappedColumn object>¶
- queue = <sqlalchemy.orm.properties.MappedColumn object>¶
- execution_backend = <sqlalchemy.orm.properties.MappedColumn object>¶
- execution_profile = <sqlalchemy.orm.properties.MappedColumn object>¶
- execution_ref = <sqlalchemy.orm.properties.MappedColumn object>¶
- status = <sqlalchemy.orm.properties.MappedColumn object>¶
- priority = <sqlalchemy.orm.properties.MappedColumn object>¶
- max_retries = <sqlalchemy.orm.properties.MappedColumn object>¶
- retry_count = <sqlalchemy.orm.properties.MappedColumn object>¶
- scheduled_at = <sqlalchemy.orm.properties.MappedColumn object>¶
- started_at = <sqlalchemy.orm.properties.MappedColumn object>¶
- completed_at = <sqlalchemy.orm.properties.MappedColumn object>¶
- heartbeat_at = <sqlalchemy.orm.properties.MappedColumn object>¶
- result_json = <sqlalchemy.orm.properties.MappedColumn object>¶
- error = <sqlalchemy.orm.properties.MappedColumn object>¶
- task_key = <sqlalchemy.orm.properties.MappedColumn object>¶
- metadata_json = <sqlalchemy.orm.properties.MappedColumn object>¶
Advanced Alchemy queue task repository.
- class litestar_queues.backends.advanced_alchemy.repository.QueueTaskRepository(*, statement: ~sqlalchemy.sql.selectable.Select | None = None, session: ~sqlalchemy.ext.asyncio.session.AsyncSession | ~sqlalchemy.ext.asyncio.scoping.async_scoped_session[~sqlalchemy.ext.asyncio.session.AsyncSession], auto_expunge: bool = False, auto_refresh: bool = True, auto_commit: bool = False, order_by: ~typing.List[tuple[str | ~sqlalchemy.orm.attributes.InstrumentedAttribute[~typing.Any], bool] | ~sqlalchemy.sql.elements.UnaryExpression[~typing.Any]] | tuple[str | ~sqlalchemy.orm.attributes.InstrumentedAttribute[~typing.Any], bool] | ~sqlalchemy.sql.elements.UnaryExpression[~typing.Any] | None = None, error_messages: ~advanced_alchemy.exceptions.ErrorMessages | type[~advanced_alchemy.utils.dataclass.Empty] | None = <class 'advanced_alchemy.utils.dataclass.Empty'>, load: ~collections.abc.Sequence[~sqlalchemy.orm.strategy_options._AbstractLoad | ~typing.Literal['*'] | ~sqlalchemy.orm.attributes.InstrumentedAttribute[~typing.Any] | ~sqlalchemy.orm.relationships.RelationshipProperty[~typing.Any] | ~sqlalchemy.orm.interfaces.MapperProperty[~typing.Any] | ~collections.abc.Sequence[~sqlalchemy.orm.strategy_options._AbstractLoad | ~typing.Literal['*'] | ~sqlalchemy.orm.attributes.InstrumentedAttribute[~typing.Any] | ~sqlalchemy.orm.relationships.RelationshipProperty[~typing.Any] | ~sqlalchemy.orm.interfaces.MapperProperty[~typing.Any]]] | ~sqlalchemy.orm.strategy_options._AbstractLoad | ~typing.Literal['*'] | ~sqlalchemy.orm.attributes.InstrumentedAttribute[~typing.Any] | ~sqlalchemy.orm.relationships.RelationshipProperty[~typing.Any] | ~sqlalchemy.orm.interfaces.MapperProperty[~typing.Any] | ~sqlalchemy.sql.base.ExecutableOption | ~collections.abc.Sequence[~sqlalchemy.sql.base.ExecutableOption] | None = None, execution_options: dict[str, ~typing.Any] | None = None, wrap_exceptions: bool = True, uniquify: bool | None = None, count_with_window_function: bool | None = None, cache_manager: CacheManager | None = None, bind_group: str | None = None, **kwargs: ~typing.Any)[source]¶
Bases:
SQLAlchemyAsyncRepository[Any]Repository for queue task records.
Advanced Alchemy queue persistence service.
- class litestar_queues.backends.advanced_alchemy.service.QueueTaskService(session: ~sqlalchemy.ext.asyncio.session.AsyncSession | ~sqlalchemy.ext.asyncio.scoping.async_scoped_session[~sqlalchemy.ext.asyncio.session.AsyncSession], *, statement: ~sqlalchemy.sql.selectable.Select | None = None, auto_expunge: bool = False, auto_refresh: bool = True, auto_commit: bool = False, order_by: ~typing.List[tuple[str | ~sqlalchemy.orm.attributes.InstrumentedAttribute[~typing.Any], bool] | ~sqlalchemy.sql.elements.UnaryExpression[~typing.Any]] | tuple[str | ~sqlalchemy.orm.attributes.InstrumentedAttribute[~typing.Any], bool] | ~sqlalchemy.sql.elements.UnaryExpression[~typing.Any] | None = None, error_messages: ~advanced_alchemy.exceptions.ErrorMessages | type[~advanced_alchemy.utils.dataclass.Empty] | None = <class 'advanced_alchemy.utils.dataclass.Empty'>, wrap_exceptions: bool = True, load: ~collections.abc.Sequence[~sqlalchemy.orm.strategy_options._AbstractLoad | ~typing.Literal['*'] | ~sqlalchemy.orm.attributes.InstrumentedAttribute[~typing.Any] | ~sqlalchemy.orm.relationships.RelationshipProperty[~typing.Any] | ~sqlalchemy.orm.interfaces.MapperProperty[~typing.Any] | ~collections.abc.Sequence[~sqlalchemy.orm.strategy_options._AbstractLoad | ~typing.Literal['*'] | ~sqlalchemy.orm.attributes.InstrumentedAttribute[~typing.Any] | ~sqlalchemy.orm.relationships.RelationshipProperty[~typing.Any] | ~sqlalchemy.orm.interfaces.MapperProperty[~typing.Any]]] | ~sqlalchemy.orm.strategy_options._AbstractLoad | ~typing.Literal['*'] | ~sqlalchemy.orm.attributes.InstrumentedAttribute[~typing.Any] | ~sqlalchemy.orm.relationships.RelationshipProperty[~typing.Any] | ~sqlalchemy.orm.interfaces.MapperProperty[~typing.Any] | ~sqlalchemy.sql.base.ExecutableOption | ~collections.abc.Sequence[~sqlalchemy.sql.base.ExecutableOption] | None = None, execution_options: dict[str, ~typing.Any] | None = None, uniquify: bool | None = None, count_with_window_function: bool | None = None, **repo_kwargs: ~typing.Any)[source]¶
Bases:
SQLAlchemyAsyncRepositoryService[Any, Any]Persistence operations for Advanced Alchemy queue records.
- classmethod for_model(model_class: type[Any]) type[QueueTaskService][source]¶
Return a service subclass bound to
model_class.
- async enqueue(task_name: str, *, args: tuple[Any, ...], kwargs: dict[str, Any], queue: str, priority: int, max_retries: int, scheduled_at: datetime | None, key: str | None, execution_backend: str, execution_profile: str | None, metadata: dict[str, Any]) QueuedTaskRecord[source]¶
- async get_task(task_id: UUID) QueuedTaskRecord | None[source]¶
- async get_task_by_key(key: str) QueuedTaskRecord | None[source]¶
- async list_pending(*, limit: int, queue: str | None, execution_backend: str | None) list[QueuedTaskRecord][source]¶
- async claim_task(task_id: UUID) QueuedTaskRecord | None[source]¶
- async claim_next(*, queue: str | None, execution_backend: str | None) QueuedTaskRecord | None[source]¶
- async complete_task(task_id: UUID, *, result: Any = None, expected_retry_count: int | None = None) QueuedTaskRecord | None[source]¶
- async fail_task(task_id: UUID, error: str, *, retry: bool, expected_retry_count: int | None = None) QueuedTaskRecord | None[source]¶
- async null_heartbeats(task_ids: list[UUID], *, expected_retry_count: int | None = None) None[source]¶
- async requeue_stale_running(*, stale_after: timedelta) StaleTaskRecoveryResult[source]¶
- async set_execution_ref(task_id: UUID, execution_backend: str, execution_ref: str, *, execution_profile: str | None) QueuedTaskRecord | None[source]¶
- async set_execution_backend(task_id: UUID, execution_backend: str, *, execution_profile: str | None) QueuedTaskRecord | None[source]¶
- async get_statistics() QueueStatistics[source]¶
- async list_completed_by_task(task_name: str, *, since: datetime | None, limit: int) list[QueuedTaskRecord][source]¶
- model_from_record(record: QueuedTaskRecord) Any[source]¶
Convert a backend-neutral record into an Advanced Alchemy model.
- Returns:
The Advanced Alchemy queue task model.
- static record_from_model(model: Any) QueuedTaskRecord[source]¶
Convert an Advanced Alchemy model into a backend-neutral record.
- Returns:
The backend-neutral queued task record.
Redis¶
Redis queue backend.
Stores queued task records in a Redis-protocol key-value server. The
implementation lives directly on RedisQueueBackend; the Valkey
backend inherits from this class and only swaps the client factory and
_backend_name ClassVar.
- class litestar_queues.backends.redis.backend.RedisQueueBackend(config: QueueConfig | None = None, *, backend_config: _RedisBackendConfig | None = None)[source]¶
Bases:
BaseQueueBackendQueue backend that stores records in a Redis-protocol key-value server.
- __init__(config: QueueConfig | None = None, *, backend_config: _RedisBackendConfig | None = None) None[source]¶
Initialize the queue backend.
- property capabilities: QueueBackendCapabilities¶
Backend behavior capabilities.
- async open() bool[source]¶
Open Redis-protocol client resources.
- Returns:
True when the client is ready.
- async enqueue(task_name: str, *, args: tuple[Any, ...] = (), kwargs: dict[str, Any] | None = None, queue: str = 'default', priority: int = 0, max_retries: int = 0, scheduled_at: datetime | None = None, key: str | None = None, execution_backend: str = 'local', execution_profile: str | None = None, metadata: dict[str, Any] | None = None) QueuedTaskRecord[source]¶
Persist a queued task.
- Returns:
The created or deduplicated queued task record.
- async get_task(task_id: UUID) QueuedTaskRecord | None[source]¶
Return a queued task by ID.
- async get_task_by_key(key: str) QueuedTaskRecord | None[source]¶
Return a queued task by deduplication key.
- async list_pending(*, limit: int = 1, queue: str | None = None, execution_backend: str | None = None) list[QueuedTaskRecord][source]¶
Return due pending or scheduled tasks ordered for execution.
- async claim_task(task_id: UUID) QueuedTaskRecord | None[source]¶
Atomically claim a pending task.
- Returns:
The claimed record, if it was still due and claimable.
- async complete_task(task_id: UUID, *, result: Any = None, expected_retry_count: int | None = None) QueuedTaskRecord | None[source]¶
Mark a task as completed.
- Returns:
The completed record, if it exists.
- async fail_task(task_id: UUID, error: str, *, retry: bool = True, expected_retry_count: int | None = None) QueuedTaskRecord | None[source]¶
Mark a task as failed or retry it.
- Returns:
The updated record, if it exists.
- async cancel_task(task_id: UUID) bool[source]¶
Cancel a task if it has not started.
- Returns:
True when the task was cancelled.
- async touch_heartbeat(task_id: UUID, *, expected_retry_count: int | None = None) bool[source]¶
Update the heartbeat timestamp for a running task.
- Returns:
True when the heartbeat was updated.
- async null_heartbeats(task_ids: list[UUID], *, expected_retry_count: int | None = None) None[source]¶
Clear heartbeat timestamps for task IDs.
- async requeue_stale_running(*, stale_after: timedelta) StaleTaskRecoveryResult[source]¶
Requeue running tasks with stale heartbeats.
- Returns:
Summary of recovered records.
- async set_execution_ref(task_id: UUID, execution_backend: str, execution_ref: str, *, execution_profile: str | None = None) QueuedTaskRecord | None[source]¶
Persist an external execution reference for a running task.
- Returns:
The updated record, if it exists.
- async set_execution_backend(task_id: UUID, execution_backend: str, *, execution_profile: str | None = None) QueuedTaskRecord | None[source]¶
Persist an execution backend/profile change for a queued task.
- Returns:
The updated record, if it exists.
- async list_running_external(*, limit: int | None = None) list[QueuedTaskRecord][source]¶
Return externally dispatched tasks with references to reconcile.
- async get_statistics() QueueStatistics[source]¶
Return queue status counts.
- async list_completed_by_task(task_name: str, *, since: datetime | None = None, limit: int = 10) list[QueuedTaskRecord][source]¶
Return recent completed records for a task name.
- async cleanup_terminal(before: datetime) int[source]¶
Delete terminal records completed before a cutoff.
- Returns:
Number of deleted records.
- async notify_new_task(record: QueuedTaskRecord) None[source]¶
Publish a Redis-protocol pub/sub message when work is available.
Redis queue backend configuration.
- class litestar_queues.backends.redis.config.RedisBackendConfig(url: str = 'redis://localhost:6379/0', key_prefix: str = 'litestar_queues', notifications: bool = True, notification_channel: str = 'litestar_queues:queue_notifications', lock_timeout: float = 5.0, poll_interval: float = 0.1, client: Any | None = None)[source]¶
Bases:
objectConfiguration for the Redis queue backend.
Valkey¶
Valkey queue backend.
The Valkey wire protocol is API-compatible with Redis, so this backend
inherits the full RedisQueueBackend implementation and only overrides
the client factory (uses valkey.asyncio instead of redis.asyncio)
plus the _backend_name ClassVar that drives lock-name error messages
and the valkey-pubsub notification capability label.
- class litestar_queues.backends.valkey.backend.ValkeyQueueBackend(config: QueueConfig | None = None, *, backend_config: ValkeyBackendConfig | None = None)[source]¶
Bases:
RedisQueueBackendValkey-backed queue backend.
- __init__(config: QueueConfig | None = None, *, backend_config: ValkeyBackendConfig | None = None) None[source]¶
Initialize the queue backend.
Valkey queue backend configuration.
- class litestar_queues.backends.valkey.config.ValkeyBackendConfig(url: str = 'redis://localhost:6379/0', key_prefix: str = 'litestar_queues', notifications: bool = True, notification_channel: str = 'litestar_queues:queue_notifications', lock_timeout: float = 5.0, poll_interval: float = 0.1, client: Any | None = None)[source]¶
Bases:
objectConfiguration for the Valkey queue backend.