Quickstart

Register a task with litestar_queues.task():

from litestar_queues import task


@task("accounts.sync", queue="accounts", retries=3, timeout=300)
async def sync_account(account_id: str) -> dict[str, str]:
    return {"account_id": account_id, "status": "synced"}

Attach the queue plugin to a Litestar application:

from litestar import Litestar, post
from litestar.di import NamedDependency
from litestar_queues import QueueConfig, QueuePlugin, QueueService


@post("/accounts/{account_id:str}/sync")
async def create_task(account_id: str, queue_service: NamedDependency[QueueService]) -> dict[str, str]:
    result = await queue_service.enqueue(sync_account, account_id)
    return {"task_id": str(result.id), "status": result.status or "queued"}


app = Litestar(route_handlers=[create_task], plugins=[QueuePlugin(config=QueueConfig())])

The default configuration runs a worker inside the Litestar application process. That keeps local and lightweight deployments simple. For heavier deployments, use a shared queue backend, disable the in-app worker in the web process, and run workers separately:

config = QueueConfig(in_app_worker=False)
$ LITESTAR_APP=app.asgi:app litestar queues run --drain-timeout 30

Next Steps

  • Configure runtime settings in Configuration.

  • Add retries, keys, metadata, and execution overrides in Tasks.

  • Choose a persistent backend in Backends.