diff --git a/src/mcp/mcp_server_core.rs b/src/mcp/mcp_server_core.rs index e7057e3..d6c3b0a 100644 --- a/src/mcp/mcp_server_core.rs +++ b/src/mcp/mcp_server_core.rs @@ -438,7 +438,7 @@ impl McpServerCore { }) } - fn create_error_response( + pub(crate) fn create_error_response( &self, code: i32, message: String, diff --git a/src/stdio_service.rs b/src/stdio_service.rs index bcf893b..d770b48 100644 --- a/src/stdio_service.rs +++ b/src/stdio_service.rs @@ -5,8 +5,9 @@ use tracing::{debug, error, info}; use crate::logging_utils::{log_mcp_request, log_mcp_response}; use crate::mcp::mcp_server_core::McpServerCore; -use crate::mcp::protocol::JsonRpcRequest; +use crate::mcp::protocol::{error_codes, JsonRpcRequest}; use crate::AppState; +use serde_json::Value; pub async fn run_stdio_service(app_state: Arc, shutdown_tx: OneshotSender<()>) { info!("Starting MCP server in stdio mode..."); @@ -42,75 +43,131 @@ pub async fn run_stdio_service(app_state: Arc, shutdown_tx: OneshotSen continue; } info!("Received from stdin (stdio_service): {}", request_str); + log_mcp_request(request_str); // Log the raw incoming string - // Log the raw request using the utility - log_mcp_request(request_str); - - // Process the request using the core module - let response_json = match serde_json::from_str::(request_str) { - Ok(rpc_request) => { - // Special handling for shutdown to exit the loop - let is_shutdown = rpc_request.method == "shutdown"; - let response = mcp_core.process_request(rpc_request).await; - - if is_shutdown { - // Log the response using the utility - log_mcp_response(&response); - - // Send the response - if let Err(e) = stdout_writer - .write_all(format!("{}\n", response).as_bytes()) - .await - { - error!("Error writing shutdown response to stdout: {}", e); - } - if let Err(e) = stdout_writer.flush().await { - error!("Error flushing stdout for shutdown: {}", e); - } - - debug!("Signaling shutdown and exiting stdio_service due to 'shutdown' request."); - let _ = shutdown_tx.send(()); // Signal main to shutdown Axum - return; // Exit the loop and function - } - - response - } - Err(e) => mcp_core.handle_parse_error(e, request_str), - }; - - // Log the raw response using the utility - log_mcp_response(&response_json); - - info!("Sending to stdout (stdio_service): {}", response_json); - // Prepare the response string with a newline - let response_to_send = format!("{}\n", response_json); - debug!( - "Attempting to write response to stdout. Length: {} bytes. Preview (up to 200 chars): '{}'", - response_to_send.len(), - response_to_send.chars().take(200).collect::() - ); - - // Write the response and handle potential errors - match stdout_writer.write_all(response_to_send.as_bytes()).await { - Ok(_) => { - debug!("Successfully wrote response bytes to stdout buffer."); - // Flush immediately after write - if let Err(e) = stdout_writer.flush().await { - error!("Error flushing stdout after successful write: {}", e); - debug!( - "Signaling shutdown and breaking loop due to stdout flush error." + let parsed_value: Value = match serde_json::from_str(request_str) { + Ok(v) => v, + Err(e) => { + error!("JSON Parse Error: {}", e); + let response_json = mcp_core.handle_parse_error(e, request_str); + log_mcp_response(&response_json); + info!("Sending parse error response to stdout: {}", response_json); + let response_to_send = format!("{}\n", response_json); + if let Err(write_err) = + stdout_writer.write_all(response_to_send.as_bytes()).await + { + error!( + "Error writing parse error response to stdout: {}", + write_err ); let _ = shutdown_tx.send(()); break; - } else { - debug!("Successfully flushed stdout."); + } + if let Err(flush_err) = stdout_writer.flush().await { + error!("Error flushing stdout for parse error: {}", flush_err); + let _ = shutdown_tx.send(()); + break; + } + continue; + } + }; + + if parsed_value.get("id").is_none() + || parsed_value.get("id").map_or(false, |id| id.is_null()) + { + // --- Handle Notification (No ID or ID is null) --- + let method = parsed_value + .get("method") + .and_then(Value::as_str) + .unwrap_or(""); + info!("Received Notification: method='{}'", method); + + match method { + "notifications/initialized" => { + debug!("Client 'initialized' notification received. No action taken, no response sent."); + } + "exit" => { + info!("'exit' notification received. Signaling shutdown immediately."); + let _ = shutdown_tx.send(()); + return; + } + _ => { + debug!( + "Received unknown/unhandled notification method: '{}'. Ignoring.", + method + ); } } - Err(e) => { - error!("Error writing response to stdout: {}", e); - debug!("Signaling shutdown and breaking loop due to stdout write error."); - let _ = shutdown_tx.send(()); - break; + continue; + } else { + let request_id = parsed_value.get("id").cloned().unwrap(); // We know ID exists and is not null here + + match serde_json::from_value::(parsed_value) { + Ok(rpc_request) => { + // --- Successfully parsed a Request --- + let is_shutdown = rpc_request.method == "shutdown"; + let response_json = mcp_core.process_request(rpc_request).await; + + // Log and send the response + log_mcp_response(&response_json); + info!("Sending response to stdout: {}", response_json); + let response_to_send = format!("{}\n", response_json); + + if let Err(e) = + stdout_writer.write_all(response_to_send.as_bytes()).await + { + error!("Error writing response to stdout: {}", e); + let _ = shutdown_tx.send(()); + break; + } + if let Err(e) = stdout_writer.flush().await { + error!("Error flushing stdout: {}", e); + let _ = shutdown_tx.send(()); + break; + } + + // Handle shutdown *after* sending the response + if is_shutdown { + debug!("'shutdown' request processed successfully. Signaling shutdown."); + let _ = shutdown_tx.send(()); // Signal main to shutdown Axum + return; // Exit the service loop + } + } + Err(e) => { + error!("Invalid JSON-RPC Request structure: {}", e); + // Use the ID we extracted earlier + let response_json = mcp_core.create_error_response( + error_codes::INVALID_REQUEST, + format!("Invalid Request structure: {}", e), + None, + request_id, // Use the ID from the original request + ); + + log_mcp_response(&response_json); + info!( + "Sending invalid request error response to stdout: {}", + response_json + ); + let response_to_send = format!("{}\n", response_json); + if let Err(write_err) = + stdout_writer.write_all(response_to_send.as_bytes()).await + { + error!( + "Error writing invalid request error response to stdout: {}", + write_err + ); + let _ = shutdown_tx.send(()); + break; + } + if let Err(flush_err) = stdout_writer.flush().await { + error!( + "Error flushing stdout for invalid request error: {}", + flush_err + ); + let _ = shutdown_tx.send(()); + break; + } + } } } }