Queue Backends

Registry

Queue backends are registered through litestar_queues.backends.

litestar_queues.backends.get_queue_backend(backend: QueueBackendConfig = 'memory', config: QueueConfig | None = None) BaseQueueBackend[source]

Get an instantiated queue backend.

Returns:

A configured queue backend instance.

Raises:

TypeError – If a typed backend config selects a backend class that does not accept backend_config.

litestar_queues.backends.get_queue_backend_class(backend_path: str) type[BaseQueueBackend][source]

Get a queue backend class by short name or import path.

Optional backends are imported lazily on first lookup so unused adapters do not require their driver extras to be installed.

Returns:

The resolved queue backend class.

Raises:

ValueError – If a short backend name is unknown.

litestar_queues.backends.queue_backend(name: str) Callable[[type[BaseQueueBackend]], type[BaseQueueBackend]][source]

Decorator to register a queue backend class with a short name.

Returns:

A decorator that registers the backend class.

litestar_queues.backends.list_queue_backends() list[str][source]

Return registered queue backend names (built-ins + dynamically registered).

Base Backend

class litestar_queues.backends.base.BaseQueueBackend(config: QueueConfig | None = None)[source]

Bases: object

Base class for queue persistence backends.

__init__(config: QueueConfig | None = None) None[source]

Initialize the queue backend.

config
property capabilities: QueueBackendCapabilities

Backend behavior capabilities.

async open() bool[source]

Open queue resources.

Returns:

True when resources are ready.

async close() None[source]

Close queue resources.

async enqueue(task_name: str, *, args: tuple[Any, ...] = (), kwargs: dict[str, Any] | None = None, queue: str = 'default', priority: int = 0, max_retries: int = 0, scheduled_at: datetime | None = None, key: str | None = None, execution_backend: str = 'local', execution_profile: str | None = None, metadata: dict[str, Any] | None = None) QueuedTaskRecord[source]

Persist a queued task.

async enqueue_many(specs: Sequence[EnqueueSpec]) list[QueuedTaskRecord][source]

Persist multiple queued tasks, returning records in input order.

The default implementation issues one enqueue() per spec, which preserves per-key deduplication and ordering. Backends with a native bulk path (e.g. SQLSpec COPY/Arrow/execute_many) override this for throughput while keeping the same semantics.

Returns:

Queue task records in the same order as specs.

async get_task(task_id: UUID) QueuedTaskRecord | None[source]

Return a queued task by ID.

async get_task_by_key(key: str) QueuedTaskRecord | None[source]

Return a queued task by deduplication key.

async list_pending(*, limit: int = 1, queue: str | None = None, execution_backend: str | None = None) list[QueuedTaskRecord][source]

Return due pending or scheduled tasks ordered for execution.

async claim_task(task_id: UUID) QueuedTaskRecord | None[source]

Atomically claim a pending task.

async claim_next(*, queue: str | None = None, execution_backend: str | None = None) QueuedTaskRecord | None[source]

Claim the next due task.

Returns:

The claimed task record, if one was available.

async complete_task(task_id: UUID, *, result: Any = None, expected_retry_count: int | None = None) QueuedTaskRecord | None[source]

Mark a task as completed.

Parameters:
  • task_id – Queue record identifier.

  • result – Task result payload.

  • expected_retry_count – When provided, update only if the record is still running with this retry count.

async fail_task(task_id: UUID, error: str, *, retry: bool = True, expected_retry_count: int | None = None) QueuedTaskRecord | None[source]

Mark a task as failed or retry it.

Parameters:
  • task_id – Queue record identifier.

  • error – Error message to persist.

  • retry – Whether retry policy may requeue the task.

  • expected_retry_count – When provided, update only if the record is still running with this retry count.

async cancel_task(task_id: UUID) bool[source]

Cancel a task if it has not started.

async touch_heartbeat(task_id: UUID, *, expected_retry_count: int | None = None) bool[source]

Update the heartbeat timestamp for a running task.

Returns:

True when a running record matched the optional retry-count fence.

async null_heartbeats(task_ids: list[UUID], *, expected_retry_count: int | None = None) None[source]

Clear heartbeat timestamps for task IDs.

Parameters:
  • task_ids – Queue record identifiers.

  • expected_retry_count – When provided, clear only records that still match this retry count.

async requeue_stale_running(*, stale_after: timedelta) StaleTaskRecoveryResult[source]

Recover running tasks with stale heartbeats.

Returns:

Summary of requeued, failed, skipped, and handler-needed records.

async set_execution_ref(task_id: UUID, execution_backend: str, execution_ref: str, *, execution_profile: str | None = None) QueuedTaskRecord | None[source]

Persist an external execution reference for a running task.

Returns:

The updated queued task record, if one exists.

async set_execution_backend(task_id: UUID, execution_backend: str, *, execution_profile: str | None = None) QueuedTaskRecord | None[source]

Persist an execution backend/profile change for a queued task.

Returns:

The updated queued task record, if one exists.

async list_running_external(*, limit: int | None = None) list[QueuedTaskRecord][source]

Return externally dispatched tasks with references to reconcile.

async get_statistics() QueueStatistics[source]

