Skip to content

Commit 6de03a1

Browse files
committed
example: add example in progress
1 parent 4c34b64 commit 6de03a1

File tree

6 files changed

+367
-2
lines changed

6 files changed

+367
-2
lines changed

examples/clients/README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,12 @@ This directory contains Model Context Protocol (MCP) client examples implemented
44

55
## Example List
66

7+
### Test scripts
8+
- `progress_test_client.py`: A client that communicates with an MCP server using progress notifications.
9+
```bash
10+
python3 progress_test_client.py
11+
```
12+
713
### SSE Client (`sse.rs`)
814

915
A client that communicates with an MCP server using Server-Sent Events (SSE) transport.
Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Simple Python client to test progress notification functionality
4+
"""
5+
6+
import json
7+
import sys
8+
import subprocess
9+
import time
10+
import threading
11+
from typing import Dict, Any
12+
13+
class MCPClient:
14+
def __init__(self):
15+
self.request_id = 1
16+
self.process = None
17+
18+
def start_server(self):
19+
"""Start the server process"""
20+
try:
21+
self.process = subprocess.Popen(
22+
["cargo", "run", "--example", "servers_progress_demo_stdio"],
23+
cwd="../../servers",
24+
stdin=subprocess.PIPE,
25+
stdout=subprocess.PIPE,
26+
stderr=subprocess.PIPE,
27+
text=True,
28+
bufsize=0
29+
)
30+
return True
31+
except Exception as e:
32+
print(f"Failed to start server: {e}")
33+
return False
34+
35+
def send_message(self, message: Dict[str, Any]) -> str:
36+
"""Send message to server"""
37+
if not self.process:
38+
return None
39+
40+
json_msg = json.dumps(message)
41+
print(f"→ Sending: {json_msg}")
42+
43+
try:
44+
self.process.stdin.write(json_msg + "\n")
45+
self.process.stdin.flush()
46+
return json_msg
47+
except Exception as e:
48+
print(f"Failed to send message: {e}")
49+
return None
50+
51+
def read_response(self, timeout=5):
52+
"""Read server response"""
53+
if not self.process:
54+
return None
55+
56+
try:
57+
# Read one line of response
58+
line = self.process.stdout.readline()
59+
if line:
60+
print(f"← Received: {line.strip()}")
61+
return json.loads(line.strip())
62+
except json.JSONDecodeError as e:
63+
print(f"JSON parsing error: {e}, raw data: {line}")
64+
except Exception as e:
65+
print(f"Failed to read response: {e}")
66+
return None
67+
68+
def initialize(self):
69+
"""Initialize connection"""
70+
init_message = {
71+
"jsonrpc": "2.0",
72+
"id": self.request_id,
73+
"method": "initialize",
74+
"params": {
75+
"protocolVersion": "2024-11-05",
76+
"capabilities": {},
77+
"clientInfo": {
78+
"name": "progress-test-client",
79+
"version": "1.0.0"
80+
}
81+
}
82+
}
83+
self.request_id += 1
84+
85+
self.send_message(init_message)
86+
response = self.read_response()
87+
88+
if response and response.get("result"):
89+
print("✅ Initialization successful")
90+
91+
# Send initialization complete notification
92+
initialized_notification = {
93+
"jsonrpc": "2.0",
94+
"method": "notifications/initialized"
95+
}
96+
self.send_message(initialized_notification)
97+
return True
98+
else:
99+
print("❌ Initialization failed")
100+
return False
101+
102+
def list_tools(self):
103+
"""List available tools"""
104+
list_tools_message = {
105+
"jsonrpc": "2.0",
106+
"id": self.request_id,
107+
"method": "tools/list"
108+
}
109+
self.request_id += 1
110+
111+
self.send_message(list_tools_message)
112+
response = self.read_response()
113+
114+
if response and response.get("result"):
115+
tools = response["result"].get("tools", [])
116+
print(f"✅ Available tools: {[tool['name'] for tool in tools]}")
117+
return tools
118+
else:
119+
print("❌ Failed to get tool list")
120+
return []
121+
122+
def call_stream_processor(self, record_count=5):
123+
"""Call the stream processor tool"""
124+
call_tool_message = {
125+
"jsonrpc": "2.0",
126+
"id": self.request_id,
127+
"method": "tools/call",
128+
"params": {
129+
"name": "stream_processor",
130+
"arguments": {
131+
"record_count": record_count
132+
}
133+
}
134+
}
135+
self.request_id += 1
136+
137+
print(f"\n🚀 Starting to process {record_count} records...")
138+
self.send_message(call_tool_message)
139+
140+
# Listen for progress notifications and final result
141+
progress_count = 0
142+
start_time = time.time()
143+
144+
while True:
145+
response = self.read_response(timeout=10)
146+
if not response:
147+
print("Timeout or connection lost")
148+
break
149+
150+
# Handle progress notifications
151+
if response.get("method") == "notifications/progress":
152+
params = response.get("params", {})
153+
progress = params.get("progress", 0)
154+
total = params.get("total", 0)
155+
message = params.get("message", "")
156+
token = params.get("progressToken", "")
157+
158+
elapsed = time.time() - start_time
159+
print(f"📊 Progress update [{token}]: {progress}/{total} - {message} (elapsed: {elapsed:.1f}s)")
160+
progress_count += 1
161+
162+
# Handle tool call result
163+
elif "result" in response:
164+
result = response["result"]
165+
if result.get("content"):
166+
content = result["content"][0]["text"]
167+
elapsed = time.time() - start_time
168+
print(f"✅ Processing completed: {content}")
169+
print(f"📈 Total progress notifications received: {progress_count}")
170+
print(f"⏱️ Total time elapsed: {elapsed:.2f}s")
171+
break
172+
173+
# Handle errors
174+
elif "error" in response:
175+
error = response["error"]
176+
print(f"❌ Error: {error}")
177+
break
178+
179+
def stop_server(self):
180+
"""Stop the server"""
181+
if self.process:
182+
self.process.terminate()
183+
self.process.wait()
184+
185+
def main():
186+
print("🧪 RMCP Progress Notification Test Client")
187+
print("=" * 50)
188+
189+
client = MCPClient()
190+
191+
try:
192+
# Start server
193+
print("🔧 Starting server...")
194+
if not client.start_server():
195+
return
196+
197+
time.sleep(1) # Wait for server to start
198+
199+
# Initialize connection
200+
print("\n🤝 Initializing connection...")
201+
if not client.initialize():
202+
return
203+
204+
# List tools
205+
print("\n🔧 Getting available tools...")
206+
tools = client.list_tools()
207+
208+
# Test stream processor tool
209+
if any(tool["name"] == "stream_processor" for tool in tools):
210+
client.call_stream_processor(record_count=50)
211+
else:
212+
print("❌ stream_processor tool not found")
213+
214+
except KeyboardInterrupt:
215+
print("\n\n⏹️ User interrupted")
216+
except Exception as e:
217+
print(f"\n❌ Error occurred: {e}")
218+
finally:
219+
print("\n🔚 Closing connection...")
220+
client.stop_server()
221+
222+
if __name__ == "__main__":
223+
main()

examples/servers/Cargo.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
2-
31
[package]
42
name = "mcp-server-examples"
53
version = "0.1.5"
@@ -9,6 +7,7 @@ publish = false
97
[dependencies]
108
rmcp = { path = "../../crates/rmcp", features = [
119
"server",
10+
"client",
1211
"transport-sse-server",
1312
"transport-io",
1413
"transport-streamable-http-server",
@@ -78,3 +77,7 @@ path = "src/simple_auth_sse.rs"
7877
[[example]]
7978
name = "counter_hyper_streamable_http"
8079
path = "src/counter_hyper_streamable_http.rs"
80+
81+
[[example]]
82+
name = "servers_progress_demo_stdio"
83+
path = "src/progress_demo_stdio.rs"

examples/servers/src/common/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
pub mod calculator;
22
pub mod counter;
33
pub mod generic_service;
4+
pub mod progress_demo;
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
use std::time::Duration;
2+
3+
use rmcp::{
4+
Error as McpError, RoleServer, ServerHandler, model::*, schemars, service::RequestContext, tool,
5+
};
6+
use serde_json::json;
7+
use tracing::{debug, info};
8+
9+
#[derive(Clone)]
10+
pub struct ProgressDemo {}
11+
12+
#[tool(tool_box)]
13+
impl ProgressDemo {
14+
#[allow(dead_code)]
15+
pub fn new() -> Self {
16+
Self {}
17+
}
18+
#[tool(description = "Process data stream with progress updates")]
19+
async fn stream_processor(
20+
&self,
21+
#[tool(param)]
22+
#[schemars(description = "Number of records to simulate")]
23+
record_count: u32,
24+
ctx: RequestContext<RoleServer>,
25+
) -> Result<CallToolResult, McpError> {
26+
if record_count == 0 {
27+
return Ok(CallToolResult::success(vec![Content::text(
28+
"No records to process",
29+
)]));
30+
}
31+
32+
info!("Starting stream processing of {} records", record_count);
33+
let mut processed_records = Vec::new();
34+
let mut counter = 0u32;
35+
36+
// simulate data stream processing
37+
for i in 0..record_count {
38+
// simulate getting record from data stream
39+
tokio::time::sleep(Duration::from_millis(200)).await;
40+
41+
let record = format!("Record_{:04}", i + 1);
42+
processed_records.push(record.clone());
43+
counter += 1;
44+
45+
// create progress notification param
46+
let progress_param = ProgressNotificationParam {
47+
progress_token: ProgressToken(NumberOrString::Number(counter)),
48+
progress: counter,
49+
total: Some(record_count),
50+
message: Some(record.clone()),
51+
};
52+
53+
match ctx.peer.notify_progress(progress_param).await {
54+
Ok(_) => {
55+
debug!("Processed record: {}", record);
56+
}
57+
Err(e) => {
58+
return Err(McpError::internal_error(
59+
format!("Failed to notify progress: {}", e),
60+
Some(json!({
61+
"record": record,
62+
"progress": counter,
63+
"error": e.to_string()
64+
})),
65+
));
66+
}
67+
}
68+
}
69+
70+
Ok(CallToolResult::success(vec![Content::text(format!(
71+
"Processed {} records successfully",
72+
processed_records.len()
73+
))]))
74+
}
75+
}
76+
77+
#[tool(tool_box)]
78+
impl ServerHandler for ProgressDemo {
79+
fn get_info(&self) -> ServerInfo {
80+
ServerInfo {
81+
protocol_version: ProtocolVersion::V_2024_11_05,
82+
capabilities: ServerCapabilities::builder()
83+
.enable_tools()
84+
.build(),
85+
server_info: Implementation::from_build_env(),
86+
instructions: Some(
87+
"This server demonstrates progress notifications during long-running operations. \
88+
Use the tools to see real-time progress updates for batch processing, file downloads, \
89+
and stream processing operations.".to_string()
90+
),
91+
}
92+
}
93+
94+
async fn initialize(
95+
&self,
96+
_request: ClientInfo,
97+
_context: RequestContext<RoleServer>,
98+
) -> Result<InitializeResult, McpError> {
99+
Ok(InitializeResult {
100+
protocol_version: ProtocolVersion::V_2024_11_05,
101+
capabilities: ServerCapabilities::builder().enable_tools().build(),
102+
server_info: Implementation::from_build_env(),
103+
instructions: Some(
104+
"Progress notification demo server initialized. \
105+
Ready to demonstrate real-time progress updates."
106+
.to_string(),
107+
),
108+
})
109+
}
110+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use rmcp::{ServiceExt, transport::stdio};
2+
use tracing_subscriber::EnvFilter;
3+
4+
mod common;
5+
use common::progress_demo::ProgressDemo;
6+
7+
#[tokio::main]
8+
async fn main() -> anyhow::Result<()> {
9+
// Initialize tracing for better debugging
10+
tracing_subscriber::fmt()
11+
.with_env_filter(EnvFilter::from_default_env())
12+
.init();
13+
14+
let server = ProgressDemo::new();
15+
let service = server.serve(stdio()).await.inspect_err(|e| {
16+
tracing::error!("serving error: {:?}", e);
17+
})?;
18+
19+
service.waiting().await?;
20+
21+
Ok(())
22+
}

0 commit comments

Comments
 (0)