Events

Models

Typed realtime event models for queue tasks.

class litestar_queues.events.models.QueueEvent(*, type: str, scope: ~typing.Literal['task', 'queue', 'worker', 'global', 'custom'], id: str = <factory>, scope_key: str | None = None, task_id: str | None = None, task_name: str | None = None, queue: str | None = None, worker_id: str | None = None, execution_backend: str | None = None, execution_profile: str | None = None, attempt: int | None = None, sequence: int | None = None, level: str | None = None, message: str | None = None, progress_current: int | float | None = None, progress_total: int | float | None = None, progress_percent: float | None = None, actor: ~litestar_queues.events.models.QueueEventActor | None = None, entity: ~litestar_queues.events.models.QueueEventEntityRef | None = None, payload: dict[str, ~typing.Any] = <factory>, occurred_at: ~datetime.datetime = <factory>, schema_version: int = 1, event_key: str | None = None)[source]

Bases: Struct

Stable event envelope for queue lifecycle, progress, log, and custom events.

The wire format is camelCase. Null-valued top-level fields are preserved so subscribers can rely on a stable schema for intermediate progress and log events. Payload contents are passed through verbatim.

type: str
scope: Literal['task', 'queue', 'worker', 'global', 'custom']
id: str
scope_key: str | None
task_id: str | None
task_name: str | None
queue: str | None
worker_id: str | None
execution_backend: str | None
execution_profile: str | None
attempt: int | None
sequence: int | None
level: str | None
message: str | None
progress_current: int | float | None
progress_total: int | float | None
progress_percent: float | None
actor: QueueEventActor | None
entity: QueueEventEntityRef | None
payload: dict[str, Any]
occurred_at: datetime
schema_version: int
event_key: str | None
to_dict() dict[str, Any][source]

Return the stable camelCase JSON-compatible event envelope.

Null-valued top-level fields are preserved so subscribers can rely on a stable schema for intermediate progress and log events. Payload contents are passed through verbatim.

to_json() bytes[source]

Return the event envelope as camelCase JSON bytes.

classmethod from_dict(data: dict[str, Any]) QueueEvent[source]

Build an event from a camelCase mapping.

Returns:

The queue event.

classmethod from_json(data: str | bytes | bytearray) QueueEvent[source]

Build an event from camelCase JSON text or bytes.

Returns:

The queue event.

Raises:

TypeError – If the decoded JSON value is not an object.

class litestar_queues.events.models.QueueEventActor(*, type: str | None = None, id: str | None = None, name: str | None = None)[source]

Bases: Struct

Actor reference for a queue event.

type: str | None
id: str | None
name: str | None
to_dict() dict[str, Any][source]

Return the camelCase wire mapping for this actor.

classmethod from_dict(data: dict[str, Any]) QueueEventActor[source]

Build an actor reference from a camelCase mapping.

Returns:

The actor reference.

class litestar_queues.events.models.QueueEventEntityRef(*, type: str, id: str, name: str | None = None)[source]

Bases: Struct

Entity reference for a queue event.

type: str
id: str
name: str | None
to_dict() dict[str, Any][source]

Return the camelCase wire mapping for this entity reference.

classmethod from_dict(data: dict[str, Any]) QueueEventEntityRef[source]

Build an entity reference from a camelCase mapping.

Returns:

The entity reference.

Publisher

Queue event publisher.

class litestar_queues.events.publisher.QueueEventConfig(enabled: bool = False, sink: QueueEventSink | None = None, channels_backend: object | None = None, strict: bool = False, publish_task_channel: bool = True, publish_queue_channel: bool = True, publish_global_lifecycle: bool = False)[source]

Bases: object

Configuration for queue event publishing.

enabled: bool
sink: QueueEventSink | None
channels_backend: object | None
strict: bool
publish_task_channel: bool
publish_queue_channel: bool
publish_global_lifecycle: bool
__init__(enabled: bool = False, sink: QueueEventSink | None = None, channels_backend: object | None = None, strict: bool = False, publish_task_channel: bool = True, publish_queue_channel: bool = True, publish_global_lifecycle: bool = False) None
class litestar_queues.events.publisher.QueueEventPublisher(sink: QueueEventSink | None = None, *, strict: bool = False, publish_task_channel: bool = True, publish_queue_channel: bool = True, publish_global_lifecycle: bool = False)[source]

Bases: object

Publish queue events through a configured sink.

__init__(sink: QueueEventSink | None = None, *, strict: bool = False, publish_task_channel: bool = True, publish_queue_channel: bool = True, publish_global_lifecycle: bool = False) None[source]
strict
publish_task_channel
publish_queue_channel
publish_global_lifecycle
property sink: QueueEventSink

Configured event sink.

async publish(event: QueueEvent, *, channels: Sequence[str] | None = None) None[source]

Publish an event to canonical and explicitly supplied channels.

resolve_channels(event: QueueEvent, *, channels: Sequence[str] | None = None) tuple[str, ...][source]

Return canonical publish channels for an event plus explicit extras.

Task Context

Task execution context and helper APIs for queue event publishing.