Return queue status counts.

async list_completed_by_task(task_name: str, *, since: datetime | None = None, limit: int = 10) list[QueuedTaskRecord][source]

Return recent completed records for a task name.

async cleanup_terminal(before: datetime) int[source]

Delete terminal records completed before a cutoff.

Returns:

The number of deleted records.

async notify_new_task(record: QueuedTaskRecord) None[source]

Notify waiters that a new task is available.

async wait_for_notifications(timeout: float | None = None) bool[source]

Wait until backend notification arrives.

Returns:

True when a notification was observed.

Memory

class litestar_queues.backends.memory.backend.InMemoryQueueBackend(config: QueueConfig | None = None)[source]

Bases: BaseQueueBackend

In-process queue backend for tests, local development, and examples.

__init__(config: QueueConfig | None = None) None[source]

Initialize the queue backend.

property capabilities: QueueBackendCapabilities

Backend behavior capabilities.

async enqueue(task_name: str, *, args: tuple[Any, ...] = (), kwargs: dict[str, Any] | None = None, queue: str = 'default', priority: int = 0, max_retries: int = 0, scheduled_at: datetime | None = None, key: str | None = None, execution_backend: str = 'local', execution_profile: str | None = None, metadata: dict[str, Any] | None = None) QueuedTaskRecord[source]

Persist a queued task.

async get_task(task_id: UUID) QueuedTaskRecord | None[source]

Return a queued task by ID.

async get_task_by_key(key: str) QueuedTaskRecord | None[source]

Return a queued task by deduplication key.

async list_pending(*, limit: int = 1, queue: str | None = None, execution_backend: str | None = None) list[QueuedTaskRecord][source]

Return due pending or scheduled tasks ordered for execution.

async claim_task(task_id: UUID) QueuedTaskRecord | None[source]

Atomically claim a pending task.

async complete_task(task_id: UUID, *, result: Any = None, expected_retry_count: int | None = None) QueuedTaskRecord | None[source]

Mark a task as completed.

Parameters:
  • task_id – Queue record identifier.

  • result – Task result payload.

  • expected_retry_count – When provided, update only if the record is still running with this retry count.

async fail_task(task_id: UUID, error: str, *, retry: bool = True, expected_retry_count: int | None = None) QueuedTaskRecord | None[source]

Mark a task as failed or retry it.

Parameters:
  • task_id – Queue record identifier.

  • error – Error message to persist.

  • retry – Whether retry policy may requeue the task.

  • expected_retry_count – When provided, update only if the record is still running with this retry count.

async cancel_task(task_id: UUID) bool[source]

Cancel a task if it has not started.

async touch_heartbeat(task_id: UUID, *, expected_retry_count: int | None = None) bool[source]

Update the heartbeat timestamp for a running task.

Returns:

True when a running record matched the optional retry-count fence.

async null_heartbeats(task_ids: list[UUID], *, expected_retry_count: int | None = None) None[source]

Clear heartbeat timestamps for task IDs.

Parameters:
  • task_ids – Queue record identifiers.

  • expected_retry_count – When provided, clear only records that still match this retry count.

async requeue_stale_running(*, stale_after: timedelta) StaleTaskRecoveryResult[source]

Recover running tasks with stale heartbeats.

Returns:

Summary of requeued, failed, skipped, and handler-needed records.

async set_execution_ref(task_id: UUID, execution_backend: str, execution_ref: str, *, execution_profile: str | None = None) QueuedTaskRecord | None[source]

Persist an external execution reference for a running task.

Returns:

The updated queued task record, if one exists.

async set_execution_backend(task_id: UUID, execution_backend: str, *, execution_profile: str | None = None) QueuedTaskRecord | None[source]

Persist an execution backend/profile change for a queued task.

Returns:

The updated queued task record, if one exists.

async list_running_external(*, limit: int | None = None) list[QueuedTaskRecord][source]

Return externally dispatched tasks with references to reconcile.

async get_statistics() QueueStatistics[source]

Return queue status counts.

async list_completed_by_task(task_name: str, *, since: datetime | None = None, limit: int = 10) list[QueuedTaskRecord][source]

Return recent completed records for a task name.

async cleanup_terminal(before: datetime) int[source]

Delete terminal records completed before a cutoff.

Returns:

The number of deleted records.

async notify_new_task(record: QueuedTaskRecord) None[source]

Notify waiters that a new task is available.

async wait_for_notifications(timeout: float | None = None) bool[source]

Wait until backend notification arrives.

Returns:

True when a notification was observed.

async clear() None[source]

Clear all in-memory records.

SQLSpec

SQLSpec queue backend.

class litestar_queues.backends.sqlspec.backend.SQLSpecQueueBackend(config: QueueConfig | None = None, *, backend_config: SQLSpecBackendConfig | None = None)[source]

Bases: BaseQueueBackend

SQLSpec-backed queue backend.

__init__(config: QueueConfig | None = None, *, backend_config: SQLSpecBackendConfig | None = None) None[source]

Initialize the queue backend.

async open() bool[source]

Open SQLSpec resources.

Returns:

True when SQLSpec resources are ready.

async close() None[source]

