Source code for litestar_queues.backends.base

import asyncio
from typing import TYPE_CHECKING, Any

from typing_extensions import Self

from litestar_queues.models import QueueBackendCapabilities, QueueStatistics, StaleTaskRecoveryResult

if TYPE_CHECKING:
    from collections.abc import Sequence
    from datetime import datetime, timedelta
    from types import TracebackType
    from uuid import UUID

    from litestar_queues.config import QueueConfig
    from litestar_queues.models import EnqueueSpec, QueuedTaskRecord

__all__ = ("BaseQueueBackend",)


[docs] class BaseQueueBackend: """Base class for queue persistence backends.""" __slots__ = ("config",)
[docs] def __init__(self, config: "QueueConfig | None" = None) -> "None": """Initialize the queue backend.""" self.config = config
@property def capabilities(self) -> "QueueBackendCapabilities": """Backend behavior capabilities.""" return QueueBackendCapabilities()
[docs] async def open(self) -> "bool": """Open queue resources. Returns: True when resources are ready. """ return True
[docs] async def close(self) -> "None": """Close queue resources."""
[docs] async def enqueue( self, task_name: "str", *, args: "tuple[Any, ...]" = (), kwargs: "dict[str, Any] | None" = None, queue: "str" = "default", priority: "int" = 0, max_retries: "int" = 0, scheduled_at: "datetime | None" = None, key: "str | None" = None, execution_backend: "str" = "local", execution_profile: "str | None" = None, metadata: "dict[str, Any] | None" = None, ) -> "QueuedTaskRecord": """Persist a queued task.""" raise NotImplementedError
[docs] async def enqueue_many(self, specs: "Sequence[EnqueueSpec]") -> "list[QueuedTaskRecord]": """Persist multiple queued tasks, returning records in input order. The default implementation issues one :meth:`enqueue` per spec, which preserves per-key deduplication and ordering. Backends with a native bulk path (e.g. SQLSpec COPY/Arrow/``execute_many``) override this for throughput while keeping the same semantics. Returns: Queue task records in the same order as ``specs``. """ return [ await self.enqueue( spec.task_name, args=spec.args, kwargs=spec.kwargs, queue=spec.queue, priority=spec.priority, max_retries=spec.max_retries, scheduled_at=spec.scheduled_at, key=spec.key, execution_backend=spec.execution_backend, execution_profile=spec.execution_profile, metadata=spec.metadata, ) for spec in specs ]
[docs] async def get_task(self, task_id: "UUID") -> "QueuedTaskRecord | None": """Return a queued task by ID.""" raise NotImplementedError
[docs] async def get_task_by_key(self, key: "str") -> "QueuedTaskRecord | None": """Return a queued task by deduplication key.""" raise NotImplementedError
[docs] async def list_pending( self, *, limit: "int" = 1, queue: "str | None" = None, execution_backend: "str | None" = None ) -> "list[QueuedTaskRecord]": """Return due pending or scheduled tasks ordered for execution.""" raise NotImplementedError
[docs] async def claim_task(self, task_id: "UUID") -> "QueuedTaskRecord | None": """Atomically claim a pending task.""" raise NotImplementedError
[docs] async def claim_next( self, *, queue: "str | None" = None, execution_backend: "str | None" = None ) -> "QueuedTaskRecord | None": """Claim the next due task. Returns: The claimed task record, if one was available. """ records = await self.list_pending(limit=1, queue=queue, execution_backend=execution_backend) if not records: return None return await self.claim_task(records[0].id)
[docs] async def complete_task( self, task_id: "UUID", *, result: "Any" = None, expected_retry_count: "int | None" = None ) -> "QueuedTaskRecord | None": """Mark a task as completed. Args: task_id: Queue record identifier. result: Task result payload. expected_retry_count: When provided, update only if the record is still running with this retry count. """ raise NotImplementedError
[docs] async def fail_task( self, task_id: "UUID", error: "str", *, retry: "bool" = True, expected_retry_count: "int | None" = None ) -> "QueuedTaskRecord | None": """Mark a task as failed or retry it. Args: task_id: Queue record identifier. error: Error message to persist. retry: Whether retry policy may requeue the task. expected_retry_count: When provided, update only if the record is still running with this retry count. """ raise NotImplementedError
[docs] async def cancel_task(self, task_id: "UUID") -> "bool": """Cancel a task if it has not started.""" raise NotImplementedError
[docs] async def touch_heartbeat(self, task_id: "UUID", *, expected_retry_count: "int | None" = None) -> "bool": """Update the heartbeat timestamp for a running task. Returns: True when a running record matched the optional retry-count fence. """ return False
[docs] async def null_heartbeats(self, task_ids: "list[UUID]", *, expected_retry_count: "int | None" = None) -> "None": """Clear heartbeat timestamps for task IDs. Args: task_ids: Queue record identifiers. expected_retry_count: When provided, clear only records that still match this retry count. """
[docs] async def requeue_stale_running(self, *, stale_after: "timedelta") -> "StaleTaskRecoveryResult": """Recover running tasks with stale heartbeats. Returns: Summary of requeued, failed, skipped, and handler-needed records. """ return StaleTaskRecoveryResult()
[docs] async def set_execution_ref( self, task_id: "UUID", execution_backend: "str", execution_ref: "str", *, execution_profile: "str | None" = None ) -> "QueuedTaskRecord | None": """Persist an external execution reference for a running task. Returns: The updated queued task record, if one exists. """ record = await self.get_task(task_id) if record is None: return None record.execution_backend = execution_backend record.execution_ref = execution_ref record.execution_profile = execution_profile return record
[docs] async def set_execution_backend( self, task_id: "UUID", execution_backend: "str", *, execution_profile: "str | None" = None ) -> "QueuedTaskRecord | None": """Persist an execution backend/profile change for a queued task. Returns: The updated queued task record, if one exists. """ record = await self.get_task(task_id) if record is None: return None record.execution_backend = execution_backend record.execution_profile = execution_profile record.execution_ref = None return record
[docs] async def list_running_external(self, *, limit: "int | None" = None) -> "list[QueuedTaskRecord]": """Return externally dispatched tasks with references to reconcile.""" return []
[docs] async def get_statistics(self) -> "QueueStatistics": """Return queue status counts.""" return QueueStatistics()
[docs] async def list_completed_by_task( self, task_name: "str", *, since: "datetime | None" = None, limit: "int" = 10 ) -> "list[QueuedTaskRecord]": """Return recent completed records for a task name.""" return []
[docs] async def cleanup_terminal(self, before: "datetime") -> "int": """Delete terminal records completed before a cutoff. Returns: The number of deleted records. """ return 0
[docs] async def notify_new_task(self, record: "QueuedTaskRecord") -> "None": """Notify waiters that a new task is available."""
[docs] async def wait_for_notifications(self, timeout: "float | None" = None) -> "bool": """Wait until backend notification arrives. Returns: True when a notification was observed. """ if timeout is not None: await asyncio.sleep(timeout) return False
async def __aenter__(self) -> "Self": await self.open() return self async def __aexit__( self, exc_type: "type[BaseException] | None", # noqa: PYI036 exc_val: "BaseException | None", # noqa: PYI036 exc_tb: "TracebackType | None", # noqa: PYI036 ) -> "None": await self.close()