Plugin API

This module contains the main plugin class for the Litestar MCP Plugin.

LitestarMCP

class litestar_mcp.plugin.LitestarMCP[source]

Bases: InitPluginProtocol, CLIPlugin

Litestar plugin for Model Context Protocol integration.

The main plugin class that implements litestar.plugins.InitPluginProtocol. It discovers routes marked with mcp_tool or mcp_resource in their opt dictionary and exposes them through the MCP Streamable HTTP transport surface.

__init__(config=None)[source]

Initialize the MCP plugin.

property config: MCPConfig

Get the plugin configuration.

property registry: Registry

Get the central registry.

property discovered_tools: dict[str, BaseRouteHandler]

Get discovered MCP tools.

property discovered_resources: dict[str, BaseRouteHandler]

Get discovered MCP resources.

on_cli_init(cli)[source]

Configure CLI commands for MCP operations.

Return type:

None

on_app_init(app_config)[source]

Initialize the MCP integration when the Litestar app starts.

Return type:

AppConfig

on_startup(app)[source]

Perform discovery after app is fully initialized and routes are built.

Return type:

None

Registry

class litestar_mcp.registry.Registry[source]

Bases: object

Central registry for MCP tools and resources.

This class decouples metadata storage and discovery from the route handlers themselves, avoiding issues with __slots__ or object mutation.

__init__()[source]

Initialize the registry.

set_sse_manager(manager)[source]

Set the SSE manager for notifications.

Return type:

None

property sse_manager: SSEManager

Return the configured SSE manager.

property tools: dict[str, BaseRouteHandler]

Get registered tools.

property resources: dict[str, BaseRouteHandler]

Get registered resources.

register_tool(name, handler)[source]

Register a tool.

Parameters:
Return type:

None

register_resource(name, handler)[source]

Register a resource.

Parameters:
Return type:

None

property templates: dict[str, ResourceTemplate]

Get registered resource templates, keyed by resource name.

register_resource_template(name, handler, template)[source]

Register an RFC 6570 Level 1 URI template for a resource.

Parameters:
  • name (str) -- The resource name (same key as register_resource).

  • handler (BaseRouteHandler) -- The route handler bound to the template.

  • template (str) -- The URI template string. Validated at registration; invalid templates raise ValueError.

Return type:

None

async publish_notification(method, params, session_id=None)[source]

Publish a JSON-RPC 2.0 notification to connected clients.

Parameters:
  • method (str) -- The notification method (e.g., 'notifications/resources/updated').

  • params (dict[str, Any]) -- The notification parameters.

  • session_id (Optional[str]) -- Optional session to target; when omitted the notification fans out to every attached session.

Return type:

None

async notify_resource_updated(uri)[source]

Notify clients that a resource has been updated.

Parameters:

uri (str) -- The URI of the updated resource.

Return type:

None

async notify_tools_list_changed()[source]

Notify clients that the tool list has changed.

Return type:

None

SSEManager

class litestar_mcp.sse.SSEManager[source]

Bases: object

Manages Streamable HTTP SSE connections and message delivery.

The manager keeps one _StreamState per open stream and a side index from session_id to the set of stream ids currently attached to that session, so notifications can be fanned out to every stream belonging to a given MCP session.

__init__(*, max_streams=10000, max_idle_seconds=3600.0)[source]

Initialize the SSE manager.

Parameters:
  • max_streams (int) -- Hard cap on concurrent open streams. Excess raises StreamLimitExceeded.

  • max_idle_seconds (float) -- Idle window (seconds) after which a stream is eligible for lazy pruning on the next open_stream.

async open_stream(session_id=None, last_event_id=None)[source]

Open a new stream (or resume from last_event_id).

Parameters:
  • session_id (Optional[str]) -- Optional session id to bind this stream to. When provided, the manager updates the session→streams index so publish() can fan out to all streams belonging to the session.

  • last_event_id (Optional[str]) -- Optional Last-Event-ID value. On a match, pending messages from the existing stream's history are replayed before normal delivery resumes.

Return type:

tuple[str, AsyncGenerator[SSEMessage, None]]

Returns:

A (stream_id, async_generator) pair.

disconnect(stream_id)[source]

Explicitly remove a stream and its buffered state.

Return type:

None

async enqueue(stream_id, message)[source]

Enqueue a raw JSON payload onto a single stream.

Return type:

None

async publish(message, session_id=None)[source]

Publish a JSON payload to one or all sessions.

When session_id is provided the message fans out to every stream attached to that session; otherwise it fans out to every stream attached to any session.

Return type:

None

async replay_from(stream_id, last_event_id)[source]

Return buffered messages after last_event_id for a stream.

Return type:

list[SSEMessage]

close_session_streams(session_id)[source]

Close every stream attached to session_id. Returns closed ids.

Return type:

list[str]

SSEMessage

class litestar_mcp.sse.SSEMessage[source]

Bases: object

Represents a single SSE message.

__init__(data, event='message', id=None)