@@ -428,56 +428,83 @@ where
428
428
if chunk. choices. is_empty( ) {
429
429
yield ( None , usage)
430
430
} else if let Some ( tool_calls) = & chunk. choices[ 0 ] . delta. tool_calls {
431
- let tool_call = & tool_calls[ 0 ] ;
432
- let id = tool_call. id. clone( ) . ok_or( anyhow!( "No tool call ID" ) ) ?;
433
- let function_name = tool_call. function. name. clone( ) . ok_or( anyhow!( "No function name" ) ) ?;
434
- let mut arguments = tool_call. function. arguments. clone( ) ;
435
-
436
- while let Some ( response_chunk) = stream. next( ) . await {
437
- if response_chunk. as_ref( ) . is_ok_and( |s| s == "data: [DONE]" ) {
438
- break ' outer;
431
+ let mut tool_call_data: std:: collections:: HashMap <i32 , ( String , String , String ) > = std:: collections:: HashMap :: new( ) ;
432
+
433
+ for tool_call in tool_calls {
434
+ if let ( Some ( index) , Some ( id) , Some ( name) ) = ( tool_call. index, & tool_call. id, & tool_call. function. name) {
435
+ tool_call_data. insert( index, ( id. clone( ) , name. clone( ) , tool_call. function. arguments. clone( ) ) ) ;
439
436
}
440
- let response_str = response_chunk?;
441
- if let Some ( line) = strip_data_prefix( & response_str) {
442
- let tool_chunk: StreamingChunk = serde_json:: from_str( line)
443
- . map_err( |e| anyhow!( "Failed to parse streaming chunk: {}: {:?}" , e, & line) ) ?;
444
- let more_args = tool_chunk. choices[ 0 ] . delta. tool_calls. as_ref( )
445
- . and_then( |calls| calls. first( ) )
446
- . map( |call| call. function. arguments. as_str( ) ) ;
447
- if let Some ( more_args) = more_args {
448
- arguments. push_str( more_args) ;
449
- } else {
450
- break ;
437
+ }
438
+
439
+ let mut done = false ;
440
+ while !done {
441
+ if let Some ( response_chunk) = stream. next( ) . await {
442
+ if response_chunk. as_ref( ) . is_ok_and( |s| s == "data: [DONE]" ) {
443
+ break ' outer;
451
444
}
445
+ let response_str = response_chunk?;
446
+ if let Some ( line) = strip_data_prefix( & response_str) {
447
+ let tool_chunk: StreamingChunk = serde_json:: from_str( line)
448
+ . map_err( |e| anyhow!( "Failed to parse streaming chunk: {}: {:?}" , e, & line) ) ?;
449
+
450
+ if let Some ( delta_tool_calls) = & tool_chunk. choices[ 0 ] . delta. tool_calls {
451
+ for delta_call in delta_tool_calls {
452
+ if let Some ( index) = delta_call. index {
453
+ if let Some ( ( _, _, ref mut args) ) = tool_call_data. get_mut( & index) {
454
+ args. push_str( & delta_call. function. arguments) ;
455
+ } else if let ( Some ( id) , Some ( name) ) = ( & delta_call. id, & delta_call. function. name) {
456
+ tool_call_data. insert( index, ( id. clone( ) , name. clone( ) , delta_call. function. arguments. clone( ) ) ) ;
457
+ }
458
+ }
459
+ }
460
+ } else {
461
+ done = true ;
462
+ }
463
+
464
+ if tool_chunk. choices[ 0 ] . finish_reason == Some ( "tool_calls" . to_string( ) ) {
465
+ done = true ;
466
+ }
467
+ }
468
+ } else {
469
+ break ;
452
470
}
453
471
}
454
472
455
- let parsed = if arguments. is_empty( ) {
456
- Ok ( json!( { } ) )
457
- } else {
458
- serde_json:: from_str:: <Value >( & arguments)
459
- } ;
460
-
461
- let content = match parsed {
462
- Ok ( params) => MessageContent :: tool_request(
463
- id,
464
- Ok ( ToolCall :: new( function_name, params) ) ,
465
- ) ,
466
- Err ( e) => {
467
- let error = ToolError :: InvalidParameters ( format!(
468
- "Could not interpret tool use parameters for id {}: {}" ,
469
- id, e
470
- ) ) ;
471
- MessageContent :: tool_request( id, Err ( error) )
473
+ let mut contents = Vec :: new( ) ;
474
+ let mut sorted_indices: Vec <_> = tool_call_data. keys( ) . cloned( ) . collect( ) ;
475
+ sorted_indices. sort( ) ;
476
+
477
+ for index in sorted_indices {
478
+ if let Some ( ( id, function_name, arguments) ) = tool_call_data. get( & index) {
479
+ let parsed = if arguments. is_empty( ) {
480
+ Ok ( json!( { } ) )
481
+ } else {
482
+ serde_json:: from_str:: <Value >( arguments)
483
+ } ;
484
+
485
+ let content = match parsed {
486
+ Ok ( params) => MessageContent :: tool_request(
487
+ id. clone( ) ,
488
+ Ok ( ToolCall :: new( function_name. clone( ) , params) ) ,
489
+ ) ,
490
+ Err ( e) => {
491
+ let error = ToolError :: InvalidParameters ( format!(
492
+ "Could not interpret tool use parameters for id {}: {}" ,
493
+ id, e
494
+ ) ) ;
495
+ MessageContent :: tool_request( id. clone( ) , Err ( error) )
496
+ }
497
+ } ;
498
+ contents. push( content) ;
472
499
}
473
- } ;
500
+ }
474
501
475
502
yield (
476
503
Some ( Message {
477
504
id: chunk. id,
478
505
role: Role :: Assistant ,
479
506
created: chrono:: Utc :: now( ) . timestamp( ) ,
480
- content: vec! [ content ] ,
507
+ content: contents ,
481
508
} ) ,
482
509
usage,
483
510
)
@@ -601,6 +628,8 @@ mod tests {
601
628
use super :: * ;
602
629
use mcp_core:: content:: Content ;
603
630
use serde_json:: json;
631
+ use tokio:: pin;
632
+ use tokio_stream:: { self , StreamExt } ;
604
633
605
634
#[ test]
606
635
fn test_validate_tool_schemas ( ) {
@@ -1088,4 +1117,54 @@ mod tests {
1088
1117
1089
1118
Ok ( ( ) )
1090
1119
}
1120
+
1121
+ #[ tokio:: test]
1122
+ async fn test_streamed_multi_tool_response_to_messages ( ) -> anyhow:: Result < ( ) > {
1123
+ let response_lines = r#"
1124
+ data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":"I'll run both"},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288340}
1125
+ data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":" `ls` commands in a"},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288340}
1126
+ data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":" single turn for you -"},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288340}
1127
+ data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":" one on the current directory an"},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288340}
1128
+ data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":"d one on the `working_dir`."},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288340}
1129
+ data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":1,"id":"toolu_bdrk_01RMTd7R9DzQjEEWgDwzcBsU","type":"function","function":{"name":"developer__shell","arguments":""}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288341}
1130
+ data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":1,"function":{"arguments":""}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288341}
1131
+ data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":1,"function":{"arguments":"{\""}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288341}
1132
+ data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":1,"function":{"arguments":"command\": \"l"}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288341}
1133
+ data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":1,"function":{"arguments":"s\"}"}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288341}
1134
+ data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":2,"id":"toolu_bdrk_016bgVTGZdpjP8ehjMWp9cWW","type":"function","function":{"name":"developer__shell","arguments":""}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288341}
1135
+ data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":2,"function":{"arguments":""}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288341}
1136
+ data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":2,"function":{"arguments":"{\""}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288342}
1137
+ data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":2,"function":{"arguments":"command\""}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288342}
1138
+ data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":2,"function":{"arguments":": \"ls wor"}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288342}
1139
+ data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":2,"function":{"arguments":"king_dir"}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288342}
1140
+ data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":2,"function":{"arguments":"\"}"}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288342}
1141
+ data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":""},"index":0,"finish_reason":"tool_calls"}],"usage":{"prompt_tokens":4982,"completion_tokens":122,"total_tokens":5104},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288342}
1142
+ data: [DONE]
1143
+ "# ;
1144
+
1145
+ let response_stream =
1146
+ tokio_stream:: iter ( response_lines. lines ( ) . map ( |line| Ok ( line. to_string ( ) ) ) ) ;
1147
+ let messages = response_to_streaming_message ( response_stream) ;
1148
+ pin ! ( messages) ;
1149
+
1150
+ while let Some ( Ok ( ( message, _usage) ) ) = messages. next ( ) . await {
1151
+ if let Some ( msg) = message {
1152
+ println ! ( "{:?}" , msg) ;
1153
+ if msg. content . len ( ) == 2 {
1154
+ if let ( MessageContent :: ToolRequest ( req1) , MessageContent :: ToolRequest ( req2) ) =
1155
+ ( & msg. content [ 0 ] , & msg. content [ 1 ] )
1156
+ {
1157
+ if req1. tool_call . is_ok ( ) && req2. tool_call . is_ok ( ) {
1158
+ // We expect two tool calls in the response
1159
+ assert_eq ! ( req1. tool_call. as_ref( ) . unwrap( ) . name, "developer__shell" ) ;
1160
+ assert_eq ! ( req2. tool_call. as_ref( ) . unwrap( ) . name, "developer__shell" ) ;
1161
+ return Ok ( ( ) ) ;
1162
+ }
1163
+ }
1164
+ }
1165
+ }
1166
+ }
1167
+
1168
+ panic ! ( "Expected tool call message with two calls, but did not see it" ) ;
1169
+ }
1091
1170
}
0 commit comments