Source code for litestar_queues.events.models

"""Typed realtime event models for queue tasks."""

from datetime import datetime, timezone
from typing import Any, Literal, cast
from uuid import uuid4

import msgspec

__all__ = ("QueueEvent", "QueueEventActor", "QueueEventEntityRef", "QueueEventScope", "QueueEventType")

QueueEventScope = Literal["task", "queue", "worker", "global", "custom"]
QueueEventType = Literal[
    "task.started",
    "task.progress",
    "task.log",
    "task.event",
    "task.completed",
    "task.failed",
    "task.cancelled",
    "task.claim_lost",
    "task.stale_failed",
    "worker.heartbeat",
    "worker.stale_recovery",
]


[docs] class QueueEventActor(msgspec.Struct, rename="camel", kw_only=True): """Actor reference for a queue event.""" type: "str | None" = None id: "str | None" = None name: "str | None" = None
[docs] def to_dict(self) -> "dict[str, Any]": """Return the camelCase wire mapping for this actor.""" return cast("dict[str, Any]", msgspec.to_builtins(self))
[docs] @classmethod def from_dict(cls, data: "dict[str, Any]") -> "QueueEventActor": """Build an actor reference from a camelCase mapping. Returns: The actor reference. """ return msgspec.convert(data, cls)
[docs] class QueueEventEntityRef(msgspec.Struct, rename="camel", kw_only=True): """Entity reference for a queue event.""" type: "str" id: "str" name: "str | None" = None
[docs] def to_dict(self) -> "dict[str, Any]": """Return the camelCase wire mapping for this entity reference.""" return cast("dict[str, Any]", msgspec.to_builtins(self))
[docs] @classmethod def from_dict(cls, data: "dict[str, Any]") -> "QueueEventEntityRef": """Build an entity reference from a camelCase mapping. Returns: The entity reference. """ return msgspec.convert(data, cls)
[docs] class QueueEvent(msgspec.Struct, rename="camel", kw_only=True): """Stable event envelope for queue lifecycle, progress, log, and custom events. The wire format is camelCase. Null-valued top-level fields are preserved so subscribers can rely on a stable schema for intermediate progress and log events. Payload contents are passed through verbatim. """ type: "str" scope: "QueueEventScope" id: "str" = msgspec.field(default_factory=lambda: uuid4().hex) scope_key: "str | None" = None task_id: "str | None" = None task_name: "str | None" = None queue: "str | None" = None worker_id: "str | None" = None execution_backend: "str | None" = None execution_profile: "str | None" = None attempt: "int | None" = None sequence: "int | None" = None level: "str | None" = None message: "str | None" = None progress_current: "int | float | None" = None progress_total: "int | float | None" = None progress_percent: "float | None" = None actor: "QueueEventActor | None" = None entity: "QueueEventEntityRef | None" = None payload: "dict[str, Any]" = msgspec.field(default_factory=dict) occurred_at: "datetime" = msgspec.field(default_factory=lambda: datetime.now(timezone.utc)) schema_version: "int" = 1 event_key: "str | None" = None
[docs] def to_dict(self) -> "dict[str, Any]": """Return the stable camelCase JSON-compatible event envelope. Null-valued top-level fields are preserved so subscribers can rely on a stable schema for intermediate progress and log events. Payload contents are passed through verbatim. """ return cast("dict[str, Any]", msgspec.to_builtins(self))
[docs] def to_json(self) -> "bytes": """Return the event envelope as camelCase JSON bytes.""" return msgspec.json.encode(self)
[docs] @classmethod def from_dict(cls, data: "dict[str, Any]") -> "QueueEvent": """Build an event from a camelCase mapping. Returns: The queue event. """ return msgspec.convert(data, cls)
[docs] @classmethod def from_json(cls, data: "str | bytes | bytearray") -> "QueueEvent": """Build an event from camelCase JSON text or bytes. Returns: The queue event. Raises: TypeError: If the decoded JSON value is not an object. """ payload = bytes(data) if isinstance(data, bytearray) else data decoded = msgspec.json.decode(payload) if not isinstance(decoded, dict): msg = "Queue event JSON must decode to an object" raise TypeError(msg) return cls.from_dict(cast("dict[str, Any]", decoded))