Source code for litestar_queues.plugin

import asyncio
import contextlib
from datetime import timedelta
from typing import TYPE_CHECKING, Any, cast

from litestar.plugins import InitPlugin

from litestar_queues.config import QueueConfig
from litestar_queues.service import QueueService
from litestar_queues.task import load_task_modules, set_default_service
from litestar_queues.worker import Worker

if TYPE_CHECKING:
    from litestar import Litestar
    from litestar.config.app import AppConfig
    from litestar.datastructures import State

    from litestar_queues.backends import BaseQueueBackend
    from litestar_queues.events import QueueEventPublisher

__all__ = ("QueuePlugin",)


[docs] class QueuePlugin(InitPlugin): """Litestar plugin for queue service dependency registration and lifecycle.""" __slots__ = ("_config", "_event_publisher", "_queue_backend", "_service", "_worker", "_worker_task")
[docs] def __init__(self, config: "QueueConfig | None" = None) -> "None": """Initialize the queue plugin.""" self._config = config or QueueConfig() self._service: "QueueService | None" = None self._queue_backend: "BaseQueueBackend | None" = None self._event_publisher: "QueueEventPublisher | None" = None self._worker: "Worker | None" = None self._worker_task: "asyncio.Task[None] | None" = None
@property def config(self) -> "QueueConfig": """Plugin configuration.""" return self._config
[docs] def get_service(self, state: "State | None" = None) -> "QueueService": """Return a QueueService for this plugin.""" if self._service is not None: return self._service return QueueService(self._config, queue_backend=self._queue_backend, event_publisher=self._event_publisher)
[docs] def on_app_init(self, app_config: "AppConfig") -> "AppConfig": """Register queue dependencies, signature namespace, state, and lifecycle hooks. Returns: The updated application configuration. """ self._queue_backend = self._config.get_queue_backend() self._event_publisher = self._config.get_event_publisher() app_config.dependencies.update(self._config.dependencies) app_config.signature_namespace.update(self._config.signature_namespace) state = { self._config.queue_service_state_key: self._config, self._config.queue_event_publisher_state_key: self._event_publisher, } if self._config.event_config.channels_backend is not None: state[self._config.queue_event_channels_backend_state_key] = self._config.event_config.channels_backend app_config.state.update(state) app_config.on_startup.append(self._on_startup) app_config.on_shutdown.append(self._on_shutdown) return app_config
[docs] def on_cli_init(self, cli: "object") -> "None": """Attach the ``queues`` subcommand group to the Litestar CLI. Args: cli: The root ``click.Group`` of the Litestar CLI. Typed as :class:`object` so importing this module does not pull ``click`` into ``sys.modules`` — Litestar's :class:`~litestar.plugins.CLIPluginProtocol` enforces the runtime type. """ from litestar_queues._cli import register register(cast("Any", cli))
async def _on_startup(self, app: "Litestar") -> "None": if self._config.task_modules: load_task_modules(self._config.task_modules) self._service = QueueService( self._config, queue_backend=self._queue_backend, event_publisher=self._event_publisher ) await self._service.open() set_default_service(self._service) app.state[self._config.queue_service_state_key] = self._service app.state[self._config.queue_event_publisher_state_key] = self._service.get_event_publisher() if self._config.event_config.channels_backend is not None: app.state[self._config.queue_event_channels_backend_state_key] = self._config.event_config.channels_backend if self._config.initialize_schedules: await self._service.initialize_schedules() if self._config.in_app_worker: self._worker = Worker( self._service, batch_size=self._config.worker_batch_size, poll_interval=self._config.worker_poll_interval, max_concurrency=self._config.worker_max_concurrency, heartbeat_interval=self._config.worker_heartbeat_interval, reconcile_interval=self._config.worker_reconcile_interval, stale_after=( timedelta(seconds=self._config.worker_stale_after) if self._config.worker_stale_after is not None else None ), stale_check_interval=self._config.worker_stale_check_interval, graceful_shutdown_timeout=self._config.worker_graceful_shutdown_timeout, final_cancel_timeout=self._config.worker_final_cancel_timeout, queues=self._config.worker_queues, ) self._worker_task = asyncio.create_task(self._worker.start()) await asyncio.sleep(0) app.state[self._config.queue_worker_state_key] = self._worker async def _on_shutdown(self, app: "Litestar") -> "None": if self._worker is not None: await self._worker.stop() if self._worker_task is not None: with contextlib.suppress(asyncio.CancelledError): await self._worker_task self._worker_task = None if self._service is not None: set_default_service(None) await self._service.close() self._service = None