import json
from dataclasses import dataclass
from importlib import import_module
from typing import TYPE_CHECKING, Any, cast
from litestar_queues.exceptions import MissingDependencyError
from litestar_queues.execution.base import BaseExecutionBackend
from litestar_queues.execution.cloudrun.config import CloudRunExecutionConfig, _execution_config_from_queue_config
if TYPE_CHECKING:
from litestar_queues.config import QueueConfig
from litestar_queues.execution.cloudrun._typing import (
CloudRunExecutionLike,
CloudRunExecutionsClient,
CloudRunJobsClient,
)
from litestar_queues.models import QueuedTaskRecord
from litestar_queues.service import QueueService
__all__ = ("CloudRunExecutionBackend", "CloudRunExecutionStatus")
_GOOGLE_CLOUD_RUN_PACKAGE = "google-cloud-run"
_CLOUDRUN_EXTRA = "cloudrun"
[docs]
@dataclass(frozen=True, slots=True)
class CloudRunExecutionStatus:
"""Backend-neutral status for a Cloud Run execution."""
succeeded: "bool" = False
failed: "bool" = False
cancelled: "bool" = False
running: "bool" = True
error: "str | None" = None
[docs]
class CloudRunExecutionBackend(BaseExecutionBackend):
"""Execution backend that dispatches queued records to Cloud Run Jobs."""
__slots__ = ("_execution_config", "executions_client", "jobs_client")
[docs]
def __init__(
self,
config: "QueueConfig | None" = None,
*,
execution_config: "CloudRunExecutionConfig | None" = None,
jobs_client: "CloudRunJobsClient | None" = None,
executions_client: "CloudRunExecutionsClient | None" = None,
) -> "None":
super().__init__(config=config)
self._execution_config = execution_config
self.jobs_client = jobs_client
self.executions_client = executions_client
@property
def is_external(self) -> "bool":
"""Whether this backend dispatches records to another process."""
return True
@property
def execution_config(self) -> "CloudRunExecutionConfig":
"""Resolved Cloud Run execution config."""
if self._execution_config is None:
self._execution_config = _execution_config_from_queue_config(self.config)
return self._execution_config
[docs]
async def execute(
self, service: "QueueService", record: "QueuedTaskRecord", *, worker_id: "str | None" = None
) -> "QueuedTaskRecord":
"""Dispatch a record and return its persisted state.
The ``worker_id`` argument is accepted for protocol parity but not
forwarded: external dispatch does not run ``service.execute_record``
locally, so the remote runner is responsible for its own worker
identity binding.
Returns:
The persisted queue record after dispatch.
"""
del worker_id
await self.dispatch(service, record)
return await service.get_queue_backend().get_task(record.id) or record
[docs]
async def dispatch(self, service: "QueueService", record: "QueuedTaskRecord") -> "str | None":
"""Dispatch a queue record to Cloud Run Jobs.
Returns:
The Cloud Run execution reference, if dispatch succeeds.
"""
request = self.build_run_job_request(service, record)
client = await self._get_jobs_client()
try:
operation = await client.run_job(request=request)
execution = await operation.result()
except Exception:
fallback = self.execution_config.fallback_execution_backend
if fallback is None:
raise
await service.get_queue_backend().set_execution_backend(record.id, fallback)
return None
execution_ref = str(execution.name)
await service.get_queue_backend().set_execution_ref(
record.id, "cloudrun", execution_ref, execution_profile=record.execution_profile
)
return execution_ref
[docs]
async def reconcile(self, service: "QueueService", record: "QueuedTaskRecord") -> "QueuedTaskRecord | None":
"""Reconcile a Cloud Run execution with the queue record.
Returns:
The terminal queue record when reconciliation completed it.
"""
if record.execution_ref is None:
return None
status = await self.check_execution_status(record.execution_ref)
if status.running:
return None
if record.status != "running":
return None
queue_backend = service.get_queue_backend()
if status.succeeded:
return await queue_backend.complete_task(
record.id,
result=record.result
if record.result is not None
else {"cloudrun_execution": record.execution_ref, "status": "succeeded"},
expected_retry_count=record.retry_count,
)
if status.cancelled:
return await queue_backend.fail_task(
record.id, "Cloud Run execution cancelled", retry=False, expected_retry_count=record.retry_count
)
if status.failed:
return await queue_backend.fail_task(
record.id, status.error or "Cloud Run execution failed", expected_retry_count=record.retry_count
)
return None
[docs]
async def cancel(self, service: "QueueService", record: "QueuedTaskRecord") -> "bool":
"""Cloud Run Jobs do not expose per-execution cancellation here.
Returns:
Always false because per-execution cancellation is not implemented.
"""
return False
[docs]
async def check_execution_status(self, execution_ref: "str") -> "CloudRunExecutionStatus":
"""Return Cloud Run execution status.
Transient API failures are treated as still running so reconciliation does
not create false terminal queue states.
"""
try:
execution = await (await self._get_executions_client()).get_execution(name=execution_ref)
except Exception as exc:
return CloudRunExecutionStatus(running=True, error=str(exc))
succeeded = int(getattr(execution, "succeeded_count", 0) or 0) > 0
failed = int(getattr(execution, "failed_count", 0) or 0) > 0
cancelled = int(getattr(execution, "cancelled_count", 0) or 0) > 0
return CloudRunExecutionStatus(
succeeded=succeeded,
failed=failed,
cancelled=cancelled,
running=not (succeeded or failed or cancelled),
error=_execution_error(execution) if failed else None,
)
[docs]
def build_run_job_request(self, service: "QueueService", record: "QueuedTaskRecord") -> "dict[str, Any]":
"""Build the Cloud Run Jobs API request for a queue record.
Returns:
Cloud Run Jobs API request data.
"""
config = self.execution_config
task_obj = service.resolve_task(record.task_name)
timeout = record.metadata.get("timeout", task_obj.timeout)
timeout_seconds = int(timeout if isinstance(timeout, int | float) else config.timeout)
job_name = config.resolve_job_name(record.execution_profile)
env = self.build_environment(record)
return {
"name": f"projects/{config.project_id}/locations/{config.region}/jobs/{job_name}",
"overrides": {
"container_overrides": [{"env": [{"name": key, "value": value} for key, value in env.items()]}],
"timeout": f"{timeout_seconds}s",
},
}
[docs]
def build_environment(self, record: "QueuedTaskRecord") -> "dict[str, str]":
"""Build generic environment variables for a Cloud Run task process.
Returns:
Environment variables for the Cloud Run task process.
"""
config = self.execution_config
env = {
config.env_name("TASK_ID"): str(record.id),
config.env_name("TASK_NAME"): record.task_name,
config.env_name("TASK_ARGS"): json.dumps(list(record.args), separators=(",", ":")),
config.env_name("TASK_KWARGS"): json.dumps(record.kwargs, separators=(",", ":")),
config.env_name("EXECUTION_BACKEND"): "cloudrun",
}
if record.execution_profile is not None:
env[config.env_name("EXECUTION_PROFILE")] = record.execution_profile
env.update(config.extra_env)
return env
async def _get_jobs_client(self) -> "CloudRunJobsClient":
if self.jobs_client is None:
try:
run_v2 = import_module("google.cloud.run_v2")
except ImportError as exc:
raise MissingDependencyError(_GOOGLE_CLOUD_RUN_PACKAGE, _CLOUDRUN_EXTRA) from exc
self.jobs_client = cast("CloudRunJobsClient", run_v2.JobsAsyncClient())
return self.jobs_client
async def _get_executions_client(self) -> "CloudRunExecutionsClient":
if self.executions_client is None:
try:
run_v2 = import_module("google.cloud.run_v2")
except ImportError as exc:
raise MissingDependencyError(_GOOGLE_CLOUD_RUN_PACKAGE, _CLOUDRUN_EXTRA) from exc
self.executions_client = cast("CloudRunExecutionsClient", run_v2.ExecutionsAsyncClient())
return self.executions_client
def _execution_error(execution: "CloudRunExecutionLike") -> "str | None":
conditions = getattr(execution, "conditions", None) or []
for condition in reversed(conditions):
message = getattr(condition, "message", None)
if message:
return str(message)
return None