Low-Level Server
For more control, you can use the low-level server implementation directly. This gives you full access to the protocol and allows you to customize every aspect of your server, including lifecycle management through the lifespan API.
Lifespan
"""
Run from the repository root:
uv run examples/snippets/servers/lowlevel/lifespan.py
"""
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Any
import mcp.server.stdio
import mcp.types as types
from mcp.server.lowlevel import NotificationOptions, Server
from mcp.server.models import InitializationOptions
# Mock database class for example
class Database:
"""Mock database class for example."""
@classmethod
async def connect(cls) -> "Database":
"""Connect to database."""
print("Database connected")
return cls()
async def disconnect(self) -> None:
"""Disconnect from database."""
print("Database disconnected")
async def query(self, query_str: str) -> list[dict[str, str]]:
"""Execute a query."""
# Simulate database query
return [{"id": "1", "name": "Example", "query": query_str}]
@asynccontextmanager
async def server_lifespan(_server: Server) -> AsyncIterator[dict[str, Any]]:
"""Manage server startup and shutdown lifecycle."""
# Initialize resources on startup
db = await Database.connect()
try:
yield {"db": db}
finally:
# Clean up on shutdown
await db.disconnect()
# Pass lifespan to server
server = Server("example-server", lifespan=server_lifespan)
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""List available tools."""
return [
types.Tool(
name="query_db",
description="Query the database",
inputSchema={
"type": "object",
"properties": {"query": {"type": "string", "description": "SQL query to execute"}},
"required": ["query"],
},
)
]
@server.call_tool()
async def query_db(name: str, arguments: dict[str, Any]) -> list[types.TextContent]:
"""Handle database query tool call."""
if name != "query_db":
raise ValueError(f"Unknown tool: {name}")
# Access lifespan context
ctx = server.request_context
db = ctx.lifespan_context["db"]
# Execute query
results = await db.query(arguments["query"])
return [types.TextContent(type="text", text=f"Query results: {results}")]
async def run():
"""Run the server with lifespan management."""
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="example-server",
server_version="0.1.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
if __name__ == "__main__":
import asyncio
asyncio.run(run())
Full example: examples/snippets/servers/lowlevel/lifespan.py
The lifespan API provides:
- A way to initialize resources when the server starts and clean them up when it stops
- Access to initialized resources through the request context in handlers
- Type-safe context passing between lifespan and request handlers
Basic Example
"""
Run from the repository root:
uv run examples/snippets/servers/lowlevel/basic.py
"""
import asyncio
import mcp.server.stdio
import mcp.types as types
from mcp.server.lowlevel import NotificationOptions, Server
from mcp.server.models import InitializationOptions
# Create a server instance
server = Server("example-server")
@server.list_prompts()
async def handle_list_prompts() -> list[types.Prompt]:
"""List available prompts."""
return [
types.Prompt(
name="example-prompt",
description="An example prompt template",
arguments=[types.PromptArgument(name="arg1", description="Example argument", required=True)],
)
]
@server.get_prompt()
async def handle_get_prompt(name: str, arguments: dict[str, str] | None) -> types.GetPromptResult:
"""Get a specific prompt by name."""
if name != "example-prompt":
raise ValueError(f"Unknown prompt: {name}")
arg1_value = (arguments or {}).get("arg1", "default")
return types.GetPromptResult(
description="Example prompt",
messages=[
types.PromptMessage(
role="user",
content=types.TextContent(type="text", text=f"Example prompt text with argument: {arg1_value}"),
)
],
)
async def run():
"""Run the basic low-level server."""
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="example",
server_version="0.1.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
if __name__ == "__main__":
asyncio