Close SQLSpec resources.

property capabilities: QueueBackendCapabilities

Backend behavior capabilities.

async create_schema() None[source]

Create the SQLSpec queue table and indexes.

async run_migrations() None[source]

Apply packaged SQLSpec migrations.

async enqueue(task_name: str, *, args: tuple[Any, ...] = (), kwargs: dict[str, Any] | None = None, queue: str = 'default', priority: int = 0, max_retries: int = 0, scheduled_at: datetime | None = None, key: str | None = None, execution_backend: str = 'local', execution_profile: str | None = None, metadata: dict[str, Any] | None = None) QueuedTaskRecord[source]

Persist a queued task.

async enqueue_many(specs: Sequence[EnqueueSpec]) list[QueuedTaskRecord][source]

Persist many tasks via the adapter’s fastest bulk path.

Resolves existing deduplication keys in one round trip, then inserts the remaining rows through the native Arrow ingest path (load_from_records()) when the adapter supports it, otherwise via a batched execute_many. Returns records in input order, with existing non-terminal keyed tasks returned as-is (no duplicate insert) to match the semantics of enqueue().

Returns:

Queue task records in the same order as specs.

async get_task(task_id: UUID) QueuedTaskRecord | None[source]

Return a queued task by ID.

async get_task_by_key(key: str) QueuedTaskRecord | None[source]

Return a queued task by deduplication key.

async list_pending(*, limit: int = 1, queue: str | None = None, execution_backend: str | None = None) list[QueuedTaskRecord][source]

Return due pending or scheduled tasks ordered for execution.

async claim_task(task_id: UUID) QueuedTaskRecord | None[source]

Atomically claim a pending task.

async claim_next(*, queue: str | None = None, execution_backend: str | None = None) QueuedTaskRecord | None[source]

Claim the next due task.

Returns:

The claimed task record, if one was available.

async complete_task(task_id: UUID, *, result: Any = None, expected_retry_count: int | None = None) QueuedTaskRecord | None[source]

Mark a task as completed.

Parameters:
  • task_id – Queue record identifier.

  • result – Task result payload.

  • expected_retry_count – When provided, update only if the record is still running with this retry count.

async fail_task(task_id: UUID, error: str, *, retry: bool = True, expected_retry_count: int | None = None) QueuedTaskRecord | None[source]

Mark a task as failed or retry it.

Parameters:
  • task_id – Queue record identifier.

  • error – Error message to persist.

  • retry – Whether retry policy may requeue the task.

  • expected_retry_count – When provided, update only if the record is still running with this retry count.

async cancel_task(task_id: UUID) bool[source]

Cancel a task if it has not started.

async touch_heartbeat(task_id: UUID, *, expected_retry_count: int | None = None) bool[source]

Update the heartbeat timestamp for a running task.

Returns:

True when a running record matched the optional retry-count fence.

async null_heartbeats(task_ids: list[UUID], *, expected_retry_count: int | None = None) None[source]

Clear heartbeat timestamps for task IDs.

Parameters:
  • task_ids – Queue record identifiers.

  • expected_retry_count – When provided, clear only records that still match this retry count.

async requeue_stale_running(*, stale_after: timedelta) StaleTaskRecoveryResult[source]

Recover running tasks with stale heartbeats.

Returns:

Summary of requeued, failed, skipped, and handler-needed records.

async set_execution_ref(task_id: UUID, execution_backend: str, execution_ref: str, *, execution_profile: str | None = None) QueuedTaskRecord | None[source]

Persist an external execution reference for a running task.

Returns:

The updated queued task record, if one exists.

async set_execution_backend(task_id: UUID, execution_backend: str, *, execution_profile: str | None = None) QueuedTaskRecord | None[source]

Persist an execution backend/profile change for a queued task.

Returns:

The updated queued task record, if one exists.

async list_running_external(*, limit: int | None = None) list[QueuedTaskRecord][source]

Return externally dispatched tasks with references to reconcile.

async get_statistics() QueueStatistics[source]

Return queue status counts.

async iter_all(*, chunk_size: int = 1000) AsyncIterator[QueuedTaskRecord][source]

Stream every queue record without materializing the full table.

Uses SQLSpec select_stream so large administrative scans and exports consume rows in chunks of chunk_size rather than loading the entire result set into memory. The backend session stays open for the duration of iteration, so callers should consume the iterator promptly.

Yields:

Queue task records from the backing SQLSpec table.

async list_completed_by_task(task_name: str, *, since: datetime | None = None, limit: int = 10) list[QueuedTaskRecord][source]

Return recent completed records for a task name.

async cleanup_terminal(before: datetime) int[source]

Delete terminal records completed before a cutoff.

Returns:

The number of deleted records.

async notify_new_task(record: QueuedTaskRecord) None[source]

Publish a SQLSpec event when configured queue work becomes available.

async wait_for_notifications(timeout: float | None = None) bool[source]

Wait for a SQLSpec event when queue notifications are configured.

Returns:

True when a notification was received.

SQLSpec backend configuration.

litestar_queues.backends.sqlspec.config.NOTIFY_TRANSPORTS: frozenset[str] = frozenset({'aq', 'listen_notify', 'listen_notify_durable', 'polling', 'table_queue', 'txeventq'})

