Added support for handling initialized and exit notifications

This commit is contained in:
Gianluca Brigandi 2025-05-12 20:31:34 -07:00
parent 13494cf101
commit cd8e1e1c00
2 changed files with 122 additions and 65 deletions

View File

@ -438,7 +438,7 @@ impl McpServerCore {
})
}
fn create_error_response(
pub(crate) fn create_error_response(
&self,
code: i32,
message: String,

View File

@ -5,8 +5,9 @@ use tracing::{debug, error, info};
use crate::logging_utils::{log_mcp_request, log_mcp_response};
use crate::mcp::mcp_server_core::McpServerCore;
use crate::mcp::protocol::JsonRpcRequest;
use crate::mcp::protocol::{error_codes, JsonRpcRequest};
use crate::AppState;
use serde_json::Value;
pub async fn run_stdio_service(app_state: Arc<AppState>, shutdown_tx: OneshotSender<()>) {
info!("Starting MCP server in stdio mode...");
@ -42,75 +43,131 @@ pub async fn run_stdio_service(app_state: Arc<AppState>, shutdown_tx: OneshotSen
continue;
}
info!("Received from stdin (stdio_service): {}", request_str);
log_mcp_request(request_str); // Log the raw incoming string
// Log the raw request using the utility
log_mcp_request(request_str);
// Process the request using the core module
let response_json = match serde_json::from_str::<JsonRpcRequest>(request_str) {
Ok(rpc_request) => {
// Special handling for shutdown to exit the loop
let is_shutdown = rpc_request.method == "shutdown";
let response = mcp_core.process_request(rpc_request).await;
if is_shutdown {
// Log the response using the utility
log_mcp_response(&response);
// Send the response
if let Err(e) = stdout_writer
.write_all(format!("{}\n", response).as_bytes())
.await
{
error!("Error writing shutdown response to stdout: {}", e);
}
if let Err(e) = stdout_writer.flush().await {
error!("Error flushing stdout for shutdown: {}", e);
}
debug!("Signaling shutdown and exiting stdio_service due to 'shutdown' request.");
let _ = shutdown_tx.send(()); // Signal main to shutdown Axum
return; // Exit the loop and function
}
response
}
Err(e) => mcp_core.handle_parse_error(e, request_str),
};
// Log the raw response using the utility
log_mcp_response(&response_json);
info!("Sending to stdout (stdio_service): {}", response_json);
// Prepare the response string with a newline
let response_to_send = format!("{}\n", response_json);
debug!(
"Attempting to write response to stdout. Length: {} bytes. Preview (up to 200 chars): '{}'",
response_to_send.len(),
response_to_send.chars().take(200).collect::<String>()
);
// Write the response and handle potential errors
match stdout_writer.write_all(response_to_send.as_bytes()).await {
Ok(_) => {
debug!("Successfully wrote response bytes to stdout buffer.");
// Flush immediately after write
if let Err(e) = stdout_writer.flush().await {
error!("Error flushing stdout after successful write: {}", e);
debug!(
"Signaling shutdown and breaking loop due to stdout flush error."
let parsed_value: Value = match serde_json::from_str(request_str) {
Ok(v) => v,
Err(e) => {
error!("JSON Parse Error: {}", e);
let response_json = mcp_core.handle_parse_error(e, request_str);
log_mcp_response(&response_json);
info!("Sending parse error response to stdout: {}", response_json);
let response_to_send = format!("{}\n", response_json);
if let Err(write_err) =
stdout_writer.write_all(response_to_send.as_bytes()).await
{
error!(
"Error writing parse error response to stdout: {}",
write_err
);
let _ = shutdown_tx.send(());
break;
} else {
debug!("Successfully flushed stdout.");
}
if let Err(flush_err) = stdout_writer.flush().await {
error!("Error flushing stdout for parse error: {}", flush_err);
let _ = shutdown_tx.send(());
break;
}
continue;
}
};
if parsed_value.get("id").is_none()
|| parsed_value.get("id").map_or(false, |id| id.is_null())
{
// --- Handle Notification (No ID or ID is null) ---
let method = parsed_value
.get("method")
.and_then(Value::as_str)
.unwrap_or("");
info!("Received Notification: method='{}'", method);
match method {
"notifications/initialized" => {
debug!("Client 'initialized' notification received. No action taken, no response sent.");
}
"exit" => {
info!("'exit' notification received. Signaling shutdown immediately.");
let _ = shutdown_tx.send(());
return;
}
_ => {
debug!(
"Received unknown/unhandled notification method: '{}'. Ignoring.",
method
);
}
}
Err(e) => {
error!("Error writing response to stdout: {}", e);
debug!("Signaling shutdown and breaking loop due to stdout write error.");
let _ = shutdown_tx.send(());
break;
continue;
} else {
let request_id = parsed_value.get("id").cloned().unwrap(); // We know ID exists and is not null here
match serde_json::from_value::<JsonRpcRequest>(parsed_value) {
Ok(rpc_request) => {
// --- Successfully parsed a Request ---
let is_shutdown = rpc_request.method == "shutdown";
let response_json = mcp_core.process_request(rpc_request).await;
// Log and send the response
log_mcp_response(&response_json);
info!("Sending response to stdout: {}", response_json);
let response_to_send = format!("{}\n", response_json);
if let Err(e) =
stdout_writer.write_all(response_to_send.as_bytes()).await
{
error!("Error writing response to stdout: {}", e);
let _ = shutdown_tx.send(());
break;
}
if let Err(e) = stdout_writer.flush().await {
error!("Error flushing stdout: {}", e);
let _ = shutdown_tx.send(());
break;
}
// Handle shutdown *after* sending the response
if is_shutdown {
debug!("'shutdown' request processed successfully. Signaling shutdown.");
let _ = shutdown_tx.send(()); // Signal main to shutdown Axum
return; // Exit the service loop
}
}
Err(e) => {
error!("Invalid JSON-RPC Request structure: {}", e);
// Use the ID we extracted earlier
let response_json = mcp_core.create_error_response(
error_codes::INVALID_REQUEST,
format!("Invalid Request structure: {}", e),
None,
request_id, // Use the ID from the original request
);
log_mcp_response(&response_json);
info!(
"Sending invalid request error response to stdout: {}",
response_json
);
let response_to_send = format!("{}\n", response_json);
if let Err(write_err) =
stdout_writer.write_all(response_to_send.as_bytes()).await
{
error!(
"Error writing invalid request error response to stdout: {}",
write_err
);
let _ = shutdown_tx.send(());
break;
}
if let Err(flush_err) = stdout_writer.flush().await {
error!(
"Error flushing stdout for invalid request error: {}",
flush_err
);
let _ = shutdown_tx.send(());
break;
}
}
}
}
}