Execution Backends

Registry

Execution backends are registered through litestar_queues.execution.

litestar_queues.execution.get_execution_backend(backend: ExecutionBackendConfig = 'immediate', config: QueueConfig | None = None) BaseExecutionBackend[source]

Get an instantiated execution backend.

Returns:

A configured execution backend instance.

Raises:

TypeError – If a typed execution config selects a backend class that does not accept execution_config.

litestar_queues.execution.get_execution_backend_class(backend_path: str) type[BaseExecutionBackend][source]

Get an execution backend class by short name or import path.

Returns:

The resolved execution backend class.

Raises:

ValueError – If a short backend name is unknown.

litestar_queues.execution.execution_backend(name: str) Callable[[type[BaseExecutionBackend]], type[BaseExecutionBackend]][source]

Decorator to register an execution backend class with a short name.

Returns:

A decorator that registers the backend class.

litestar_queues.execution.list_execution_backends() list[str][source]

Return registered execution backend names.

Base Backend

class litestar_queues.execution.base.BaseExecutionBackend(config: QueueConfig | None = None)[source]

Bases: object

Base class for queue execution backends.

__init__(config: QueueConfig | None = None) None[source]

Initialize the execution backend.

config
property is_external: bool

Whether this backend dispatches records to another process.

async open() bool[source]

Open execution resources.

Returns:

True when resources are ready.

async close() None[source]

Close execution resources.

async execute(service: QueueService, record: QueuedTaskRecord, *, worker_id: str | None = None) QueuedTaskRecord[source]

Execute a queue record.

async dispatch(service: QueueService, record: QueuedTaskRecord) str | None[source]

Dispatch a queue record to an external executor.

Returns:

The external execution reference, if one was created.

async reconcile(service: QueueService, record: QueuedTaskRecord) QueuedTaskRecord | None[source]

Reconcile an externally running queue record.

Returns:

The updated record when reconciliation changes state.

async cancel(service: QueueService, record: QueuedTaskRecord) bool[source]

Cancel an externally running queue record if possible.

Returns:

True when cancellation succeeds.

Immediate

class litestar_queues.execution.immediate.ImmediateExecutionBackend(config: QueueConfig | None = None)[source]

Bases: BaseExecutionBackend

Execution backend that runs records inline.

async execute(service: QueueService, record: QueuedTaskRecord, *, worker_id: str | None = None) QueuedTaskRecord[source]

Execute a task immediately in the current event loop.

Returns:

The updated queue record.

Local

class litestar_queues.execution.local.LocalExecutionBackend(config: QueueConfig | None = None)[source]

Bases: BaseExecutionBackend

Execution backend for in-process workers.

async execute(service: QueueService, record: QueuedTaskRecord, *, worker_id: str | None = None) QueuedTaskRecord[source]

Execute one claimed task in a local worker.

Returns:

The updated queue record.

Cloud Run

class litestar_queues.execution.cloudrun.config.CloudRunExecutionConfig(project_id: str, region: str = 'us-central1', job_name: str | None = None, profiles: dict[str, str]=<factory>, timeout: int = 300, poll_interval: float = 5.0, env_prefix: str = 'LITESTAR_QUEUES', extra_env: dict[str, str]=<factory>, fallback_execution_backend: str | None = 'local')[source]

Bases: object

Configuration for Cloud Run Jobs execution.

backend_name: ClassVar[str] = 'cloudrun'
project_id: str
region: str
job_name: str | None
profiles: dict[str, str]
timeout: int
poll_interval: float
env_prefix: str
extra_env: dict[str, str]
fallback_execution_backend: str | None
resolve_job_name(profile: str | None = None) str[source]

Return the Cloud Run Job name for a profile.

Returns:

The resolved Cloud Run Job name.

Raises:

QueueConfigurationError – If no job name can be resolved.

env_name(suffix: str) str[source]

Return an environment variable name using the configured prefix.