Valid worker-wakeup transports for SQLSpecBackendConfig.notify_transport.

listen_notify/listen_notify_durable push wakeups through native LISTEN/NOTIFY, table_queue uses the durable events table, aq and txeventq use Oracle Advanced Queuing backends, and polling disables push wakeups so workers fall back to interval polling.

class litestar_queues.backends.sqlspec.config.SQLSpecBackendConfig(sqlspec: SQLSpec | None = None, config: Any | None = None, heartbeat_pool_config: Any | None = None, table_name: str | None = None, create_schema: bool | None = None, run_migrations: bool | None = None, event_channel: AsyncEventChannel | None = None, notifications: bool | None = None, notification_channel: str | None = None, notify_transport: str | None = None, event_backend: str | None = None, event_queue_table: str | None = None, event_poll_interval: float | None = None, event_settings: dict[str, Any]=<factory>, queue_observability: bool = True, column_map: Mapping[str, str]=<factory>, native_json_columns: frozenset[str] = <factory>, manage_schema: bool = True)[source]

Bases: object

Configuration values for the SQLSpec queue backend.

backend_name: ClassVar[str] = 'sqlspec'
sqlspec: SQLSpec | None
config: Any | None
heartbeat_pool_config: Any | None
table_name: str | None
create_schema: bool | None
run_migrations: bool | None
event_channel: AsyncEventChannel | None
notifications: bool | None
notification_channel: str | None
notify_transport: str | None
event_backend: str | None
event_queue_table: str | None
event_poll_interval: float | None
event_settings: dict[str, Any]
queue_observability: bool
column_map: Mapping[str, str]
native_json_columns: frozenset[str]
manage_schema: bool
__init__(sqlspec: SQLSpec | None = None, config: Any | None = None, heartbeat_pool_config: Any | None = None, table_name: str | None = None, create_schema: bool | None = None, run_migrations: bool | None = None, event_channel: AsyncEventChannel | None = None, notifications: bool | None = None, notification_channel: str | None = None, notify_transport: str | None = None, event_backend: str | None = None, event_queue_table: str | None = None, event_poll_interval: float | None = None, event_settings: dict[str, Any]=<factory>, queue_observability: bool = True, column_map: Mapping[str, str]=<factory>, native_json_columns: frozenset[str] = <factory>, manage_schema: bool = True) None

Schema and migration helpers for the SQLSpec queue backend.

litestar_queues.backends.sqlspec.schema.migration_directory() Path[source]

Return the packaged SQLSpec queue extension migration directory.

litestar_queues.backends.sqlspec.schema.migration_paths() tuple[str, ...][source]

Return packaged SQLSpec migration file paths.

litestar_queues.backends.sqlspec.schema.validate_column_map(column_map: Mapping[str, str]) dict[str, str][source]

Validate a canonical-to-adopter column map.

Returns:

A defensive copy of the validated map.

Raises:

QueueConfigurationError – If a canonical name is unknown or a mapped name is not a valid SQL identifier.

litestar_queues.backends.sqlspec.schema.validate_native_json_columns(columns: frozenset[str]) frozenset[str][source]

Validate native JSON passthrough columns.

Returns:

The validated column set.

Raises:

QueueConfigurationError – If any column is not a canonical JSON column.

litestar_queues.backends.sqlspec.schema.validate_table_name(table_name: str) str[source]

Validate a SQL identifier used for the queue table name.

Returns:

The validated table name, normalized to unquoted SQLSpec identifier parts.

Raises:

QueueConfigurationError – If the table name is not a valid SQL identifier.

SQLSpec queue store factory.

litestar_queues.backends.sqlspec.stores.factory.create_queue_store(config: Any, *, table_name: str | None = None, column_map: Mapping[str, str] | None = None, native_json_columns: frozenset[str] | None = None, manage_schema: bool = True) SQLSpecQueueStore[source]

Create a queue store for a SQLSpec adapter configuration.

Returns:

The queue store implementation for the SQLSpec adapter.

Advanced Alchemy

Advanced Alchemy queue backend.

class litestar_queues.backends.advanced_alchemy.backend.AdvancedAlchemyQueueBackend(config: QueueConfig | None = None, *, backend_config: AdvancedAlchemyBackendConfig | None = None)[source]

Bases: BaseQueueBackend

Advanced Alchemy-backed queue backend.

__init__(config: QueueConfig | None = None, *, backend_config: AdvancedAlchemyBackendConfig | None = None) None[source]

Initialize the queue backend.

property capabilities: QueueBackendCapabilities

Backend behavior capabilities.

async open() bool[source]

Open Advanced Alchemy resources.

Returns:

True when resources are ready.

async close() None[source]

Close backend-owned resources.

async create_schema() None[source]

Create the queue task table and indexes.

async enqueue(task_name: str, *, args: tuple[Any, ...] = (), kwargs: dict[str, Any] | None = None, queue: str = 'default', priority: int = 0, max_retries: int = 0, scheduled_at: datetime | None = None, key: str | None = None, execution_backend: str = 'local', execution_profile: str | None = None, metadata: dict[str, Any] | None = None) QueuedTaskRecord[source]

