From 4e39ca6bba8a4d72e63c26aa64be025e394fb6f7 Mon Sep 17 00:00:00 2001 From: Gianluca Brigandi Date: Fri, 27 Jun 2025 09:32:31 -0700 Subject: [PATCH] Improved design by factoring out tool-speciic logic into their own module leaving the main MCP server entrypoint as just a dispatcher of the former. --- Cargo.toml | 2 +- src/lib.rs | 8 +- src/main.rs | 1566 ++++------------------------------ src/tools/agents.rs | 563 ++++++++++++ src/tools/alerts.rs | 105 +++ src/tools/mod.rs | 60 ++ src/tools/rules.rs | 142 +++ src/tools/stats.rs | 493 +++++++++++ src/tools/vulnerabilities.rs | 412 +++++++++ 9 files changed, 1931 insertions(+), 1420 deletions(-) create mode 100644 src/tools/agents.rs create mode 100644 src/tools/alerts.rs create mode 100644 src/tools/mod.rs create mode 100644 src/tools/rules.rs create mode 100644 src/tools/stats.rs create mode 100644 src/tools/vulnerabilities.rs diff --git a/Cargo.toml b/Cargo.toml index 9cf53da..287a9ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mcp-server-wazuh" -version = "0.2.3" +version = "0.2.4" edition = "2021" description = "Wazuh SIEM MCP Server" authors = ["Gianluca Brigandi "] diff --git a/src/lib.rs b/src/lib.rs index bb8e9b4..6f6f4c3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,2 @@ -// Re-export the wazuh-client crate types for convenience -pub use wazuh_client::{ - WazuhClientFactory, WazuhClients, WazuhIndexerClient, WazuhApiError, - AgentsClient, RulesClient, ConfigurationClient, VulnerabilityClient, - ActiveResponseClient, ClusterClient, LogsClient, ConnectivityStatus -}; +pub mod tools; + diff --git a/src/main.rs b/src/main.rs index bb5014c..fcbbcbe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,46 +5,72 @@ // bridge to a Wazuh instance. It exposes various Wazuh functionalities as tools that can // be invoked by MCP clients (e.g., AI models, automation scripts). // +// Architecture: +// The application follows a modular design where the main MCP server delegates to +// domain-specific tool modules, promoting separation of concerns and maintainability. +// // Structure: // - `main()`: Entry point of the application. Initializes logging (tracing), -// sets up the `WazuhToolsServer`, and starts the MCP server using either stdio or HTTP-SSE transport. +// sets up the `WazuhToolsServer`, and starts the MCP server using stdio transport. // -// - `WazuhToolsServer`: The core struct that implements the `rmcp::ServerHandler` trait -// and the `#[tool(tool_box)]` attribute. -// - It holds the configuration for connecting to the Wazuh Indexer API. -// - Its methods, decorated with `#[tool(...)]`, define the actual tools available -// to MCP clients (e.g., `get_wazuh_alert_summary`). +// - `WazuhToolsServer`: The core orchestrator struct that implements the `rmcp::ServerHandler` trait +// and the `#[tool(tool_box)]` attribute. It acts as a facade that delegates tool calls to +// specialized domain modules: +// - Holds instances of domain-specific tool modules (AgentTools, AlertTools, RuleTools, etc.) +// - Its methods, decorated with `#[tool(...)]`, define the MCP tool interface and delegate +// to the appropriate domain module for actual implementation +// - Manages the lifecycle and configuration of Wazuh client connections +// +// - Domain-Specific Tool Modules (in `tools/` package): +// - `AlertTools` (`tools/alerts.rs`): Handles alert-related operations via Wazuh Indexer +// - `RuleTools` (`tools/rules.rs`): Manages security rule queries via Wazuh Manager API +// - `VulnerabilityTools` (`tools/vulnerabilities.rs`): Processes vulnerability data via Wazuh Manager API +// - `AgentTools` (`tools/agents.rs`): Handles agent management and system information queries +// - `StatsTools` (`tools/stats.rs`): Provides logging, statistics, and cluster health monitoring +// Each module encapsulates: +// - Domain-specific business logic and data formatting +// - Parameter validation and error handling +// - Client interaction patterns for their respective Wazuh APIs +// - Rich output formatting with structured text and emojis // // - Tool Parameter Structs (e.g., `GetAlertSummaryParams`): // - These structs define the expected input parameters for each tool. // - They use `serde::Deserialize` for parsing input and `schemars::JsonSchema` // for generating a schema that MCP clients can use to understand how to call the tools. +// - Located within their respective domain modules for better organization // // - `wazuh_client` crate: // - This external crate is used to interact with both the Wazuh Manager API and the Wazuh Indexer API. // - `WazuhClientFactory` is used to create specific clients (e.g., `WazuhIndexerClient`, `RulesClient`, `AgentsClient`, `LogsClient`, `ClusterClient`, `VulnerabilityClient`). +// - Clients are wrapped in `Arc>` for thread-safe access across async operations // // Workflow: -// 1. Server starts and listens for MCP requests on stdio -// 2. MCP client sends a `call_tool` request. -// 3. `WazuhToolsServer` dispatches to the appropriate tool method based on the tool name. -// 4. The tool method parses parameters, interacts with the Wazuh client to fetch data. -// 5. The result (success with data or error) is packaged into a `CallToolResult` -// and sent back to the MCP client. +// 1. Server starts and listens for MCP requests on stdio +// 2. MCP client sends a `call_tool` request to `WazuhToolsServer` +// 3. `WazuhToolsServer` routes the request to the appropriate domain-specific tool module +// 4. The domain module validates parameters, interacts with the relevant Wazuh client, and formats results +// 5. The result (success with formatted data or error) is packaged into a `CallToolResult` +// and sent back to the MCP client via the main server // // Exposed Tools: // The server exposes a set of tools categorized by the Wazuh component they interact with: // -// Wazuh Indexer Tools: +// Alert Management (via AlertTools): // - `get_wazuh_alert_summary`: Retrieves a summary of security alerts from the Wazuh Indexer. // -// Wazuh Manager Tools: +// Rule Management (via RuleTools): // - `get_wazuh_rules_summary`: Fetches security rules defined in the Wazuh Manager. +// +// Vulnerability Management (via VulnerabilityTools): // - `get_wazuh_vulnerability_summary`: Gets vulnerability scan results for a specific agent from the Wazuh Manager. // - `get_wazuh_critical_vulnerabilities`: Retrieves critical vulnerabilities for an agent. -// - `get_wazuh_running_agents`: Lists active and inactive agents connected to the Wazuh Manager. -// - `get_wazuh_agent_processes`: Retrieves running processes on a specific agent (via Syscollector, data reported to Manager). -// - `get_wazuh_agent_ports`: Lists open network ports on a specific agent (via Syscollector, data reported to Manager). +// +// Agent Management (via AgentTools): +// - `get_wazuh_agents`: Lists active and inactive agents connected to the Wazuh Manager. +// - `get_wazuh_agent_processes`: Retrieves running processes on a specific agent (via Syscollector). +// - `get_wazuh_agent_ports`: Lists open network ports on a specific agent (via Syscollector). +// +// Statistics and Monitoring (via StatsTools): // - `search_wazuh_manager_logs`: Searches logs generated by the Wazuh Manager. // - `get_wazuh_manager_error_logs`: Retrieves error-specific logs from the Wazuh Manager. // - `get_wazuh_log_collector_stats`: Gets log collection statistics for an agent. @@ -69,21 +95,31 @@ // - `WAZUH_TEST_PROTOCOL`: (Optional) Protocol to use for Wazuh API/Indexer connections, e.g., "http" or "https" (default: "https"). // Logging behavior is controlled by the `RUST_LOG` environment variable (e.g., `RUST_LOG=info,mcp_server_wazuh=debug`). -use reqwest::StatusCode; -use rmcp::{ - Error as McpError, ServerHandler, ServiceExt, - model::{ - CallToolResult, Content, Implementation, ProtocolVersion, ServerCapabilities, ServerInfo, - }, - schemars, tool, - transport::stdio, -}; -use std::sync::Arc; -use std::env; use clap::Parser; use dotenv::dotenv; +use rmcp::{ + model::{CallToolResult, Implementation, ProtocolVersion, ServerCapabilities, ServerInfo}, + tool, + transport::stdio, + Error as McpError, ServerHandler, ServiceExt, +}; +use std::env; +use std::sync::Arc; -use wazuh_client::{WazuhClientFactory, WazuhIndexerClient, RulesClient, VulnerabilityClient, VulnerabilitySeverity, AgentsClient, LogsClient, ClusterClient, Port as WazuhPort}; +use wazuh_client::WazuhClientFactory; + +mod tools; +use tools::agents::{AgentTools, GetAgentPortsParams, GetAgentProcessesParams, GetAgentsParams}; +use tools::alerts::{AlertTools, GetAlertSummaryParams}; +use tools::rules::{GetRulesSummaryParams, RuleTools}; +use tools::stats::{ + GetClusterHealthParams, GetClusterNodesParams, GetLogCollectorStatsParams, + GetManagerErrorLogsParams, GetRemotedStatsParams, GetWeeklyStatsParams, + SearchManagerLogsParams, StatsTools, +}; +use tools::vulnerabilities::{ + GetCriticalVulnerabilitiesParams, GetVulnerabilitiesSummaryParams, VulnerabilityTools, +}; #[derive(Parser, Debug)] #[command(name = "mcp-server-wazuh")] @@ -93,209 +129,60 @@ struct Args { // Future versions may add HTTP-SSE transport } -#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] -struct GetAlertSummaryParams { - #[schemars(description = "Maximum number of alerts to retrieve (default: 100)")] - limit: Option, -} - -#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] -struct GetRulesSummaryParams { - #[schemars(description = "Maximum number of rules to retrieve (default: 100)")] - limit: Option, - #[schemars(description = "Rule level to filter by (optional)")] - level: Option, - #[schemars(description = "Rule group to filter by (optional)")] - group: Option, - #[schemars(description = "Filename to filter by (optional)")] - filename: Option, -} - -#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] -struct GetVulnerabilitySummaryParams { - #[schemars(description = "Maximum number of vulnerabilities to retrieve (default: 100)")] - limit: Option, - #[schemars(description = "Agent ID to filter vulnerabilities by (required, e.g., \"0\", \"1\", \"001\")")] - agent_id: String, - #[schemars(description = "Severity level to filter by (Low, Medium, High, Critical) (optional)")] - severity: Option, - #[schemars(description = "CVE ID to search for (optional)")] - cve: Option, -} - -#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] -struct GetAgentsParams { - #[schemars(description = "Maximum number of agents to retrieve (default: 100)")] - limit: Option, - #[schemars(description = "Agent status filter (active, disconnected, pending, never_connected)")] - status: String, - #[schemars(description = "Agent name to search for (optional)")] - name: Option, - #[schemars(description = "Agent IP address to filter by (optional)")] - ip: Option, - #[schemars(description = "Agent group to filter by (optional)")] - group: Option, - #[schemars(description = "Operating system platform to filter by (optional)")] - os_platform: Option, - #[schemars(description = "Agent version to filter by (optional)")] - version: Option, -} - -#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] -struct GetCriticalVulnerabilitiesParams { - #[schemars(description = "Agent ID to get critical vulnerabilities for (required, e.g., \"0\", \"1\", \"001\")")] - agent_id: String, -} - -#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] -struct GetAgentProcessesParams { - #[schemars(description = "Agent ID to get processes for (required, e.g., \"0\", \"1\", \"001\")")] - agent_id: String, - #[schemars(description = "Maximum number of processes to retrieve (default: 100)")] - limit: Option, - #[schemars(description = "Search string to filter processes by name or command (optional)")] - search: Option, -} - -#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] -struct GetAgentPortsParams { - #[schemars(description = "Agent ID to get network ports for (required, e.g., \"001\", \"002\", \"003\")")] - agent_id: String, - #[schemars(description = "Maximum number of ports to retrieve (default: 100)")] - limit: Option, - #[schemars(description = "Protocol to filter by (e.g., \"tcp\", \"udp\")")] - protocol: String, - #[schemars(description = "State to filter by (e.g., \"LISTENING\", \"ESTABLISHED\")")] - state: String, -} - -#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] -struct SearchManagerLogsParams { - #[schemars(description = "Maximum number of log entries to retrieve (default: 100)")] - limit: Option, - #[schemars(description = "Number of log entries to skip (default: 0)")] - offset: Option, - #[schemars(description = "Log level to filter by (e.g., \"error\", \"warning\", \"info\")")] - level: String, - #[schemars(description = "Log tag to filter by (e.g., \"wazuh-modulesd\") (optional)")] - tag: Option, - #[schemars(description = "Search term to filter log descriptions (optional)")] - search_term: Option, -} - -#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] -struct GetManagerErrorLogsParams { - #[schemars(description = "Maximum number of error log entries to retrieve (default: 100)")] - limit: Option, -} - -#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] -struct GetLogCollectorStatsParams { - #[schemars(description = "Agent ID to get log collector stats for (required, e.g., \"0\", \"1\", \"001\")")] - agent_id: String, -} - -#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] -struct GetRemotedStatsParams { - // No parameters needed for remoted stats -} - -#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] -struct GetWeeklyStatsParams { - // No parameters needed for weekly stats -} - -#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] -struct GetClusterHealthParams { - // No parameters needed for cluster health -} - -#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] -struct GetClusterNodesParams { - #[schemars(description = "Maximum number of nodes to retrieve (optional, Wazuh API default is 500)")] - limit: Option, - #[schemars(description = "Number of nodes to skip (offset) (optional, default: 0)")] - offset: Option, - #[schemars(description = "Filter by node type (e.g., 'master', 'worker') (optional)")] - node_type: Option, -} - #[derive(Clone)] struct WazuhToolsServer { - #[allow(dead_code)] // Kept for future expansion to other Wazuh clients - wazuh_factory: Arc, - wazuh_indexer_client: Arc, - wazuh_rules_client: Arc>, - wazuh_vulnerability_client: Arc>, - wazuh_agents_client: Arc>, - wazuh_logs_client: Arc>, - wazuh_cluster_client: Arc>, + agent_tools: AgentTools, + alert_tools: AlertTools, + rule_tools: RuleTools, + stats_tools: StatsTools, + vulnerability_tools: VulnerabilityTools, } #[tool(tool_box)] impl WazuhToolsServer { - fn format_agent_id(agent_id_str: &str) -> Result { - // Attempt to parse as a number first - if let Ok(num) = agent_id_str.parse::() { - if num > 999 { - Err(format!( - "Agent ID '{}' is too large. Must be a number between 0 and 999.", - agent_id_str - )) - } else { - Ok(format!("{:03}", num)) - } - } else if agent_id_str.len() == 3 && agent_id_str.chars().all(|c| c.is_ascii_digit()) { - // Already correctly formatted (e.g., "001") - Ok(agent_id_str.to_string()) - } else { - Err(format!( - "Invalid agent_id format: '{}'. Must be a number (e.g., 1, 12) or a 3-digit string (e.g., 001, 012).", - agent_id_str - )) - } - } - fn new() -> Result { dotenv().ok(); - let api_host = env::var("WAZUH_API_HOST").unwrap_or_else(|_| "localhost".to_string()); - let api_port: u16 = env::var("WAZUH_API_PORT") - .unwrap_or_else(|_| "55000".to_string()) - .parse() - .unwrap_or(55000); - let api_username = env::var("WAZUH_API_USERNAME").unwrap_or_else(|_| "wazuh".to_string()); - let api_password = env::var("WAZUH_API_PASSWORD").unwrap_or_else(|_| "wazuh".to_string()); + let api_host = env::var("WAZUH_API_HOST").unwrap_or_else(|_| "localhost".to_string()); + let api_port: u16 = env::var("WAZUH_API_PORT") + .unwrap_or_else(|_| "55000".to_string()) + .parse() + .unwrap_or(55000); + let api_username = env::var("WAZUH_API_USERNAME").unwrap_or_else(|_| "wazuh".to_string()); + let api_password = env::var("WAZUH_API_PASSWORD").unwrap_or_else(|_| "wazuh".to_string()); - let indexer_host = env::var("WAZUH_INDEXER_HOST").unwrap_or_else(|_| "localhost".to_string()); - let indexer_port: u16 = env::var("WAZUH_INDEXER_PORT") - .unwrap_or_else(|_| "9200".to_string()) - .parse() - .unwrap_or(9200); - let indexer_username = - env::var("WAZUH_INDEXER_USERNAME").unwrap_or_else(|_| "admin".to_string()); - let indexer_password = - env::var("WAZUH_INDEXER_PASSWORD").unwrap_or_else(|_| "admin".to_string()); + let indexer_host = + env::var("WAZUH_INDEXER_HOST").unwrap_or_else(|_| "localhost".to_string()); + let indexer_port: u16 = env::var("WAZUH_INDEXER_PORT") + .unwrap_or_else(|_| "9200".to_string()) + .parse() + .unwrap_or(9200); + let indexer_username = + env::var("WAZUH_INDEXER_USERNAME").unwrap_or_else(|_| "admin".to_string()); + let indexer_password = + env::var("WAZUH_INDEXER_PASSWORD").unwrap_or_else(|_| "admin".to_string()); - let verify_ssl = env::var("WAZUH_VERIFY_SSL") - .unwrap_or_else(|_| "false".to_string()) - .parse() - .unwrap_or(false); + let verify_ssl = env::var("WAZUH_VERIFY_SSL") + .unwrap_or_else(|_| "false".to_string()) + .parse() + .unwrap_or(false); - let test_protocol = env::var("WAZUH_TEST_PROTOCOL") - .ok().or_else(|| Some("https".to_string())); + let test_protocol = env::var("WAZUH_TEST_PROTOCOL") + .ok() + .or_else(|| Some("https".to_string())); - let wazuh_factory = WazuhClientFactory::new( - api_host, - api_port, - api_username, - api_password, - indexer_host, - indexer_port, - indexer_username, - indexer_password, - verify_ssl, - test_protocol); + let wazuh_factory = WazuhClientFactory::new( + api_host, + api_port, + api_username, + api_password, + indexer_host, + indexer_port, + indexer_username, + indexer_password, + verify_ssl, + test_protocol, + ); let wazuh_indexer_client = wazuh_factory.create_indexer_client(); let wazuh_rules_client = wazuh_factory.create_rules_client(); @@ -304,16 +191,29 @@ impl WazuhToolsServer { let wazuh_logs_client = wazuh_factory.create_logs_client(); let wazuh_cluster_client = wazuh_factory.create_cluster_client(); + let indexer_client_arc = Arc::new(wazuh_indexer_client); + let rules_client_arc = Arc::new(tokio::sync::Mutex::new(wazuh_rules_client)); + let vulnerability_client_arc = + Arc::new(tokio::sync::Mutex::new(wazuh_vulnerability_client)); + let agents_client_arc = Arc::new(tokio::sync::Mutex::new(wazuh_agents_client)); + let logs_client_arc = Arc::new(tokio::sync::Mutex::new(wazuh_logs_client)); + let cluster_client_arc = Arc::new(tokio::sync::Mutex::new(wazuh_cluster_client)); + + let agent_tools = + AgentTools::new(agents_client_arc.clone(), vulnerability_client_arc.clone()); + let alert_tools = AlertTools::new(indexer_client_arc.clone()); + let rule_tools = RuleTools::new(rules_client_arc.clone()); + let stats_tools = StatsTools::new(logs_client_arc.clone(), cluster_client_arc.clone()); + let vulnerability_tools = VulnerabilityTools::new(vulnerability_client_arc.clone()); + Ok(Self { - wazuh_factory: Arc::new(wazuh_factory), - wazuh_indexer_client: Arc::new(wazuh_indexer_client), - wazuh_rules_client: Arc::new(tokio::sync::Mutex::new(wazuh_rules_client)), - wazuh_vulnerability_client: Arc::new(tokio::sync::Mutex::new(wazuh_vulnerability_client)), - wazuh_agents_client: Arc::new(tokio::sync::Mutex::new(wazuh_agents_client)), - wazuh_logs_client: Arc::new(tokio::sync::Mutex::new(wazuh_logs_client)), - wazuh_cluster_client: Arc::new(tokio::sync::Mutex::new(wazuh_cluster_client)), + agent_tools, + alert_tools, + rule_tools, + stats_tools, + vulnerability_tools, }) - } + } #[tool( name = "get_wazuh_alert_summary", @@ -323,72 +223,7 @@ impl WazuhToolsServer { &self, #[tool(aggr)] params: GetAlertSummaryParams, ) -> Result { - let limit = params.limit.unwrap_or(100); - - tracing::info!(limit = %limit, "Retrieving Wazuh alert summary"); - - match self.wazuh_indexer_client.get_alerts().await { - Ok(raw_alerts) => { - let alerts_to_process: Vec<_> = raw_alerts.into_iter().take(limit as usize).collect(); - - if alerts_to_process.is_empty() { - tracing::info!("No Wazuh alerts found to process. Returning standard message."); - // Ensure this directly returns a Vec with one Content::text item - return Ok(CallToolResult::success(vec![Content::text( - "No Wazuh alerts found.", - )])); - } - - // Process non-empty alerts - // This part should already be correct if alerts_to_process is not empty, - // as it maps each alert to Content::text directly. - let num_alerts_to_process = alerts_to_process.len(); // Get length before moving - let mcp_content_items: Vec = alerts_to_process - .into_iter() - .map(|alert_value| { - let source = alert_value.get("_source").unwrap_or(&alert_value); - - let id = source.get("id") - .and_then(|v| v.as_str()) - .or_else(|| alert_value.get("_id").and_then(|v| v.as_str())) - .unwrap_or("Unknown ID"); - - let description = source.get("rule") - .and_then(|r| r.get("description")) - .and_then(|d| d.as_str()) - .unwrap_or("No description available"); - - let timestamp = source.get("timestamp") - .and_then(|t| t.as_str()) - .unwrap_or("Unknown time"); - - let agent_name = source.get("agent") - .and_then(|a| a.get("name")) - .and_then(|n| n.as_str()) - .unwrap_or("Unknown agent"); - - let rule_level = source.get("rule") - .and_then(|r| r.get("level")) - .and_then(|l| l.as_u64()) - .unwrap_or(0); - - let formatted_text = format!( - "Alert ID: {}\nTime: {}\nAgent: {}\nLevel: {}\nDescription: {}", - id, timestamp, agent_name, rule_level, description - ); - Content::text(formatted_text) - }) - .collect(); - - tracing::info!("Successfully processed {} alerts into {} MCP content items", num_alerts_to_process, mcp_content_items.len()); - Ok(CallToolResult::success(mcp_content_items)) - } - Err(e) => { - let err_msg = format!("Error retrieving alerts from Wazuh: {}", e); - tracing::error!("{}", err_msg); - Ok(CallToolResult::error(vec![Content::text(err_msg)])) - } - } + self.alert_tools.get_wazuh_alert_summary(params).await } #[tool( @@ -399,100 +234,7 @@ impl WazuhToolsServer { &self, #[tool(aggr)] params: GetRulesSummaryParams, ) -> Result { - let limit = params.limit.unwrap_or(100); - - tracing::info!( - limit = %limit, - level = ?params.level, - group = ?params.group, - filename = ?params.filename, - "Retrieving Wazuh rules summary" - ); - - let mut rules_client = self.wazuh_rules_client.lock().await; - - match rules_client.get_rules( - Some(limit), - None, // offset - params.level, - params.group.as_deref(), - params.filename.as_deref(), - ).await { - Ok(rules) => { - if rules.is_empty() { - tracing::info!("No Wazuh rules found matching criteria. Returning standard message."); - return Ok(CallToolResult::success(vec![Content::text( - "No Wazuh rules found matching the specified criteria.", - )])); - } - - let num_rules = rules.len(); - let mcp_content_items: Vec = rules - .into_iter() - .map(|rule| { - let groups_str = rule.groups.join(", "); - - let compliance_info = { - let mut compliance = Vec::new(); - if let Some(gdpr) = &rule.gdpr { - if !gdpr.is_empty() { - compliance.push(format!("GDPR: {}", gdpr.join(", "))); - } - } - if let Some(hipaa) = &rule.hipaa { - if !hipaa.is_empty() { - compliance.push(format!("HIPAA: {}", hipaa.join(", "))); - } - } - if let Some(pci) = &rule.pci_dss { - if !pci.is_empty() { - compliance.push(format!("PCI DSS: {}", pci.join(", "))); - } - } - if let Some(nist) = &rule.nist_800_53 { - if !nist.is_empty() { - compliance.push(format!("NIST 800-53: {}", nist.join(", "))); - } - } - if compliance.is_empty() { - String::new() - } else { - format!("\nCompliance: {}", compliance.join(" | ")) - } - }; - - let severity = match rule.level { - 0..=3 => "Low", - 4..=7 => "Medium", - 8..=12 => "High", - 13..=15 => "Critical", - _ => "Unknown", - }; - - let formatted_text = format!( - "Rule ID: {}\nLevel: {} ({})\nDescription: {}\nGroups: {}\nFile: {}\nStatus: {}{}", - rule.id, - rule.level, - severity, - rule.description, - groups_str, - rule.filename, - rule.status, - compliance_info - ); - Content::text(formatted_text) - }) - .collect(); - - tracing::info!("Successfully processed {} rules into {} MCP content items", num_rules, mcp_content_items.len()); - Ok(CallToolResult::success(mcp_content_items)) - } - Err(e) => { - let err_msg = format!("Error retrieving rules from Wazuh: {}", e); - tracing::error!("{}", err_msg); - Ok(CallToolResult::error(vec![Content::text(err_msg)])) - } - } + self.rule_tools.get_wazuh_rules_summary(params).await } #[tool( @@ -501,175 +243,11 @@ impl WazuhToolsServer { )] async fn get_wazuh_vulnerability_summary( &self, - #[tool(aggr)] params: GetVulnerabilitySummaryParams, + #[tool(aggr)] params: GetVulnerabilitiesSummaryParams, ) -> Result { - let limit = params.limit.unwrap_or(100); - let offset = 0; // Default offset, can be extended in future if needed - - // agent_id is now a required String. If missing, serde would have failed deserialization. - // We just need to format it. - let agent_id = match Self::format_agent_id(¶ms.agent_id) { - Ok(formatted_id) => formatted_id, - Err(err_msg) => { - tracing::error!("Error formatting agent_id for vulnerability summary: {}", err_msg); - return Ok(CallToolResult::error(vec![Content::text(err_msg)])); - } - }; - - tracing::info!( - limit = %limit, - agent_id = %agent_id, - severity = ?params.severity, - cve = ?params.cve, - "Retrieving Wazuh vulnerability summary" - ); - - let mut vulnerability_client = self.wazuh_vulnerability_client.lock().await; - - // Filter by CVE if specified by searching through results - let vulnerabilities = if let Some(cve_filter) = ¶ms.cve { - match vulnerability_client.get_agent_vulnerabilities( - &agent_id, - Some(1000), // Get more results to filter - Some(offset), - params.severity.as_deref().and_then(VulnerabilitySeverity::from_str) - ).await { - Ok(all_vulns) => { - let filtered: Vec<_> = all_vulns - .into_iter() - .filter(|v| v.cve.to_lowercase().contains(&cve_filter.to_lowercase())) - .take(limit as usize) - .collect(); - Ok(filtered) - } - Err(e) => Err(e), - } - } else { - vulnerability_client.get_agent_vulnerabilities( - &agent_id, - Some(limit), - Some(offset), - params.severity.as_deref().and_then(VulnerabilitySeverity::from_str) - ).await - }; - - match vulnerabilities { - Ok(vulnerabilities) => { - if vulnerabilities.is_empty() { - tracing::info!("No Wazuh vulnerabilities found matching criteria. Returning standard message."); - return Ok(CallToolResult::success(vec![Content::text( - "No Wazuh vulnerabilities found matching the specified criteria.", - )])); - } - - let num_vulnerabilities = vulnerabilities.len(); - let mcp_content_items: Vec = vulnerabilities - .into_iter() - .map(|vuln| { - let severity_indicator = match vuln.severity { - VulnerabilitySeverity::Critical => "🔴 CRITICAL", - VulnerabilitySeverity::High => "🟠 HIGH", - VulnerabilitySeverity::Medium => "🟡 MEDIUM", - VulnerabilitySeverity::Low => "🟢 LOW", - }; - - let published_info = if let Some(published) = &vuln.published { - format!("\nPublished: {}", published) - } else { - String::new() - }; - - let updated_info = if let Some(updated) = &vuln.updated { - format!("\nUpdated: {}", updated) - } else { - String::new() - }; - - let detection_time_info = if let Some(detection_time) = &vuln.detection_time { - format!("\nDetection Time: {}", detection_time) - } else { - String::new() - }; - - let agent_info = { - let id_str = vuln.agent_id.as_deref(); - let name_str = vuln.agent_name.as_deref(); - - match (id_str, name_str) { - (Some("000"), Some(name)) => format!("\nAgent: {} (Wazuh Manager, ID: 000)", name), - (Some("000"), None) => "\nAgent: Wazuh Manager (ID: 000)".to_string(), - (Some(id), Some(name)) => format!("\nAgent: {} (ID: {})", name, id), - (Some(id), None) => format!("\nAgent ID: {}", id), - (None, Some(name)) => format!("\nAgent: {} (ID: Unknown)", name), // Should ideally not happen if ID is a primary key for agent context - (None, None) => String::new(), // No agent information available - } - }; - - let cvss_info = if let Some(cvss) = &vuln.cvss { - let mut cvss_parts = Vec::new(); - if let Some(cvss2) = &cvss.cvss2 { - if let Some(score) = cvss2.base_score { - cvss_parts.push(format!("CVSS2: {}", score)); - } - } - if let Some(cvss3) = &cvss.cvss3 { - if let Some(score) = cvss3.base_score { - cvss_parts.push(format!("CVSS3: {}", score)); - } - } - if !cvss_parts.is_empty() { - format!("\nCVSS Scores: {}", cvss_parts.join(", ")) - } else { - String::new() - } - } else { - String::new() - }; - - let reference_info = if let Some(reference) = &vuln.reference { - format!("\nReference: {}", reference) - } else { - String::new() - }; - - let description = vuln.description.as_deref().unwrap_or("No description available"); - - let formatted_text = format!( - "CVE: {}\nSeverity: {}\nTitle: {}\nDescription: {}{}{}{}{}{}{}", - vuln.cve, - severity_indicator, - vuln.title, - description, - published_info, - updated_info, - detection_time_info, - agent_info, - cvss_info, - reference_info - ); - Content::text(formatted_text) - }) - .collect(); - - tracing::info!("Successfully processed {} vulnerabilities into {} MCP content items", num_vulnerabilities, mcp_content_items.len()); - Ok(CallToolResult::success(mcp_content_items)) - } - Err(e) => { - match e { - wazuh_client::WazuhApiError::HttpError { status, message: _, url: _ } if status == StatusCode::NOT_FOUND => { - tracing::info!("No vulnerability summary found for agent {}. Returning standard message.", agent_id); - return Ok(CallToolResult::success(vec![Content::text( - format!("No vulnerability summary found for agent {}.", agent_id), - )])); - } - _ => { - } - } - let err_msg = format!("Error retrieving vulnerabilities from Wazuh: {}", e); - tracing::error!("{}", err_msg); - Ok(CallToolResult::error(vec![Content::text(err_msg)])) - } - } + self.vulnerability_tools + .get_wazuh_vulnerability_summary(params) + .await } #[tool( @@ -680,124 +258,9 @@ impl WazuhToolsServer { &self, #[tool(aggr)] params: GetCriticalVulnerabilitiesParams, ) -> Result { - let agent_id = match Self::format_agent_id(¶ms.agent_id) { - Ok(formatted_id) => formatted_id, - Err(err_msg) => { - tracing::error!("Error formatting agent_id for critical vulnerabilities: {}", err_msg); - return Ok(CallToolResult::error(vec![Content::text(err_msg)])); - } - }; - - tracing::info!( - agent_id = %agent_id, - "Retrieving critical vulnerabilities for Wazuh agent" - ); - - let mut vulnerability_client = self.wazuh_vulnerability_client.lock().await; - - match vulnerability_client.get_critical_vulnerabilities(&agent_id).await { - Ok(vulnerabilities) => { - if vulnerabilities.is_empty() { - tracing::info!("No critical vulnerabilities found for agent {}. Returning standard message.", agent_id); - return Ok(CallToolResult::success(vec![Content::text( - format!("No critical vulnerabilities found for agent {}.", agent_id), - )])); - } - - let num_vulnerabilities = vulnerabilities.len(); - let mcp_content_items: Vec = vulnerabilities - .into_iter() - .map(|vuln| { - let published_info = if let Some(published) = &vuln.published { - format!("\nPublished: {}", published) - } else { - String::new() - }; - - let updated_info = if let Some(updated) = &vuln.updated { - format!("\nUpdated: {}", updated) - } else { - String::new() - }; - - let detection_time_info = if let Some(detection_time) = &vuln.detection_time { - format!("\nDetection Time: {}", detection_time) - } else { - String::new() - }; - - let agent_info = if let Some(agent_name) = &vuln.agent_name { - format!("\nAgent: {} (ID: {})", agent_name, vuln.agent_id.as_deref().unwrap_or("Unknown")) - } else if let Some(agent_id) = &vuln.agent_id { - format!("\nAgent ID: {}", agent_id) - } else { - String::new() - }; - - let cvss_info = if let Some(cvss) = &vuln.cvss { - let mut cvss_parts = Vec::new(); - if let Some(cvss2) = &cvss.cvss2 { - if let Some(score) = cvss2.base_score { - cvss_parts.push(format!("CVSS2: {}", score)); - } - } - if let Some(cvss3) = &cvss.cvss3 { - if let Some(score) = cvss3.base_score { - cvss_parts.push(format!("CVSS3: {}", score)); - } - } - if !cvss_parts.is_empty() { - format!("\nCVSS Scores: {}", cvss_parts.join(", ")) - } else { - String::new() - } - } else { - String::new() - }; - - let reference_info = if let Some(reference) = &vuln.reference { - format!("\nReference: {}", reference) - } else { - String::new() - }; - - let description = vuln.description.as_deref().unwrap_or("No description available"); - - let formatted_text = format!( - "🔴 CRITICAL VULNERABILITY\nCVE: {}\nTitle: {}\nDescription: {}{}{}{}{}{}{}", - vuln.cve, - vuln.title, - description, - published_info, - updated_info, - detection_time_info, - agent_info, - cvss_info, - reference_info - ); - Content::text(formatted_text) - }) - .collect(); - - tracing::info!("Successfully processed {} critical vulnerabilities into {} MCP content items", num_vulnerabilities, mcp_content_items.len()); - Ok(CallToolResult::success(mcp_content_items)) - } - Err(e) => { - match e { - wazuh_client::WazuhApiError::HttpError { status, message: _, url: _ } if status == StatusCode::NOT_FOUND => { - tracing::info!("No critical vulnerabilities found for agent {}. Returning standard message.", agent_id); - return Ok(CallToolResult::success(vec![Content::text( - format!("No critical vulnerabilities found for agent {}.", agent_id), - )])); - } - _ => { - } - } - let err_msg = format!("Error retrieving critical vulnerabilities from Wazuh for agent {}: {}", agent_id, e); - tracing::error!("{}", err_msg); - Ok(CallToolResult::error(vec![Content::text(err_msg)])) - } - } + self.vulnerability_tools + .get_wazuh_critical_vulnerabilities(params) + .await } #[tool( @@ -808,178 +271,7 @@ impl WazuhToolsServer { &self, #[tool(aggr)] params: GetAgentsParams, ) -> Result { - let limit = params.limit.unwrap_or(100); - - tracing::info!( - limit = %limit, - status = ?params.status, - name = ?params.name, - ip = ?params.ip, - group = ?params.group, - os_platform = ?params.os_platform, - version = ?params.version, - "Retrieving Wazuh agents" - ); - - let mut agents_client = self.wazuh_agents_client.lock().await; - - match agents_client.get_agents( - Some(limit), - None, // offset - None, // select - None, // sort - None, // search - Some(¶ms.status), - None, // query - None, // older_than - params.os_platform.as_deref(), - None, // os_version - None, // os_name - None, // manager_host - params.version.as_deref(), - params.group.as_deref(), - None, // node_name - params.name.as_deref(), - params.ip.as_deref(), - None, // register_ip - None, // group_config_status - None, // distinct - ).await { - Ok(agents) => { - if agents.is_empty() { - tracing::info!("No Wazuh agents found matching criteria. Returning standard message."); - return Ok(CallToolResult::success(vec![Content::text( - format!("No Wazuh agents found matching the specified criteria (status: {}).", ¶ms.status), - )])); - } - - let num_agents = agents.len(); - let mcp_content_items: Vec = agents - .into_iter() - .map(|agent| { - let status_indicator = match agent.status.to_lowercase().as_str() { - "active" => "🟢 ACTIVE", - "disconnected" => "🔴 DISCONNECTED", - "pending" => "🟡 PENDING", - "never_connected" => "⚪ NEVER CONNECTED", - _ => &agent.status, - }; - - let ip_info = if let Some(ip) = &agent.ip { - format!("\nIP: {}", ip) - } else { - String::new() - }; - - let register_ip_info = if let Some(register_ip) = &agent.register_ip { - if agent.ip.as_ref() != Some(register_ip) { - format!("\nRegistered IP: {}", register_ip) - } else { - String::new() - } - } else { - String::new() - }; - - let os_info = if let Some(os) = &agent.os { - let mut os_parts = Vec::new(); - if let Some(name) = &os.name { - os_parts.push(name.clone()); - } - if let Some(version) = &os.version { - os_parts.push(version.clone()); - } - if let Some(arch) = &os.arch { - os_parts.push(format!("({})", arch)); - } - if !os_parts.is_empty() { - format!("\nOS: {}", os_parts.join(" ")) - } else { - String::new() - } - } else { - String::new() - }; - - let version_info = if let Some(version) = &agent.version { - format!("\nAgent Version: {}", version) - } else { - String::new() - }; - - let group_info = if let Some(groups) = &agent.group { - if !groups.is_empty() { - format!("\nGroups: {}", groups.join(", ")) - } else { - String::new() - } - } else { - String::new() - }; - - let last_keep_alive_info = if let Some(last_keep_alive) = &agent.last_keep_alive { - format!("\nLast Keep Alive: {}", last_keep_alive) - } else { - String::new() - }; - - let date_add_info = if let Some(date_add) = &agent.date_add { - format!("\nRegistered: {}", date_add) - } else { - String::new() - }; - - let node_info = if let Some(node_name) = &agent.node_name { - format!("\nNode: {}", node_name) - } else { - String::new() - }; - - let config_status_info = if let Some(config_status) = &agent.group_config_status { - let config_indicator = match config_status.to_lowercase().as_str() { - "synced" => "✅ SYNCED", - "not synced" => "❌ NOT SYNCED", - _ => config_status, - }; - format!("\nConfig Status: {}", config_indicator) - } else { - String::new() - }; - - let agent_id_display = if agent.id == "000" { - format!("{} (Wazuh Manager)", agent.id) - } else { - agent.id.clone() - }; - - let formatted_text = format!( - "Agent ID: {}\nName: {}\nStatus: {}{}{}{}{}{}{}{}{}{}", - agent_id_display, - agent.name, - status_indicator, - ip_info, - register_ip_info, - os_info, - version_info, - group_info, - last_keep_alive_info, - date_add_info, - node_info, - config_status_info - ); - Content::text(formatted_text) - }) - .collect(); - - tracing::info!("Successfully processed {} agents into {} MCP content items", num_agents, mcp_content_items.len()); - Ok(CallToolResult::success(mcp_content_items)) - } - Err(e) => { - let err_msg = format!("Error retrieving agents from Wazuh: {}", e); - tracing::error!("{}", err_msg); - Ok(CallToolResult::error(vec![Content::text(err_msg)])) - } - } + self.agent_tools.get_wazuh_agents(params).await } #[tool( @@ -990,104 +282,7 @@ impl WazuhToolsServer { &self, #[tool(aggr)] params: GetAgentProcessesParams, ) -> Result { - let agent_id = match Self::format_agent_id(¶ms.agent_id) { - Ok(formatted_id) => formatted_id, - Err(err_msg) => { - tracing::error!("Error formatting agent_id for agent processes: {}", err_msg); - return Ok(CallToolResult::error(vec![Content::text(err_msg)])); - } - }; - let limit = params.limit.unwrap_or(100); - let offset = 0; - - tracing::info!( - agent_id = %agent_id, - limit = %limit, - search = ?params.search, - "Retrieving Wazuh agent processes" - ); - - let mut vulnerability_client = self.wazuh_vulnerability_client.lock().await; - - match vulnerability_client.get_agent_processes( - &agent_id, - Some(limit), - Some(offset), - params.search.as_deref(), - ).await { - Ok(processes) => { - if processes.is_empty() { - tracing::info!("No processes found for agent {} with current filters. Returning standard message.", agent_id); - return Ok(CallToolResult::success(vec![Content::text( - format!("No processes found for agent {} matching the specified criteria.", agent_id), - )])); - } - - let num_processes = processes.len(); - let mcp_content_items: Vec = processes - .into_iter() - .map(|process| { - let mut details = vec![ - format!("PID: {}", process.pid), - format!("Name: {}", process.name), - ]; - - if let Some(state) = &process.state { - details.push(format!("State: {}", state)); - } - if let Some(ppid) = &process.ppid { - details.push(format!("PPID: {}", ppid)); - } - if let Some(euser) = &process.euser { - details.push(format!("User: {}", euser)); - } - if let Some(cmd) = &process.cmd { - details.push(format!("Command: {}", cmd)); - } - if let Some(start_time_str) = &process.start_time { - if let Ok(start_time_unix) = start_time_str.parse::() { - // Assuming start_time is a Unix timestamp in seconds - use chrono::DateTime; - if let Some(dt) = DateTime::from_timestamp(start_time_unix, 0) { - details.push(format!("Start Time: {}", dt.format("%Y-%m-%d %H:%M:%S UTC"))); - } else { - details.push(format!("Start Time: {} (raw)", start_time_str)); - } - } else { - // If it's not a simple number, print as is - details.push(format!("Start Time: {}", start_time_str)); - } - } - if let Some(resident_mem) = process.resident { - details.push(format!("Memory (Resident): {} KB", resident_mem / 1024)); // Assuming resident is in bytes - } - if let Some(vm_size) = process.vm_size { - details.push(format!("Memory (VM Size): {} KB", vm_size / 1024)); // Assuming vm_size is in bytes - } - - Content::text(details.join("\n")) - }) - .collect(); - - tracing::info!("Successfully processed {} processes for agent {} into {} MCP content items", num_processes, agent_id, mcp_content_items.len()); - Ok(CallToolResult::success(mcp_content_items)) - } - Err(e) => { - match e { - wazuh_client::WazuhApiError::HttpError { status, message: _, url: _ } if status == StatusCode::NOT_FOUND => { - tracing::info!("No process data found for agent {}. Syscollector might not have run or data is unavailable.", agent_id); - Ok(CallToolResult::success(vec![Content::text( - format!("No process data found for agent {}. The agent might not exist, syscollector data might be unavailable, or the agent is not active.", agent_id), - )])) - } - _ => { - let err_msg = format!("Error retrieving processes for agent {} from Wazuh: {}", agent_id, e); - tracing::error!("{}", err_msg); - Ok(CallToolResult::error(vec![Content::text(err_msg)])) - } - } - } - } + self.agent_tools.get_wazuh_agent_processes(params).await } #[tool( @@ -1096,55 +291,9 @@ impl WazuhToolsServer { )] async fn get_wazuh_cluster_health( &self, - #[tool(aggr)] _params: GetClusterHealthParams, // No params used + #[tool(aggr)] params: GetClusterHealthParams, ) -> Result { - tracing::info!("Retrieving Wazuh cluster health"); - - let mut cluster_client = self.wazuh_cluster_client.lock().await; - - match cluster_client.is_cluster_healthy().await { - Ok(is_healthy) => { - let health_status_text = if is_healthy { - "Cluster is healthy: Yes".to_string() - } else { - // To provide more context, we can fetch the basic status - match cluster_client.get_cluster_status().await { - Ok(status) => { - let mut reasons = Vec::new(); - if !status.enabled.eq_ignore_ascii_case("yes") { - reasons.push("cluster is not enabled"); - } - if !status.running.eq_ignore_ascii_case("yes") { - reasons.push("cluster is not running"); - } - // is_cluster_healthy already checks n_connected_nodes > 0 if enabled and running - // If it's still false, and enabled/running are "yes", it implies no connected nodes or healthcheck failed. - if status.enabled.eq_ignore_ascii_case("yes") && status.running.eq_ignore_ascii_case("yes") { - match cluster_client.get_cluster_healthcheck().await { - Ok(hc) if hc.n_connected_nodes == 0 => reasons.push("no nodes are connected"), - Err(_) => reasons.push("healthcheck endpoint failed or reported issues"), - _ => {} // Healthy implies connected nodes - } - } - if reasons.is_empty() && !is_healthy { // Should not happen if logic is correct - reasons.push("unknown reason, check detailed logs or healthcheck endpoint"); - } - format!("Cluster is healthy: No. Reasons: {}", reasons.join("; ")) - } - Err(_) => { - "Cluster is healthy: No. Additionally, failed to retrieve basic cluster status for more details.".to_string() - } - } - }; - tracing::info!("Successfully retrieved cluster health: {}", health_status_text); - Ok(CallToolResult::success(vec![Content::text(health_status_text)])) - } - Err(e) => { - let err_msg = format!("Error retrieving cluster health from Wazuh: {}", e); - tracing::error!("{}", err_msg); - Ok(CallToolResult::error(vec![Content::text(err_msg)])) - } - } + self.stats_tools.get_wazuh_cluster_health(params).await } #[tool( @@ -1155,54 +304,7 @@ impl WazuhToolsServer { &self, #[tool(aggr)] params: GetClusterNodesParams, ) -> Result { - tracing::info!( - limit = ?params.limit, - offset = ?params.offset, - node_type = ?params.node_type, - "Retrieving Wazuh cluster nodes" - ); - - let mut cluster_client = self.wazuh_cluster_client.lock().await; - - match cluster_client.get_cluster_nodes( - params.limit, - params.offset, - params.node_type.as_deref(), - ).await { - Ok(nodes) => { - if nodes.is_empty() { - tracing::info!("No Wazuh cluster nodes found matching criteria. Returning standard message."); - return Ok(CallToolResult::success(vec![Content::text( - "No Wazuh cluster nodes found matching the specified criteria.", - )])); - } - - let num_nodes = nodes.len(); - let mcp_content_items: Vec = nodes - .into_iter() - .map(|node| { - let status_indicator = match node.status.to_lowercase().as_str() { - "connected" | "active" => "🟢 CONNECTED", // Assuming 'active' is also a good state - "disconnected" => "🔴 DISCONNECTED", - _ => &node.status, - }; - let formatted_text = format!( - "Node Name: {}\nType: {}\nVersion: {}\nIP: {}\nStatus: {}", - node.name, node.node_type, node.version, node.ip, status_indicator - ); - Content::text(formatted_text) - }) - .collect(); - - tracing::info!("Successfully processed {} cluster nodes into {} MCP content items", num_nodes, mcp_content_items.len()); - Ok(CallToolResult::success(mcp_content_items)) - } - Err(e) => { - let err_msg = format!("Error retrieving cluster nodes from Wazuh: {}", e); - tracing::error!("{}", err_msg); - Ok(CallToolResult::error(vec![Content::text(err_msg)])) - } - } + self.stats_tools.get_wazuh_cluster_nodes(params).await } #[tool( @@ -1213,56 +315,7 @@ impl WazuhToolsServer { &self, #[tool(aggr)] params: SearchManagerLogsParams, ) -> Result { - let limit = params.limit.unwrap_or(100); - let offset = params.offset.unwrap_or(0); - - tracing::info!( - limit = %limit, - offset = %offset, - level = ?params.level, - tag = ?params.tag, - search_term = ?params.search_term, - "Searching Wazuh manager logs" - ); - - let mut logs_client = self.wazuh_logs_client.lock().await; - - match logs_client.get_manager_logs( - Some(limit), - Some(offset), - Some(¶ms.level), - params.tag.as_deref(), - params.search_term.as_deref(), - ).await { - Ok(log_entries) => { - if log_entries.is_empty() { - tracing::info!("No Wazuh manager logs found matching criteria. Returning standard message."); - return Ok(CallToolResult::success(vec![Content::text( - "No Wazuh manager logs found matching the specified criteria.", - )])); - } - - let num_logs = log_entries.len(); - let mcp_content_items: Vec = log_entries - .into_iter() - .map(|log_entry| { - let formatted_text = format!( - "Timestamp: {}\nTag: {}\nLevel: {}\nDescription: {}", - log_entry.timestamp, log_entry.tag, log_entry.level, log_entry.description - ); - Content::text(formatted_text) - }) - .collect(); - - tracing::info!("Successfully processed {} manager log entries into {} MCP content items", num_logs, mcp_content_items.len()); - Ok(CallToolResult::success(mcp_content_items)) - } - Err(e) => { - let err_msg = format!("Error searching manager logs from Wazuh: {}", e); - tracing::error!("{}", err_msg); - Ok(CallToolResult::error(vec![Content::text(err_msg)])) - } - } + self.stats_tools.search_wazuh_manager_logs(params).await } #[tool( @@ -1273,42 +326,7 @@ impl WazuhToolsServer { &self, #[tool(aggr)] params: GetManagerErrorLogsParams, ) -> Result { - let limit = params.limit.unwrap_or(100); - - tracing::info!(limit = %limit, "Retrieving Wazuh manager error logs"); - - let mut logs_client = self.wazuh_logs_client.lock().await; - - match logs_client.get_error_logs(Some(limit)).await { - Ok(log_entries) => { - if log_entries.is_empty() { - tracing::info!("No Wazuh manager error logs found. Returning standard message."); - return Ok(CallToolResult::success(vec![Content::text( - "No Wazuh manager error logs found.", - )])); - } - - let num_logs = log_entries.len(); - let mcp_content_items: Vec = log_entries - .into_iter() - .map(|log_entry| { - let formatted_text = format!( - "Timestamp: {}\nTag: {}\nLevel: {}\nDescription: {}", - log_entry.timestamp, log_entry.tag, log_entry.level, log_entry.description - ); - Content::text(formatted_text) - }) - .collect(); - - tracing::info!("Successfully processed {} manager error log entries into {} MCP content items", num_logs, mcp_content_items.len()); - Ok(CallToolResult::success(mcp_content_items)) - } - Err(e) => { - let err_msg = format!("Error retrieving manager error logs from Wazuh: {}", e); - tracing::error!("{}", err_msg); - Ok(CallToolResult::error(vec![Content::text(err_msg)])) - } - } + self.stats_tools.get_wazuh_manager_error_logs(params).await } #[tool( @@ -1319,101 +337,7 @@ impl WazuhToolsServer { &self, #[tool(aggr)] params: GetLogCollectorStatsParams, ) -> Result { - let agent_id = match Self::format_agent_id(¶ms.agent_id) { - Ok(formatted_id) => formatted_id, - Err(err_msg) => { - tracing::error!("Error formatting agent_id for log collector stats: {}", err_msg); - return Ok(CallToolResult::error(vec![Content::text(err_msg)])); - } - }; - - tracing::info!(agent_id = %agent_id, "Retrieving Wazuh log collector stats"); - - let mut logs_client = self.wazuh_logs_client.lock().await; - - match logs_client.get_logcollector_stats(&agent_id).await { - Ok(stats) => { // stats is wazuh_client::logs::LogCollectorStats - // Helper closure to format a LogCollectorPeriod - let format_period = |period_name: &str, period_data: &wazuh_client::logs::LogCollectorPeriod| -> String { - let files_info: String = period_data.files.iter() - .map(|file: &wazuh_client::logs::LogFile| { - let targets_str: String = file.targets.iter() - .map(|target: &wazuh_client::logs::LogTarget| { - format!(" - Name: {}, Drops: {}", target.name, target.drops) - }) - .collect::>() - .join("\n"); - let targets_display = if targets_str.is_empty() { - " (No specific targets with drops for this file)".to_string() - } else { - format!(" Targets:\n{}", targets_str) - }; - format!( - " - Location: {}\n Events: {}\n Bytes: {}\n{}", - file.location, file.events, file.bytes, targets_display - ) - }) - .collect::>() - .join("\n\n"); - - let files_display = if files_info.is_empty() { - " (No files processed in this period)".to_string() - } else { - files_info - }; - - format!( - "{}:\n Start: {}\n End: {}\n Files:\n{}", - period_name, period_data.start, period_data.end, files_display - ) - }; - - let global_period_info = format_period("Global Period", &stats.global); - let interval_period_info = format_period("Interval Period", &stats.interval); - - let formatted_text = format!( - "Log Collector Stats for Agent: {}\n\n{}\n\n{}", - agent_id, // Use the agent_id from params - global_period_info, - interval_period_info - ); - - tracing::info!("Successfully retrieved and formatted log collector stats for agent {}", agent_id); - Ok(CallToolResult::success(vec![Content::text(formatted_text)])) - } - Err(e) => { - // Check if the error is due to agent not found or stats not available - if let wazuh_client::WazuhApiError::ApiError(msg) = &e { - // This specific message is returned by the client if affected_items is empty/missing - if msg.contains(&format!("Log collector stats for agent {} not found", agent_id)) { - tracing::info!("No log collector stats found for agent {} (API error). Returning standard message.", agent_id); - return Ok(CallToolResult::success(vec![Content::text( - format!("No log collector stats found for agent {}. The agent might not exist, stats are unavailable, or the agent is not active.", agent_id), - )])); - } - // General "Agent Not Found" might also come as an ApiError from other layers - if msg.contains("Agent Not Found") { - tracing::info!("Agent {} not found (API error). Returning standard message.", agent_id); - return Ok(CallToolResult::success(vec![Content::text( - format!("Agent {} not found. Cannot retrieve log collector stats.", agent_id), - )])); - } - } - // HTTP 404 can also indicate agent not found or endpoint issues - if let wazuh_client::WazuhApiError::HttpError { status, .. } = &e { - if *status == StatusCode::NOT_FOUND { - tracing::info!("No log collector stats found for agent {} (HTTP 404). Agent might not exist or endpoint unavailable.", agent_id); - return Ok(CallToolResult::success(vec![Content::text( - format!("No log collector stats found for agent {}. The agent might not exist, stats are unavailable, or the agent is not active.", agent_id), - )])); - } - } - // Default error handling for other cases - let err_msg = format!("Error retrieving log collector stats for agent {} from Wazuh: {}", agent_id, e); - tracing::error!("{}", err_msg); - Ok(CallToolResult::error(vec![Content::text(err_msg)])) - } - } + self.stats_tools.get_wazuh_log_collector_stats(params).await } #[tool( @@ -1422,35 +346,9 @@ impl WazuhToolsServer { )] async fn get_wazuh_remoted_stats( &self, - #[tool(aggr)] _params: GetRemotedStatsParams, // No params used + #[tool(aggr)] params: GetRemotedStatsParams, ) -> Result { - tracing::info!("Retrieving Wazuh remoted stats"); - - let mut logs_client = self.wazuh_logs_client.lock().await; - - match logs_client.get_remoted_stats().await { - Ok(stats) => { - let formatted_text = format!( - "Wazuh Remoted Statistics:\nQueue Size: {}\nTotal Queue Size: {}\nTCP Sessions: {}\nControl Message Count: {}\nDiscarded Message Count: {}\nMessages Sent (Bytes): {}\nBytes Received: {}\nDequeued After Close: {}", - stats.queue_size, - stats.total_queue_size, - stats.tcp_sessions, - stats.ctrl_msg_count, - stats.discarded_count, - stats.sent_bytes, - stats.recv_bytes, - stats.dequeued_after_close - ); - - tracing::info!("Successfully retrieved remoted stats"); - Ok(CallToolResult::success(vec![Content::text(formatted_text)])) - } - Err(e) => { - let err_msg = format!("Error retrieving remoted stats from Wazuh: {}", e); - tracing::error!("{}", err_msg); - Ok(CallToolResult::error(vec![Content::text(err_msg)])) - } - } + self.stats_tools.get_wazuh_remoted_stats(params).await } #[tool( @@ -1461,140 +359,7 @@ impl WazuhToolsServer { &self, #[tool(aggr)] params: GetAgentPortsParams, ) -> Result { - let agent_id = match Self::format_agent_id(¶ms.agent_id) { - Ok(formatted_id) => formatted_id, - Err(err_msg) => { - tracing::error!("Error formatting agent_id for agent ports: {}", err_msg); - return Ok(CallToolResult::error(vec![Content::text(err_msg)])); - } - }; - let limit = params.limit.unwrap_or(100); - let offset = 0; // Default offset - - tracing::info!( - agent_id = %agent_id, - limit = %limit, - protocol = ?params.protocol, - state = ?params.state, - "Retrieving Wazuh agent network ports" - ); - - let mut vulnerability_client = self.wazuh_vulnerability_client.lock().await; - - // Note: The wazuh_client::VulnerabilityClient::get_agent_ports provided in the prompt - // only supports filtering by protocol. If state filtering is needed, the client would need an update. - // For now, we pass params.protocol and ignore params.state for the API call, - // but we can filter by state client-side if necessary, or acknowledge this limitation. - // The current wazuh-client `get_agent_ports` does not support state filtering directly in its parameters. - // We will filter client-side for now if `params.state` is provided. - match vulnerability_client.get_agent_ports( - &agent_id, - Some(limit * 2), // Fetch more to allow for client-side state filtering - Some(offset), - Some(¶ms.protocol), - ).await { - Ok(mut ports) => { - let requested_state_is_listening = params.state.trim().eq_ignore_ascii_case("listening"); - - ports.retain(|port| { - tracing::debug!( - "Pre-filter port: {:?} (State: {:?}), requested_state_is_listening: {}", - port.inode, // Using inode for a concise port identifier in log - port.state, - requested_state_is_listening - ); - let result = match port.state.as_ref().map(|s| s.trim()) { - Some(actual_port_state_str) => { // Port has a state string - if actual_port_state_str.is_empty() { - // Filter out ports where state is present but an empty string - false - } else if requested_state_is_listening { - // User requested "listening": keep only if actual state is "listening" - actual_port_state_str.eq_ignore_ascii_case("listening") - } else { - // User requested non-"listening": keep if actual state is not "listening" - !actual_port_state_str.eq_ignore_ascii_case("listening") - } - } - None => { // Port has no state (port.state is None) - if requested_state_is_listening { - // If user wants "listening" ports, a port with no state is not a match. - false - } else { - // If user wants non-"listening" ports, a port with no state is a match. - true - } - } - }; - - tracing::debug!("Post-filter decision for port: {:?}, Keep: {}", port.inode, result); - result - }); - - // Apply limit after client-side filtering - ports.truncate(limit as usize); - - if ports.is_empty() { - tracing::info!("No network ports found for agent {} with current filters. Returning standard message.", agent_id); - return Ok(CallToolResult::success(vec![Content::text( - format!("No network ports found for agent {} matching the specified criteria.", agent_id), - )])); - } - - let num_ports = ports.len(); - let mcp_content_items: Vec = ports - .into_iter() - .map(|port: WazuhPort| { // Explicitly type port - let mut details = vec![ - format!("Protocol: {}", port.protocol), - format!("Local: {}:{}", port.local.ip.clone().unwrap_or("N/A".to_string()), port.local.port), - ]; - - if let Some(remote) = &port.remote { - details.push(format!("Remote: {}:{}", remote.ip.clone().unwrap_or("N/A".to_string()), remote.port)); - } - if let Some(state) = &port.state { - details.push(format!("State: {}", state)); - } - if let Some(process_name) = &port.process { // process field in WazuhPort is Option - details.push(format!("Process Name: {}", process_name)); - } - if let Some(pid) = port.pid { // pid field in WazuhPort is Option - details.push(format!("PID: {}", pid)); - } - if let Some(inode) = port.inode { - details.push(format!("Inode: {}", inode)); - } - if let Some(tx_queue) = port.tx_queue { - details.push(format!("TX Queue: {}", tx_queue)); - } - if let Some(rx_queue) = port.rx_queue { - details.push(format!("RX Queue: {}", rx_queue)); - } - - Content::text(details.join("\n")) - }) - .collect(); - - tracing::info!("Successfully processed {} network ports for agent {} into {} MCP content items", num_ports, agent_id, mcp_content_items.len()); - Ok(CallToolResult::success(mcp_content_items)) - } - Err(e) => { - match e { - wazuh_client::WazuhApiError::HttpError { status, message: _, url: _ } if status == StatusCode::NOT_FOUND => { - tracing::info!("No network port data found for agent {}. Syscollector might not have run or data is unavailable.", agent_id); - Ok(CallToolResult::success(vec![Content::text( - format!("No network port data found for agent {}. The agent might not exist, syscollector data might be unavailable, or the agent is not active.", agent_id), - )])) - } - _ => { - let err_msg = format!("Error retrieving network ports for agent {} from Wazuh: {}", agent_id, e); - tracing::error!("{}", err_msg); - Ok(CallToolResult::error(vec![Content::text(err_msg)])) - } - } - } - } + self.agent_tools.get_wazuh_agent_ports(params).await } #[tool( @@ -1603,32 +368,9 @@ impl WazuhToolsServer { )] async fn get_wazuh_weekly_stats( &self, - #[tool(aggr)] _params: GetWeeklyStatsParams, // No params used + #[tool(aggr)] params: GetWeeklyStatsParams, ) -> Result { - tracing::info!("Retrieving Wazuh weekly stats"); - - let mut logs_client = self.wazuh_logs_client.lock().await; - - match logs_client.get_weekly_stats().await { - Ok(stats_value) => { - match serde_json::to_string_pretty(&stats_value) { - Ok(formatted_json) => { - tracing::info!("Successfully retrieved and formatted weekly stats."); - Ok(CallToolResult::success(vec![Content::text(formatted_json)])) - } - Err(e) => { - let err_msg = format!("Error formatting weekly stats JSON: {}", e); - tracing::error!("{}", err_msg); - Ok(CallToolResult::error(vec![Content::text(err_msg)])) - } - } - } - Err(e) => { - let err_msg = format!("Error retrieving weekly stats from Wazuh: {}", e); - tracing::error!("{}", err_msg); - Ok(CallToolResult::error(vec![Content::text(err_msg)])) - } - } + self.stats_tools.get_wazuh_weekly_stats(params).await } } @@ -1696,14 +438,12 @@ async fn main() -> anyhow::Result<()> { tracing::info!("Starting Wazuh MCP Server..."); // Create an instance of our Wazuh tools server - let server = WazuhToolsServer::new() - .expect("Error initializing Wazuh tools server"); + let server = WazuhToolsServer::new().expect("Error initializing Wazuh tools server"); tracing::info!("Using stdio transport"); - let service = server.serve(stdio()).await - .inspect_err(|e| { - tracing::error!("serving error: {:?}", e); - })?; + let service = server.serve(stdio()).await.inspect_err(|e| { + tracing::error!("serving error: {:?}", e); + })?; service.waiting().await?; Ok(()) diff --git a/src/tools/agents.rs b/src/tools/agents.rs new file mode 100644 index 0000000..a8f508c --- /dev/null +++ b/src/tools/agents.rs @@ -0,0 +1,563 @@ +//! Agent tools module for Wazuh MCP Server +//! +//! This module contains all agent-related tool implementations including: +//! - Agent listing and filtering +//! - Agent process monitoring +//! - Agent network port monitoring + +use super::{ToolModule, ToolUtils}; +use reqwest::StatusCode; +use rmcp::model::{CallToolResult, Content}; +use rmcp::Error as McpError; +use std::sync::Arc; +use tokio::sync::Mutex; +use wazuh_client::{AgentsClient, Port as WazuhPort, VulnerabilityClient}; + +#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] +pub struct GetAgentsParams { + #[schemars(description = "Maximum number of agents to retrieve (default: 100)")] + pub limit: Option, + #[schemars( + description = "Agent status filter (active, disconnected, pending, never_connected)" + )] + pub status: String, + #[schemars(description = "Agent name to search for (optional)")] + pub name: Option, + #[schemars(description = "Agent IP address to filter by (optional)")] + pub ip: Option, + #[schemars(description = "Agent group to filter by (optional)")] + pub group: Option, + #[schemars(description = "Operating system platform to filter by (optional)")] + pub os_platform: Option, + #[schemars(description = "Agent version to filter by (optional)")] + pub version: Option, +} + +#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] +pub struct GetAgentProcessesParams { + #[schemars( + description = "Agent ID to get processes for (required, e.g., \"0\", \"1\", \"001\")" + )] + pub agent_id: String, + #[schemars(description = "Maximum number of processes to retrieve (default: 100)")] + pub limit: Option, + #[schemars(description = "Search string to filter processes by name or command (optional)")] + pub search: Option, +} + +#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] +pub struct GetAgentPortsParams { + #[schemars( + description = "Agent ID to get network ports for (required, e.g., \"001\", \"002\", \"003\")" + )] + pub agent_id: String, + #[schemars(description = "Maximum number of ports to retrieve (default: 100)")] + pub limit: Option, + #[schemars(description = "Protocol to filter by (e.g., \"tcp\", \"udp\")")] + pub protocol: String, + #[schemars(description = "State to filter by (e.g., \"LISTENING\", \"ESTABLISHED\")")] + pub state: String, +} + +#[derive(Clone)] +pub struct AgentTools { + agents_client: Arc>, + vulnerability_client: Arc>, +} + +impl AgentTools { + pub fn new( + agents_client: Arc>, + vulnerability_client: Arc>, + ) -> Self { + Self { + agents_client, + vulnerability_client, + } + } + + pub async fn get_wazuh_agents( + &self, + params: GetAgentsParams, + ) -> Result { + let limit = params.limit.unwrap_or(100); + + tracing::info!( + limit = %limit, + status = ?params.status, + name = ?params.name, + ip = ?params.ip, + group = ?params.group, + os_platform = ?params.os_platform, + version = ?params.version, + "Retrieving Wazuh agents" + ); + + let mut agents_client = self.agents_client.lock().await; + + match agents_client + .get_agents( + Some(limit), + None, // offset + None, // select + None, // sort + None, // search + Some(¶ms.status), + None, // query + None, // older_than + params.os_platform.as_deref(), + None, // os_version + None, // os_name + None, // manager_host + params.version.as_deref(), + params.group.as_deref(), + None, // node_name + params.name.as_deref(), + params.ip.as_deref(), + None, // register_ip + None, // group_config_status + None, // distinct + ) + .await + { + Ok(agents) => { + if agents.is_empty() { + tracing::info!( + "No Wazuh agents found matching criteria. Returning standard message." + ); + return Self::not_found_result(&format!( + "Wazuh agents matching the specified criteria (status: {})", + ¶ms.status + )); + } + + let num_agents = agents.len(); + let mcp_content_items: Vec = agents + .into_iter() + .map(|agent| { + let status_indicator = match agent.status.to_lowercase().as_str() { + "active" => "🟢 ACTIVE", + "disconnected" => "🔴 DISCONNECTED", + "pending" => "🟡 PENDING", + "never_connected" => "⚪ NEVER CONNECTED", + _ => &agent.status, + }; + + let ip_info = if let Some(ip) = &agent.ip { + format!("\nIP: {}", ip) + } else { + String::new() + }; + + let register_ip_info = if let Some(register_ip) = &agent.register_ip { + if agent.ip.as_ref() != Some(register_ip) { + format!("\nRegistered IP: {}", register_ip) + } else { + String::new() + } + } else { + String::new() + }; + + let os_info = if let Some(os) = &agent.os { + let mut os_parts = Vec::new(); + if let Some(name) = &os.name { + os_parts.push(name.clone()); + } + if let Some(version) = &os.version { + os_parts.push(version.clone()); + } + if let Some(arch) = &os.arch { + os_parts.push(format!("({})", arch)); + } + if !os_parts.is_empty() { + format!("\nOS: {}", os_parts.join(" ")) + } else { + String::new() + } + } else { + String::new() + }; + + let version_info = if let Some(version) = &agent.version { + format!("\nAgent Version: {}", version) + } else { + String::new() + }; + + let group_info = if let Some(groups) = &agent.group { + if !groups.is_empty() { + format!("\nGroups: {}", groups.join(", ")) + } else { + String::new() + } + } else { + String::new() + }; + + let last_keep_alive_info = + if let Some(last_keep_alive) = &agent.last_keep_alive { + format!("\nLast Keep Alive: {}", last_keep_alive) + } else { + String::new() + }; + + let date_add_info = if let Some(date_add) = &agent.date_add { + format!("\nRegistered: {}", date_add) + } else { + String::new() + }; + + let node_info = if let Some(node_name) = &agent.node_name { + format!("\nNode: {}", node_name) + } else { + String::new() + }; + + let config_status_info = + if let Some(config_status) = &agent.group_config_status { + let config_indicator = match config_status.to_lowercase().as_str() { + "synced" => "✅ SYNCED", + "not synced" => "❌ NOT SYNCED", + _ => config_status, + }; + format!("\nConfig Status: {}", config_indicator) + } else { + String::new() + }; + + let agent_id_display = if agent.id == "000" { + format!("{} (Wazuh Manager)", agent.id) + } else { + agent.id.clone() + }; + + let formatted_text = format!( + "Agent ID: {}\nName: {}\nStatus: {}{}{}{}{}{}{}{}{}{}", + agent_id_display, + agent.name, + status_indicator, + ip_info, + register_ip_info, + os_info, + version_info, + group_info, + last_keep_alive_info, + date_add_info, + node_info, + config_status_info + ); + Content::text(formatted_text) + }) + .collect(); + + tracing::info!( + "Successfully processed {} agents into {} MCP content items", + num_agents, + mcp_content_items.len() + ); + Self::success_result(mcp_content_items) + } + Err(e) => { + let err_msg = Self::format_error("agents", "retrieving agents", &e); + tracing::error!("{}", err_msg); + Self::error_result(err_msg) + } + } + } + + pub async fn get_wazuh_agent_processes( + &self, + params: GetAgentProcessesParams, + ) -> Result { + let agent_id = match ToolUtils::format_agent_id(¶ms.agent_id) { + Ok(formatted_id) => formatted_id, + Err(err_msg) => { + tracing::error!("Error formatting agent_id for agent processes: {}", err_msg); + return Self::error_result(err_msg); + } + }; + let limit = params.limit.unwrap_or(100); + let offset = 0; + + tracing::info!( + agent_id = %agent_id, + limit = %limit, + search = ?params.search, + "Retrieving Wazuh agent processes" + ); + + let mut vulnerability_client = self.vulnerability_client.lock().await; + + match vulnerability_client + .get_agent_processes( + &agent_id, + Some(limit), + Some(offset), + params.search.as_deref(), + ) + .await + { + Ok(processes) => { + if processes.is_empty() { + tracing::info!("No processes found for agent {} with current filters. Returning standard message.", agent_id); + return Self::not_found_result(&format!( + "processes for agent {} matching the specified criteria", + agent_id + )); + } + + let num_processes = processes.len(); + let mcp_content_items: Vec = processes + .into_iter() + .map(|process| { + let mut details = vec![ + format!("PID: {}", process.pid), + format!("Name: {}", process.name), + ]; + + if let Some(state) = &process.state { + details.push(format!("State: {}", state)); + } + if let Some(ppid) = &process.ppid { + details.push(format!("PPID: {}", ppid)); + } + if let Some(euser) = &process.euser { + details.push(format!("User: {}", euser)); + } + if let Some(cmd) = &process.cmd { + details.push(format!("Command: {}", cmd)); + } + if let Some(start_time_str) = &process.start_time { + if let Ok(start_time_unix) = start_time_str.parse::() { + // Assuming start_time is a Unix timestamp in seconds + use chrono::DateTime; + if let Some(dt) = DateTime::from_timestamp(start_time_unix, 0) { + details.push(format!( + "Start Time: {}", + dt.format("%Y-%m-%d %H:%M:%S UTC") + )); + } else { + details.push(format!("Start Time: {} (raw)", start_time_str)); + } + } else { + // If it's not a simple number, print as is + details.push(format!("Start Time: {}", start_time_str)); + } + } + if let Some(resident_mem) = process.resident { + details.push(format!("Memory (Resident): {} KB", resident_mem / 1024)); + // Assuming resident is in bytes + } + if let Some(vm_size) = process.vm_size { + details.push(format!("Memory (VM Size): {} KB", vm_size / 1024)); + // Assuming vm_size is in bytes + } + + Content::text(details.join("\n")) + }) + .collect(); + + tracing::info!( + "Successfully processed {} processes for agent {} into {} MCP content items", + num_processes, + agent_id, + mcp_content_items.len() + ); + Self::success_result(mcp_content_items) + } + Err(e) => match e { + wazuh_client::WazuhApiError::HttpError { + status, + message: _, + url: _, + } if status == StatusCode::NOT_FOUND => { + tracing::info!("No process data found for agent {}. Syscollector might not have run or data is unavailable.", agent_id); + Self::success_result(vec![Content::text( + format!("No process data found for agent {}. The agent might not exist, syscollector data might be unavailable, or the agent is not active.", agent_id), + )]) + } + _ => { + let err_msg = Self::format_error( + "agent processes", + &format!("retrieving processes for agent {}", agent_id), + &e, + ); + tracing::error!("{}", err_msg); + Self::error_result(err_msg) + } + }, + } + } + + pub async fn get_wazuh_agent_ports( + &self, + params: GetAgentPortsParams, + ) -> Result { + let agent_id = match ToolUtils::format_agent_id(¶ms.agent_id) { + Ok(formatted_id) => formatted_id, + Err(err_msg) => { + tracing::error!("Error formatting agent_id for agent ports: {}", err_msg); + return Self::error_result(err_msg); + } + }; + let limit = params.limit.unwrap_or(100); + let offset = 0; // Default offset + + tracing::info!( + agent_id = %agent_id, + limit = %limit, + protocol = ?params.protocol, + state = ?params.state, + "Retrieving Wazuh agent network ports" + ); + + let mut vulnerability_client = self.vulnerability_client.lock().await; + + // Note: The wazuh_client::VulnerabilityClient::get_agent_ports provided in the prompt + // only supports filtering by protocol. If state filtering is needed, the client would need an update. + // For now, we pass params.protocol and ignore params.state for the API call, + // but we can filter by state client-side if necessary, or acknowledge this limitation. + // The current wazuh-client `get_agent_ports` does not support state filtering directly in its parameters. + // We will filter client-side for now if `params.state` is provided. + match vulnerability_client + .get_agent_ports( + &agent_id, + Some(limit * 2), // Fetch more to allow for client-side state filtering + Some(offset), + Some(¶ms.protocol), + ) + .await + { + Ok(mut ports) => { + let requested_state_is_listening = + params.state.trim().eq_ignore_ascii_case("listening"); + + ports.retain(|port| { + tracing::debug!( + "Pre-filter port: {:?} (State: {:?}), requested_state_is_listening: {}", + port.inode, // Using inode for a concise port identifier in log + port.state, + requested_state_is_listening + ); + let result = match port.state.as_ref().map(|s| s.trim()) { + Some(actual_port_state_str) => { + // Port has a state string + if actual_port_state_str.is_empty() { + // Filter out ports where state is present but an empty string + false + } else if requested_state_is_listening { + // User requested "listening": keep only if actual state is "listening" + actual_port_state_str.eq_ignore_ascii_case("listening") + } else { + // User requested non-"listening": keep if actual state is not "listening" + !actual_port_state_str.eq_ignore_ascii_case("listening") + } + } + None => { + // Port has no state (port.state is None) + if requested_state_is_listening { + // If user wants "listening" ports, a port with no state is not a match. + false + } else { + // If user wants non-"listening" ports, a port with no state is a match. + true + } + } + }; + + tracing::debug!( + "Post-filter decision for port: {:?}, Keep: {}", + port.inode, + result + ); + result + }); + + // Apply limit after client-side filtering + ports.truncate(limit as usize); + + if ports.is_empty() { + tracing::info!("No network ports found for agent {} with current filters. Returning standard message.", agent_id); + return Self::not_found_result(&format!( + "network ports for agent {} matching the specified criteria", + agent_id + )); + } + + let num_ports = ports.len(); + let mcp_content_items: Vec = ports + .into_iter() + .map(|port: WazuhPort| { + // Explicitly type port + let mut details = vec![ + format!("Protocol: {}", port.protocol), + format!( + "Local: {}:{}", + port.local.ip.clone().unwrap_or("N/A".to_string()), + port.local.port + ), + ]; + + if let Some(remote) = &port.remote { + details.push(format!( + "Remote: {}:{}", + remote.ip.clone().unwrap_or("N/A".to_string()), + remote.port + )); + } + if let Some(state) = &port.state { + details.push(format!("State: {}", state)); + } + if let Some(process_name) = &port.process { + // process field in WazuhPort is Option + details.push(format!("Process Name: {}", process_name)); + } + if let Some(pid) = port.pid { + // pid field in WazuhPort is Option + details.push(format!("PID: {}", pid)); + } + if let Some(inode) = port.inode { + details.push(format!("Inode: {}", inode)); + } + if let Some(tx_queue) = port.tx_queue { + details.push(format!("TX Queue: {}", tx_queue)); + } + if let Some(rx_queue) = port.rx_queue { + details.push(format!("RX Queue: {}", rx_queue)); + } + + Content::text(details.join("\n")) + }) + .collect(); + + tracing::info!("Successfully processed {} network ports for agent {} into {} MCP content items", num_ports, agent_id, mcp_content_items.len()); + Self::success_result(mcp_content_items) + } + Err(e) => match e { + wazuh_client::WazuhApiError::HttpError { + status, + message: _, + url: _, + } if status == StatusCode::NOT_FOUND => { + tracing::info!("No network port data found for agent {}. Syscollector might not have run or data is unavailable.", agent_id); + Self::success_result(vec![Content::text( + format!("No network port data found for agent {}. The agent might not exist, syscollector data might be unavailable, or the agent is not active.", agent_id), + )]) + } + _ => { + let err_msg = Self::format_error( + "agent ports", + &format!("retrieving network ports for agent {}", agent_id), + &e, + ); + tracing::error!("{}", err_msg); + Self::error_result(err_msg) + } + }, + } + } +} + +impl ToolModule for AgentTools {} + diff --git a/src/tools/alerts.rs b/src/tools/alerts.rs new file mode 100644 index 0000000..4c4b0dc --- /dev/null +++ b/src/tools/alerts.rs @@ -0,0 +1,105 @@ +//! Wazuh Indexer alert tools +//! +//! This module contains tools for retrieving and analyzing Wazuh security alerts +//! from the Wazuh Indexer. + +use rmcp::{ + Error as McpError, + model::{CallToolResult, Content}, + tool, +}; +use std::sync::Arc; +use wazuh_client::WazuhIndexerClient; +use super::ToolModule; + +/// Parameters for getting alert summary +#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] +pub struct GetAlertSummaryParams { + #[schemars(description = "Maximum number of alerts to retrieve (default: 100)")] + pub limit: Option, +} + +/// Alert tools implementation +#[derive(Clone)] +pub struct AlertTools { + indexer_client: Arc, +} + +impl AlertTools { + pub fn new(indexer_client: Arc) -> Self { + Self { indexer_client } + } + + #[tool( + name = "get_wazuh_alert_summary", + description = "Retrieves a summary of Wazuh security alerts. Returns formatted alert information including ID, timestamp, and description." + )] + pub async fn get_wazuh_alert_summary( + &self, + #[tool(aggr)] params: GetAlertSummaryParams, + ) -> Result { + let limit = params.limit.unwrap_or(100); + + tracing::info!(limit = %limit, "Retrieving Wazuh alert summary"); + + match self.indexer_client.get_alerts().await { + Ok(raw_alerts) => { + let alerts_to_process: Vec<_> = raw_alerts.into_iter().take(limit as usize).collect(); + + if alerts_to_process.is_empty() { + tracing::info!("No Wazuh alerts found to process. Returning standard message."); + return Self::not_found_result("Wazuh alerts"); + } + + let num_alerts_to_process = alerts_to_process.len(); + let mcp_content_items: Vec = alerts_to_process + .into_iter() + .map(|alert_value| { + let source = alert_value.get("_source").unwrap_or(&alert_value); + + let id = source.get("id") + .and_then(|v| v.as_str()) + .or_else(|| alert_value.get("_id").and_then(|v| v.as_str())) + .unwrap_or("Unknown ID"); + + let description = source.get("rule") + .and_then(|r| r.get("description")) + .and_then(|d| d.as_str()) + .unwrap_or("No description available"); + + let timestamp = source.get("timestamp") + .and_then(|t| t.as_str()) + .unwrap_or("Unknown time"); + + let agent_name = source.get("agent") + .and_then(|a| a.get("name")) + .and_then(|n| n.as_str()) + .unwrap_or("Unknown agent"); + + let rule_level = source.get("rule") + .and_then(|r| r.get("level")) + .and_then(|l| l.as_u64()) + .unwrap_or(0); + + let formatted_text = format!( + "Alert ID: {}\nTime: {}\nAgent: {}\nLevel: {}\nDescription: {}", + id, timestamp, agent_name, rule_level, description + ); + Content::text(formatted_text) + }) + .collect(); + + tracing::info!("Successfully processed {} alerts into {} MCP content items", num_alerts_to_process, mcp_content_items.len()); + Self::success_result(mcp_content_items) + } + Err(e) => { + let err_msg = Self::format_error("Indexer", "retrieving alerts", &e); + tracing::error!("{}", err_msg); + Self::error_result(err_msg) + } + } + } +} + +impl ToolModule for AlertTools {} + diff --git a/src/tools/mod.rs b/src/tools/mod.rs new file mode 100644 index 0000000..17359e6 --- /dev/null +++ b/src/tools/mod.rs @@ -0,0 +1,60 @@ +//! Tools module for Wazuh MCP Server +//! +//! This module contains all the tool implementations organized by Wazuh component domains. +//! Each submodule handles a specific area of Wazuh functionality. + +pub mod agents; +pub mod alerts; +pub mod rules; +pub mod stats; +pub mod vulnerabilities; + +use rmcp::model::{CallToolResult, Content}; +use rmcp::Error as McpError; + +pub trait ToolModule { + fn format_error(component: &str, operation: &str, error: &dyn std::fmt::Display) -> String { + format!("Error {} from Wazuh {}: {}", operation, component, error) + } + + fn success_result(content: Vec) -> Result { + Ok(CallToolResult::success(content)) + } + + fn error_result(message: String) -> Result { + Ok(CallToolResult::error(vec![Content::text(message)])) + } + + fn not_found_result(resource: &str) -> Result { + Ok(CallToolResult::success(vec![Content::text(format!( + "No {} found matching the specified criteria.", resource + ))])) + } +} + +pub struct ToolUtils; + +impl ToolUtils { + pub fn format_agent_id(agent_id_str: &str) -> Result { + // Attempt to parse as a number first + if let Ok(num) = agent_id_str.parse::() { + if num > 999 { + Err(format!( + "Agent ID '{}' is too large. Must be a number between 0 and 999.", + agent_id_str + )) + } else { + Ok(format!("{:03}", num)) + } + } else if agent_id_str.len() == 3 && agent_id_str.chars().all(|c| c.is_ascii_digit()) { + // Already correctly formatted (e.g., "001") + Ok(agent_id_str.to_string()) + } else { + Err(format!( + "Invalid agent_id format: '{}'. Must be a number (e.g., 1, 12) or a 3-digit string (e.g., 001, 012).", + agent_id_str + )) + } + } +} + diff --git a/src/tools/rules.rs b/src/tools/rules.rs new file mode 100644 index 0000000..3094375 --- /dev/null +++ b/src/tools/rules.rs @@ -0,0 +1,142 @@ +//! Wazuh Manager rule tools +//! +//! This module contains tools for retrieving and analyzing Wazuh security rules +//! from the Wazuh Manager. + +use rmcp::{ + Error as McpError, + model::{CallToolResult, Content}, + tool, +}; +use std::sync::Arc; +use tokio::sync::Mutex; +use wazuh_client::RulesClient; +use super::ToolModule; + +#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] +pub struct GetRulesSummaryParams { + #[schemars(description = "Maximum number of rules to retrieve (default: 100)")] + pub limit: Option, + #[schemars(description = "Rule level to filter by (optional)")] + pub level: Option, + #[schemars(description = "Rule group to filter by (optional)")] + pub group: Option, + #[schemars(description = "Filename to filter by (optional)")] + pub filename: Option, +} + +#[derive(Clone)] +pub struct RuleTools { + rules_client: Arc>, +} + +impl RuleTools { + pub fn new(rules_client: Arc>) -> Self { + Self { rules_client } + } + + #[tool( + name = "get_wazuh_rules_summary", + description = "Retrieves a summary of Wazuh security rules. Returns formatted rule information including ID, level, description, and groups. Supports filtering by level, group, and filename." + )] + pub async fn get_wazuh_rules_summary( + &self, + #[tool(aggr)] params: GetRulesSummaryParams, + ) -> Result { + let limit = params.limit.unwrap_or(100); + + tracing::info!( + limit = %limit, + level = ?params.level, + group = ?params.group, + filename = ?params.filename, + "Retrieving Wazuh rules summary" + ); + + let mut rules_client = self.rules_client.lock().await; + + match rules_client.get_rules( + Some(limit), + None, // offset + params.level, + params.group.as_deref(), + params.filename.as_deref(), + ).await { + Ok(rules) => { + if rules.is_empty() { + tracing::info!("No Wazuh rules found matching criteria. Returning standard message."); + return Self::not_found_result("Wazuh rules matching the specified criteria"); + } + + let num_rules = rules.len(); + let mcp_content_items: Vec = rules + .into_iter() + .map(|rule| { + let groups_str = rule.groups.join(", "); + + let compliance_info = { + let mut compliance = Vec::new(); + if let Some(gdpr) = &rule.gdpr { + if !gdpr.is_empty() { + compliance.push(format!("GDPR: {}", gdpr.join(", "))); + } + } + if let Some(hipaa) = &rule.hipaa { + if !hipaa.is_empty() { + compliance.push(format!("HIPAA: {}", hipaa.join(", "))); + } + } + if let Some(pci) = &rule.pci_dss { + if !pci.is_empty() { + compliance.push(format!("PCI DSS: {}", pci.join(", "))); + } + } + if let Some(nist) = &rule.nist_800_53 { + if !nist.is_empty() { + compliance.push(format!("NIST 800-53: {}", nist.join(", "))); + } + } + if compliance.is_empty() { + String::new() + } else { + format!("\nCompliance: {}", compliance.join(" | ")) + } + }; + + let severity = match rule.level { + 0..=3 => "Low", + 4..=7 => "Medium", + 8..=12 => "High", + 13..=15 => "Critical", + _ => "Unknown", + }; + + let formatted_text = format!( + "Rule ID: {}\nLevel: {} ({})\nDescription: {}\nGroups: {}\nFile: {}\nStatus: {}{}", + rule.id, + rule.level, + severity, + rule.description, + groups_str, + rule.filename, + rule.status, + compliance_info + ); + Content::text(formatted_text) + }) + .collect(); + + tracing::info!("Successfully processed {} rules into {} MCP content items", num_rules, mcp_content_items.len()); + Self::success_result(mcp_content_items) + } + Err(e) => { + let err_msg = Self::format_error("Manager", "retrieving rules", &e); + tracing::error!("{}", err_msg); + Self::error_result(err_msg) + } + } + } +} + +impl ToolModule for RuleTools {} + diff --git a/src/tools/stats.rs b/src/tools/stats.rs new file mode 100644 index 0000000..797d955 --- /dev/null +++ b/src/tools/stats.rs @@ -0,0 +1,493 @@ +//! Stats tools for Wazuh MCP Server +//! +//! This module contains tools for retrieving various statistics from Wazuh components, +//! including manager logs, remoted daemon stats, log collector stats, and weekly statistics. + +use super::{ToolModule, ToolUtils}; +use reqwest::StatusCode; +use rmcp::model::{CallToolResult, Content}; +use rmcp::Error as McpError; +use std::sync::Arc; +use tokio::sync::Mutex; +use wazuh_client::{ClusterClient, LogsClient}; + +#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] +pub struct SearchManagerLogsParams { + #[schemars(description = "Maximum number of log entries to retrieve (default: 100)")] + pub limit: Option, + #[schemars(description = "Number of log entries to skip (default: 0)")] + pub offset: Option, + #[schemars(description = "Log level to filter by (e.g., \"error\", \"warning\", \"info\")")] + pub level: String, + #[schemars(description = "Log tag to filter by (e.g., \"wazuh-modulesd\") (optional)")] + pub tag: Option, + #[schemars(description = "Search term to filter log descriptions (optional)")] + pub search_term: Option, +} + +#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] +pub struct GetManagerErrorLogsParams { + #[schemars(description = "Maximum number of error log entries to retrieve (default: 100)")] + pub limit: Option, +} + +#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] +pub struct GetLogCollectorStatsParams { + #[schemars( + description = "Agent ID to get log collector stats for (required, e.g., \"0\", \"1\", \"001\")" + )] + pub agent_id: String, +} + +#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] +pub struct GetRemotedStatsParams { + // No parameters needed for remoted stats +} + +#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] +pub struct GetWeeklyStatsParams { + // No parameters needed for weekly stats +} + +#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] +pub struct GetClusterHealthParams { + // No parameters needed for cluster health +} + +#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] +pub struct GetClusterNodesParams { + #[schemars( + description = "Maximum number of nodes to retrieve (optional, Wazuh API default is 500)" + )] + pub limit: Option, + #[schemars(description = "Number of nodes to skip (offset) (optional, default: 0)")] + pub offset: Option, + #[schemars(description = "Filter by node type (e.g., 'master', 'worker') (optional)")] + pub node_type: Option, +} + +#[derive(Clone)] +pub struct StatsTools { + logs_client: Arc>, + cluster_client: Arc>, +} + +impl StatsTools { + pub fn new( + logs_client: Arc>, + cluster_client: Arc>, + ) -> Self { + Self { + logs_client, + cluster_client, + } + } + + pub async fn search_wazuh_manager_logs( + &self, + params: SearchManagerLogsParams, + ) -> Result { + let limit = params.limit.unwrap_or(100); + let offset = params.offset.unwrap_or(0); + + tracing::info!( + limit = %limit, + offset = %offset, + level = ?params.level, + tag = ?params.tag, + search_term = ?params.search_term, + "Searching Wazuh manager logs" + ); + + let mut logs_client = self.logs_client.lock().await; + + match logs_client + .get_manager_logs( + Some(limit), + Some(offset), + Some(¶ms.level), + params.tag.as_deref(), + params.search_term.as_deref(), + ) + .await + { + Ok(log_entries) => { + if log_entries.is_empty() { + tracing::info!("No Wazuh manager logs found matching criteria. Returning standard message."); + return Self::not_found_result("Wazuh manager logs"); + } + + let num_logs = log_entries.len(); + let mcp_content_items: Vec = log_entries + .into_iter() + .map(|log_entry| { + let formatted_text = format!( + "Timestamp: {}\nTag: {}\nLevel: {}\nDescription: {}", + log_entry.timestamp, + log_entry.tag, + log_entry.level, + log_entry.description + ); + Content::text(formatted_text) + }) + .collect(); + + tracing::info!( + "Successfully processed {} manager log entries into {} MCP content items", + num_logs, + mcp_content_items.len() + ); + Self::success_result(mcp_content_items) + } + Err(e) => { + let err_msg = Self::format_error("Manager", "searching logs", &e); + tracing::error!("{}", err_msg); + Self::error_result(err_msg) + } + } + } + + pub async fn get_wazuh_manager_error_logs( + &self, + params: GetManagerErrorLogsParams, + ) -> Result { + let limit = params.limit.unwrap_or(100); + + tracing::info!(limit = %limit, "Retrieving Wazuh manager error logs"); + + let mut logs_client = self.logs_client.lock().await; + + match logs_client.get_error_logs(Some(limit)).await { + Ok(log_entries) => { + if log_entries.is_empty() { + tracing::info!( + "No Wazuh manager error logs found. Returning standard message." + ); + return Self::not_found_result("Wazuh manager error logs"); + } + + let num_logs = log_entries.len(); + let mcp_content_items: Vec = log_entries + .into_iter() + .map(|log_entry| { + let formatted_text = format!( + "Timestamp: {}\nTag: {}\nLevel: {}\nDescription: {}", + log_entry.timestamp, + log_entry.tag, + log_entry.level, + log_entry.description + ); + Content::text(formatted_text) + }) + .collect(); + + tracing::info!( + "Successfully processed {} manager error log entries into {} MCP content items", + num_logs, + mcp_content_items.len() + ); + Self::success_result(mcp_content_items) + } + Err(e) => { + let err_msg = Self::format_error("Manager", "retrieving error logs", &e); + tracing::error!("{}", err_msg); + Self::error_result(err_msg) + } + } + } + + pub async fn get_wazuh_log_collector_stats( + &self, + params: GetLogCollectorStatsParams, + ) -> Result { + let agent_id = match ToolUtils::format_agent_id(¶ms.agent_id) { + Ok(formatted_id) => formatted_id, + Err(err_msg) => { + tracing::error!( + "Error formatting agent_id for log collector stats: {}", + err_msg + ); + return Self::error_result(err_msg); + } + }; + + tracing::info!(agent_id = %agent_id, "Retrieving Wazuh log collector stats"); + + let mut logs_client = self.logs_client.lock().await; + + match logs_client.get_logcollector_stats(&agent_id).await { + Ok(stats) => { + // Helper closure to format a LogCollectorPeriod + let format_period = |period_name: &str, + period_data: &wazuh_client::logs::LogCollectorPeriod| + -> String { + let files_info: String = period_data + .files + .iter() + .map(|file: &wazuh_client::logs::LogFile| { + let targets_str: String = file + .targets + .iter() + .map(|target: &wazuh_client::logs::LogTarget| { + format!( + " - Name: {}, Drops: {}", + target.name, target.drops + ) + }) + .collect::>() + .join("\n"); + let targets_display = if targets_str.is_empty() { + " (No specific targets with drops for this file)".to_string() + } else { + format!(" Targets:\n{}", targets_str) + }; + format!( + " - Location: {}\n Events: {}\n Bytes: {}\n{}", + file.location, file.events, file.bytes, targets_display + ) + }) + .collect::>() + .join("\n\n"); + + let files_display = if files_info.is_empty() { + " (No files processed in this period)".to_string() + } else { + files_info + }; + + format!( + "{}:\n Start: {}\n End: {}\n Files:\n{}", + period_name, period_data.start, period_data.end, files_display + ) + }; + + let global_period_info = format_period("Global Period", &stats.global); + let interval_period_info = format_period("Interval Period", &stats.interval); + + let formatted_text = format!( + "Log Collector Stats for Agent: {}\n\n{}\n\n{}", + agent_id, + global_period_info, + interval_period_info + ); + + tracing::info!( + "Successfully retrieved and formatted log collector stats for agent {}", + agent_id + ); + Self::success_result(vec![Content::text(formatted_text)]) + } + Err(e) => { + // Check if the error is due to agent not found or stats not available + if let wazuh_client::WazuhApiError::ApiError(msg) = &e { + if msg.contains(&format!( + "Log collector stats for agent {} not found", + agent_id + )) { + tracing::info!("No log collector stats found for agent {} (API error). Returning standard message.", agent_id); + return Self::success_result(vec![Content::text( + format!("No log collector stats found for agent {}. The agent might not exist, stats are unavailable, or the agent is not active.", agent_id), + )]); + } + if msg.contains("Agent Not Found") { + tracing::info!( + "Agent {} not found (API error). Returning standard message.", + agent_id + ); + return Self::success_result(vec![Content::text(format!( + "Agent {} not found. Cannot retrieve log collector stats.", + agent_id + ))]); + } + } + if let wazuh_client::WazuhApiError::HttpError { status, .. } = &e { + if *status == StatusCode::NOT_FOUND { + tracing::info!("No log collector stats found for agent {} (HTTP 404). Agent might not exist or endpoint unavailable.", agent_id); + return Self::success_result(vec![Content::text( + format!("No log collector stats found for agent {}. The agent might not exist, stats are unavailable, or the agent is not active.", agent_id), + )]); + } + } + let err_msg = format!( + "Error retrieving log collector stats for agent {} from Wazuh: {}", + agent_id, e + ); + tracing::error!("{}", err_msg); + Self::error_result(err_msg) + } + } + } + + pub async fn get_wazuh_remoted_stats( + &self, + _params: GetRemotedStatsParams, + ) -> Result { + tracing::info!("Retrieving Wazuh remoted stats"); + + let mut logs_client = self.logs_client.lock().await; + + match logs_client.get_remoted_stats().await { + Ok(stats) => { + let formatted_text = format!( + "Wazuh Remoted Statistics:\nQueue Size: {}\nTotal Queue Size: {}\nTCP Sessions: {}\nControl Message Count: {}\nDiscarded Message Count: {}\nMessages Sent (Bytes): {}\nBytes Received: {}\nDequeued After Close: {}", + stats.queue_size, + stats.total_queue_size, + stats.tcp_sessions, + stats.ctrl_msg_count, + stats.discarded_count, + stats.sent_bytes, + stats.recv_bytes, + stats.dequeued_after_close + ); + + tracing::info!("Successfully retrieved remoted stats"); + Self::success_result(vec![Content::text(formatted_text)]) + } + Err(e) => { + let err_msg = Self::format_error("Manager", "retrieving remoted stats", &e); + tracing::error!("{}", err_msg); + Self::error_result(err_msg) + } + } + } + + pub async fn get_wazuh_weekly_stats( + &self, + _params: GetWeeklyStatsParams, + ) -> Result { + tracing::info!("Retrieving Wazuh weekly stats"); + + let mut logs_client = self.logs_client.lock().await; + + match logs_client.get_weekly_stats().await { + Ok(stats_value) => match serde_json::to_string_pretty(&stats_value) { + Ok(formatted_json) => { + tracing::info!("Successfully retrieved and formatted weekly stats."); + Self::success_result(vec![Content::text(formatted_json)]) + } + Err(e) => { + let err_msg = format!("Error formatting weekly stats JSON: {}", e); + tracing::error!("{}", err_msg); + Self::error_result(err_msg) + } + }, + Err(e) => { + let err_msg = Self::format_error("Manager", "retrieving weekly stats", &e); + tracing::error!("{}", err_msg); + Self::error_result(err_msg) + } + } + } + + pub async fn get_wazuh_cluster_health( + &self, + _params: GetClusterHealthParams, + ) -> Result { + tracing::info!("Retrieving Wazuh cluster health"); + + let mut cluster_client = self.cluster_client.lock().await; + + match cluster_client.is_cluster_healthy().await { + Ok(is_healthy) => { + let health_status_text = if is_healthy { + "Cluster is healthy: Yes".to_string() + } else { + // To provide more context, we can fetch the basic status + match cluster_client.get_cluster_status().await { + Ok(status) => { + let mut reasons = Vec::new(); + if !status.enabled.eq_ignore_ascii_case("yes") { + reasons.push("cluster is not enabled"); + } + if !status.running.eq_ignore_ascii_case("yes") { + reasons.push("cluster is not running"); + } + if status.enabled.eq_ignore_ascii_case("yes") && status.running.eq_ignore_ascii_case("yes") { + match cluster_client.get_cluster_healthcheck().await { + Ok(hc) if hc.n_connected_nodes == 0 => reasons.push("no nodes are connected"), + Err(_) => reasons.push("healthcheck endpoint failed or reported issues"), + _ => {} // Healthy implies connected nodes + } + } + if reasons.is_empty() && !is_healthy { + reasons.push("unknown reason, check detailed logs or healthcheck endpoint"); + } + format!("Cluster is healthy: No. Reasons: {}", reasons.join("; ")) + } + Err(_) => { + "Cluster is healthy: No. Additionally, failed to retrieve basic cluster status for more details.".to_string() + } + } + }; + tracing::info!( + "Successfully retrieved cluster health: {}", + health_status_text + ); + Self::success_result(vec![Content::text(health_status_text)]) + } + Err(e) => { + let err_msg = Self::format_error("Cluster", "retrieving health", &e); + tracing::error!("{}", err_msg); + Self::error_result(err_msg) + } + } + } + + pub async fn get_wazuh_cluster_nodes( + &self, + params: GetClusterNodesParams, + ) -> Result { + tracing::info!( + limit = ?params.limit, + offset = ?params.offset, + node_type = ?params.node_type, + "Retrieving Wazuh cluster nodes" + ); + + let mut cluster_client = self.cluster_client.lock().await; + + match cluster_client + .get_cluster_nodes(params.limit, params.offset, params.node_type.as_deref()) + .await + { + Ok(nodes) => { + if nodes.is_empty() { + tracing::info!("No Wazuh cluster nodes found matching criteria. Returning standard message."); + return Self::not_found_result("Wazuh cluster nodes"); + } + + let num_nodes = nodes.len(); + let mcp_content_items: Vec = nodes + .into_iter() + .map(|node| { + let status_indicator = match node.status.to_lowercase().as_str() { + "connected" | "active" => "🟢 CONNECTED", + "disconnected" => "🔴 DISCONNECTED", + _ => &node.status, + }; + let formatted_text = format!( + "Node Name: {}\nType: {}\nVersion: {}\nIP: {}\nStatus: {}", + node.name, node.node_type, node.version, node.ip, status_indicator + ); + Content::text(formatted_text) + }) + .collect(); + + tracing::info!( + "Successfully processed {} cluster nodes into {} MCP content items", + num_nodes, + mcp_content_items.len() + ); + Self::success_result(mcp_content_items) + } + Err(e) => { + let err_msg = Self::format_error("Cluster", "retrieving nodes", &e); + tracing::error!("{}", err_msg); + Self::error_result(err_msg) + } + } + } +} + +impl ToolModule for StatsTools {} + diff --git a/src/tools/vulnerabilities.rs b/src/tools/vulnerabilities.rs new file mode 100644 index 0000000..5ec8da2 --- /dev/null +++ b/src/tools/vulnerabilities.rs @@ -0,0 +1,412 @@ +//! Wazuh Manager vulnerability tools +//! +//! This module contains tools for retrieving and analyzing vulnerability information +//! from the Wazuh Manager. + +use rmcp::{ + Error as McpError, + model::{CallToolResult, Content}, + schemars, +}; +use std::sync::Arc; +use tokio::sync::Mutex; +use wazuh_client::{VulnerabilityClient, VulnerabilitySeverity}; +use super::ToolModule; + +#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] +pub struct GetVulnerabilitiesSummaryParams { + #[schemars(description = "Maximum number of vulnerabilities to retrieve (default: 100)")] + pub limit: Option, + #[schemars(description = "Agent ID to filter vulnerabilities by (required, e.g., \"0\", \"1\", \"001\")")] + pub agent_id: String, + #[schemars(description = "Severity level to filter by (Low, Medium, High, Critical) (optional)")] + pub severity: Option, + #[schemars(description = "CVE ID to search for (optional)")] + pub cve: Option, +} + +#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] +pub struct GetCriticalVulnerabilitiesParams { + #[schemars(description = "Agent ID to get critical vulnerabilities for (required, e.g., \"0\", \"1\", \"001\")")] + pub agent_id: String, +} + +#[derive(Clone)] +pub struct VulnerabilityTools { + vulnerability_client: Arc>, +} + +impl VulnerabilityTools { + pub fn new(vulnerability_client: Arc>) -> Self { + Self { vulnerability_client } + } + + fn format_agent_id(agent_id_str: &str) -> Result { + // Attempt to parse as a number first + if let Ok(num) = agent_id_str.parse::() { + if num > 999 { + Err(format!( + "Agent ID '{}' is too large. Must be a number between 0 and 999.", + agent_id_str + )) + } else { + Ok(format!("{:03}", num)) + } + } else if agent_id_str.len() == 3 && agent_id_str.chars().all(|c| c.is_ascii_digit()) { + // Already correctly formatted (e.g., "001") + Ok(agent_id_str.to_string()) + } else { + Err(format!( + "Invalid agent_id format: '{}'. Must be a number (e.g., 1, 12) or a 3-digit string (e.g., 001, 012).", + agent_id_str + )) + } + } + + pub async fn get_wazuh_vulnerability_summary( + &self, + params: GetVulnerabilitiesSummaryParams, + ) -> Result { + let limit = params.limit.unwrap_or(100); + let offset = 0; // Default offset, can be extended in future if needed + + let agent_id = match Self::format_agent_id(¶ms.agent_id) { + Ok(formatted_id) => formatted_id, + Err(err_msg) => { + tracing::error!( + "Error formatting agent_id for vulnerability summary: {}", + err_msg + ); + return Ok(CallToolResult::error(vec![Content::text(err_msg)])); + } + }; + + tracing::info!( + limit = %limit, + agent_id = %agent_id, + severity = ?params.severity, + cve = ?params.cve, + "Retrieving Wazuh vulnerability summary" + ); + + let mut vulnerability_client = self.vulnerability_client.lock().await; + + let vulnerabilities = if let Some(cve_filter) = ¶ms.cve { + match vulnerability_client + .get_agent_vulnerabilities( + &agent_id, + Some(1000), // Get more results to filter + Some(offset), + params + .severity + .as_deref() + .and_then(VulnerabilitySeverity::from_str), + ) + .await + { + Ok(all_vulns) => { + let filtered: Vec<_> = all_vulns + .into_iter() + .filter(|v| v.cve.to_lowercase().contains(&cve_filter.to_lowercase())) + .take(limit as usize) + .collect(); + Ok(filtered) + } + Err(e) => Err(e), + } + } else { + vulnerability_client + .get_agent_vulnerabilities( + &agent_id, + Some(limit), + Some(offset), + params + .severity + .as_deref() + .and_then(VulnerabilitySeverity::from_str), + ) + .await + }; + + match vulnerabilities { + Ok(vulnerabilities) => { + if vulnerabilities.is_empty() { + tracing::info!("No Wazuh vulnerabilities found matching criteria. Returning standard message."); + return Ok(CallToolResult::success(vec![Content::text( + "No Wazuh vulnerabilities found matching the specified criteria.", + )])); + } + + let num_vulnerabilities = vulnerabilities.len(); + let mcp_content_items: Vec = vulnerabilities + .into_iter() + .map(|vuln| { + let severity_indicator = match vuln.severity { + VulnerabilitySeverity::Critical => "🔴 CRITICAL", + VulnerabilitySeverity::High => "🟠 HIGH", + VulnerabilitySeverity::Medium => "🟡 MEDIUM", + VulnerabilitySeverity::Low => "🟢 LOW", + }; + + let published_info = if let Some(published) = &vuln.published { + format!("\nPublished: {}", published) + } else { + String::new() + }; + + let updated_info = if let Some(updated) = &vuln.updated { + format!("\nUpdated: {}", updated) + } else { + String::new() + }; + + let detection_time_info = if let Some(detection_time) = &vuln.detection_time + { + format!("\nDetection Time: {}", detection_time) + } else { + String::new() + }; + + let agent_info = { + let id_str = vuln.agent_id.as_deref(); + let name_str = vuln.agent_name.as_deref(); + + match (id_str, name_str) { + (Some("000"), Some(name)) => { + format!("\nAgent: {} (Wazuh Manager, ID: 000)", name) + } + (Some("000"), None) => { + "\nAgent: Wazuh Manager (ID: 000)".to_string() + } + (Some(id), Some(name)) => format!("\nAgent: {} (ID: {})", name, id), + (Some(id), None) => format!("\nAgent ID: {}", id), + (None, Some(name)) => format!("\nAgent: {} (ID: Unknown)", name), // Should ideally not happen if ID is a primary key for agent context + (None, None) => String::new(), // No agent information available + } + }; + + let cvss_info = if let Some(cvss) = &vuln.cvss { + let mut cvss_parts = Vec::new(); + if let Some(cvss2) = &cvss.cvss2 { + if let Some(score) = cvss2.base_score { + cvss_parts.push(format!("CVSS2: {}", score)); + } + } + if let Some(cvss3) = &cvss.cvss3 { + if let Some(score) = cvss3.base_score { + cvss_parts.push(format!("CVSS3: {}", score)); + } + } + if !cvss_parts.is_empty() { + format!("\nCVSS Scores: {}", cvss_parts.join(", ")) + } else { + String::new() + } + } else { + String::new() + }; + + let reference_info = if let Some(reference) = &vuln.reference { + format!("\nReference: {}", reference) + } else { + String::new() + }; + + let description = vuln + .description + .as_deref() + .unwrap_or("No description available"); + + let formatted_text = format!( + "CVE: {}\nSeverity: {}\nTitle: {}\nDescription: {}{}{}{}{}{}{}", + vuln.cve, + severity_indicator, + vuln.title, + description, + published_info, + updated_info, + detection_time_info, + agent_info, + cvss_info, + reference_info + ); + Content::text(formatted_text) + }) + .collect(); + + tracing::info!( + "Successfully processed {} vulnerabilities into {} MCP content items", + num_vulnerabilities, + mcp_content_items.len() + ); + Ok(CallToolResult::success(mcp_content_items)) + } + Err(e) => { + use reqwest::StatusCode; + match e { + wazuh_client::WazuhApiError::HttpError { + status, + message: _, + url: _, + } if status == StatusCode::NOT_FOUND => { + tracing::info!("No vulnerability summary found for agent {}. Returning standard message.", agent_id); + return Ok(CallToolResult::success(vec![Content::text(format!( + "No vulnerability summary found for agent {}.", + agent_id + ))])); + } + _ => {} + } + let err_msg = format!("Error retrieving vulnerabilities from Wazuh: {}", e); + tracing::error!("{}", err_msg); + Ok(CallToolResult::error(vec![Content::text(err_msg)])) + } + } + } + + pub async fn get_wazuh_critical_vulnerabilities( + &self, + params: GetCriticalVulnerabilitiesParams, + ) -> Result { + let agent_id = match Self::format_agent_id(¶ms.agent_id) { + Ok(formatted_id) => formatted_id, + Err(err_msg) => { + tracing::error!( + "Error formatting agent_id for critical vulnerabilities: {}", + err_msg + ); + return Ok(CallToolResult::error(vec![Content::text(err_msg)])); + } + }; + + tracing::info!( + agent_id = %agent_id, + "Retrieving critical vulnerabilities for Wazuh agent" + ); + + let mut vulnerability_client = self.vulnerability_client.lock().await; + + match vulnerability_client + .get_critical_vulnerabilities(&agent_id) + .await + { + Ok(vulnerabilities) => { + if vulnerabilities.is_empty() { + tracing::info!("No critical vulnerabilities found for agent {}. Returning standard message.", agent_id); + return Ok(CallToolResult::success(vec![Content::text(format!( + "No critical vulnerabilities found for agent {}.", + agent_id + ))])); + } + + let num_vulnerabilities = vulnerabilities.len(); + let mcp_content_items: Vec = vulnerabilities + .into_iter() + .map(|vuln| { + let published_info = if let Some(published) = &vuln.published { + format!("\nPublished: {}", published) + } else { + String::new() + }; + + let updated_info = if let Some(updated) = &vuln.updated { + format!("\nUpdated: {}", updated) + } else { + String::new() + }; + + let detection_time_info = if let Some(detection_time) = &vuln.detection_time { + format!("\nDetection Time: {}", detection_time) + } else { + String::new() + }; + + let agent_info = if let Some(agent_name) = &vuln.agent_name { + format!("\nAgent: {} (ID: {})", agent_name, vuln.agent_id.as_deref().unwrap_or("Unknown")) + } else if let Some(agent_id) = &vuln.agent_id { + format!("\nAgent ID: {}", agent_id) + } else { + String::new() + }; + + let cvss_info = if let Some(cvss) = &vuln.cvss { + let mut cvss_parts = Vec::new(); + if let Some(cvss2) = &cvss.cvss2 { + if let Some(score) = cvss2.base_score { + cvss_parts.push(format!("CVSS2: {}", score)); + } + } + if let Some(cvss3) = &cvss.cvss3 { + if let Some(score) = cvss3.base_score { + cvss_parts.push(format!("CVSS3: {}", score)); + } + } + if !cvss_parts.is_empty() { + format!("\nCVSS Scores: {}", cvss_parts.join(", ")) + } else { + String::new() + } + } else { + String::new() + }; + + let reference_info = if let Some(reference) = &vuln.reference { + format!("\nReference: {}", reference) + } else { + String::new() + }; + + let description = vuln.description.as_deref().unwrap_or("No description available"); + + let formatted_text = format!( + "🔴 CRITICAL VULNERABILITY\nCVE: {}\nTitle: {}\nDescription: {}{}{}{}{}{}{}", + vuln.cve, + vuln.title, + description, + published_info, + updated_info, + detection_time_info, + agent_info, + cvss_info, + reference_info + ); + Content::text(formatted_text) + }) + .collect(); + + tracing::info!( + "Successfully processed {} critical vulnerabilities into {} MCP content items", + num_vulnerabilities, + mcp_content_items.len() + ); + Ok(CallToolResult::success(mcp_content_items)) + } + Err(e) => { + use reqwest::StatusCode; + match e { + wazuh_client::WazuhApiError::HttpError { + status, + message: _, + url: _, + } if status == StatusCode::NOT_FOUND => { + tracing::info!("No critical vulnerabilities found for agent {}. Returning standard message.", agent_id); + return Ok(CallToolResult::success(vec![Content::text(format!( + "No critical vulnerabilities found for agent {}.", + agent_id + ))])); + } + _ => {} + } + let err_msg = format!( + "Error retrieving critical vulnerabilities from Wazuh for agent {}: {}", + agent_id, e + ); + tracing::error!("{}", err_msg); + Ok(CallToolResult::error(vec![Content::text(err_msg)])) + } + } + } +} + +impl ToolModule for VulnerabilityTools {} +