from typing import TYPE_CHECKING
from typing_extensions import Self
if TYPE_CHECKING:
from types import TracebackType
from litestar_queues.config import QueueConfig
from litestar_queues.models import QueuedTaskRecord
from litestar_queues.service import QueueService
__all__ = ("BaseExecutionBackend",)
[docs]
class BaseExecutionBackend:
"""Base class for queue execution backends."""
__slots__ = ("config",)
[docs]
def __init__(self, config: "QueueConfig | None" = None) -> "None":
"""Initialize the execution backend."""
self.config = config
@property
def is_external(self) -> "bool":
"""Whether this backend dispatches records to another process."""
return False
[docs]
async def open(self) -> "bool":
"""Open execution resources.
Returns:
True when resources are ready.
"""
return True
[docs]
async def close(self) -> "None":
"""Close execution resources."""
[docs]
async def execute(
self, service: "QueueService", record: "QueuedTaskRecord", *, worker_id: "str | None" = None
) -> "QueuedTaskRecord":
"""Execute a queue record."""
raise NotImplementedError
[docs]
async def dispatch(self, service: "QueueService", record: "QueuedTaskRecord") -> "str | None":
"""Dispatch a queue record to an external executor.
Returns:
The external execution reference, if one was created.
"""
await self.execute(service, record)
return record.execution_ref
[docs]
async def reconcile(self, service: "QueueService", record: "QueuedTaskRecord") -> "QueuedTaskRecord | None":
"""Reconcile an externally running queue record.
Returns:
The updated record when reconciliation changes state.
"""
return None
[docs]
async def cancel(self, service: "QueueService", record: "QueuedTaskRecord") -> "bool":
"""Cancel an externally running queue record if possible.
Returns:
True when cancellation succeeds.
"""
return False
async def __aenter__(self) -> "Self":
await self.open()
return self
async def __aexit__(
self,
exc_type: "type[BaseException] | None", # noqa: PYI036
exc_val: "BaseException | None", # noqa: PYI036
exc_tb: "TracebackType | None", # noqa: PYI036
) -> "None":
await self.close()