Backends¶
Litestar Queues separates queue backends from execution backends.
Queue backends persist task state. The core package registers the memory
backend for tests, local development, and in-process workers. The sqlspec
backend is available when the SQLSpec extra is installed. Additional optional
extras provide Advanced Alchemy, Redis, and Valkey integrations.
Execution backends decide where claimed tasks run. The core package registers
immediate for inline execution and local for in-process worker
execution. The optional cloudrun extra dispatches records to Cloud Run Jobs.
Install optional extras only when an application needs them:
pip install litestar-queues[sqlspec]
pip install litestar-queues[advanced-alchemy]
pip install litestar-queues[redis]
pip install litestar-queues[valkey]
pip install litestar-queues[cloudrun]
The core package import does not require optional queue or execution client libraries.
SQLSpec¶
Install the SQLSpec extra when a queue needs SQL-backed persistence:
pip install litestar-queues[sqlspec]
Configure SQLSpec queue persistence by passing a SQLSpec adapter config through
SQLSpecBackendConfig:
from sqlspec.adapters.aiosqlite import AiosqliteConfig
from litestar_queues import QueueConfig
from litestar_queues.backends.sqlspec import SQLSpecBackendConfig
config = QueueConfig(
queue_backend=SQLSpecBackendConfig(
config=AiosqliteConfig(
connection_config={"database": "queue.db"},
),
),
execution_backend="local",
)
By default, the backend creates the queue table on startup. Set
create_schema=False in SQLSpecBackendConfig when schema management is
handled elsewhere. Applications that want SQLSpec to apply the packaged queue
migration can set run_migrations=True:
config = QueueConfig(
queue_backend=SQLSpecBackendConfig(
config=AiosqliteConfig(
connection_config={"database": "queue.db"},
),
create_schema=False,
run_migrations=True,
),
execution_backend="local",
)
Litestar applications that use SQLSpec sessions should register SQLSpec’s
first-party plugin directly and pass the same SQLSpec instance and adapter
config to the queue backend:
from litestar import Litestar
from sqlspec import SQLSpec
from sqlspec.adapters.aiosqlite import AiosqliteConfig
from sqlspec.extensions.litestar import SQLSpecPlugin
from litestar_queues import QueueConfig, QueuePlugin
from litestar_queues.backends.sqlspec import SQLSpecBackendConfig
sqlspec = SQLSpec()
sqlspec_config = AiosqliteConfig(connection_config={"database": "queue.db"})
sqlspec.add_config(sqlspec_config)
app = Litestar(
plugins=[
SQLSpecPlugin(sqlspec),
QueuePlugin(
QueueConfig(
queue_backend=SQLSpecBackendConfig(
sqlspec=sqlspec,
config=sqlspec_config,
),
execution_backend="local",
)
)
],
)
SQLSpec persists task arguments, keyword arguments, metadata, and results using
SQLSpec’s serializer. Packaged migrations are registered with SQLSpec’s
extension runner as ext_litestar_queues_0001. Applications with their own
migration flow can set both create_schema=False and run_migrations=False.
Heartbeat Pool Isolation¶
Workers issue a heartbeat write every QueueConfig.worker_heartbeat_interval
seconds for every running task. At high worker_max_concurrency those writes
share the main pool with task fetch, claim, and lifecycle UPDATEs, and on
network databases (AsyncPG, AioMySQL) heartbeats can stall behind queue work
and miss the stale-recovery window.
Set heartbeat_pool_config to a second SQLSpec adapter config so heartbeat
writes run on a small dedicated pool:
from sqlspec.adapters.asyncpg import AsyncpgConfig
from litestar_queues import QueueConfig
from litestar_queues.backends.sqlspec import SQLSpecBackendConfig
queue_url = "postgresql://queue@db/queues"
main_config = AsyncpgConfig(
pool_config={"dsn": queue_url, "min_size": 4, "max_size": 16},
)
heartbeat_config = AsyncpgConfig(
pool_config={"dsn": queue_url, "min_size": 1, "max_size": 2},
)
config = QueueConfig(
queue_backend=SQLSpecBackendConfig(
config=main_config,
heartbeat_pool_config=heartbeat_config,
),
execution_backend="local",
worker_max_concurrency=32,
)
The dedicated config MUST point at the same database as the main config; the
backend only uses it for touch_heartbeat and null_heartbeats. Lifecycle
writes that touch heartbeat_at alongside other columns (claim_task,
complete_task, fail_task, requeue_stale_running) stay on the main
pool. Recommended sizing: max_size=2 for AsyncPG / AioMySQL,
max_size=1 for AioSQLite. The dedicated pool’s connections add to the
application’s total database connection budget.
When heartbeat_pool_config is None (the default), heartbeat writes
share the main pool exactly as before. If the dedicated pool fails to register
or open, the backend logs a single warning and falls back to the main pool for
the lifetime of the backend.
SQLSpec Store Selection¶
The SQLSpec queue backend selects stores by SQLSpec adapter configuration, not by directly importing database drivers. This keeps driver dependencies optional and lets each store use database-specific DDL, column types, JSON behavior, and claim/update statements where the database supports them.
SQLSpec adapter |
Queue store |
Notes |
|---|---|---|
|
|
Async MySQL behavior. |
|
|
Async SQLite behavior. |
|
|
Async MySQL behavior. |
|
|
PostgreSQL behavior. |
|
|
CockroachDB behavior on the asyncpg driver. |
|
|
DuckDB-specific DDL and JSON behavior. |
|
|
Async and sync variants are selected from the SQLSpec config type. |
|
|
Sync MySQL behavior. |
|
|
Uses Oracle-specific DDL and JSON column choices. |
|
|
PostgreSQL behavior. |
|
|
Async and sync variants are selected from the SQLSpec config type. |
|
|
CockroachDB behavior on psycopg; async and sync variants are selected from the SQLSpec config type. |
|
|
Sync SQLite behavior. |
Unsupported SQLSpec adapters raise QueueConfigurationError. Applications
should install the SQLSpec adapter driver they configure; Litestar Queues does
not install every SQLSpec driver as a package dependency.
SQLSpec Capability Matrix¶
SQLSpec adapters use the strongest queue primitive their configured driver advertises, then fall back to the portable path when a capability is absent.
Adapter family |
Claim strategy |
JSON storage and codec |
Bulk insert |
Notifications |
Notes |
|---|---|---|---|---|---|
|
Optimistic compare-and-swap. |
|
Native Arrow |
Polling unless an explicit SQLSpec table queue is configured. |
SQLite serializes writes, so the portable path is the concurrency guard. |
|
Optimistic compare-and-swap. |
|
Arrow |
Polling. |
Positional Arrow ingest is contract-tested so column order matches the table definition. |
|
|
|
Arrow |
|
Stale-recovery statement batches use SQLSpec |
|
Optimistic compare-and-swap. |
|
Arrow |
Polling. |
CockroachDB keeps PostgreSQL-compatible DDL, but the queue backend
stays on the portable claim path instead of relying on |
|
|
|
Arrow |
SQLSpec durable table queue. |
PostgreSQL storage parameters tune queue-table churn where supported. |
|
|
MySQL |
Arrow |
Polling. |
Index prefixes keep InnoDB key length within portable bounds. |
|
|
MySQL |
Arrow |
Polling. |
Sync MySQL behavior uses the same InnoDB prefix guard. |
|
|
Version-aware |
Arrow |
Polling by default; explicit |
Oracle object names are kept within the adapter’s identifier limits; Oracle 23ai can pipeline stale-recovery statement batches. |
Additional SQLSpec adapters can be added by implementing a queue store and registering it with the SQLSpec store factory.
Advanced Alchemy¶
Install the Advanced Alchemy extra when a queue should persist task state using Advanced Alchemy and SQLAlchemy:
pip install litestar-queues[advanced-alchemy]
Configure the queue backend with SQLAlchemyAsyncConfig:
from advanced_alchemy.extensions.litestar import SQLAlchemyAsyncConfig
from litestar_queues import QueueConfig
from litestar_queues.backends.advanced_alchemy import AdvancedAlchemyBackendConfig
alchemy_config = SQLAlchemyAsyncConfig(
connection_string="sqlite+aiosqlite:///queue.db",
)
config = QueueConfig(
queue_backend=AdvancedAlchemyBackendConfig(
sqlalchemy_config=alchemy_config,
create_schema=True,
),
execution_backend="local",
)
Queue operations use one queue table by default: litestar_queue_task.
They run through fresh operation-scoped sessions opened from
sqlalchemy_config and commit or roll back queue mutations explicitly.
Litestar applications should register Advanced Alchemy’s first-party plugin directly and pass the same config to the queue backend:
from advanced_alchemy.base import UUIDAuditBase
from advanced_alchemy.extensions.litestar import SQLAlchemyAsyncConfig, SQLAlchemyPlugin
from litestar import Litestar
from litestar_queues import QueueConfig, QueuePlugin
from litestar_queues.backends.advanced_alchemy import AdvancedAlchemyBackendConfig, QueueTaskModelMixin
class AppQueueTask(UUIDAuditBase, QueueTaskModelMixin):
__tablename__ = "app_queue_task"
alchemy_config = SQLAlchemyAsyncConfig(
connection_string="sqlite+aiosqlite:///queue.db",
)
app = Litestar(
plugins=[
SQLAlchemyPlugin(config=alchemy_config),
QueuePlugin(
QueueConfig(
queue_backend=AdvancedAlchemyBackendConfig(
sqlalchemy_config=alchemy_config,
model_class=AppQueueTask,
create_schema=True,
),
execution_backend="local",
)
),
],
)
The queue plugin does not append SQLAlchemyPlugin or consume request-scoped
db_session dependencies. Applications that manage schema with Alembic should
import the queue model they use into their Alembic environment so autogenerate
can include the queue table in the application migration stream. The default
model is QueueTaskModel and uses the litestar_queue_task table. It is
imported when AdvancedAlchemyBackendConfig() is constructed, so app startup
normally puts it on Advanced Alchemy’s base metadata before migration
autogenerate runs.
App-Owned Queue Model¶
Advanced Alchemy support includes a default queue model. Override it when the
application needs its own table name, base class, bind metadata, or Alembic
ownership. Compose QueueTaskModelMixin with an Advanced Alchemy base that
provides compatible id and created_at columns, then pass the resulting
model class to the backend:
from advanced_alchemy.base import UUIDAuditBase
from advanced_alchemy.extensions.litestar import SQLAlchemyAsyncConfig
from litestar_queues import QueueConfig
from litestar_queues.backends.advanced_alchemy import AdvancedAlchemyBackendConfig, QueueTaskModelMixin
class AppQueueTask(UUIDAuditBase, QueueTaskModelMixin):
__tablename__ = "app_queue_task"
alchemy_config = SQLAlchemyAsyncConfig(
connection_string="sqlite+aiosqlite:///queue.db",
)
config = QueueConfig(
queue_backend=AdvancedAlchemyBackendConfig(
sqlalchemy_config=alchemy_config,
model_class=AppQueueTask,
create_schema=True,
),
execution_backend="local",
)
QueueTaskModelMixin carries the queue columns and derives index names from
the composed class’s __tablename__. If your application uses Alembic
autogenerate, import this model in env.py and include its metadata in
target_metadata. Use create_schema only for local bootstrap or tests;
production schema changes should be generated and reviewed in the application
migration stream.
Advanced Alchemy Capability Notes¶
The Advanced Alchemy backend uses native SQLAlchemy and Advanced Alchemy features where the dialect supports them:
Dialect family |
Claim strategy |
JSON storage |
Keyed enqueue |
|---|---|---|---|
PostgreSQL |
|
|
Native |
MySQL / MariaDB |
|
Native JSON through Advanced Alchemy’s |
Native duplicate-key upsert for keyed records. |
Oracle |
|
Oracle JSON/BLOB handling through Advanced Alchemy’s |
Native |
SQLite and other dialects |
Optimistic compare-and-swap. |
Native dialect JSON where SQLAlchemy provides it. |
Portable key-check and insert fallback. |
All paths keep the same public queue semantics: active keyed records deduplicate, terminal keyed records can be replaced, stale recovery returns the affected task IDs, and ownership-fence losses surface as normal queue lifecycle events.
Heartbeat Session Maker Isolation¶
Workers issue a heartbeat write every QueueConfig.worker_heartbeat_interval
seconds for every running task. At high worker_max_concurrency those writes
share the main async SQLAlchemy pool with task fetch, claim, and lifecycle
UPDATEs. On network databases (AsyncPG, AioMySQL) heartbeats can stall behind
queue work and miss the stale-recovery window.
Construct a dedicated async engine and async_sessionmaker, then pass it
through heartbeat_session_maker:
from advanced_alchemy.extensions.litestar import SQLAlchemyAsyncConfig
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from litestar_queues import QueueConfig
from litestar_queues.backends.advanced_alchemy import AdvancedAlchemyBackendConfig
from myapp.models import AppQueueTask
queue_url = "postgresql+asyncpg://queue@db/queues"
main_config = SQLAlchemyAsyncConfig(connection_string=queue_url)
heartbeat_engine = create_async_engine(queue_url, pool_size=1, max_overflow=1)
heartbeat_maker = async_sessionmaker(heartbeat_engine, expire_on_commit=False)
config = QueueConfig(
queue_backend=AdvancedAlchemyBackendConfig(
sqlalchemy_config=main_config,
model_class=AppQueueTask,
heartbeat_session_maker=heartbeat_maker,
),
execution_backend="local",
worker_max_concurrency=32,
)
The dedicated engine MUST point at the same database as the main config. The
backend uses it only for touch_heartbeat and null_heartbeats; lifecycle
UPDATEs that touch heartbeat_at alongside other columns (claim_task,
complete_task, fail_task, requeue_stale_running) stay on the main
session for transactional correctness. Recommended sizing:
pool_size=1, max_overflow=1 for AsyncPG / AioMySQL, pool_size=1 for
aiosqlite (SQLite serializes writes anyway). The dedicated engine’s
connections add to the application’s total database connection budget.
The adopter owns the dedicated engine’s lifecycle. backend.close() does
NOT dispose the heartbeat engine — call engine.dispose() from the
application shutdown hook that owns the engine. Unlike the SQLSpec backend
(which registers its dedicated config on open()), the Advanced Alchemy
backend never constructs or owns the heartbeat engine; any construction error
surfaces from the first touch_heartbeat() call.
When heartbeat_session_maker is None (the default), heartbeat writes
share the main session exactly as before.
Redis¶
Install the Redis extra when a queue should persist task state in Redis:
pip install litestar-queues[redis]
Configure the queue backend with a Redis URL or pass an already configured async Redis client:
from litestar_queues import QueueConfig
from litestar_queues.backends.redis import RedisBackendConfig
config = QueueConfig(
queue_backend=RedisBackendConfig(
url="redis://localhost:6379/0",
key_prefix="litestar_queues",
notifications=True,
),
execution_backend="local",
)
Redis queue records are stored in hashes under the configured key prefix. The
backend keeps an ID set for operational queries, a key hash for deduplication,
a sorted set for delayed scheduling, and short-lived SET NX locks around
claim and key-replacement mutations. Task arguments, keyword arguments,
metadata, and results must be JSON serializable.
Redis pub/sub is used only as a worker wakeup mechanism. Notifications are not durable; workers that miss a message fall back to polling.
Valkey¶
Install the Valkey extra when a queue should use Valkey’s asyncio client:
pip install litestar-queues[valkey]
Configure Valkey with the same queue backend settings:
from litestar_queues import QueueConfig
from litestar_queues.backends.valkey import ValkeyBackendConfig
config = QueueConfig(
queue_backend=ValkeyBackendConfig(
url="redis://localhost:6379/0",
key_prefix="litestar_queues",
notifications=True,
),
execution_backend="local",
)
Valkey follows the same queue lifecycle contract as Redis: active key deduplication, terminal key replacement, delayed scheduling, atomic claim via backend locks, retries, heartbeats, stale running recovery, result lookup, stats, cleanup, and optional pub/sub worker wakeups.
Cloud Run¶
Install the Cloud Run extra when tasks should execute in Cloud Run Jobs:
pip install litestar-queues[cloudrun]
Configure execution with generic package settings. Queue persistence remains app-owned and can use any queue backend that supports execution references:
from litestar_queues import QueueConfig, task
from litestar_queues.backends.sqlspec import SQLSpecBackendConfig
from litestar_queues.execution.cloudrun import CloudRunExecutionConfig
@task("reports.render", execution_backend="cloudrun", execution_profile="heavy")
async def render_report(report_id: str) -> None:
...
config = QueueConfig(
queue_backend=SQLSpecBackendConfig(config=...),
execution_backend=CloudRunExecutionConfig(
project_id="example-project",
region="us-central1",
job_name="queue-worker",
profiles={"heavy": "queue-worker-heavy"},
),
)
The dispatch worker stores the Cloud Run execution name on the queue record and
does not claim the task locally. The Cloud Run container should run
litestar-queues-cloudrun-worker or
python -m litestar_queues.execution.cloudrun.entrypoint. The entry point
reads LITESTAR_QUEUES_TASK_ID, loads configured task modules, claims the
persisted record, updates heartbeats, executes the task through the shared queue
service, and returns deterministic process exit codes.
If the Cloud Run API call fails before a remote execution owns the task, the
backend can move the record to a fallback execution backend such as local.
Status checks that fail transiently are treated as still running so
reconciliation does not create false terminal states.
SQLSpec Event Notifications¶
The SQLSpec backend falls back to worker polling unless notifications are
configured. To wake workers through SQLSpec Events, configure the SQLSpec
events extension and enable queue notifications:
from sqlspec.adapters.aiosqlite import AiosqliteConfig
from litestar_queues import QueueConfig
from litestar_queues.backends.sqlspec import SQLSpecBackendConfig
sqlspec_config = AiosqliteConfig(
connection_config={"database": "queue.db"},
extension_config={
"events": {
"backend": "table_queue",
"queue_table": "queue_events",
"poll_interval": 0.1,
}
},
)
config = QueueConfig(
queue_backend=SQLSpecBackendConfig(
config=sqlspec_config,
create_schema=False,
run_migrations=True,
notifications=True,
notification_channel="queue_notifications",
),
execution_backend="local",
)
PostgreSQL SQLSpec adapters can use SQLSpec’s native listen_notify backend;
other adapters can use the durable table_queue backend. Oracle adapters can
use native aq or txeventq transports when the database user has AQ
privileges and the target queues are provisioned. These Oracle transports stay
explicit because queue provisioning is DBA-owned:
from sqlspec.adapters.oracledb import OracleAsyncConfig
oracle_config = OracleAsyncConfig(
connection_config={
"host": "db.example.com",
"port": 1521,
"service_name": "FREEPDB1",
"user": "queue_app",
"password": "...",
"min": 1,
"max": 5,
},
extension_config={
"events": {
"backend": "txeventq",
"aq_queue": "LQ_EVENTS_TXQ",
}
},
)
config = QueueConfig(
queue_backend=SQLSpecBackendConfig(
config=oracle_config,
notifications=True,
notify_transport="txeventq",
event_settings={"aq_queue": "LQ_EVENTS_TXQ"},
),
execution_backend="local",
)
Queue notification channel names must be valid SQLSpec event identifiers.
SQLSpec Observability¶
The SQLSpec backend uses SQLSpec’s ObservabilityRuntime for queue-domain
counters and spans. SQL statements executed by the backend already flow through
SQLSpec driver spans and statement observers, so query spans inherit SQLSpec
correlation context automatically.
Queue counters are recorded with these reserved names:
Metric |
Meaning |
|---|---|
|
Queue records inserted by |
|
Records successfully claimed for execution. |
|
Records completed successfully. |
|
Records moved to terminal failure by |
|
Records requeued for another attempt. |
|
Stale running records handled by stale recovery. |
|
Worker wakeup notifications published through SQLSpec Events. |
|
Fenced completion or failure attempts rejected after ownership changed. |
|
Stale records moved to terminal failure. |
Counters are available from SQLSpec diagnostics:
runtime = sqlspec_config.get_observability_runtime()
queue_metrics = runtime.metrics_snapshot()
Set queue_observability=False on SQLSpecBackendConfig to disable only the
queue-domain counters and custom queue spans. SQLSpec driver query spans and
statement observers remain controlled by the SQLSpec config.
OpenTelemetry tracing and Prometheus statement metrics are enabled through SQLSpec’s optional extensions:
from sqlspec import SQLSpec
from sqlspec.adapters.asyncpg import AsyncpgConfig
from sqlspec.extensions.otel import enable_tracing
from sqlspec.extensions.prometheus import enable_metrics
from litestar_queues import QueueConfig
from litestar_queues.backends.sqlspec import SQLSpecBackendConfig
observability = enable_tracing(
resource_attributes={"service.name": "queue-worker"},
)
observability = enable_metrics(base_config=observability)
sqlspec = SQLSpec(observability_config=observability)
sqlspec_config = AsyncpgConfig(
pool_config={"dsn": "postgresql://queue@db/queues"},
)
sqlspec.add_config(sqlspec_config)
config = QueueConfig(
queue_backend=SQLSpecBackendConfig(
sqlspec=sqlspec,
config=sqlspec_config,
),
execution_backend="local",
)
Pool, connection, session, and query lifecycle hooks remain SQLSpec-owned. Attach them to the same runtime when you need adapter-level lifecycle events:
def record_query_complete(context: dict[str, object]) -> None:
...
runtime = sqlspec_config.get_observability_runtime()
runtime.register_lifecycle_hook("on_query_complete", record_query_complete)
SQLSpec’s Prometheus helper records bounded statement metrics such as
sqlspec_driver_query_total and sqlspec_driver_query_duration_seconds.
Applications that want the queue-domain counters in Prometheus can expose the
metrics_snapshot() values through their own metrics bridge.