-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Open
Labels
Description
Initial Checks
- I confirm that I'm using the latest version of MCP Python SDK
- I confirm that I searched for my issue in https://github.com/modelcontextprotocol/python-sdk/issues before opening this issue
Description
I used the following URL code, but a warning message appeared in MCP 1.13.1 version
https://github.com/modelcontextprotocol/python-sdk/blob/main/examples/servers/simple-streamablehttp-stateless/mcp_simple_streamablehttp_stateless/server.py
The warning message is as follows:
2025-08-22 20:39:59,254 - mcp.server.streamable_http - ERROR - Error in message router
Traceback (most recent call last):
File "xx\.venv\Lib\site-packages\mcp\server\streamable_http.py", line 831, in message_router
async for session_message in write_stream_reader:
File xx\.venv\Lib\site-packages\anyio\abc\_streams.py", line 41, in __anext__
return await self.receive()
^^^^^^^^^^^^^^^^^^^^
File "xx\.venv\Lib\site-packages\anyio\streams\memory.py", line 111, in receive
return self.receive_nowait()
^^^^^^^^^^^^^^^^^^^^^
File "xx\.venv\Lib\site-packages\anyio\streams\memory.py", line 93, in receive_nowait
raise ClosedResourceError
anyio.ClosedResourceError
Example Code
import contextlib
import logging
from collections.abc import AsyncIterator
from typing import Any
import anyio
import click
import mcp.types as types
from mcp.server.lowlevel import Server
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from starlette.applications import Starlette
from starlette.middleware.cors import CORSMiddleware
from starlette.routing import Mount
from starlette.types import Receive, Scope, Send
logger = logging.getLogger(__name__)
def main(
port: int,
log_level: str,
json_response: bool,
) -> int:
# Configure logging
logging.basicConfig(
level=getattr(logging, log_level.upper()),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
app = Server("mcp-streamable-http-stateless-demo")
@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[types.ContentBlock]:
ctx = app.request_context
interval = arguments.get("interval", 1.0)
count = arguments.get("count", 5)
caller = arguments.get("caller", "unknown")
# Send the specified number of notifications with the given interval
for i in range(count):
await ctx.session.send_log_message(
level="info",
data=f"Notification {i + 1}/{count} from caller: {caller}",
logger="notification_stream",
related_request_id=ctx.request_id,
)
if i < count - 1: # Don't wait after the last notification
await anyio.sleep(interval)
return [
types.TextContent(
type="text",
text=(f"Sent {count} notifications with {interval}s interval for caller: {caller}"),
)
]
@app.list_tools()
async def list_tools() -> list[types.Tool]:
return [
types.Tool(
name="start-notification-stream",
description=("Sends a stream of notifications with configurable count and interval"),
inputSchema={
"type": "object",
"required": ["interval", "count", "caller"],
"properties": {
"interval": {
"type": "number",
"description": "Interval between notifications in seconds",
},
"count": {
"type": "number",
"description": "Number of notifications to send",
},
"caller": {
"type": "string",
"description": ("Identifier of the caller to include in notifications"),
},
},
},
)
]
# Create the session manager with true stateless mode
session_manager = StreamableHTTPSessionManager(
app=app,
event_store=None,
json_response=json_response,
stateless=True,
)
async def handle_streamable_http(scope: Scope, receive: Receive, send: Send) -> None:
await session_manager.handle_request(scope, receive, send)
@contextlib.asynccontextmanager
async def lifespan(app: Starlette) -> AsyncIterator[None]:
"""Context manager for session manager."""
async with session_manager.run():
logger.info("Application started with StreamableHTTP session manager!")
try:
yield
finally:
logger.info("Application shutting down...")
# Create an ASGI application using the transport
starlette_app = Starlette(
debug=True,
routes=[
Mount("/mcp", app=handle_streamable_http),
],
lifespan=lifespan,
)
# Wrap ASGI application with CORS middleware to expose Mcp-Session-Id header
# for browser-based clients (ensures 500 errors get proper CORS headers)
starlette_app = CORSMiddleware(
starlette_app,
allow_origins=["*"], # Allow all origins - adjust as needed for production
allow_methods=["GET", "POST", "DELETE"], # MCP streamable HTTP methods
expose_headers=["Mcp-Session-Id"],
)
import uvicorn
uvicorn.run(starlette_app, host="0.0.0.0", port=port)
return 0
if __name__ == "__main__":
main(8001,"debug",True) # Run the main function when this script is executed
Python & MCP Python SDK
My Python version number:
Python 3.12.4
Code running mode
python main.py
os version:
win11