from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Literal
from uuid import UUID, uuid4
__all__ = (
"TERMINAL_STATUSES",
"EnqueueSpec",
"QueueBackendCapabilities",
"QueueStatistics",
"QueuedTaskRecord",
"StaleTaskRecoveryResult",
"TaskStatus",
)
TaskStatus = Literal["pending", "scheduled", "running", "completed", "failed", "cancelled"]
"""Queue task lifecycle states."""
TERMINAL_STATUSES: "frozenset[TaskStatus]" = frozenset({"completed", "failed", "cancelled"})
"""Statuses that represent finished queue records."""
[docs]
@dataclass(slots=True)
class QueueBackendCapabilities:
"""Behavior advertised by a queue backend."""
supports_notifications: "bool" = False
notification_backend: "str | None" = None
notifications_durable: "bool" = False
supports_heartbeats: "bool" = True
supports_atomic_claim: "bool" = True
supports_atomic_delayed_promotion: "bool" = True
supports_external_refs: "bool" = True
supports_terminal_cleanup: "bool" = True
[docs]
@dataclass(slots=True)
class QueueStatistics:
"""Operational status counts for a queue backend."""
pending: "int" = 0
scheduled: "int" = 0
running: "int" = 0
completed: "int" = 0
failed: "int" = 0
cancelled: "int" = 0
@property
def total(self) -> "int":
"""Total number of known queue records."""
return self.pending + self.scheduled + self.running + self.completed + self.failed + self.cancelled
[docs]
@dataclass(slots=True)
class StaleTaskRecoveryResult:
"""Summary of stale running task recovery."""
requeued: "int" = 0
failed: "int" = 0
skipped: "int" = 0
handler_needed: "int" = 0
failed_task_ids: "list[UUID]" = field(default_factory=list)
handler_needed_task_ids: "list[UUID]" = field(default_factory=list)
[docs]
def to_payload(self) -> "dict[str, int]":
"""Return a JSON-compatible event payload."""
return {
"requeued": self.requeued,
"failed": self.failed,
"skipped": self.skipped,
"handler_needed": self.handler_needed,
}
[docs]
@dataclass(slots=True)
class EnqueueSpec:
"""A single task specification for bulk enqueue via ``enqueue_many``.
Mirrors the keyword arguments of :meth:`BaseQueueBackend.enqueue` so a batch
of tasks can be described declaratively and submitted in one call.
"""
task_name: "str"
args: "tuple[Any, ...]" = ()
kwargs: "dict[str, Any] | None" = None
queue: "str" = "default"
priority: "int" = 0
max_retries: "int" = 0
scheduled_at: "datetime | None" = None
key: "str | None" = None
execution_backend: "str" = "local"
execution_profile: "str | None" = None
metadata: "dict[str, Any] | None" = None
[docs]
@dataclass(slots=True)
class QueuedTaskRecord:
"""Backend-neutral representation of a queued task."""
task_name: "str"
id: "UUID" = field(default_factory=uuid4)
args: "tuple[Any, ...]" = ()
kwargs: "dict[str, Any]" = field(default_factory=dict)
queue: "str" = "default"
execution_backend: "str" = "local"
execution_profile: "str | None" = None
execution_ref: "str | None" = None
status: "TaskStatus" = "pending"
priority: "int" = 0
max_retries: "int" = 0
retry_count: "int" = 0
scheduled_at: "datetime | None" = None
created_at: "datetime" = field(default_factory=lambda: datetime.now(timezone.utc))
started_at: "datetime | None" = None
completed_at: "datetime | None" = None
heartbeat_at: "datetime | None" = None
result: "Any" = None
error: "str | None" = None
key: "str | None" = None
metadata: "dict[str, Any]" = field(default_factory=dict)
@property
def is_terminal(self) -> "bool":
"""Whether the record is in a terminal state."""
return self.status in TERMINAL_STATUSES
@property
def is_due(self) -> "bool":
"""Whether the record is eligible to be claimed now."""
return self.scheduled_at is None or self.scheduled_at <= datetime.now(timezone.utc)