Source code for litestar_queues.events.sinks

"""Queue event sink protocols and core implementations."""

import asyncio
from collections import defaultdict
from typing import TYPE_CHECKING, Protocol

if TYPE_CHECKING:
    from collections.abc import Sequence

    from litestar_queues.events.models import QueueEvent

__all__ = ("InMemoryQueueEventSink", "NoopQueueEventSink", "QueueEventSink")


[docs] class QueueEventSink(Protocol): """Transport boundary for queue event delivery."""
[docs] async def publish(self, event: "QueueEvent", *, channels: "Sequence[str]") -> "None": """Publish an event to the requested channels."""
[docs] class NoopQueueEventSink: """Event sink that accepts events and drops them.""" __slots__ = ()
[docs] async def publish(self, event: "QueueEvent", *, channels: "Sequence[str]") -> "None": """Drop an event publish."""
[docs] class InMemoryQueueEventSink: """In-process event sink for tests, examples, and local demos.""" __slots__ = ("_channel_events", "_lock", "_published")
[docs] def __init__(self) -> "None": self._published: "list[tuple[QueueEvent, tuple[str, ...]]]" = [] self._channel_events: "defaultdict[str, list[QueueEvent]]" = defaultdict(list) self._lock = asyncio.Lock()
@property def events(self) -> "list[QueueEvent]": """Published events in publish order.""" return [event for event, _ in self._published] @property def published(self) -> "list[tuple[QueueEvent, tuple[str, ...]]]": """Published events with their channels.""" return list(self._published)
[docs] def events_for(self, channel: "str") -> "list[QueueEvent]": """Return events published to a channel.""" return list(self._channel_events.get(channel, []))
[docs] async def publish(self, event: "QueueEvent", *, channels: "Sequence[str]") -> "None": """Store an event in process.""" channel_tuple = tuple(channels) async with self._lock: self._published.append((event, channel_tuple)) for channel in channel_tuple: self._channel_events[channel].append(event)