Events¶
Models¶
Typed realtime event models for queue tasks.
- class litestar_queues.events.models.QueueEvent(*, type: str, scope: ~typing.Literal['task', 'queue', 'worker', 'global', 'custom'], id: str = <factory>, scope_key: str | None = None, task_id: str | None = None, task_name: str | None = None, queue: str | None = None, worker_id: str | None = None, execution_backend: str | None = None, execution_profile: str | None = None, attempt: int | None = None, sequence: int | None = None, level: str | None = None, message: str | None = None, progress_current: int | float | None = None, progress_total: int | float | None = None, progress_percent: float | None = None, actor: ~litestar_queues.events.models.QueueEventActor | None = None, entity: ~litestar_queues.events.models.QueueEventEntityRef | None = None, payload: dict[str, ~typing.Any] = <factory>, occurred_at: ~datetime.datetime = <factory>, schema_version: int = 1, event_key: str | None = None)[source]¶
Bases:
StructStable event envelope for queue lifecycle, progress, log, and custom events.
The wire format is camelCase. Null-valued top-level fields are preserved so subscribers can rely on a stable schema for intermediate progress and log events. Payload contents are passed through verbatim.
- actor: QueueEventActor | None¶
- entity: QueueEventEntityRef | None¶
- to_dict() dict[str, Any][source]¶
Return the stable camelCase JSON-compatible event envelope.
Null-valued top-level fields are preserved so subscribers can rely on a stable schema for intermediate progress and log events. Payload contents are passed through verbatim.
- class litestar_queues.events.models.QueueEventActor(*, type: str | None = None, id: str | None = None, name: str | None = None)[source]¶
Bases:
StructActor reference for a queue event.
Publisher¶
Queue event publisher.
- class litestar_queues.events.publisher.QueueEventConfig(enabled: bool = False, sink: QueueEventSink | None = None, channels_backend: object | None = None, strict: bool = False, publish_task_channel: bool = True, publish_queue_channel: bool = True, publish_global_lifecycle: bool = False)[source]¶
Bases:
objectConfiguration for queue event publishing.
- sink: QueueEventSink | None¶
- class litestar_queues.events.publisher.QueueEventPublisher(sink: QueueEventSink | None = None, *, strict: bool = False, publish_task_channel: bool = True, publish_queue_channel: bool = True, publish_global_lifecycle: bool = False)[source]¶
Bases:
objectPublish queue events through a configured sink.
- __init__(sink: QueueEventSink | None = None, *, strict: bool = False, publish_task_channel: bool = True, publish_queue_channel: bool = True, publish_global_lifecycle: bool = False) None[source]¶
- strict¶
- publish_task_channel¶
- publish_queue_channel¶
- publish_global_lifecycle¶
- property sink: QueueEventSink¶
Configured event sink.
Task Context¶
Task execution context and helper APIs for queue event publishing.
- class litestar_queues.events.context.TaskExecutionContext(task_id: str, task_name: str, queue: str, worker_id: str | None, execution_backend: str, execution_profile: str | None, attempt: int, event_publisher: QueueEventPublisher)[source]¶
Bases:
objectContext bound while a queue task is executing.
- event_publisher: QueueEventPublisher¶
- async progress(*, current: float | None = None, total: float | None = None, percent: float | None = None, message: str | None = None, payload: dict[str, Any] | None = None, channels: Sequence[str] | None = None) None[source]¶
Publish a task progress event.
- async log(message: str, *, level: str = 'info', payload: dict[str, Any] | None = None, channels: Sequence[str] | None = None) None[source]¶
Publish a task log event.
- async event(event_type: str, *, message: str | None = None, payload: dict[str, Any] | None = None, channels: Sequence[str] | None = None) None[source]¶
Publish a custom task event.
- async lifecycle(event_type: str, *, message: str | None = None, payload: dict[str, Any] | None = None) None[source]¶
Publish a worker-owned lifecycle event.
- async publish(event_type: str, *, level: str | None = None, message: str | None = None, progress_current: float | None = None, progress_total: float | None = None, progress_percent: float | None = None, payload: dict[str, Any] | None = None, channels: Sequence[str] | None = None) QueueEvent[source]¶
Build and publish an event for this task context.
- Returns:
The published queue event.
- litestar_queues.events.context.get_current_task_context() TaskExecutionContext | None[source]¶
Return the task execution context for the current async context.
- async litestar_queues.events.context.publish_task_event(event_type: str, *, message: str | None = None, payload: dict[str, Any] | None = None, channels: Sequence[str] | None = None) None[source]¶
Publish a custom event through the currently bound task context.
- async litestar_queues.events.context.publish_task_log(message: str, *, level: str = 'info', payload: dict[str, Any] | None = None, channels: Sequence[str] | None = None) None[source]¶
Publish a log event through the currently bound task context.
- async litestar_queues.events.context.publish_task_progress(*, current: float | None = None, total: float | None = None, percent: float | None = None, message: str | None = None, payload: dict[str, Any] | None = None, channels: Sequence[str] | None = None) None[source]¶
Publish progress through the currently bound task context.
- litestar_queues.events.context.require_current_task_context() TaskExecutionContext[source]¶
Return the current task context or raise if none is bound.
- Raises:
RuntimeError – If no task context is bound.
Channels¶
Channel naming helpers for queue events.
- class litestar_queues.events.channels.QueueChannels[source]¶
Bases:
objectCanonical channel name factories for queue event scopes.
- classmethod task(task_id: str, *, topic: str = 'events') str[source]¶
Return the channel for task-scoped events.
- classmethod queue(queue: str, *, topic: str = 'events') str[source]¶
Return the channel for queue-scoped events.
- classmethod worker(worker_id: str, *, topic: str = 'events') str[source]¶
Return the channel for worker-scoped events.
Sinks¶
Queue event sink protocols and core implementations.
- class litestar_queues.events.sinks.InMemoryQueueEventSink[source]¶
Bases:
objectIn-process event sink for tests, examples, and local demos.
- property events: list[QueueEvent]¶
Published events in publish order.
- events_for(channel: str) list[QueueEvent][source]¶
Return events published to a channel.
- async publish(event: QueueEvent, *, channels: Sequence[str]) None[source]¶
Store an event in process.
Litestar Channels¶
Litestar Channels helpers for queue events.
- class litestar_queues.events.litestar.ChannelsQueueEventSink(channels_backend: object)[source]¶
Bases:
objectEvent sink that publishes to an app-owned Litestar Channels object.
- async publish(event: QueueEvent, *, channels: Sequence[str]) None[source]¶
Publish an event to Litestar Channels.
- async litestar_queues.events.litestar.stream_queue_events(socket: Any, channels: Sequence[str], *, history: int = 0, channels_backend: object | None = None) None[source]¶
Stream queue events from an app-owned Channels subscription to a WebSocket.
The caller owns route paths, guards, tenant filtering, and authorization.
- Raises:
RuntimeError – If no Channels backend or plugin can be resolved.