Persist a queued task.

async get_task(task_id: UUID) QueuedTaskRecord | None[source]

Return a queued task by ID.

async get_task_by_key(key: str) QueuedTaskRecord | None[source]

Return a queued task by deduplication key.

async list_pending(*, limit: int = 1, queue: str | None = None, execution_backend: str | None = None) list[QueuedTaskRecord][source]

Return due pending or scheduled tasks ordered for execution.

async claim_task(task_id: UUID) QueuedTaskRecord | None[source]

Atomically claim a pending task.

async claim_next(*, queue: str | None = None, execution_backend: str | None = None) QueuedTaskRecord | None[source]

Claim the next due task.

Returns:

The claimed task record, if one was available.

async complete_task(task_id: UUID, *, result: Any = None, expected_retry_count: int | None = None) QueuedTaskRecord | None[source]

Mark a task as completed.

Parameters:
  • task_id – Queue record identifier.

  • result – Task result payload.

  • expected_retry_count – When provided, update only if the record is still running with this retry count.

async fail_task(task_id: UUID, error: str, *, retry: bool = True, expected_retry_count: int | None = None) QueuedTaskRecord | None[source]

Mark a task as failed or retry it.

Parameters:
  • task_id – Queue record identifier.

  • error – Error message to persist.

  • retry – Whether retry policy may requeue the task.

  • expected_retry_count – When provided, update only if the record is still running with this retry count.

async cancel_task(task_id: UUID) bool[source]

Cancel a task if it has not started.

async touch_heartbeat(task_id: UUID, *, expected_retry_count: int | None = None) bool[source]

Update the heartbeat timestamp for a running task.

Returns:

True when a running record matched the optional retry-count fence.

async null_heartbeats(task_ids: list[UUID], *, expected_retry_count: int | None = None) None[source]

Clear heartbeat timestamps for task IDs.

Parameters:
  • task_ids – Queue record identifiers.

  • expected_retry_count – When provided, clear only records that still match this retry count.

async requeue_stale_running(*, stale_after: timedelta) StaleTaskRecoveryResult[source]

Recover running tasks with stale heartbeats.

Returns:

Summary of requeued, failed, skipped, and handler-needed records.

async set_execution_ref(task_id: UUID, execution_backend: str, execution_ref: str, *, execution_profile: str | None = None) QueuedTaskRecord | None[source]

Persist an external execution reference for a running task.

Returns:

The updated queued task record, if one exists.

async set_execution_backend(task_id: UUID, execution_backend: str, *, execution_profile: str | None = None) QueuedTaskRecord | None[source]

Persist an execution backend/profile change for a queued task.

Returns:

The updated queued task record, if one exists.

async list_running_external(*, limit: int | None = None) list[QueuedTaskRecord][source]

Return externally dispatched tasks with references to reconcile.

async get_statistics() QueueStatistics[source]

Return queue status counts.

async list_completed_by_task(task_name: str, *, since: datetime | None = None, limit: int = 10) list[QueuedTaskRecord][source]

Return recent completed records for a task name.

async cleanup_terminal(before: datetime) int[source]

Delete terminal records completed before a cutoff.

Returns:

The number of deleted records.

Advanced Alchemy backend configuration.

class litestar_queues.backends.advanced_alchemy.config.AdvancedAlchemyBackendConfig(sqlalchemy_config: SQLAlchemyAsyncConfig | None = None, heartbeat_session_maker: async_sessionmaker[AsyncSession] | None = None, model_class: type[Any] | None = <factory>, create_schema: bool = False)[source]

Bases: object

Configuration values for the Advanced Alchemy queue backend.

backend_name: ClassVar[str] = 'advanced-alchemy'
sqlalchemy_config: SQLAlchemyAsyncConfig | None
heartbeat_session_maker: async_sessionmaker[AsyncSession] | None
model_class: type[Any] | None
create_schema: bool
__init__(sqlalchemy_config: SQLAlchemyAsyncConfig | None = None, heartbeat_session_maker: async_sessionmaker[AsyncSession] | None = None, model_class: type[Any] | None = <factory>, create_schema: bool = False) None

Advanced Alchemy queue task mixins.

class litestar_queues.backends.advanced_alchemy.mixins.QueueTaskModelMixin[source]

Bases: object

Declarative mixin carrying queue task columns and indexes.

Compose this with an application-owned Advanced Alchemy base that provides compatible id and created_at columns.

task_name = <sqlalchemy.orm.properties.MappedColumn object>
args_json = <sqlalchemy.orm.properties.MappedColumn object>
kwargs_json = <sqlalchemy.orm.properties.MappedColumn object>
queue = <sqlalchemy.orm.properties.MappedColumn object>
execution_backend = <sqlalchemy.orm.properties.MappedColumn object>
execution_profile = <sqlalchemy.orm.properties.MappedColumn object>
execution_ref = <sqlalchemy.orm.properties.MappedColumn object>
status = <sqlalchemy.orm.properties.MappedColumn object>
priority = <sqlalchemy.orm.properties.MappedColumn object>
max_retries = <sqlalchemy.orm.properties.MappedColumn object>
retry_count = <sqlalchemy.orm.properties.MappedColumn object>
scheduled_at = <sqlalchemy.orm.properties.MappedColumn object>
started_at = <sqlalchemy.orm.properties.MappedColumn object>
completed_at = <sqlalchemy.orm.properties.MappedColumn object>
heartbeat_at = <sqlalchemy.orm.properties.MappedColumn object>
result_json = <sqlalchemy.orm.properties.MappedColumn object>
error = <sqlalchemy.orm.properties.MappedColumn object>
task_key = <sqlalchemy.orm.properties.MappedColumn object>
metadata_json = <sqlalchemy.orm.properties.MappedColumn object>