class litestar_queues.events.context.TaskExecutionContext(task_id: str, task_name: str, queue: str, worker_id: str | None, execution_backend: str, execution_profile: str | None, attempt: int, event_publisher: QueueEventPublisher)[source]

Bases: object

Context bound while a queue task is executing.

task_id: str
task_name: str
queue: str
worker_id: str | None
execution_backend: str
execution_profile: str | None
attempt: int
event_publisher: QueueEventPublisher
async progress(*, current: float | None = None, total: float | None = None, percent: float | None = None, message: str | None = None, payload: dict[str, Any] | None = None, channels: Sequence[str] | None = None) None[source]

Publish a task progress event.

async log(message: str, *, level: str = 'info', payload: dict[str, Any] | None = None, channels: Sequence[str] | None = None) None[source]

Publish a task log event.

async event(event_type: str, *, message: str | None = None, payload: dict[str, Any] | None = None, channels: Sequence[str] | None = None) None[source]

Publish a custom task event.

async lifecycle(event_type: str, *, message: str | None = None, payload: dict[str, Any] | None = None) None[source]

Publish a worker-owned lifecycle event.

async publish(event_type: str, *, level: str | None = None, message: str | None = None, progress_current: float | None = None, progress_total: float | None = None, progress_percent: float | None = None, payload: dict[str, Any] | None = None, channels: Sequence[str] | None = None) QueueEvent[source]

Build and publish an event for this task context.

Returns:

The published queue event.

__init__(task_id: str, task_name: str, queue: str, worker_id: str | None, execution_backend: str, execution_profile: str | None, attempt: int, event_publisher: QueueEventPublisher) None
litestar_queues.events.context.get_current_task_context() TaskExecutionContext | None[source]

Return the task execution context for the current async context.

async litestar_queues.events.context.publish_task_event(event_type: str, *, message: str | None = None, payload: dict[str, Any] | None = None, channels: Sequence[str] | None = None) None[source]

Publish a custom event through the currently bound task context.

async litestar_queues.events.context.publish_task_log(message: str, *, level: str = 'info', payload: dict[str, Any] | None = None, channels: Sequence[str] | None = None) None[source]

Publish a log event through the currently bound task context.

async litestar_queues.events.context.publish_task_progress(*, current: float | None = None, total: float | None = None, percent: float | None = None, message: str | None = None, payload: dict[str, Any] | None = None, channels: Sequence[str] | None = None) None[source]

Publish progress through the currently bound task context.

litestar_queues.events.context.require_current_task_context() TaskExecutionContext[source]

Return the current task context or raise if none is bound.

Raises:

RuntimeError – If no task context is bound.

Channels

Channel naming helpers for queue events.

class litestar_queues.events.channels.QueueChannels[source]

Bases: object

Canonical channel name factories for queue event scopes.

prefix: ClassVar[str] = 'litestar_queues'
classmethod task(task_id: str, *, topic: str = 'events') str[source]

Return the channel for task-scoped events.

classmethod queue(queue: str, *, topic: str = 'events') str[source]

Return the channel for queue-scoped events.

classmethod worker(worker_id: str, *, topic: str = 'events') str[source]

Return the channel for worker-scoped events.

classmethod global_channel(*, topic: str = 'events') str[source]

Return the global queue event channel.

classmethod custom(scope_key: str, *, topic: str = 'events') str[source]

Return a custom queue event channel.

Sinks

Queue event sink protocols and core implementations.

class litestar_queues.events.sinks.InMemoryQueueEventSink[source]

Bases: object

In-process event sink for tests, examples, and local demos.

__init__() None[source]
property events: list[QueueEvent]

Published events in publish order.

property published: list[tuple[QueueEvent, tuple[str, ...]]]

Published events with their channels.

events_for(channel: str) list[QueueEvent][source]

Return events published to a channel.

async publish(event: QueueEvent, *, channels: Sequence[str]) None[source]

Store an event in process.

class litestar_queues.events.sinks.NoopQueueEventSink[source]

Bases: object

Event sink that accepts events and drops them.

async publish(event: QueueEvent, *, channels: Sequence[str]) None[source]

Drop an event publish.

class litestar_queues.events.sinks.QueueEventSink(*args, **kwargs)[source]

Bases: Protocol

Transport boundary for queue event delivery.

async publish(event: QueueEvent, *, channels: Sequence[str]) None[source]

Publish an event to the requested channels.

__init__(*args, **kwargs)

Litestar Channels

Litestar Channels helpers for queue events.

class litestar_queues.events.litestar.ChannelsQueueEventSink(channels_backend: object)[source]

Bases: object

Event sink that publishes to an app-owned Litestar Channels object.

__init__(channels_backend: object) None[source]
property channels_backend: object

Wrapped Channels backend or plugin.

async publish(event: QueueEvent, *, channels: Sequence[str]) None[source]

Publish an event to Litestar Channels.

async litestar_queues.events.litestar.stream_queue_events(socket: Any, channels: Sequence[str], *, history: int = 0, channels_backend: object | None = None) None[source]

Stream queue events from an app-owned Channels subscription to a WebSocket.

The caller owns route paths, guards, tenant filtering, and authorization.

Raises:

RuntimeError – If no Channels backend or plugin can be resolved.