@@ -1288,7 +1288,7 @@ impl Agent {
12881288 let mut combined = stream:: select_all( with_id) ;
12891289 let mut all_install_successful = true ;
12901290
1291- while let Some ( ( request_id , item ) ) = combined . next ( ) . await {
1291+ loop {
12921292 if is_token_cancelled( & cancel_token) {
12931293 break ;
12941294 }
@@ -1297,43 +1297,55 @@ impl Agent {
12971297 yield AgentEvent :: Message ( msg) ;
12981298 }
12991299
1300- match item {
1301- ToolStreamItem :: Result ( output) => {
1302- let output = call_tool_result:: validate( output) ;
1303-
1304- // Platform extensions use meta as a way to publish notifications. Ideally we'd
1305- // send the notifications directly, but the current plumbing doesn't support that
1306- // well:
1307- if let Ok ( ref call_result) = output {
1308- if let Some ( ref meta) = call_result. meta {
1309- if let Some ( notification_data) = meta. 0 . get( "platform_notification" ) {
1310- if let Some ( method) = notification_data. get( "method" ) . and_then( |v| v. as_str( ) ) {
1311- let params = notification_data. get( "params" ) . cloned( ) ;
1312- let custom_notification = rmcp:: model:: CustomNotification :: new(
1313- method. to_string( ) ,
1314- params,
1315- ) ;
1316-
1317- let server_notification = rmcp:: model:: ServerNotification :: CustomNotification ( custom_notification) ;
1318- yield AgentEvent :: McpNotification ( ( request_id. clone( ) , server_notification) ) ;
1300+ tokio:: select! {
1301+ biased;
1302+
1303+ tool_item = combined. next( ) => {
1304+ match tool_item {
1305+ Some ( ( request_id, item) ) => {
1306+ match item {
1307+ ToolStreamItem :: Result ( output) => {
1308+ let output = call_tool_result:: validate( output) ;
1309+
1310+ if let Ok ( ref call_result) = output {
1311+ if let Some ( ref meta) = call_result. meta {
1312+ if let Some ( notification_data) = meta. 0 . get( "platform_notification" ) {
1313+ if let Some ( method) = notification_data. get( "method" ) . and_then( |v| v. as_str( ) ) {
1314+ let params = notification_data. get( "params" ) . cloned( ) ;
1315+ let custom_notification = rmcp:: model:: CustomNotification :: new(
1316+ method. to_string( ) ,
1317+ params,
1318+ ) ;
1319+
1320+ let server_notification = rmcp:: model:: ServerNotification :: CustomNotification ( custom_notification) ;
1321+ yield AgentEvent :: McpNotification ( ( request_id. clone( ) , server_notification) ) ;
1322+ }
1323+ }
1324+ }
1325+ }
1326+
1327+ if enable_extension_request_ids. contains( & request_id)
1328+ && output. is_err( )
1329+ {
1330+ all_install_successful = false ;
1331+ }
1332+ if let Some ( response_msg) = request_to_response_map. get( & request_id) {
1333+ let metadata = request_metadata. get( & request_id) . and_then( |m| m. as_ref( ) ) ;
1334+ let mut response = response_msg. lock( ) . await ;
1335+ * response = response. clone( ) . with_tool_response_with_metadata( request_id, output, metadata) ;
1336+ }
1337+ }
1338+ ToolStreamItem :: Message ( msg) => {
1339+ yield AgentEvent :: McpNotification ( ( request_id, msg) ) ;
13191340 }
13201341 }
13211342 }
1322- }
1323-
1324- if enable_extension_request_ids. contains( & request_id)
1325- && output. is_err( )
1326- {
1327- all_install_successful = false ;
1328- }
1329- if let Some ( response_msg) = request_to_response_map. get( & request_id) {
1330- let metadata = request_metadata. get( & request_id) . and_then( |m| m. as_ref( ) ) ;
1331- let mut response = response_msg. lock( ) . await ;
1332- * response = response. clone( ) . with_tool_response_with_metadata( request_id, output, metadata) ;
1343+ None => break ,
13331344 }
13341345 }
1335- ToolStreamItem :: Message ( msg) => {
1336- yield AgentEvent :: McpNotification ( ( request_id, msg) ) ;
1346+
1347+ _ = tokio:: time:: sleep( std:: time:: Duration :: from_millis( 100 ) ) => {
1348+ // Continue loop to drain elicitation messages
13371349 }
13381350 }
13391351 }
0 commit comments