"""Advanced Alchemy queue backend."""
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING, Any, cast
from litestar_queues.backends.advanced_alchemy.config import AdvancedAlchemyBackendConfig
from litestar_queues.backends.advanced_alchemy.mixins import QueueTaskModelMixin
from litestar_queues.backends.advanced_alchemy.service import QueueTaskService
from litestar_queues.backends.base import BaseQueueBackend
from litestar_queues.exceptions import QueueConfigurationError
from litestar_queues.models import QueueBackendCapabilities, QueuedTaskRecord, QueueStatistics, StaleTaskRecoveryResult
if TYPE_CHECKING:
from collections.abc import AsyncIterator
from datetime import datetime, timedelta
from uuid import UUID
from litestar_queues.config import QueueConfig
__all__ = ("AdvancedAlchemyQueueBackend",)
[docs]
class AdvancedAlchemyQueueBackend(BaseQueueBackend):
"""Advanced Alchemy-backed queue backend."""
__slots__ = (
"_create_schema",
"_heartbeat_session_maker",
"_model_class",
"_opened",
"_service_class",
"_sqlalchemy_config",
)
[docs]
def __init__(
self, config: "QueueConfig | None" = None, *, backend_config: "AdvancedAlchemyBackendConfig | None" = None
) -> "None":
super().__init__(config=config)
backend_config = backend_config or AdvancedAlchemyBackendConfig()
self._sqlalchemy_config = backend_config.sqlalchemy_config
self._heartbeat_session_maker = backend_config.heartbeat_session_maker
self._model_class, self._service_class = self._resolve_model_classes(backend_config.model_class)
self._create_schema = backend_config.create_schema
self._opened = False
@property
def capabilities(self) -> "QueueBackendCapabilities":
"""Backend behavior capabilities."""
return QueueBackendCapabilities()
[docs]
async def open(self) -> "bool":
"""Open Advanced Alchemy resources.
Returns:
True when resources are ready.
"""
if self._opened:
return True
self._ensure_configured()
if self._create_schema:
await self.create_schema()
self._opened = True
return True
[docs]
async def close(self) -> "None":
"""Close backend-owned resources."""
self._opened = False
[docs]
async def create_schema(self) -> "None":
"""Create the queue task table and indexes."""
if self._sqlalchemy_config is not None:
engine = self._sqlalchemy_config.get_engine()
async with engine.begin() as connection:
await connection.run_sync(cast("Any", self._model_class.__table__).create, checkfirst=True)
else:
async with self._session() as session, session.begin():
connection = await session.connection()
await connection.run_sync(cast("Any", self._model_class.__table__).create, checkfirst=True)
[docs]
async def enqueue(
self,
task_name: "str",
*,
args: "tuple[Any, ...]" = (),
kwargs: "dict[str, Any] | None" = None,
queue: "str" = "default",
priority: "int" = 0,
max_retries: "int" = 0,
scheduled_at: "datetime | None" = None,
key: "str | None" = None,
execution_backend: "str" = "local",
execution_profile: "str | None" = None,
metadata: "dict[str, Any] | None" = None,
) -> "QueuedTaskRecord":
async with self._operation() as service:
record = await service.enqueue(
task_name,
args=args,
kwargs=dict(kwargs or {}),
queue=queue,
priority=priority,
max_retries=max_retries,
scheduled_at=scheduled_at,
key=key,
execution_backend=execution_backend,
execution_profile=execution_profile,
metadata=dict(metadata or {}),
)
await self.notify_new_task(record)
return record
[docs]
async def get_task(self, task_id: "UUID") -> "QueuedTaskRecord | None":
async with self._service() as service:
return await service.get_task(task_id)
[docs]
async def get_task_by_key(self, key: "str") -> "QueuedTaskRecord | None":
async with self._service() as service:
return await service.get_task_by_key(key)
[docs]
async def list_pending(
self, *, limit: "int" = 1, queue: "str | None" = None, execution_backend: "str | None" = None
) -> "list[QueuedTaskRecord]":
async with self._service() as service:
return await service.list_pending(limit=limit, queue=queue, execution_backend=execution_backend)
[docs]
async def claim_task(self, task_id: "UUID") -> "QueuedTaskRecord | None":
async with self._operation() as service:
return await service.claim_task(task_id)
[docs]
async def claim_next(
self, *, queue: "str | None" = None, execution_backend: "str | None" = None
) -> "QueuedTaskRecord | None":
async with self._operation() as service:
return await service.claim_next(queue=queue, execution_backend=execution_backend)
[docs]
async def complete_task(
self, task_id: "UUID", *, result: "Any" = None, expected_retry_count: "int | None" = None
) -> "QueuedTaskRecord | None":
async with self._operation() as service:
return await service.complete_task(task_id, result=result, expected_retry_count=expected_retry_count)
[docs]
async def fail_task(
self, task_id: "UUID", error: "str", *, retry: "bool" = True, expected_retry_count: "int | None" = None
) -> "QueuedTaskRecord | None":
async with self._operation() as service:
return await service.fail_task(task_id, error, retry=retry, expected_retry_count=expected_retry_count)
[docs]
async def cancel_task(self, task_id: "UUID") -> "bool":
async with self._operation() as service:
return await service.cancel_task(task_id)
[docs]
async def touch_heartbeat(self, task_id: "UUID", *, expected_retry_count: "int | None" = None) -> "bool":
async with self._heartbeat_operation() as service:
return await service.touch_heartbeat(task_id, expected_retry_count=expected_retry_count)
[docs]
async def null_heartbeats(self, task_ids: "list[UUID]", *, expected_retry_count: "int | None" = None) -> "None":
async with self._heartbeat_operation() as service:
await service.null_heartbeats(task_ids, expected_retry_count=expected_retry_count)
[docs]
async def requeue_stale_running(self, *, stale_after: "timedelta") -> "StaleTaskRecoveryResult":
async with self._operation() as service:
return await service.requeue_stale_running(stale_after=stale_after)
[docs]
async def set_execution_ref(
self, task_id: "UUID", execution_backend: "str", execution_ref: "str", *, execution_profile: "str | None" = None
) -> "QueuedTaskRecord | None":
async with self._operation() as service:
return await service.set_execution_ref(
task_id, execution_backend, execution_ref, execution_profile=execution_profile
)
[docs]
async def set_execution_backend(
self, task_id: "UUID", execution_backend: "str", *, execution_profile: "str | None" = None
) -> "QueuedTaskRecord | None":
async with self._operation() as service:
record = await service.set_execution_backend(
task_id, execution_backend, execution_profile=execution_profile
)
if record is not None:
await self.notify_new_task(record)
return record
[docs]
async def list_running_external(self, *, limit: "int | None" = None) -> "list[QueuedTaskRecord]":
async with self._service() as service:
return await service.list_running_external(limit=limit)
[docs]
async def get_statistics(self) -> "QueueStatistics":
async with self._service() as service:
return await service.get_statistics()
[docs]
async def list_completed_by_task(
self, task_name: "str", *, since: "datetime | None" = None, limit: "int" = 10
) -> "list[QueuedTaskRecord]":
async with self._service() as service:
return await service.list_completed_by_task(task_name, since=since, limit=limit)
[docs]
async def cleanup_terminal(self, before: "datetime") -> "int":
async with self._operation() as service:
return await service.cleanup_terminal(before)
def _ensure_configured(self) -> "None":
if self._sqlalchemy_config is None:
msg = "AdvancedAlchemyQueueBackend requires sqlalchemy_config."
raise QueueConfigurationError(msg)
def _ensure_opened(self) -> "None":
if not self._opened:
msg = "AdvancedAlchemyQueueBackend.open() must be called before using the backend."
raise RuntimeError(msg)
def _resolve_model_classes(self, model_class: "type[Any] | None") -> 'tuple[type[Any], type["QueueTaskService"]]':
if model_class is None:
msg = "AdvancedAlchemyBackendConfig.model_class must inherit QueueTaskModelMixin."
raise QueueConfigurationError(msg)
try:
valid_model = issubclass(model_class, QueueTaskModelMixin)
except TypeError:
valid_model = False
if not valid_model:
msg = "AdvancedAlchemyBackendConfig.model_class must inherit QueueTaskModelMixin."
raise QueueConfigurationError(msg)
if "__tablename__" not in model_class.__dict__:
msg = "AdvancedAlchemyBackendConfig.model_class must declare __tablename__."
raise QueueConfigurationError(msg)
missing_columns = {
"id",
"created_at",
"task_name",
"args_json",
"kwargs_json",
"queue",
"execution_backend",
"execution_profile",
"execution_ref",
"status",
"priority",
"max_retries",
"retry_count",
"scheduled_at",
"started_at",
"completed_at",
"heartbeat_at",
"result_json",
"error",
"task_key",
"metadata_json",
} - {column.name for column in model_class.__table__.columns}
if missing_columns:
columns = ", ".join(sorted(missing_columns))
msg = f"AdvancedAlchemyBackendConfig.model_class is missing queue columns: {columns}."
raise QueueConfigurationError(msg)
return model_class, QueueTaskService.for_model(model_class)
@asynccontextmanager
async def _session(self) -> "AsyncIterator[Any]":
self._ensure_configured()
sqlalchemy_config = self._sqlalchemy_config
if sqlalchemy_config is None:
msg = "AdvancedAlchemyQueueBackend requires sqlalchemy_config."
raise QueueConfigurationError(msg)
session_maker = sqlalchemy_config.create_session_maker()
async with session_maker() as session:
yield session
@asynccontextmanager
async def _service(self) -> 'AsyncIterator["QueueTaskService"]':
self._ensure_opened()
async with self._session() as session:
yield self._service_class(session=session)
@asynccontextmanager
async def _operation(self) -> 'AsyncIterator["QueueTaskService"]':
self._ensure_opened()
async with self._session() as session, session.begin():
yield self._service_class(session=session)
@asynccontextmanager
async def _heartbeat_operation(self) -> 'AsyncIterator["QueueTaskService"]':
"""Yield a ``QueueTaskService`` bound to the dedicated heartbeat session maker.
Falls back to :meth:`_operation` when ``heartbeat_session_maker`` is not
configured. The dedicated engine is supplied and owned by the adopter;
:meth:`close` does not dispose it.
Yields:
Queue task service bound to the heartbeat or default operation.
"""
self._ensure_opened()
if self._heartbeat_session_maker is None:
async with self._operation() as service:
yield service
else:
async with self._heartbeat_session_maker() as session, session.begin():
yield self._service_class(session=session)