Advanced Alchemy queue task repository.

class litestar_queues.backends.advanced_alchemy.repository.QueueTaskRepository(*, statement: ~sqlalchemy.sql.selectable.Select | None = None, session: ~sqlalchemy.ext.asyncio.session.AsyncSession | ~sqlalchemy.ext.asyncio.scoping.async_scoped_session[~sqlalchemy.ext.asyncio.session.AsyncSession], auto_expunge: bool = False, auto_refresh: bool = True, auto_commit: bool = False, order_by: ~typing.List[tuple[str | ~sqlalchemy.orm.attributes.InstrumentedAttribute[~typing.Any], bool] | ~sqlalchemy.sql.elements.UnaryExpression[~typing.Any]] | tuple[str | ~sqlalchemy.orm.attributes.InstrumentedAttribute[~typing.Any], bool] | ~sqlalchemy.sql.elements.UnaryExpression[~typing.Any] | None = None, error_messages: ~advanced_alchemy.exceptions.ErrorMessages | type[~advanced_alchemy.utils.dataclass.Empty] | None = <class 'advanced_alchemy.utils.dataclass.Empty'>, load: ~collections.abc.Sequence[~sqlalchemy.orm.strategy_options._AbstractLoad | ~typing.Literal['*'] | ~sqlalchemy.orm.attributes.InstrumentedAttribute[~typing.Any] | ~sqlalchemy.orm.relationships.RelationshipProperty[~typing.Any] | ~sqlalchemy.orm.interfaces.MapperProperty[~typing.Any] | ~collections.abc.Sequence[~sqlalchemy.orm.strategy_options._AbstractLoad | ~typing.Literal['*'] | ~sqlalchemy.orm.attributes.InstrumentedAttribute[~typing.Any] | ~sqlalchemy.orm.relationships.RelationshipProperty[~typing.Any] | ~sqlalchemy.orm.interfaces.MapperProperty[~typing.Any]]] | ~sqlalchemy.orm.strategy_options._AbstractLoad | ~typing.Literal['*'] | ~sqlalchemy.orm.attributes.InstrumentedAttribute[~typing.Any] | ~sqlalchemy.orm.relationships.RelationshipProperty[~typing.Any] | ~sqlalchemy.orm.interfaces.MapperProperty[~typing.Any] | ~sqlalchemy.sql.base.ExecutableOption | ~collections.abc.Sequence[~sqlalchemy.sql.base.ExecutableOption] | None = None, execution_options: dict[str, ~typing.Any] | None = None, wrap_exceptions: bool = True, uniquify: bool | None = None, count_with_window_function: bool | None = None, cache_manager: CacheManager | None = None, bind_group: str | None = None, **kwargs: ~typing.Any)[source]

Bases: SQLAlchemyAsyncRepository[Any]

Repository for queue task records.

classmethod for_model(model_class: type[Any]) type[QueueTaskRepository][source]

Return a repository subclass bound to model_class.

Advanced Alchemy queue persistence service.

class litestar_queues.backends.advanced_alchemy.service.QueueTaskService(session: ~sqlalchemy.ext.asyncio.session.AsyncSession | ~sqlalchemy.ext.asyncio.scoping.async_scoped_session[~sqlalchemy.ext.asyncio.session.AsyncSession], *, statement: ~sqlalchemy.sql.selectable.Select | None = None, auto_expunge: bool = False, auto_refresh: bool = True, auto_commit: bool = False, order_by: ~typing.List[tuple[str | ~sqlalchemy.orm.attributes.InstrumentedAttribute[~typing.Any], bool] | ~sqlalchemy.sql.elements.UnaryExpression[~typing.Any]] | tuple[str | ~sqlalchemy.orm.attributes.InstrumentedAttribute[~typing.Any], bool] | ~sqlalchemy.sql.elements.UnaryExpression[~typing.Any] | None = None, error_messages: ~advanced_alchemy.exceptions.ErrorMessages | type[~advanced_alchemy.utils.dataclass.Empty] | None = <class 'advanced_alchemy.utils.dataclass.Empty'>, wrap_exceptions: bool = True, load: ~collections.abc.Sequence[~sqlalchemy.orm.strategy_options._AbstractLoad | ~typing.Literal['*'] | ~sqlalchemy.orm.attributes.InstrumentedAttribute[~typing.Any] | ~sqlalchemy.orm.relationships.RelationshipProperty[~typing.Any] | ~sqlalchemy.orm.interfaces.MapperProperty[~typing.Any] | ~collections.abc.Sequence[~sqlalchemy.orm.strategy_options._AbstractLoad | ~typing.Literal['*'] | ~sqlalchemy.orm.attributes.InstrumentedAttribute[~typing.Any] | ~sqlalchemy.orm.relationships.RelationshipProperty[~typing.Any] | ~sqlalchemy.orm.interfaces.MapperProperty[~typing.Any]]] | ~sqlalchemy.orm.strategy_options._AbstractLoad | ~typing.Literal['*'] | ~sqlalchemy.orm.attributes.InstrumentedAttribute[~typing.Any] | ~sqlalchemy.orm.relationships.RelationshipProperty[~typing.Any] | ~sqlalchemy.orm.interfaces.MapperProperty[~typing.Any] | ~sqlalchemy.sql.base.ExecutableOption | ~collections.abc.Sequence[~sqlalchemy.sql.base.ExecutableOption] | None = None, execution_options: dict[str, ~typing.Any] | None = None, uniquify: bool | None = None, count_with_window_function: bool | None = None, **repo_kwargs: ~typing.Any)[source]

