Execution Backends¶
Registry¶
Execution backends are registered through litestar_queues.execution.
- litestar_queues.execution.get_execution_backend(backend: ExecutionBackendConfig = 'immediate', config: QueueConfig | None = None) BaseExecutionBackend[source]¶
Get an instantiated execution backend.
- Returns:
A configured execution backend instance.
- Raises:
TypeError – If a typed execution config selects a backend class that does not accept
execution_config.
- litestar_queues.execution.get_execution_backend_class(backend_path: str) type[BaseExecutionBackend][source]¶
Get an execution backend class by short name or import path.
- Returns:
The resolved execution backend class.
- Raises:
ValueError – If a short backend name is unknown.
- litestar_queues.execution.execution_backend(name: str) Callable[[type[BaseExecutionBackend]], type[BaseExecutionBackend]][source]¶
Decorator to register an execution backend class with a short name.
- Returns:
A decorator that registers the backend class.
Base Backend¶
- class litestar_queues.execution.base.BaseExecutionBackend(config: QueueConfig | None = None)[source]¶
Bases:
objectBase class for queue execution backends.
- __init__(config: QueueConfig | None = None) None[source]¶
Initialize the execution backend.
- config¶
- async execute(service: QueueService, record: QueuedTaskRecord, *, worker_id: str | None = None) QueuedTaskRecord[source]¶
Execute a queue record.
- async dispatch(service: QueueService, record: QueuedTaskRecord) str | None[source]¶
Dispatch a queue record to an external executor.
- Returns:
The external execution reference, if one was created.
- async reconcile(service: QueueService, record: QueuedTaskRecord) QueuedTaskRecord | None[source]¶
Reconcile an externally running queue record.
- Returns:
The updated record when reconciliation changes state.
- async cancel(service: QueueService, record: QueuedTaskRecord) bool[source]¶
Cancel an externally running queue record if possible.
- Returns:
True when cancellation succeeds.
Immediate¶
- class litestar_queues.execution.immediate.ImmediateExecutionBackend(config: QueueConfig | None = None)[source]¶
Bases:
BaseExecutionBackendExecution backend that runs records inline.
- async execute(service: QueueService, record: QueuedTaskRecord, *, worker_id: str | None = None) QueuedTaskRecord[source]¶
Execute a task immediately in the current event loop.
- Returns:
The updated queue record.
Local¶
- class litestar_queues.execution.local.LocalExecutionBackend(config: QueueConfig | None = None)[source]¶
Bases:
BaseExecutionBackendExecution backend for in-process workers.
- async execute(service: QueueService, record: QueuedTaskRecord, *, worker_id: str | None = None) QueuedTaskRecord[source]¶
Execute one claimed task in a local worker.
- Returns:
The updated queue record.
Cloud Run¶
- class litestar_queues.execution.cloudrun.config.CloudRunExecutionConfig(project_id: str, region: str = 'us-central1', job_name: str | None = None, profiles: dict[str, str]=<factory>, timeout: int = 300, poll_interval: float = 5.0, env_prefix: str = 'LITESTAR_QUEUES', extra_env: dict[str, str]=<factory>, fallback_execution_backend: str | None = 'local')[source]¶
Bases:
objectConfiguration for Cloud Run Jobs execution.
- class litestar_queues.execution.cloudrun.backend.CloudRunExecutionBackend(config: QueueConfig | None = None, *, execution_config: CloudRunExecutionConfig | None = None, jobs_client: CloudRunJobsClient | None = None, executions_client: CloudRunExecutionsClient | None = None)[source]¶
Bases:
BaseExecutionBackendExecution backend that dispatches queued records to Cloud Run Jobs.
- __init__(config: QueueConfig | None = None, *, execution_config: CloudRunExecutionConfig | None = None, jobs_client: CloudRunJobsClient | None = None, executions_client: CloudRunExecutionsClient | None = None) None[source]¶
Initialize the execution backend.
- jobs_client¶
- executions_client¶
- property execution_config: CloudRunExecutionConfig¶
Resolved Cloud Run execution config.
- async execute(service: QueueService, record: QueuedTaskRecord, *, worker_id: str | None = None) QueuedTaskRecord[source]¶
Dispatch a record and return its persisted state.
The
worker_idargument is accepted for protocol parity but not forwarded: external dispatch does not runservice.execute_recordlocally, so the remote runner is responsible for its own worker identity binding.- Returns:
The persisted queue record after dispatch.
- async dispatch(service: QueueService, record: QueuedTaskRecord) str | None[source]¶
Dispatch a queue record to Cloud Run Jobs.
- Returns:
The Cloud Run execution reference, if dispatch succeeds.
- async reconcile(service: QueueService, record: QueuedTaskRecord) QueuedTaskRecord | None[source]¶
Reconcile a Cloud Run execution with the queue record.
- Returns:
The terminal queue record when reconciliation completed it.
- async cancel(service: QueueService, record: QueuedTaskRecord) bool[source]¶
Cloud Run Jobs do not expose per-execution cancellation here.
- Returns:
Always false because per-execution cancellation is not implemented.
- async check_execution_status(execution_ref: str) CloudRunExecutionStatus[source]¶
Return Cloud Run execution status.
Transient API failures are treated as still running so reconciliation does not create false terminal queue states.
- build_run_job_request(service: QueueService, record: QueuedTaskRecord) dict[str, Any][source]¶
Build the Cloud Run Jobs API request for a queue record.
- Returns:
Cloud Run Jobs API request data.
- class litestar_queues.execution.cloudrun.backend.CloudRunExecutionStatus(succeeded: bool = False, failed: bool = False, cancelled: bool = False, running: bool = True, error: str | None = None)[source]¶
Bases:
objectBackend-neutral status for a Cloud Run execution.
- class litestar_queues.execution.cloudrun.entrypoint.CloudRunExitCode(*values)[source]¶
Bases:
IntEnumDeterministic Cloud Run task process exit codes.
- SUCCESS = 0¶
- FAILURE = 1¶
- MISSING_TASK_ID = 2¶
- INVALID_TASK_ID = 3¶
- MISSING_RECORD = 4¶
- UNKNOWN_TASK = 5¶
- CLAIM_LOST = 6¶
- CANCELLED = 7¶
- async litestar_queues.execution.cloudrun.entrypoint.execute_cloudrun_task(*, config: QueueConfig | None = None, service: QueueService | None = None, service_factory: Callable[[], Any] | None = None, env: Mapping[str, str] | None = None) CloudRunExitCode[source]¶
Execute one persisted queue record in a Cloud Run task process.
- Returns:
A deterministic process exit code.
- litestar_queues.execution.cloudrun.entrypoint.main() None[source]¶
Console entry point for Cloud Run task execution.
- Raises:
SystemExit – Always raised with the execution exit code.