Tasks¶
Decorate a callable with litestar_queues.task() to register it in the
process task registry:
from litestar_queues import task
@task("reports.render", queue="reports", priority=10, retries=2, timeout=120)
async def render_report(report_id: str) -> str:
return report_id
The decorated object can still be called directly:
value = await render_report("report-1")
Enqueueing¶
Use QueueService.enqueue() from a route handler, service layer, script, or
worker entry point:
result = await queue_service.enqueue(render_report, "report-1")
await result.wait(timeout=30)
TaskResult caches the last known record state. Call refresh() to reload
the record or wait() to poll until it reaches a terminal status.
Per-Enqueue Overrides¶
Task decorator defaults can be overridden for one enqueue call:
result = await queue_service.enqueue(
render_report,
"report-1",
queue="slow-reports",
priority=1,
retries=5,
timeout=600,
run_after=30,
execution_backend="cloudrun",
execution_profile="heavy",
metadata={"requested_by": "user-123"},
)
Use Task.using() when a configured copy is easier to pass around:
heavy_render = render_report.using(execution_backend="cloudrun", execution_profile="heavy")
await queue_service.enqueue(heavy_render, "report-1")
Deduplication Keys¶
Pass key= to deduplicate active work. Queue backends should reuse or replace
records according to the backend contract instead of creating duplicate active
records for the same key:
await queue_service.enqueue(render_report, "report-1", key="report:report-1")
Task Context¶
When a task accepts _job_id or _task_context, the worker injects those
values during queued execution:
from litestar_queues.events import TaskExecutionContext
@task("imports.run")
async def run_import(path: str, _job_id: object, _task_context: TaskExecutionContext) -> None:
await _task_context.progress(current=1, total=10, message=f"Started {_job_id}")
You can also publish through the helper functions while a task context is bound:
from litestar_queues.events import publish_task_log, publish_task_progress
@task("imports.process")
async def process_import(path: str) -> None:
await publish_task_log("Import started")
await publish_task_progress(current=5, total=10)
Retries¶
Unhandled exceptions retry until max_retries is exhausted. Raise
NonRetryableError or use the non_retryable helper when a failure should
move directly to the terminal failed state.
Background Tasks¶
Litestar can run a small function after it sends the HTTP response. Use this when the route should return first and enqueue the queue job second.
This only saves the job in the queue. Your queue settings decide when the task runs.
If the job must be saved before the response is sent, enqueue inside the route instead:
result = await queue_service.enqueue(background_process, 42)
return {"task_id": str(result.id)}
Native BackgroundTask¶
Pass the task’s enqueue method to Litestar’s BackgroundTask:
from litestar import post, Response
from litestar.background_tasks import BackgroundTask
from litestar_queues import task
@task("tasks.background_process")
async def background_process(value: int) -> None:
pass
@post("/trigger")
async def trigger() -> Response[dict[str, str]]:
return Response(
{"status": "queued"},
background=BackgroundTask(background_process.enqueue, 42)
)
QueuedBackgroundTask Helper¶
Use QueuedBackgroundTask when you want to pass the queue task itself. It
finds the app’s active QueueService for you.
QueuePlugin must be registered on the app before you use this helper.
from litestar import post, Response
from litestar_queues import QueuedBackgroundTask, task
@task("tasks.background_process")
async def background_process(value: int) -> None:
pass
@post("/trigger")
async def trigger() -> Response[dict[str, str]]:
return Response(
{"status": "queued"},
background=QueuedBackgroundTask(background_process, 42)
)
If the route already receives QueueService from Litestar, pass it to the
helper:
from litestar.di import NamedDependency
@post("/trigger")
async def trigger(queue_service: NamedDependency[QueueService]) -> Response[dict[str, str]]:
return Response(
{"status": "queued"},
background=QueuedBackgroundTask(background_process, 42, service=queue_service)
)
Which Form To Use¶
Use await queue_service.enqueue(...) when the job must be saved before the
route returns.
Use BackgroundTask(background_process.enqueue, ...) when you want Litestar’s
standard background task behavior.
Use QueuedBackgroundTask(background_process, ...) when you want a short
helper that finds QueueService for you.