Bases: SQLAlchemyAsyncRepositoryService[Any, Any]

Persistence operations for Advanced Alchemy queue records.

classmethod for_model(model_class: type[Any]) type[QueueTaskService][source]

Return a service subclass bound to model_class.

async enqueue(task_name: str, *, args: tuple[Any, ...], kwargs: dict[str, Any], queue: str, priority: int, max_retries: int, scheduled_at: datetime | None, key: str | None, execution_backend: str, execution_profile: str | None, metadata: dict[str, Any]) QueuedTaskRecord[source]
async get_task(task_id: UUID) QueuedTaskRecord | None[source]
async get_task_by_key(key: str) QueuedTaskRecord | None[source]
async list_pending(*, limit: int, queue: str | None, execution_backend: str | None) list[QueuedTaskRecord][source]
async claim_task(task_id: UUID) QueuedTaskRecord | None[source]
async claim_next(*, queue: str | None, execution_backend: str | None) QueuedTaskRecord | None[source]
async complete_task(task_id: UUID, *, result: Any = None, expected_retry_count: int | None = None) QueuedTaskRecord | None[source]
async fail_task(task_id: UUID, error: str, *, retry: bool, expected_retry_count: int | None = None) QueuedTaskRecord | None[source]
async cancel_task(task_id: UUID) bool[source]
async touch_heartbeat(task_id: UUID, *, expected_retry_count: int | None = None) bool[source]
async null_heartbeats(task_ids: list[UUID], *, expected_retry_count: int | None = None) None[source]
async requeue_stale_running(*, stale_after: timedelta) StaleTaskRecoveryResult[source]
async set_execution_ref(task_id: UUID, execution_backend: str, execution_ref: str, *, execution_profile: str | None) QueuedTaskRecord | None[source]
async set_execution_backend(task_id: UUID, execution_backend: str, *, execution_profile: str | None) QueuedTaskRecord | None[source]
async list_running_external(*, limit: int | None = None) list[QueuedTaskRecord][source]
async get_statistics() QueueStatistics[source]
async list_completed_by_task(task_name: str, *, since: datetime | None, limit: int) list[QueuedTaskRecord][source]
async cleanup_terminal(before: datetime) int[source]
model_from_record(record: QueuedTaskRecord) Any[source]

Convert a backend-neutral record into an Advanced Alchemy model.

Returns:

The Advanced Alchemy queue task model.

static record_from_model(model: Any) QueuedTaskRecord[source]

Convert an Advanced Alchemy model into a backend-neutral record.

Returns:

The backend-neutral queued task record.

Redis

Redis queue backend.

Stores queued task records in a Redis-protocol key-value server. The implementation lives directly on RedisQueueBackend; the Valkey backend inherits from this class and only swaps the client factory and _backend_name ClassVar.

class litestar_queues.backends.redis.backend.RedisQueueBackend(config: QueueConfig | None = None, *, backend_config: _RedisBackendConfig | None = None)[source]

Bases: BaseQueueBackend

Queue backend that stores records in a Redis-protocol key-value server.

__init__(config: QueueConfig | None = None, *, backend_config: _RedisBackendConfig | None = None) None[source]

Initialize the queue backend.

property capabilities: QueueBackendCapabilities

Backend behavior capabilities.

async open() bool[source]

Open Redis-protocol client resources.

Returns:

True when the client is ready.

async close() None[source]

Close owned Redis-protocol client resources.

async enqueue(task_name: str, *, args: tuple[Any, ...] = (), kwargs: dict[str, Any] | None = None, queue: str = 'default', priority: int = 0, max_retries: int = 0, scheduled_at: datetime | None = None, key: str | None = None, execution_backend: str = 'local', execution_profile: str | None = None, metadata: dict[str, Any] | None = None) QueuedTaskRecord[source]

Persist a queued task.

Returns:

The created or deduplicated queued task record.

async get_task(task_id: UUID) QueuedTaskRecord | None[source]

Return a queued task by ID.

async get_task_by_key(key: str) QueuedTaskRecord | None[source]

Return a queued task by deduplication key.

