Source code for litestar_queues.execution.immediate

from typing import TYPE_CHECKING

from litestar_queues.execution.base import BaseExecutionBackend

if TYPE_CHECKING:
    from litestar_queues.models import QueuedTaskRecord
    from litestar_queues.service import QueueService

__all__ = ("ImmediateExecutionBackend",)


[docs] class ImmediateExecutionBackend(BaseExecutionBackend): """Execution backend that runs records inline.""" __slots__ = ()
[docs] async def execute( self, service: "QueueService", record: "QueuedTaskRecord", *, worker_id: "str | None" = None ) -> "QueuedTaskRecord": """Execute a task immediately in the current event loop. Returns: The updated queue record. """ return await service.execute_record(record, worker_id=worker_id)