__init__(project_id: str, region: str = 'us-central1', job_name: str | None = None, profiles: dict[str, str]=<factory>, timeout: int = 300, poll_interval: float = 5.0, env_prefix: str = 'LITESTAR_QUEUES', extra_env: dict[str, str]=<factory>, fallback_execution_backend: str | None = 'local') None
class litestar_queues.execution.cloudrun.backend.CloudRunExecutionBackend(config: QueueConfig | None = None, *, execution_config: CloudRunExecutionConfig | None = None, jobs_client: CloudRunJobsClient | None = None, executions_client: CloudRunExecutionsClient | None = None)[source]

Bases: BaseExecutionBackend

Execution backend that dispatches queued records to Cloud Run Jobs.

__init__(config: QueueConfig | None = None, *, execution_config: CloudRunExecutionConfig | None = None, jobs_client: CloudRunJobsClient | None = None, executions_client: CloudRunExecutionsClient | None = None) None[source]

Initialize the execution backend.

jobs_client
executions_client
property is_external: bool

Whether this backend dispatches records to another process.

property execution_config: CloudRunExecutionConfig

Resolved Cloud Run execution config.

async execute(service: QueueService, record: QueuedTaskRecord, *, worker_id: str | None = None) QueuedTaskRecord[source]

Dispatch a record and return its persisted state.

The worker_id argument is accepted for protocol parity but not forwarded: external dispatch does not run service.execute_record locally, so the remote runner is responsible for its own worker identity binding.

Returns:

The persisted queue record after dispatch.

async dispatch(service: QueueService, record: QueuedTaskRecord) str | None[source]

Dispatch a queue record to Cloud Run Jobs.

Returns:

The Cloud Run execution reference, if dispatch succeeds.

async reconcile(service: QueueService, record: QueuedTaskRecord) QueuedTaskRecord | None[source]

Reconcile a Cloud Run execution with the queue record.

Returns:

The terminal queue record when reconciliation completed it.

async cancel(service: QueueService, record: QueuedTaskRecord) bool[source]

Cloud Run Jobs do not expose per-execution cancellation here.

Returns:

Always false because per-execution cancellation is not implemented.

async check_execution_status(execution_ref: str) CloudRunExecutionStatus[source]

Return Cloud Run execution status.

Transient API failures are treated as still running so reconciliation does not create false terminal queue states.

build_run_job_request(service: QueueService, record: QueuedTaskRecord) dict[str, Any][source]

Build the Cloud Run Jobs API request for a queue record.

Returns:

Cloud Run Jobs API request data.

build_environment(record: QueuedTaskRecord) dict[str, str][source]

Build generic environment variables for a Cloud Run task process.

Returns:

Environment variables for the Cloud Run task process.

class litestar_queues.execution.cloudrun.backend.CloudRunExecutionStatus(succeeded: bool = False, failed: bool = False, cancelled: bool = False, running: bool = True, error: str | None = None)[source]

Bases: object

Backend-neutral status for a Cloud Run execution.

succeeded: bool
failed: bool
cancelled: bool
running: bool
error: str | None
__init__(succeeded: bool = False, failed: bool = False, cancelled: bool = False, running: bool = True, error: str | None = None) None
class litestar_queues.execution.cloudrun.entrypoint.CloudRunExitCode(*values)[source]

Bases: IntEnum

Deterministic Cloud Run task process exit codes.

SUCCESS = 0
FAILURE = 1
MISSING_TASK_ID = 2
INVALID_TASK_ID = 3
MISSING_RECORD = 4
UNKNOWN_TASK = 5
CLAIM_LOST = 6
CANCELLED = 7
async litestar_queues.execution.cloudrun.entrypoint.execute_cloudrun_task(*, config: QueueConfig | None = None, service: QueueService | None = None, service_factory: Callable[[], Any] | None = None, env: Mapping[str, str] | None = None) CloudRunExitCode[source]

Execute one persisted queue record in a Cloud Run task process.

Returns:

A deterministic process exit code.

litestar_queues.execution.cloudrun.entrypoint.main() None[source]

Console entry point for Cloud Run task execution.

Raises:

SystemExit – Always raised with the execution exit code.