模型上下文协议,服务于LLM,调用外部数据。例如:天气数据、数据库数据等等。
完整代码:https://github.com/MrNiebit/hot-topic-mcp
调试服务端也可以使用命令 mcp dev xxx-server.py
import asyncio
import uvicorn
from contextlib import asynccontextmanager
from mcp.server.lowlevel.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from starlette.applications import Starlette
from starlette.routing import Mount
import mcp.types as types
class StreamableHTTPMCPServer:
def __init__(self, name: str = "StreamableHTTPServer"):
self.server = Server(name)
self.session_manager = None
self._setup_handlers()
def _setup_handlers(self):
"""设置 MCP 协议处理器"""
@self.server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
return [
types.Tool(
name="echo",
description="Echo back the input message",
inputSchema={
"type": "object",
"properties": {
"message": {"type": "string"}
},
"required": ["message"]
}
),
types.Tool(
name="add",
description="Add two numbers",
inputSchema={
"type": "object",
"properties": {
"x": {"type": "number"},
"y": {"type": "number"}
},
"required": ["x", "y"]
}
)
]
@self.server.call_tool()
async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent]:
if name == "echo":
message = arguments.get("message", "")
return [types.TextContent(type="text", text=f"Echo: {message}")]
elif name == "add":
x = arguments.get("x", 0)
y = arguments.get("y", 0)
result = x + y
return [types.TextContent(type="text", text=f"Result: {result}")]
raise ValueError(f"Unknown tool: {name}")
@self.server.list_resources()
async def handle_list_resources() -> list[types.Resource]:
return [
types.Resource(
uri="resource://greeting",
name="greeting",
description="A simple greeting resource",
mimeType="text/plain"
),
types.Resource(
uri="resource://status",
name="status",
description="Server status information",
mimeType="application/json"
)
]
@self.server.read_resource()
async def handle_read_resource(uri: str) -> str:
if uri == "resource://greeting":
return "Hello from Streamable HTTP server!"
elif uri == "resource://status":
return '{"status": "running", "transport": "streamable-http"}'
raise ValueError(f"Unknown resource: {uri}")
@self.server.list_prompts()
async def handle_list_prompts() -> list[types.Prompt]:
return [
types.Prompt(
name="analyze",
description="Analyze the given data",
arguments=[
types.PromptArgument(
name="data",
description="Data to analyze",
required=True
)
]
)
]
@self.server.get_prompt()
async def handle_get_prompt(name: str, arguments: dict) -> types.GetPromptResult:
if name == "analyze":
data = arguments.get("data", "")
return types.GetPromptResult(
description="Analysis prompt",
messages=[
types.PromptMessage(
role="user",
content=types.TextContent(
type="text",
text=f"Please analyze this data: {data}"
)
)
]
)
raise ValueError(f"Unknown prompt: {name}")
def create_app(self, stateless: bool = False) -> Starlette:
"""创建 Starlette ASGI 应用"""
# 创建会话管理器
self.session_manager = StreamableHTTPSessionManager(
app=self.server,
event_store=None, # 可以添加事件存储支持
json_response=False,
stateless=stateless
)
# 创建请求处理器
async def streamable_http_handler(scope, receive, send):
await self.session_manager.handle_request(scope, receive, send)
# 创建 Starlette 应用
app = Starlette(
routes=[
Mount("/mcp", app=streamable_http_handler)
],
lifespan=lambda app: self.session_manager.run()
)
return app
async def run_async(self, host: str = "127.0.0.1", port: int = 8000, stateless: bool = False):
"""异步运行服务器"""
app = self.create_app(stateless=stateless)
config = uvicorn.Config(
app,
host=host,
port=port,
log_level="info"
)
server = uvicorn.Server(config)
await server.serve()
def run(self, host: str = "127.0.0.1", port: int = 8000, stateless: bool = False):
"""同步运行服务器"""
asyncio.run(self.run_async(host, port, stateless))
# 使用示例
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Streamable HTTP MCP Server")
parser.add_argument("--host", default="127.0.0.1", help="Host to bind to")
parser.add_argument("--port", type=int, default=8000, help="Port to bind to")
parser.add_argument("--stateless", action="store_true", help="Run in stateless mode")
args = parser.parse_args()
server = StreamableHTTPMCPServer()
print(f"Starting Streamable HTTP MCP server on {args.host}:{args.port}")
print(f"Mode: {'Stateless' if args.stateless else 'Stateful'}")
print(f"Endpoint: http://{args.host}:{args.port}/mcp")
server.run(host=args.host, port=args.port, stateless=args.stateless)
from mcp.server.lowlevel.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
import mcp.types as types
from items.hot_topic_factory import HotTopicFactory
import json
server = Server("hot-topic-mcp")
TOOLS = [
types.Tool(
name="sina_weibo_hot_topic",
description="这是一个用来获取新浪微博热搜的工具",
inputSchema={
"type": "object",
"properties": {
"top": {
"type": "integer",
"description": "热搜的数量",
"default": 10
}
}
}
)
]
TOOLS_NAME_LIST = {tool.name: tool for tool in TOOLS}
@server.list_tools()
async def list_tools() -> list[types.Tool]:
return TOOLS
@server.call_tool()
async def call_tool(name: str, arguments: dict | None) -> list[types.TextContent]:
if name not in TOOLS_NAME_LIST:
raise ValueError("未找到对应的工具")
# print(arguments)
result = HotTopicFactory.get_hot_topics(name, **arguments)
# 这里返回的是列表
return [types.TextContent(
type="text",
text=json.dumps(result, ensure_ascii=False)
)]
async def run():
async with stdio_server() as (read, write):
await server.run(
read_stream=read,
write_stream=write,
initialization_options=InitializationOptions(
server_name="host-topic-mcp",
server_version="2025.03.16",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={}
)
)
)
if __name__ == "__main__":
import asyncio
asyncio.run(run())
pass
import asyncio
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
async def test_client():
async with streamablehttp_client("http://127.0.0.1:8000/mcp") as (read_stream, write_stream, _):
async with ClientSession(read_stream, write_stream) as session:
# 初始化会话
result = await session.initialize()
print(f"Server: {result.serverInfo.name}")
# 调用工具
tool_result = await session.call_tool("echo", {"message": "Hello World"})
print(f"Tool result: {tool_result.content[0].text}")
# 读取资源
resource_result = await session.read_resource("resource://greeting")
print(f"Resource: {resource_result.contents[0].text}")
if __name__ == "__main__":
asyncio.run(test_client())
from mcp import ClientSession, StdioServerParameters, types
from mcp.client.stdio import stdio_client
server_params = StdioServerParameters(
command="python",
args=["../main.py"]
)
async def run():
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
# 初始化
print(await session.initialize())
result = await session.list_tools()
print(result)
result = await session.call_tool(name="sina_weibo_hot_topic", arguments={"top": 5, "type": "all"})
print(result)
if __name__ == "__main__":
import asyncio
asyncio.run(run())
这里使用 Claude Desktop App 测试的,Cherry-Studio、vscode的Cline插件这些也都是支持的。
代码:https://github.com/MrNiebit/llm-mcp-invoke