Workers¶
The local worker claims due records from the configured queue backend and executes them with the configured execution backend.
Plugin Worker¶
Set in_app_worker=True to run an in-app worker with the Litestar
application:
from litestar_queues.backends.sqlspec import SQLSpecBackendConfig
config = QueueConfig(
queue_backend=SQLSpecBackendConfig(config=...),
execution_backend="local",
in_app_worker=True,
worker_batch_size=20,
worker_max_concurrency=4,
)
This is useful for tests, local development, and lightweight deployments. For heavier production workloads, consider running workers as separate processes so web and background capacity can be scaled independently.
Manual Worker¶
Use Worker directly for scripts, process managers, or custom entry points:
from litestar_queues import QueueConfig, QueueService, Worker
async with QueueService(QueueConfig(queue_backend="memory", execution_backend="local")) as service:
worker = Worker(service, batch_size=10, max_concurrency=2)
await worker.start()
run_once() processes one batch and is useful in tests:
processed = await worker.run_once()
Heartbeats and Stale Records¶
Workers update heartbeats while a local task is running and clear heartbeat
values after execution finishes. Backends that support stale recovery can
requeue running records whose heartbeat is older than worker_stale_after.
The worker re-checks stale records every worker_stale_check_interval
seconds (default 60) from inside the poll loop, so a worker that survives
a peer crash will rescue orphaned records without operator intervention. Set
worker_stale_after to None (the default) to disable stale recovery
entirely; the periodic check is skipped in that case.
SQL-backed backends can optionally route heartbeat writes through a dedicated connection so they do not contend with task fetch and lifecycle UPDATEs under high concurrency. See Heartbeat Pool Isolation (SQLSpec) and Heartbeat Session Maker Isolation (Advanced Alchemy).
External Execution¶
External execution backends dispatch work outside the local process and store an
execution reference on the queue record. The worker periodically calls
reconcile_external() so external backends can move records into terminal
queue states after the remote execution completes.
Cloud Run uses this flow: local workers dispatch records to Cloud Run Jobs, and the Cloud Run worker entry point claims the persisted record inside the remote container.
Worker Wakeups¶
Queue backends can implement notifications to wake sleeping workers. These
notifications are only hints. Workers always fall back to polling via
worker_poll_interval when notifications are unavailable or missed.
Worker Identity¶
Each Worker carries a string worker_id used to
tag published QueueEvent envelopes (the workerId field on the wire).
The default is "worker-{os.getpid()}"; operators that run multiple
workers per host, or that need stable identities across hosts where PIDs
may collide, should pass an explicit worker_id to Worker(...):
worker = Worker(service, worker_id="orders-worker-3")
The QueuePlugin startup path uses the PID-based
default. Standalone worker entry points should pass an explicit worker_id
per process.