# ruff: noqa: PYI034
"""Utilities shared across the litestar-mcp package.
This module is the single home for handler-introspection helpers
(``get_handler_function``), discovery filtering (``should_include_handler``),
MCP metadata decorators (``@mcp_tool``, ``@mcp_resource``, ``@mcp_prompt``,
``get_mcp_metadata``, ``MetadataRegistry``), LLM-facing description
rendering (``render_description``, ``extract_description_sources``,
``DescriptionSources``), and the RFC 6570 Level 1 URI template helpers
(``parse_template``, ``match_uri``, ``expand_template``). Before v0.5.0
these lived in separate modules (``filters.py``, ``decorators.py``,
``_descriptions.py``, ``_uri_template.py``); Ch5 of the v0.5.0 roadmap
flattens them into this single module.
"""
from __future__ import annotations
import re
from collections.abc import Callable
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Literal, Optional, TypeVar
if TYPE_CHECKING:
from litestar.handlers import BaseRouteHandler
from litestar_mcp.config import MCPConfig, MCPOptKeys
F = TypeVar("F", bound=Callable[..., Any])
Kind = Literal["tool", "resource", "prompt"]
_STRUCTURED_FIELDS: tuple[str, str, str] = ("when_to_use", "returns", "agent_instructions")
_VAR_RE = re.compile(r"\{([A-Za-z_][A-Za-z0-9_]*)\}")
_IDENT_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
# ---------------------------------------------------------------------------
# Public MetadataRegistry and DescriptionSources
# ---------------------------------------------------------------------------
class MetadataRegistry:
"""Singleton registry for MCP metadata using qualnames as keys."""
_instance: MetadataRegistry | None = None
_data: dict[str, dict[str, Any]]
def __new__(cls) -> MetadataRegistry:
if cls._instance is None:
inst = super().__new__(cls)
inst._data = {}
cls._instance = inst
return cls._instance
def set(self, obj: Any, value: dict[str, Any]) -> None:
key = self._get_key(obj)
self._data[key] = value
def get(self, obj: Any) -> dict[str, Any] | None:
key = self._get_key(obj)
return self._data.get(key) # pyright: ignore[reportReturnType]
def _get_key(self, obj: Any) -> str:
target = obj
if hasattr(obj, "fn"):
target = obj.fn
if hasattr(target, "value"):
target = target.value
if hasattr(target, "__func__"):
target = target.__func__
if hasattr(target, "__wrapped__"):
target = target.__wrapped__
module = getattr(target, "__module__", "unknown")
qualname = getattr(target, "__qualname__", "unknown")
return f"{module}.{qualname}"
_REGISTRY = MetadataRegistry()
@dataclass(frozen=True)
class DescriptionSources:
"""Resolved description fields for a handler.
Attributes:
description: The primary LLM-facing description (always set).
when_to_use: Optional ``## When to use`` section.
returns: Optional ``## Returns`` section.
agent_instructions: Optional ``## Instructions`` section.
"""
description: str
when_to_use: str | None
returns: str | None
agent_instructions: str | None
# ---------------------------------------------------------------------------
# Public functions
# ---------------------------------------------------------------------------
def get_handler_function(handler: BaseRouteHandler) -> Callable[..., Any]:
"""Extract the actual function from a handler.
Litestar wraps functions in AnyCallable containers with .value attribute.
Dishka-injected handlers also wrap the original function and expose it via
``__dishka_orig_func__``. MCP execution needs the original callable
signature so dependency injection hooks can see the actual handler
parameters instead of Dishka's synthetic ``request`` wrapper.
Args:
handler: The Litestar route handler.
Returns:
The underlying callable function.
"""
fn = handler.fn
resolved = getattr(fn, "value", fn)
return getattr(resolved, "__dishka_orig_func__", resolved)
def should_include_handler(name: str, tags: set[str], config: MCPConfig) -> bool:
"""Determine whether a handler should be included based on config filters.
Precedence: exclude > include; tags > operations.
Args:
name: The handler/tool name.
tags: Set of tags associated with the handler.
config: MCP configuration with filter fields.
Returns:
True if the handler should be included, False otherwise.
"""
if config.exclude_tags and tags & set(config.exclude_tags):
return False
if config.include_tags and not (tags & set(config.include_tags)):
return False
if config.exclude_operations and name in config.exclude_operations:
return False
return not (config.include_operations and name not in config.include_operations)
[docs]
def mcp_resource(
name: str,
*,
uri_template: str | None = None,
description: str | None = None,
agent_instructions: str | None = None,
when_to_use: str | None = None,
returns: str | None = None,
) -> Callable[[F], F]:
"""Decorator to mark a route handler as an MCP resource.
Args:
name: The name of the MCP resource.
uri_template: Optional RFC 6570 Level 1 URI template
(e.g. ``"app://workspaces/{workspace_id}/files/{file_id}"``).
Concrete URIs matching the template dispatch to this handler
with extracted variables passed as kwargs.
description: LLM-facing description. Overrides ``fn.__doc__``.
agent_instructions: Mandatory-context block rendered in the
``## Instructions`` section.
when_to_use: Optional structured hint rendered as the
``## When to use`` section.
returns: Optional return-shape hint rendered as the ``## Returns``
section.
Returns:
Decorator function that adds MCP metadata to the handler.
Example:
Pass MCP metadata straight through to the route decorator —
Litestar funnels unknown kwargs into ``handler.opt``:
```python
@get("/config", mcp_resource="app_config")
async def get_config() -> dict:
return {"debug": True}
@get(
"/workspaces/{workspace_id:str}/files/{file_id:str}",
mcp_resource="workspace_file",
mcp_resource_template="app://workspaces/{workspace_id}/files/{file_id}",
)
async def read_workspace_file(workspace_id: str, file_id: str) -> dict:
...
```
The decorator form is retained for parity; it carries the same
metadata to the registry.
"""
if uri_template is not None:
parse_template(uri_template)
def decorator(fn: F) -> F:
metadata: dict[str, Any] = {"type": "resource", "name": name}
if uri_template is not None:
metadata["resource_template"] = uri_template
if description is not None:
metadata["description"] = description
if agent_instructions is not None:
metadata["agent_instructions"] = agent_instructions
if when_to_use is not None:
metadata["when_to_use"] = when_to_use
if returns is not None:
metadata["returns"] = returns
_REGISTRY.set(fn, metadata)
return fn
return decorator
[docs]
def mcp_prompt(
name: str,
*,
title: str | None = None,
description: str | None = None,
arguments: list[dict[str, Any]] | None = None,
icons: list[dict[str, Any]] | None = None,
) -> Callable[[F], F]:
r"""Decorator to mark a callable as an MCP prompt template.
Prompt functions take keyword arguments matching the declared prompt
arguments and return prompt messages. The return value is normalised
to a list of ``PromptMessage`` dicts:
* ``str`` → single ``{"role": "user", "content": {"type": "text", "text": ...}}``
* ``dict`` → treated as a single message and wrapped in a list
* ``list[dict]`` → used directly
* Any other type → ``str(result)`` wrapped as a single user text message
Both sync and async callables are supported.
Args:
name: Unique identifier for the prompt (used in ``prompts/get``).
title: Optional human-readable display name for UI clients.
description: LLM-facing description. When omitted, ``fn.__doc__``
is used as the fallback during registration.
arguments: Optional explicit argument definitions — each entry is a
dict with ``name`` (required), and optional ``title``,
``description``, and ``required`` keys. ``title`` is the
human-readable display name from the MCP ``BaseMetadata`` mixin
(a hint for UI clients; not used for matching). When omitted,
the argument list is derived automatically from the decorated
function's signature and Google-style docstring — note that
signature introspection cannot infer ``title``, so set it
explicitly when needed.
icons: Optional list of icon objects for UI display. Each entry is a
dict with ``src`` (URL), ``mimeType``, and optionally ``sizes``
per the MCP spec.
Returns:
Decorator function that adds MCP metadata to the callable.
Example:
Standalone prompt function registered via
``LitestarMCP(prompts=[summarize_text])``:
```python
@mcp_prompt(name="summarize", description="Summarise a document.")
async def summarize_text(text: str, style: str = "concise") -> str:
return f"Please summarise the following in a {style} style:\\n\\n{text}"
```
"""
def decorator(fn: F) -> F:
metadata: dict[str, Any] = {"type": "prompt", "name": name}
if title is not None:
metadata["title"] = title
if description is not None:
metadata["description"] = description
if arguments is not None:
metadata["arguments"] = arguments
if icons is not None:
metadata["icons"] = icons
_REGISTRY.set(fn, metadata)
return fn
return decorator
def get_mcp_metadata(obj: Any) -> dict[str, Any] | None:
"""Get MCP metadata for an object if it exists.
Args:
obj: Object to check for MCP metadata.
Returns:
MCP metadata dictionary or None if not present.
"""
return _REGISTRY.get(obj)
def extract_description_sources(
handler: Any,
fn: Any,
*,
kind: Kind,
fallback_name: str,
opt_keys: MCPOptKeys | None = None,
) -> DescriptionSources:
"""Resolve every description field for a handler."""
keys = opt_keys if opt_keys is not None else _default_opt_keys()
description = _read_field(handler, fn, "description", kind, keys)
if description is None:
doc = _clean(getattr(fn, "__doc__", None))
description = doc if doc is not None else f"{kind.title()}: {fallback_name}"
return DescriptionSources(
description=description,
when_to_use=_read_field(handler, fn, "when_to_use", kind, keys),
returns=_read_field(handler, fn, "returns", kind, keys),
agent_instructions=_read_field(handler, fn, "agent_instructions", kind, keys),
)
def render_description(
handler: Any,
fn: Any,
*,
kind: Kind,
fallback_name: str,
structured: bool = True,
opt_keys: MCPOptKeys | None = None,
) -> str:
"""Render the final description string for a handler."""
sources = extract_description_sources(handler, fn, kind=kind, fallback_name=fallback_name, opt_keys=opt_keys)
if not structured:
return sources.description
sections: list[str] = [sources.description]
if sources.when_to_use:
sections.append(f"## When to use\n{sources.when_to_use}")
if sources.returns:
sections.append(f"## Returns\n{sources.returns}")
if sources.agent_instructions:
sections.append(f"## Instructions\n{sources.agent_instructions}")
return "\n\n".join(sections)
def parse_template(template: str) -> list[Segment]:
"""Parse ``template`` into alternating literal + variable segments."""
if template.count("{") != template.count("}"):
msg = f"Unbalanced braces in template: {template!r}"
raise ValueError(msg)
segments: list[Segment] = []
pos = 0
for match in _VAR_RE.finditer(template):
if match.start() > pos:
segments.append(_Literal(template[pos : match.start()]))
segments.append(_Variable(match.group(1)))
pos = match.end()
if pos < len(template):
segments.append(_Literal(template[pos:]))
for seg in segments:
if isinstance(seg, _Literal) and ("{" in seg.text or "}" in seg.text):
msg = f"Invalid variable in template: {template!r}"
raise ValueError(msg)
if not segments:
msg = f"Empty template: {template!r}"
raise ValueError(msg)
return segments
def match_uri(template: str, uri: str) -> dict[str, str] | None:
"""Match ``uri`` against ``template`` and extract variable values."""
segments = parse_template(template)
values: dict[str, str] = {}
remaining = uri
for i, seg in enumerate(segments):
if isinstance(seg, _Literal):
if not remaining.startswith(seg.text):
return None
remaining = remaining[len(seg.text) :]
continue
next_literal: str | None = next(
(s.text for s in segments[i + 1 :] if isinstance(s, _Literal)),
None,
)
if next_literal is None:
value, rest = remaining, ""
else:
idx = remaining.find(next_literal)
if idx < 0:
return None
value, rest = remaining[:idx], remaining[idx:]
if not value or "/" in value:
return None
values[seg.name] = value
remaining = rest
if remaining:
return None
return values
def expand_template(template: str, values: dict[str, str]) -> str:
"""Substitute ``{var}`` placeholders with values from ``values``."""
for name in values:
if not _IDENT_RE.match(name):
msg = f"Invalid variable name: {name!r}"
raise ValueError(msg)
segments = parse_template(template)
parts: list[str] = []
for seg in segments:
if isinstance(seg, _Literal):
parts.append(seg.text)
else:
parts.append(values[seg.name])
return "".join(parts)
# ---------------------------------------------------------------------------
# Private helper functions and classes
# ---------------------------------------------------------------------------
def _clean(value: Any) -> str | None:
if isinstance(value, str):
stripped = value.strip()
if stripped:
return stripped
return None
def _default_opt_keys() -> MCPOptKeys:
"""Return a default :class:`MCPOptKeys` without creating an import cycle."""
from litestar_mcp.config import MCPOptKeys
return MCPOptKeys()
def _read_field(
handler: Any,
fn: Any,
field_name: str,
kind: Kind,
opt_keys: MCPOptKeys,
) -> str | None:
opt = getattr(handler, "opt", None) or {}
opt_value = _clean(opt.get(opt_keys.for_field(field_name, kind)))
if opt_value is not None:
return opt_value
metadata = get_mcp_metadata(handler) or get_mcp_metadata(fn) or {}
return _clean(metadata.get(field_name))
@dataclass(frozen=True, slots=True)
class _Variable:
name: str
@dataclass(frozen=True, slots=True)
class _Literal:
text: str
Segment = _Variable | _Literal
__all__ = (
"DescriptionSources",
"MetadataRegistry",
"expand_template",
"extract_description_sources",
"get_handler_function",
"get_mcp_metadata",
"match_uri",
"mcp_prompt",
"mcp_resource",
"mcp_tool",
"parse_template",
"render_description",
"should_include_handler",
)