async list_pending(*, limit: int = 1, queue: str | None = None, execution_backend: str | None = None) list[QueuedTaskRecord][source]

Return due pending or scheduled tasks ordered for execution.

async claim_task(task_id: UUID) QueuedTaskRecord | None[source]

Atomically claim a pending task.

Returns:

The claimed record, if it was still due and claimable.

async complete_task(task_id: UUID, *, result: Any = None, expected_retry_count: int | None = None) QueuedTaskRecord | None[source]

Mark a task as completed.

Returns:

The completed record, if it exists.

async fail_task(task_id: UUID, error: str, *, retry: bool = True, expected_retry_count: int | None = None) QueuedTaskRecord | None[source]

Mark a task as failed or retry it.

Returns:

The updated record, if it exists.

async cancel_task(task_id: UUID) bool[source]

Cancel a task if it has not started.

Returns:

True when the task was cancelled.

async touch_heartbeat(task_id: UUID, *, expected_retry_count: int | None = None) bool[source]

Update the heartbeat timestamp for a running task.

Returns:

True when the heartbeat was updated.

async null_heartbeats(task_ids: list[UUID], *, expected_retry_count: int | None = None) None[source]

Clear heartbeat timestamps for task IDs.

async requeue_stale_running(*, stale_after: timedelta) StaleTaskRecoveryResult[source]

Requeue running tasks with stale heartbeats.

Returns:

Summary of recovered records.

async set_execution_ref(task_id: UUID, execution_backend: str, execution_ref: str, *, execution_profile: str | None = None) QueuedTaskRecord | None[source]

Persist an external execution reference for a running task.

Returns:

The updated record, if it exists.

async set_execution_backend(task_id: UUID, execution_backend: str, *, execution_profile: str | None = None) QueuedTaskRecord | None[source]

Persist an execution backend/profile change for a queued task.

Returns:

The updated record, if it exists.

async list_running_external(*, limit: int | None = None) list[QueuedTaskRecord][source]

Return externally dispatched tasks with references to reconcile.

async get_statistics() QueueStatistics[source]

Return queue status counts.

async list_completed_by_task(task_name: str, *, since: datetime | None = None, limit: int = 10) list[QueuedTaskRecord][source]

Return recent completed records for a task name.

async cleanup_terminal(before: datetime) int[source]

Delete terminal records completed before a cutoff.

Returns:

Number of deleted records.

async notify_new_task(record: QueuedTaskRecord) None[source]

Publish a Redis-protocol pub/sub message when work is available.

async wait_for_notifications(timeout: float | None = None) bool[source]

Wait for a Redis-protocol pub/sub message when notifications are enabled.

Returns:

True when a notification was observed.

Redis queue backend configuration.

class litestar_queues.backends.redis.config.RedisBackendConfig(url: str = 'redis://localhost:6379/0', key_prefix: str = 'litestar_queues', notifications: bool = True, notification_channel: str = 'litestar_queues:queue_notifications', lock_timeout: float = 5.0, poll_interval: float = 0.1, client: Any | None = None)[source]

Bases: object

Configuration for the Redis queue backend.

backend_name: ClassVar[str] = 'redis'
url: str
key_prefix: str
notifications: bool
notification_channel: str
lock_timeout: float
poll_interval: float
client: Any | None
__init__(url: str = 'redis://localhost:6379/0', key_prefix: str = 'litestar_queues', notifications: bool = True, notification_channel: str = 'litestar_queues:queue_notifications', lock_timeout: float = 5.0, poll_interval: float = 0.1, client: Any | None = None) None

Valkey

Valkey queue backend.

The Valkey wire protocol is API-compatible with Redis, so this backend inherits the full RedisQueueBackend implementation and only overrides the client factory (uses valkey.asyncio instead of redis.asyncio) plus the _backend_name ClassVar that drives lock-name error messages and the valkey-pubsub notification capability label.

class litestar_queues.backends.valkey.backend.ValkeyQueueBackend(config: QueueConfig | None = None, *, backend_config: ValkeyBackendConfig | None = None)[source]

Bases: RedisQueueBackend

Valkey-backed queue backend.

__init__(config: QueueConfig | None = None, *, backend_config: ValkeyBackendConfig | None = None) None[source]

Initialize the queue backend.

Valkey queue backend configuration.

class litestar_queues.backends.valkey.config.ValkeyBackendConfig(url: str = 'redis://localhost:6379/0', key_prefix: str = 'litestar_queues', notifications: bool = True, notification_channel: str = 'litestar_queues:queue_notifications', lock_timeout: float = 5.0, poll_interval: float = 0.1, client: Any | None = None)[source]

Bases: object

Configuration for the Valkey queue backend.

backend_name: ClassVar[str] = 'valkey'
url: str
key_prefix: str
notifications: bool
notification_channel: str
lock_timeout: float
poll_interval: float
client: Any | None
__init__(url: str = 'redis://localhost:6379/0', key_prefix: str = 'litestar_queues', notifications: bool = True, notification_channel: str = 'litestar_queues:queue_notifications', lock_timeout: float = 5.0, poll_interval: float = 0.1, client: Any | None = None) None