Source code for litestar_queues.backends.valkey.backend

"""Valkey queue backend.

The Valkey wire protocol is API-compatible with Redis, so this backend
inherits the full ``RedisQueueBackend`` implementation and only overrides
the client factory (uses ``valkey.asyncio`` instead of ``redis.asyncio``)
plus the ``_backend_name`` ClassVar that drives lock-name error messages
and the ``valkey-pubsub`` notification capability label.
"""

from typing import TYPE_CHECKING, Any, ClassVar, cast

from valkey import asyncio as valkey_asyncio

from litestar_queues.backends.redis.backend import RedisQueueBackend
from litestar_queues.backends.valkey.config import ValkeyBackendConfig

if TYPE_CHECKING:
    from litestar_queues.config import QueueConfig

__all__ = ("ValkeyQueueBackend",)


[docs] class ValkeyQueueBackend(RedisQueueBackend): """Valkey-backed queue backend.""" _backend_name: "ClassVar[str]" = "valkey"
[docs] def __init__( self, config: "QueueConfig | None" = None, *, backend_config: "ValkeyBackendConfig | None" = None ) -> "None": backend_config = backend_config or ValkeyBackendConfig() super().__init__(config=config, backend_config=cast("Any", backend_config))
def _create_client(self, url: "str") -> "Any": from_url = cast("Any", valkey_asyncio.from_url) return from_url(url, decode_responses=True)