mcp-server-wazuh/src/main.rs

1637 lines
78 KiB
Rust

//
// Purpose:
//
// This Rust application implements an MCP (Model Context Protocol) server that acts as a
// bridge to a Wazuh instance. It exposes various Wazuh functionalities as tools that can
// be invoked by MCP clients (e.g., AI models, automation scripts).
//
// Structure:
// - `main()`: Entry point of the application. Initializes logging (tracing),
// sets up the `WazuhToolsServer`, and starts the MCP server using either stdio or HTTP-SSE transport.
//
// - `WazuhToolsServer`: The core struct that implements the `rmcp::ServerHandler` trait
// and the `#[tool(tool_box)]` attribute.
// - It holds the configuration for connecting to the Wazuh Indexer API.
// - Its methods, decorated with `#[tool(...)]`, define the actual tools available
// to MCP clients (e.g., `get_wazuh_alert_summary`).
//
// - Tool Parameter Structs (e.g., `GetAlertSummaryParams`):
// - These structs define the expected input parameters for each tool.
// - They use `serde::Deserialize` for parsing input and `schemars::JsonSchema`
// for generating a schema that MCP clients can use to understand how to call the tools.
//
// - `wazuh_client` crate:
// - This external crate is used to interact with both the Wazuh Manager API and the Wazuh Indexer API.
// - `WazuhClientFactory` is used to create specific clients (e.g., `WazuhIndexerClient`, `RulesClient`, `AgentsClient`, `LogsClient`, `ClusterClient`, `VulnerabilityClient`).
//
// Workflow:
// 1. Server starts and listens for MCP requests on stdio
// 2. MCP client sends a `call_tool` request.
// 3. `WazuhToolsServer` dispatches to the appropriate tool method based on the tool name.
// 4. The tool method parses parameters, interacts with the Wazuh client to fetch data.
// 5. The result (success with data or error) is packaged into a `CallToolResult`
// and sent back to the MCP client.
//
// Exposed Tools:
// The server exposes a set of tools categorized by the Wazuh component they interact with:
//
// Wazuh Indexer Tools:
// - `get_wazuh_alert_summary`: Retrieves a summary of security alerts from the Wazuh Indexer.
//
// Wazuh Manager Tools:
// - `get_wazuh_rules_summary`: Fetches security rules defined in the Wazuh Manager.
// - `get_wazuh_vulnerability_summary`: Gets vulnerability scan results for a specific agent from the Wazuh Manager.
// - `get_wazuh_critical_vulnerabilities`: Retrieves critical vulnerabilities for an agent.
// - `get_wazuh_running_agents`: Lists active and inactive agents connected to the Wazuh Manager.
// - `get_wazuh_agent_processes`: Retrieves running processes on a specific agent (via Syscollector, data reported to Manager).
// - `get_wazuh_agent_ports`: Lists open network ports on a specific agent (via Syscollector, data reported to Manager).
// - `search_wazuh_manager_logs`: Searches logs generated by the Wazuh Manager.
// - `get_wazuh_manager_error_logs`: Retrieves error-specific logs from the Wazuh Manager.
// - `get_wazuh_log_collector_stats`: Gets log collection statistics for an agent.
// - `get_wazuh_remoted_stats`: Fetches statistics from the Wazuh Manager's remoted daemon.
// - `get_wazuh_weekly_stats`: Retrieves aggregated weekly statistics from the Wazuh Manager.
// - `get_wazuh_cluster_health`: Checks the health status of the Wazuh Manager cluster.
// - `get_wazuh_cluster_nodes`: Lists nodes participating in the Wazuh Manager cluster.
//
// (Detailed parameters and descriptions for each tool are available via the MCP `get_tools` command or in the server's `get_info` response.)
//
// Configuration:
// The server requires the following environment variables to connect to the Wazuh instance:
// - `WAZUH_API_HOST`: Hostname or IP address of the Wazuh API.
// - `WAZUH_API_PORT`: Port number for the Wazuh API (default: 55000).
// - `WAZUH_API_USERNAME`: Username for Wazuh API authentication.
// - `WAZUH_API_PASSWORD`: Password for Wazuh API authentication.
// - `WAZUH_INDEXER_HOST`: Hostname or IP address of the Wazuh Indexer.
// - `WAZUH_INDEXER_PORT`: Port number for the Wazuh Indexer API (default: 9200).
// - `WAZUH_INDEXER_USERNAME`: Username for Wazuh Indexer authentication.
// - `WAZUH_INDEXER_PASSWORD`: Password for Wazuh Indexer authentication.
// - `WAZUH_VERIFY_SSL`: Set to "true" to enable SSL certificate verification, "false" otherwise (default: false).
// - `WAZUH_TEST_PROTOCOL`: (Optional) Protocol to use for Wazuh API/Indexer connections, e.g., "http" or "https" (default: "https").
// Logging behavior is controlled by the `RUST_LOG` environment variable (e.g., `RUST_LOG=info,mcp_server_wazuh=debug`).
use reqwest::StatusCode;
use rmcp::{
Error as McpError, ServerHandler, ServiceExt,
model::{
CallToolResult, Content, Implementation, ProtocolVersion, ServerCapabilities, ServerInfo,
},
schemars, tool,
transport::stdio,
};
use std::sync::Arc;
use std::env;
use clap::Parser;
use dotenv::dotenv;
use wazuh_client::{WazuhClientFactory, WazuhIndexerClient, RulesClient, VulnerabilityClient, VulnerabilitySeverity, AgentsClient, LogsClient, ClusterClient, Port as WazuhPort};
#[derive(Parser, Debug)]
#[command(name = "mcp-server-wazuh")]
#[command(about = "Wazuh SIEM MCP Server")]
struct Args {
// Currently only stdio transport is supported
// Future versions may add HTTP-SSE transport
}
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
struct GetAlertSummaryParams {
#[schemars(description = "Maximum number of alerts to retrieve (default: 100)")]
limit: Option<u32>,
}
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
struct GetRulesSummaryParams {
#[schemars(description = "Maximum number of rules to retrieve (default: 100)")]
limit: Option<u32>,
#[schemars(description = "Rule level to filter by (optional)")]
level: Option<u32>,
#[schemars(description = "Rule group to filter by (optional)")]
group: Option<String>,
#[schemars(description = "Filename to filter by (optional)")]
filename: Option<String>,
}
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
struct GetVulnerabilitySummaryParams {
#[schemars(description = "Maximum number of vulnerabilities to retrieve (default: 100)")]
limit: Option<u32>,
#[schemars(description = "Agent ID to filter vulnerabilities by (required, e.g., \"0\", \"1\", \"001\")")]
agent_id: String,
#[schemars(description = "Severity level to filter by (Low, Medium, High, Critical) (optional)")]
severity: Option<String>,
#[schemars(description = "CVE ID to search for (optional)")]
cve: Option<String>,
}
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
struct GetRunningAgentsParams {
#[schemars(description = "Maximum number of agents to retrieve (default: 100)")]
limit: Option<u32>,
#[schemars(description = "Agent status filter (active, disconnected, pending, never_connected) (default: active)")]
status: Option<String>,
#[schemars(description = "Agent name to search for (optional)")]
name: Option<String>,
#[schemars(description = "Agent IP address to filter by (optional)")]
ip: Option<String>,
#[schemars(description = "Agent group to filter by (optional)")]
group: Option<String>,
#[schemars(description = "Operating system platform to filter by (optional)")]
os_platform: Option<String>,
#[schemars(description = "Agent version to filter by (optional)")]
version: Option<String>,
}
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
struct GetCriticalVulnerabilitiesParams {
#[schemars(description = "Agent ID to get critical vulnerabilities for (required, e.g., \"0\", \"1\", \"001\")")]
agent_id: String,
}
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
struct GetAgentProcessesParams {
#[schemars(description = "Agent ID to get processes for (required, e.g., \"0\", \"1\", \"001\")")]
agent_id: String,
#[schemars(description = "Maximum number of processes to retrieve (default: 100)")]
limit: Option<u32>,
#[schemars(description = "Search string to filter processes by name or command (optional)")]
search: Option<String>,
}
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
struct GetAgentPortsParams {
#[schemars(description = "Agent ID to get network ports for (required, e.g., \"0\", \"1\", \"001\")")]
agent_id: String,
#[schemars(description = "Maximum number of ports to retrieve (default: 100)")]
limit: Option<u32>,
#[schemars(description = "Protocol to filter by (e.g., \"tcp\", \"udp\") (optional)")]
protocol: Option<String>,
#[schemars(description = "State to filter by (e.g., \"LISTEN\", \"ESTABLISHED\") (optional)")]
state: Option<String>,
}
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
struct SearchManagerLogsParams {
#[schemars(description = "Maximum number of log entries to retrieve (default: 100)")]
limit: Option<u32>,
#[schemars(description = "Number of log entries to skip (default: 0)")]
offset: Option<u32>,
#[schemars(description = "Log level to filter by (e.g., \"error\", \"warning\", \"info\") (optional)")]
level: Option<String>,
#[schemars(description = "Log tag to filter by (e.g., \"wazuh-modulesd\") (optional)")]
tag: Option<String>,
#[schemars(description = "Search term to filter log descriptions (optional)")]
search_term: Option<String>,
}
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
struct GetManagerErrorLogsParams {
#[schemars(description = "Maximum number of error log entries to retrieve (default: 100)")]
limit: Option<u32>,
}
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
struct GetLogCollectorStatsParams {
#[schemars(description = "Agent ID to get log collector stats for (required, e.g., \"0\", \"1\", \"001\")")]
agent_id: String,
}
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
struct GetRemotedStatsParams {
// No parameters needed for remoted stats
}
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
struct GetWeeklyStatsParams {
// No parameters needed for weekly stats
}
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
struct GetClusterHealthParams {
// No parameters needed for cluster health
}
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
struct GetClusterNodesParams {
#[schemars(description = "Maximum number of nodes to retrieve (optional, Wazuh API default is 500)")]
limit: Option<u32>,
#[schemars(description = "Number of nodes to skip (offset) (optional, default: 0)")]
offset: Option<u32>,
#[schemars(description = "Filter by node type (e.g., 'master', 'worker') (optional)")]
node_type: Option<String>,
}
#[derive(Clone)]
struct WazuhToolsServer {
#[allow(dead_code)] // Kept for future expansion to other Wazuh clients
wazuh_factory: Arc<WazuhClientFactory>,
wazuh_indexer_client: Arc<WazuhIndexerClient>,
wazuh_rules_client: Arc<tokio::sync::Mutex<RulesClient>>,
wazuh_vulnerability_client: Arc<tokio::sync::Mutex<VulnerabilityClient>>,
wazuh_agents_client: Arc<tokio::sync::Mutex<AgentsClient>>,
wazuh_logs_client: Arc<tokio::sync::Mutex<LogsClient>>,
wazuh_cluster_client: Arc<tokio::sync::Mutex<ClusterClient>>,
}
#[tool(tool_box)]
impl WazuhToolsServer {
fn format_agent_id(agent_id_str: &str) -> Result<String, String> {
// Attempt to parse as a number first
if let Ok(num) = agent_id_str.parse::<u32>() {
if num > 999 {
Err(format!(
"Agent ID '{}' is too large. Must be a number between 0 and 999.",
agent_id_str
))
} else {
Ok(format!("{:03}", num))
}
} else if agent_id_str.len() == 3 && agent_id_str.chars().all(|c| c.is_ascii_digit()) {
// Already correctly formatted (e.g., "001")
Ok(agent_id_str.to_string())
} else {
Err(format!(
"Invalid agent_id format: '{}'. Must be a number (e.g., 1, 12) or a 3-digit string (e.g., 001, 012).",
agent_id_str
))
}
}
fn new() -> Result<Self, anyhow::Error> {
dotenv().ok();
let api_host = env::var("WAZUH_API_HOST").unwrap_or_else(|_| "localhost".to_string());
let api_port: u16 = env::var("WAZUH_API_PORT")
.unwrap_or_else(|_| "55000".to_string())
.parse()
.unwrap_or(55000);
let api_username = env::var("WAZUH_API_USERNAME").unwrap_or_else(|_| "wazuh".to_string());
let api_password = env::var("WAZUH_API_PASSWORD").unwrap_or_else(|_| "wazuh".to_string());
let indexer_host = env::var("WAZUH_INDEXER_HOST").unwrap_or_else(|_| "localhost".to_string());
let indexer_port: u16 = env::var("WAZUH_INDEXER_PORT")
.unwrap_or_else(|_| "9200".to_string())
.parse()
.unwrap_or(9200);
let indexer_username =
env::var("WAZUH_INDEXER_USERNAME").unwrap_or_else(|_| "admin".to_string());
let indexer_password =
env::var("WAZUH_INDEXER_PASSWORD").unwrap_or_else(|_| "admin".to_string());
let verify_ssl = env::var("WAZUH_VERIFY_SSL")
.unwrap_or_else(|_| "false".to_string())
.parse()
.unwrap_or(false);
let test_protocol = env::var("WAZUH_TEST_PROTOCOL")
.ok().or_else(|| Some("https".to_string()));
let wazuh_factory = WazuhClientFactory::new(
api_host,
api_port,
api_username,
api_password,
indexer_host,
indexer_port,
indexer_username,
indexer_password,
verify_ssl,
test_protocol);
let wazuh_indexer_client = wazuh_factory.create_indexer_client();
let wazuh_rules_client = wazuh_factory.create_rules_client();
let wazuh_vulnerability_client = wazuh_factory.create_vulnerability_client();
let wazuh_agents_client = wazuh_factory.create_agents_client();
let wazuh_logs_client = wazuh_factory.create_logs_client();
let wazuh_cluster_client = wazuh_factory.create_cluster_client();
Ok(Self {
wazuh_factory: Arc::new(wazuh_factory),
wazuh_indexer_client: Arc::new(wazuh_indexer_client),
wazuh_rules_client: Arc::new(tokio::sync::Mutex::new(wazuh_rules_client)),
wazuh_vulnerability_client: Arc::new(tokio::sync::Mutex::new(wazuh_vulnerability_client)),
wazuh_agents_client: Arc::new(tokio::sync::Mutex::new(wazuh_agents_client)),
wazuh_logs_client: Arc::new(tokio::sync::Mutex::new(wazuh_logs_client)),
wazuh_cluster_client: Arc::new(tokio::sync::Mutex::new(wazuh_cluster_client)),
})
}
#[tool(
name = "get_wazuh_alert_summary",
description = "Retrieves a summary of Wazuh security alerts. Returns formatted alert information including ID, timestamp, and description."
)]
async fn get_wazuh_alert_summary(
&self,
#[tool(aggr)] params: GetAlertSummaryParams,
) -> Result<CallToolResult, McpError> {
let limit = params.limit.unwrap_or(100);
tracing::info!(limit = %limit, "Retrieving Wazuh alert summary");
match self.wazuh_indexer_client.get_alerts().await {
Ok(raw_alerts) => {
let alerts_to_process: Vec<_> = raw_alerts.into_iter().take(limit as usize).collect();
if alerts_to_process.is_empty() {
tracing::info!("No Wazuh alerts found to process. Returning standard message.");
// Ensure this directly returns a Vec<Content> with one Content::text item
return Ok(CallToolResult::success(vec![Content::text(
"No Wazuh alerts found.",
)]));
}
// Process non-empty alerts
// This part should already be correct if alerts_to_process is not empty,
// as it maps each alert to Content::text directly.
let num_alerts_to_process = alerts_to_process.len(); // Get length before moving
let mcp_content_items: Vec<Content> = alerts_to_process
.into_iter()
.map(|alert_value| {
let source = alert_value.get("_source").unwrap_or(&alert_value);
let id = source.get("id")
.and_then(|v| v.as_str())
.or_else(|| alert_value.get("_id").and_then(|v| v.as_str()))
.unwrap_or("Unknown ID");
let description = source.get("rule")
.and_then(|r| r.get("description"))
.and_then(|d| d.as_str())
.unwrap_or("No description available");
let timestamp = source.get("timestamp")
.and_then(|t| t.as_str())
.unwrap_or("Unknown time");
let agent_name = source.get("agent")
.and_then(|a| a.get("name"))
.and_then(|n| n.as_str())
.unwrap_or("Unknown agent");
let rule_level = source.get("rule")
.and_then(|r| r.get("level"))
.and_then(|l| l.as_u64())
.unwrap_or(0);
let formatted_text = format!(
"Alert ID: {}\nTime: {}\nAgent: {}\nLevel: {}\nDescription: {}",
id, timestamp, agent_name, rule_level, description
);
Content::text(formatted_text)
})
.collect();
tracing::info!("Successfully processed {} alerts into {} MCP content items", num_alerts_to_process, mcp_content_items.len());
Ok(CallToolResult::success(mcp_content_items))
}
Err(e) => {
let err_msg = format!("Error retrieving alerts from Wazuh: {}", e);
tracing::error!("{}", err_msg);
Ok(CallToolResult::error(vec![Content::text(err_msg)]))
}
}
}
#[tool(
name = "get_wazuh_rules_summary",
description = "Retrieves a summary of Wazuh security rules. Returns formatted rule information including ID, level, description, and groups. Supports filtering by level, group, and filename."
)]
async fn get_wazuh_rules_summary(
&self,
#[tool(aggr)] params: GetRulesSummaryParams,
) -> Result<CallToolResult, McpError> {
let limit = params.limit.unwrap_or(100);
tracing::info!(
limit = %limit,
level = ?params.level,
group = ?params.group,
filename = ?params.filename,
"Retrieving Wazuh rules summary"
);
let mut rules_client = self.wazuh_rules_client.lock().await;
match rules_client.get_rules(
Some(limit),
None, // offset
params.level,
params.group.as_deref(),
params.filename.as_deref(),
).await {
Ok(rules) => {
if rules.is_empty() {
tracing::info!("No Wazuh rules found matching criteria. Returning standard message.");
return Ok(CallToolResult::success(vec![Content::text(
"No Wazuh rules found matching the specified criteria.",
)]));
}
let num_rules = rules.len();
let mcp_content_items: Vec<Content> = rules
.into_iter()
.map(|rule| {
let groups_str = rule.groups.join(", ");
let compliance_info = {
let mut compliance = Vec::new();
if let Some(gdpr) = &rule.gdpr {
if !gdpr.is_empty() {
compliance.push(format!("GDPR: {}", gdpr.join(", ")));
}
}
if let Some(hipaa) = &rule.hipaa {
if !hipaa.is_empty() {
compliance.push(format!("HIPAA: {}", hipaa.join(", ")));
}
}
if let Some(pci) = &rule.pci_dss {
if !pci.is_empty() {
compliance.push(format!("PCI DSS: {}", pci.join(", ")));
}
}
if let Some(nist) = &rule.nist_800_53 {
if !nist.is_empty() {
compliance.push(format!("NIST 800-53: {}", nist.join(", ")));
}
}
if compliance.is_empty() {
String::new()
} else {
format!("\nCompliance: {}", compliance.join(" | "))
}
};
let severity = match rule.level {
0..=3 => "Low",
4..=7 => "Medium",
8..=12 => "High",
13..=15 => "Critical",
_ => "Unknown",
};
let formatted_text = format!(
"Rule ID: {}\nLevel: {} ({})\nDescription: {}\nGroups: {}\nFile: {}\nStatus: {}{}",
rule.id,
rule.level,
severity,
rule.description,
groups_str,
rule.filename,
rule.status,
compliance_info
);
Content::text(formatted_text)
})
.collect();
tracing::info!("Successfully processed {} rules into {} MCP content items", num_rules, mcp_content_items.len());
Ok(CallToolResult::success(mcp_content_items))
}
Err(e) => {
let err_msg = format!("Error retrieving rules from Wazuh: {}", e);
tracing::error!("{}", err_msg);
Ok(CallToolResult::error(vec![Content::text(err_msg)]))
}
}
}
#[tool(
name = "get_wazuh_vulnerability_summary",
description = "Retrieves a summary of Wazuh vulnerability detections for a specific agent. Returns formatted vulnerability information including CVE ID, severity, detection time, and agent details. Supports filtering by severity level."
)]
async fn get_wazuh_vulnerability_summary(
&self,
#[tool(aggr)] params: GetVulnerabilitySummaryParams,
) -> Result<CallToolResult, McpError> {
let limit = params.limit.unwrap_or(100);
let offset = 0; // Default offset, can be extended in future if needed
// agent_id is now a required String. If missing, serde would have failed deserialization.
// We just need to format it.
let agent_id = match Self::format_agent_id(&params.agent_id) {
Ok(formatted_id) => formatted_id,
Err(err_msg) => {
tracing::error!("Error formatting agent_id for vulnerability summary: {}", err_msg);
return Ok(CallToolResult::error(vec![Content::text(err_msg)]));
}
};
tracing::info!(
limit = %limit,
agent_id = %agent_id,
severity = ?params.severity,
cve = ?params.cve,
"Retrieving Wazuh vulnerability summary"
);
let mut vulnerability_client = self.wazuh_vulnerability_client.lock().await;
// Filter by CVE if specified by searching through results
let vulnerabilities = if let Some(cve_filter) = &params.cve {
match vulnerability_client.get_agent_vulnerabilities(
&agent_id,
Some(1000), // Get more results to filter
Some(offset),
params.severity.as_deref().and_then(VulnerabilitySeverity::from_str)
).await {
Ok(all_vulns) => {
let filtered: Vec<_> = all_vulns
.into_iter()
.filter(|v| v.cve.to_lowercase().contains(&cve_filter.to_lowercase()))
.take(limit as usize)
.collect();
Ok(filtered)
}
Err(e) => Err(e),
}
} else {
vulnerability_client.get_agent_vulnerabilities(
&agent_id,
Some(limit),
Some(offset),
params.severity.as_deref().and_then(VulnerabilitySeverity::from_str)
).await
};
match vulnerabilities {
Ok(vulnerabilities) => {
if vulnerabilities.is_empty() {
tracing::info!("No Wazuh vulnerabilities found matching criteria. Returning standard message.");
return Ok(CallToolResult::success(vec![Content::text(
"No Wazuh vulnerabilities found matching the specified criteria.",
)]));
}
let num_vulnerabilities = vulnerabilities.len();
let mcp_content_items: Vec<Content> = vulnerabilities
.into_iter()
.map(|vuln| {
let severity_indicator = match vuln.severity {
VulnerabilitySeverity::Critical => "🔴 CRITICAL",
VulnerabilitySeverity::High => "🟠 HIGH",
VulnerabilitySeverity::Medium => "🟡 MEDIUM",
VulnerabilitySeverity::Low => "🟢 LOW",
};
let published_info = if let Some(published) = &vuln.published {
format!("\nPublished: {}", published)
} else {
String::new()
};
let updated_info = if let Some(updated) = &vuln.updated {
format!("\nUpdated: {}", updated)
} else {
String::new()
};
let detection_time_info = if let Some(detection_time) = &vuln.detection_time {
format!("\nDetection Time: {}", detection_time)
} else {
String::new()
};
let agent_info = {
let id_str = vuln.agent_id.as_deref();
let name_str = vuln.agent_name.as_deref();
match (id_str, name_str) {
(Some("000"), Some(name)) => format!("\nAgent: {} (Wazuh Manager, ID: 000)", name),
(Some("000"), None) => "\nAgent: Wazuh Manager (ID: 000)".to_string(),
(Some(id), Some(name)) => format!("\nAgent: {} (ID: {})", name, id),
(Some(id), None) => format!("\nAgent ID: {}", id),
(None, Some(name)) => format!("\nAgent: {} (ID: Unknown)", name), // Should ideally not happen if ID is a primary key for agent context
(None, None) => String::new(), // No agent information available
}
};
let cvss_info = if let Some(cvss) = &vuln.cvss {
let mut cvss_parts = Vec::new();
if let Some(cvss2) = &cvss.cvss2 {
if let Some(score) = cvss2.base_score {
cvss_parts.push(format!("CVSS2: {}", score));
}
}
if let Some(cvss3) = &cvss.cvss3 {
if let Some(score) = cvss3.base_score {
cvss_parts.push(format!("CVSS3: {}", score));
}
}
if !cvss_parts.is_empty() {
format!("\nCVSS Scores: {}", cvss_parts.join(", "))
} else {
String::new()
}
} else {
String::new()
};
let reference_info = if let Some(reference) = &vuln.reference {
format!("\nReference: {}", reference)
} else {
String::new()
};
let description = vuln.description.as_deref().unwrap_or("No description available");
let formatted_text = format!(
"CVE: {}\nSeverity: {}\nTitle: {}\nDescription: {}{}{}{}{}{}{}",
vuln.cve,
severity_indicator,
vuln.title,
description,
published_info,
updated_info,
detection_time_info,
agent_info,
cvss_info,
reference_info
);
Content::text(formatted_text)
})
.collect();
tracing::info!("Successfully processed {} vulnerabilities into {} MCP content items", num_vulnerabilities, mcp_content_items.len());
Ok(CallToolResult::success(mcp_content_items))
}
Err(e) => {
match e {
wazuh_client::WazuhApiError::HttpError { status, message: _, url: _ } if status == StatusCode::NOT_FOUND => {
tracing::info!("No vulnerability summary found for agent {}. Returning standard message.", agent_id);
return Ok(CallToolResult::success(vec![Content::text(
format!("No vulnerability summary found for agent {}.", agent_id),
)]));
}
_ => {
}
}
let err_msg = format!("Error retrieving vulnerabilities from Wazuh: {}", e);
tracing::error!("{}", err_msg);
Ok(CallToolResult::error(vec![Content::text(err_msg)]))
}
}
}
#[tool(
name = "get_wazuh_critical_vulnerabilities",
description = "Retrieves critical vulnerabilities for a specific Wazuh agent. Returns formatted vulnerability information including CVE ID, title, description, CVSS scores, and detection details. Only shows vulnerabilities with 'Critical' severity level."
)]
async fn get_wazuh_critical_vulnerabilities(
&self,
#[tool(aggr)] params: GetCriticalVulnerabilitiesParams,
) -> Result<CallToolResult, McpError> {
let agent_id = match Self::format_agent_id(&params.agent_id) {
Ok(formatted_id) => formatted_id,
Err(err_msg) => {
tracing::error!("Error formatting agent_id for critical vulnerabilities: {}", err_msg);
return Ok(CallToolResult::error(vec![Content::text(err_msg)]));
}
};
tracing::info!(
agent_id = %agent_id,
"Retrieving critical vulnerabilities for Wazuh agent"
);
let mut vulnerability_client = self.wazuh_vulnerability_client.lock().await;
match vulnerability_client.get_critical_vulnerabilities(&agent_id).await {
Ok(vulnerabilities) => {
if vulnerabilities.is_empty() {
tracing::info!("No critical vulnerabilities found for agent {}. Returning standard message.", agent_id);
return Ok(CallToolResult::success(vec![Content::text(
format!("No critical vulnerabilities found for agent {}.", agent_id),
)]));
}
let num_vulnerabilities = vulnerabilities.len();
let mcp_content_items: Vec<Content> = vulnerabilities
.into_iter()
.map(|vuln| {
let published_info = if let Some(published) = &vuln.published {
format!("\nPublished: {}", published)
} else {
String::new()
};
let updated_info = if let Some(updated) = &vuln.updated {
format!("\nUpdated: {}", updated)
} else {
String::new()
};
let detection_time_info = if let Some(detection_time) = &vuln.detection_time {
format!("\nDetection Time: {}", detection_time)
} else {
String::new()
};
let agent_info = if let Some(agent_name) = &vuln.agent_name {
format!("\nAgent: {} (ID: {})", agent_name, vuln.agent_id.as_deref().unwrap_or("Unknown"))
} else if let Some(agent_id) = &vuln.agent_id {
format!("\nAgent ID: {}", agent_id)
} else {
String::new()
};
let cvss_info = if let Some(cvss) = &vuln.cvss {
let mut cvss_parts = Vec::new();
if let Some(cvss2) = &cvss.cvss2 {
if let Some(score) = cvss2.base_score {
cvss_parts.push(format!("CVSS2: {}", score));
}
}
if let Some(cvss3) = &cvss.cvss3 {
if let Some(score) = cvss3.base_score {
cvss_parts.push(format!("CVSS3: {}", score));
}
}
if !cvss_parts.is_empty() {
format!("\nCVSS Scores: {}", cvss_parts.join(", "))
} else {
String::new()
}
} else {
String::new()
};
let reference_info = if let Some(reference) = &vuln.reference {
format!("\nReference: {}", reference)
} else {
String::new()
};
let description = vuln.description.as_deref().unwrap_or("No description available");
let formatted_text = format!(
"🔴 CRITICAL VULNERABILITY\nCVE: {}\nTitle: {}\nDescription: {}{}{}{}{}{}{}",
vuln.cve,
vuln.title,
description,
published_info,
updated_info,
detection_time_info,
agent_info,
cvss_info,
reference_info
);
Content::text(formatted_text)
})
.collect();
tracing::info!("Successfully processed {} critical vulnerabilities into {} MCP content items", num_vulnerabilities, mcp_content_items.len());
Ok(CallToolResult::success(mcp_content_items))
}
Err(e) => {
match e {
wazuh_client::WazuhApiError::HttpError { status, message: _, url: _ } if status == StatusCode::NOT_FOUND => {
tracing::info!("No critical vulnerabilities found for agent {}. Returning standard message.", agent_id);
return Ok(CallToolResult::success(vec![Content::text(
format!("No critical vulnerabilities found for agent {}.", agent_id),
)]));
}
_ => {
}
}
let err_msg = format!("Error retrieving critical vulnerabilities from Wazuh for agent {}: {}", agent_id, e);
tracing::error!("{}", err_msg);
Ok(CallToolResult::error(vec![Content::text(err_msg)]))
}
}
}
#[tool(
name = "get_wazuh_running_agents",
description = "Retrieves a list of Wazuh agents with their current status and details. Returns formatted agent information including ID, name, IP, status, OS details, and last activity. Supports filtering by status, name, IP, group, OS platform, and version."
)]
async fn get_wazuh_running_agents(
&self,
#[tool(aggr)] params: GetRunningAgentsParams,
) -> Result<CallToolResult, McpError> {
let limit = params.limit.unwrap_or(100);
let status = params.status.as_deref().unwrap_or("active"); // Default to active agents
tracing::info!(
limit = %limit,
status = %status,
name = ?params.name,
ip = ?params.ip,
group = ?params.group,
os_platform = ?params.os_platform,
version = ?params.version,
"Retrieving Wazuh agents"
);
let mut agents_client = self.wazuh_agents_client.lock().await;
match agents_client.get_agents(
Some(limit),
None, // offset
None, // select
None, // sort
None, // search
Some(status),
None, // query
None, // older_than
params.os_platform.as_deref(),
None, // os_version
None, // os_name
None, // manager_host
params.version.as_deref(),
params.group.as_deref(),
None, // node_name
params.name.as_deref(),
params.ip.as_deref(),
None, // register_ip
None, // group_config_status
None, // distinct
).await {
Ok(agents) => {
if agents.is_empty() {
tracing::info!("No Wazuh agents found matching criteria. Returning standard message.");
return Ok(CallToolResult::success(vec![Content::text(
format!("No Wazuh agents found matching the specified criteria (status: {}).", status),
)]));
}
let num_agents = agents.len();
let mcp_content_items: Vec<Content> = agents
.into_iter()
.map(|agent| {
let status_indicator = match agent.status.to_lowercase().as_str() {
"active" => "🟢 ACTIVE",
"disconnected" => "🔴 DISCONNECTED",
"pending" => "🟡 PENDING",
"never_connected" => "⚪ NEVER CONNECTED",
_ => &agent.status,
};
let ip_info = if let Some(ip) = &agent.ip {
format!("\nIP: {}", ip)
} else {
String::new()
};
let register_ip_info = if let Some(register_ip) = &agent.register_ip {
if agent.ip.as_ref() != Some(register_ip) {
format!("\nRegistered IP: {}", register_ip)
} else {
String::new()
}
} else {
String::new()
};
let os_info = if let Some(os) = &agent.os {
let mut os_parts = Vec::new();
if let Some(name) = &os.name {
os_parts.push(name.clone());
}
if let Some(version) = &os.version {
os_parts.push(version.clone());
}
if let Some(arch) = &os.arch {
os_parts.push(format!("({})", arch));
}
if !os_parts.is_empty() {
format!("\nOS: {}", os_parts.join(" "))
} else {
String::new()
}
} else {
String::new()
};
let version_info = if let Some(version) = &agent.version {
format!("\nAgent Version: {}", version)
} else {
String::new()
};
let group_info = if let Some(groups) = &agent.group {
if !groups.is_empty() {
format!("\nGroups: {}", groups.join(", "))
} else {
String::new()
}
} else {
String::new()
};
let last_keep_alive_info = if let Some(last_keep_alive) = &agent.last_keep_alive {
format!("\nLast Keep Alive: {}", last_keep_alive)
} else {
String::new()
};
let date_add_info = if let Some(date_add) = &agent.date_add {
format!("\nRegistered: {}", date_add)
} else {
String::new()
};
let node_info = if let Some(node_name) = &agent.node_name {
format!("\nNode: {}", node_name)
} else {
String::new()
};
let config_status_info = if let Some(config_status) = &agent.group_config_status {
let config_indicator = match config_status.to_lowercase().as_str() {
"synced" => "✅ SYNCED",
"not synced" => "❌ NOT SYNCED",
_ => config_status,
};
format!("\nConfig Status: {}", config_indicator)
} else {
String::new()
};
let agent_id_display = if agent.id == "000" {
format!("{} (Wazuh Manager)", agent.id)
} else {
agent.id.clone()
};
let formatted_text = format!(
"Agent ID: {}\nName: {}\nStatus: {}{}{}{}{}{}{}{}{}{}",
agent_id_display,
agent.name,
status_indicator,
ip_info,
register_ip_info,
os_info,
version_info,
group_info,
last_keep_alive_info,
date_add_info,
node_info,
config_status_info
);
Content::text(formatted_text)
})
.collect();
tracing::info!("Successfully processed {} agents into {} MCP content items", num_agents, mcp_content_items.len());
Ok(CallToolResult::success(mcp_content_items))
}
Err(e) => {
let err_msg = format!("Error retrieving agents from Wazuh: {}", e);
tracing::error!("{}", err_msg);
Ok(CallToolResult::error(vec![Content::text(err_msg)]))
}
}
}
#[tool(
name = "get_wazuh_agent_processes",
description = "Retrieves a list of running processes for a specific Wazuh agent. Returns formatted process information including PID, name, state, user, and command. Supports filtering by process name/command."
)]
async fn get_wazuh_agent_processes(
&self,
#[tool(aggr)] params: GetAgentProcessesParams,
) -> Result<CallToolResult, McpError> {
let agent_id = match Self::format_agent_id(&params.agent_id) {
Ok(formatted_id) => formatted_id,
Err(err_msg) => {
tracing::error!("Error formatting agent_id for agent processes: {}", err_msg);
return Ok(CallToolResult::error(vec![Content::text(err_msg)]));
}
};
let limit = params.limit.unwrap_or(100);
let offset = 0;
tracing::info!(
agent_id = %agent_id,
limit = %limit,
search = ?params.search,
"Retrieving Wazuh agent processes"
);
let mut vulnerability_client = self.wazuh_vulnerability_client.lock().await;
match vulnerability_client.get_agent_processes(
&agent_id,
Some(limit),
Some(offset),
params.search.as_deref(),
).await {
Ok(processes) => {
if processes.is_empty() {
tracing::info!("No processes found for agent {} with current filters. Returning standard message.", agent_id);
return Ok(CallToolResult::success(vec![Content::text(
format!("No processes found for agent {} matching the specified criteria.", agent_id),
)]));
}
let num_processes = processes.len();
let mcp_content_items: Vec<Content> = processes
.into_iter()
.map(|process| {
let mut details = vec![
format!("PID: {}", process.pid),
format!("Name: {}", process.name),
];
if let Some(state) = &process.state {
details.push(format!("State: {}", state));
}
if let Some(ppid) = &process.ppid {
details.push(format!("PPID: {}", ppid));
}
if let Some(euser) = &process.euser {
details.push(format!("User: {}", euser));
}
if let Some(cmd) = &process.cmd {
details.push(format!("Command: {}", cmd));
}
if let Some(start_time_str) = &process.start_time {
if let Ok(start_time_unix) = start_time_str.parse::<i64>() {
// Assuming start_time is a Unix timestamp in seconds
use chrono::DateTime;
if let Some(dt) = DateTime::from_timestamp(start_time_unix, 0) {
details.push(format!("Start Time: {}", dt.format("%Y-%m-%d %H:%M:%S UTC")));
} else {
details.push(format!("Start Time: {} (raw)", start_time_str));
}
} else {
// If it's not a simple number, print as is
details.push(format!("Start Time: {}", start_time_str));
}
}
if let Some(resident_mem) = process.resident {
details.push(format!("Memory (Resident): {} KB", resident_mem / 1024)); // Assuming resident is in bytes
}
if let Some(vm_size) = process.vm_size {
details.push(format!("Memory (VM Size): {} KB", vm_size / 1024)); // Assuming vm_size is in bytes
}
Content::text(details.join("\n"))
})
.collect();
tracing::info!("Successfully processed {} processes for agent {} into {} MCP content items", num_processes, agent_id, mcp_content_items.len());
Ok(CallToolResult::success(mcp_content_items))
}
Err(e) => {
match e {
wazuh_client::WazuhApiError::HttpError { status, message: _, url: _ } if status == StatusCode::NOT_FOUND => {
tracing::info!("No process data found for agent {}. Syscollector might not have run or data is unavailable.", agent_id);
Ok(CallToolResult::success(vec![Content::text(
format!("No process data found for agent {}. The agent might not exist, syscollector data might be unavailable, or the agent is not active.", agent_id),
)]))
}
_ => {
let err_msg = format!("Error retrieving processes for agent {} from Wazuh: {}", agent_id, e);
tracing::error!("{}", err_msg);
Ok(CallToolResult::error(vec![Content::text(err_msg)]))
}
}
}
}
}
#[tool(
name = "get_wazuh_cluster_health",
description = "Checks the health of the Wazuh cluster. Returns whether the cluster is enabled, running, and if nodes are connected."
)]
async fn get_wazuh_cluster_health(
&self,
#[tool(aggr)] _params: GetClusterHealthParams, // No params used
) -> Result<CallToolResult, McpError> {
tracing::info!("Retrieving Wazuh cluster health");
let mut cluster_client = self.wazuh_cluster_client.lock().await;
match cluster_client.is_cluster_healthy().await {
Ok(is_healthy) => {
let health_status_text = if is_healthy {
"Cluster is healthy: Yes".to_string()
} else {
// To provide more context, we can fetch the basic status
match cluster_client.get_cluster_status().await {
Ok(status) => {
let mut reasons = Vec::new();
if !status.enabled.eq_ignore_ascii_case("yes") {
reasons.push("cluster is not enabled");
}
if !status.running.eq_ignore_ascii_case("yes") {
reasons.push("cluster is not running");
}
// is_cluster_healthy already checks n_connected_nodes > 0 if enabled and running
// If it's still false, and enabled/running are "yes", it implies no connected nodes or healthcheck failed.
if status.enabled.eq_ignore_ascii_case("yes") && status.running.eq_ignore_ascii_case("yes") {
match cluster_client.get_cluster_healthcheck().await {
Ok(hc) if hc.n_connected_nodes == 0 => reasons.push("no nodes are connected"),
Err(_) => reasons.push("healthcheck endpoint failed or reported issues"),
_ => {} // Healthy implies connected nodes
}
}
if reasons.is_empty() && !is_healthy { // Should not happen if logic is correct
reasons.push("unknown reason, check detailed logs or healthcheck endpoint");
}
format!("Cluster is healthy: No. Reasons: {}", reasons.join("; "))
}
Err(_) => {
"Cluster is healthy: No. Additionally, failed to retrieve basic cluster status for more details.".to_string()
}
}
};
tracing::info!("Successfully retrieved cluster health: {}", health_status_text);
Ok(CallToolResult::success(vec![Content::text(health_status_text)]))
}
Err(e) => {
let err_msg = format!("Error retrieving cluster health from Wazuh: {}", e);
tracing::error!("{}", err_msg);
Ok(CallToolResult::error(vec![Content::text(err_msg)]))
}
}
}
#[tool(
name = "get_wazuh_cluster_nodes",
description = "Retrieves a list of nodes in the Wazuh cluster. Returns formatted node information including name, type, version, IP, and status. Supports filtering by limit, offset, and node type."
)]
async fn get_wazuh_cluster_nodes(
&self,
#[tool(aggr)] params: GetClusterNodesParams,
) -> Result<CallToolResult, McpError> {
tracing::info!(
limit = ?params.limit,
offset = ?params.offset,
node_type = ?params.node_type,
"Retrieving Wazuh cluster nodes"
);
let mut cluster_client = self.wazuh_cluster_client.lock().await;
match cluster_client.get_cluster_nodes(
params.limit,
params.offset,
params.node_type.as_deref(),
).await {
Ok(nodes) => {
if nodes.is_empty() {
tracing::info!("No Wazuh cluster nodes found matching criteria. Returning standard message.");
return Ok(CallToolResult::success(vec![Content::text(
"No Wazuh cluster nodes found matching the specified criteria.",
)]));
}
let num_nodes = nodes.len();
let mcp_content_items: Vec<Content> = nodes
.into_iter()
.map(|node| {
let status_indicator = match node.status.to_lowercase().as_str() {
"connected" | "active" => "🟢 CONNECTED", // Assuming 'active' is also a good state
"disconnected" => "🔴 DISCONNECTED",
_ => &node.status,
};
let formatted_text = format!(
"Node Name: {}\nType: {}\nVersion: {}\nIP: {}\nStatus: {}",
node.name, node.node_type, node.version, node.ip, status_indicator
);
Content::text(formatted_text)
})
.collect();
tracing::info!("Successfully processed {} cluster nodes into {} MCP content items", num_nodes, mcp_content_items.len());
Ok(CallToolResult::success(mcp_content_items))
}
Err(e) => {
let err_msg = format!("Error retrieving cluster nodes from Wazuh: {}", e);
tracing::error!("{}", err_msg);
Ok(CallToolResult::error(vec![Content::text(err_msg)]))
}
}
}
#[tool(
name = "search_wazuh_manager_logs",
description = "Searches Wazuh manager logs. Returns formatted log entries including timestamp, tag, level, and description. Supports filtering by limit, offset, level, tag, and a search term."
)]
async fn search_wazuh_manager_logs(
&self,
#[tool(aggr)] params: SearchManagerLogsParams,
) -> Result<CallToolResult, McpError> {
let limit = params.limit.unwrap_or(100);
let offset = params.offset.unwrap_or(0);
tracing::info!(
limit = %limit,
offset = %offset,
level = ?params.level,
tag = ?params.tag,
search_term = ?params.search_term,
"Searching Wazuh manager logs"
);
let mut logs_client = self.wazuh_logs_client.lock().await;
match logs_client.get_manager_logs(
Some(limit),
Some(offset),
params.level.as_deref(),
params.tag.as_deref(),
params.search_term.as_deref(),
).await {
Ok(log_entries) => {
if log_entries.is_empty() {
tracing::info!("No Wazuh manager logs found matching criteria. Returning standard message.");
return Ok(CallToolResult::success(vec![Content::text(
"No Wazuh manager logs found matching the specified criteria.",
)]));
}
let num_logs = log_entries.len();
let mcp_content_items: Vec<Content> = log_entries
.into_iter()
.map(|log_entry| {
let formatted_text = format!(
"Timestamp: {}\nTag: {}\nLevel: {}\nDescription: {}",
log_entry.timestamp, log_entry.tag, log_entry.level, log_entry.description
);
Content::text(formatted_text)
})
.collect();
tracing::info!("Successfully processed {} manager log entries into {} MCP content items", num_logs, mcp_content_items.len());
Ok(CallToolResult::success(mcp_content_items))
}
Err(e) => {
let err_msg = format!("Error searching manager logs from Wazuh: {}", e);
tracing::error!("{}", err_msg);
Ok(CallToolResult::error(vec![Content::text(err_msg)]))
}
}
}
#[tool(
name = "get_wazuh_manager_error_logs",
description = "Retrieves Wazuh manager error logs. Returns formatted log entries including timestamp, tag, level (error), and description."
)]
async fn get_wazuh_manager_error_logs(
&self,
#[tool(aggr)] params: GetManagerErrorLogsParams,
) -> Result<CallToolResult, McpError> {
let limit = params.limit.unwrap_or(100);
tracing::info!(limit = %limit, "Retrieving Wazuh manager error logs");
let mut logs_client = self.wazuh_logs_client.lock().await;
match logs_client.get_error_logs(Some(limit)).await {
Ok(log_entries) => {
if log_entries.is_empty() {
tracing::info!("No Wazuh manager error logs found. Returning standard message.");
return Ok(CallToolResult::success(vec![Content::text(
"No Wazuh manager error logs found.",
)]));
}
let num_logs = log_entries.len();
let mcp_content_items: Vec<Content> = log_entries
.into_iter()
.map(|log_entry| {
let formatted_text = format!(
"Timestamp: {}\nTag: {}\nLevel: {}\nDescription: {}",
log_entry.timestamp, log_entry.tag, log_entry.level, log_entry.description
);
Content::text(formatted_text)
})
.collect();
tracing::info!("Successfully processed {} manager error log entries into {} MCP content items", num_logs, mcp_content_items.len());
Ok(CallToolResult::success(mcp_content_items))
}
Err(e) => {
let err_msg = format!("Error retrieving manager error logs from Wazuh: {}", e);
tracing::error!("{}", err_msg);
Ok(CallToolResult::error(vec![Content::text(err_msg)]))
}
}
}
#[tool(
name = "get_wazuh_log_collector_stats",
description = "Retrieves log collector statistics for a specific Wazuh agent. Returns information about events processed, dropped, bytes, and target log files."
)]
async fn get_wazuh_log_collector_stats(
&self,
#[tool(aggr)] params: GetLogCollectorStatsParams,
) -> Result<CallToolResult, McpError> {
let agent_id = match Self::format_agent_id(&params.agent_id) {
Ok(formatted_id) => formatted_id,
Err(err_msg) => {
tracing::error!("Error formatting agent_id for log collector stats: {}", err_msg);
return Ok(CallToolResult::error(vec![Content::text(err_msg)]));
}
};
tracing::info!(agent_id = %agent_id, "Retrieving Wazuh log collector stats");
let mut logs_client = self.wazuh_logs_client.lock().await;
match logs_client.get_logcollector_stats(&agent_id).await {
Ok(stats) => {
let targets_info: String = stats.targets.iter()
.map(|target| format!(" - Name: {}, Drops: {}", target.name, target.drops))
.collect::<Vec<String>>()
.join("\n");
let formatted_text = format!(
"Log Collector Stats for Agent: {}\nTotal Events: {}\nEvents Dropped: {}\nBytes Processed: {}\nTargets:\n{}",
stats.agent_id, stats.events, stats.events_dropped, stats.bytes, targets_info
);
tracing::info!("Successfully retrieved log collector stats for agent {}", agent_id);
Ok(CallToolResult::success(vec![Content::text(formatted_text)]))
}
Err(e) => {
// Check if the error is due to agent not found or stats not available
if let wazuh_client::WazuhApiError::ApiError(msg) = &e {
if msg.contains(&format!("Log collector stats for agent {} not found", agent_id)) ||
msg.contains("Agent Not Found") { // General agent not found
tracing::info!("No log collector stats found for agent {}. Returning standard message.", agent_id);
return Ok(CallToolResult::success(vec![Content::text(
format!("No log collector stats found for agent {}. The agent might not exist or stats are unavailable.", agent_id),
)]));
}
}
match e {
wazuh_client::WazuhApiError::HttpError { status, .. } if status == StatusCode::NOT_FOUND => {
tracing::info!("No log collector stats found for agent {} (HTTP 404). Returning standard message.", agent_id);
Ok(CallToolResult::success(vec![Content::text(
format!("No log collector stats found for agent {}. The agent might not exist or stats are unavailable.", agent_id),
)]))
}
_ => {
let err_msg = format!("Error retrieving log collector stats for agent {} from Wazuh: {}", agent_id, e);
tracing::error!("{}", err_msg);
Ok(CallToolResult::error(vec![Content::text(err_msg)]))
}
}
}
}
}
#[tool(
name = "get_wazuh_remoted_stats",
description = "Retrieves statistics from the Wazuh remoted daemon. Returns information about queue size, TCP sessions, event counts, and message traffic."
)]
async fn get_wazuh_remoted_stats(
&self,
#[tool(aggr)] _params: GetRemotedStatsParams, // No params used
) -> Result<CallToolResult, McpError> {
tracing::info!("Retrieving Wazuh remoted stats");
let mut logs_client = self.wazuh_logs_client.lock().await;
match logs_client.get_remoted_stats().await {
Ok(stats) => {
let formatted_text = format!(
"Wazuh Remoted Statistics:\nTotal Queue Size: {}\nTCP Sessions: {}\nEvent Count: {}\nControl Message Count: {}\nDiscarded Message Count: {}\nMessages Sent: {}\nBytes Received: {}\nDequeued After Close: {}",
stats.total_queue_size,
stats.tcp_sessions,
stats.evt_count,
stats.ctrl_count,
stats.discarded_count,
stats.msg_sent,
stats.recv_bytes,
stats.dequeued_after_close
);
tracing::info!("Successfully retrieved remoted stats");
Ok(CallToolResult::success(vec![Content::text(formatted_text)]))
}
Err(e) => {
let err_msg = format!("Error retrieving remoted stats from Wazuh: {}", e);
tracing::error!("{}", err_msg);
Ok(CallToolResult::error(vec![Content::text(err_msg)]))
}
}
}
#[tool(
name = "get_wazuh_agent_ports",
description = "Retrieves a list of open network ports for a specific Wazuh agent. Returns formatted port information including local/remote IP and port, protocol, state, and associated process/PID. Supports filtering by protocol and state."
)]
async fn get_wazuh_agent_ports(
&self,
#[tool(aggr)] params: GetAgentPortsParams,
) -> Result<CallToolResult, McpError> {
let agent_id = match Self::format_agent_id(&params.agent_id) {
Ok(formatted_id) => formatted_id,
Err(err_msg) => {
tracing::error!("Error formatting agent_id for agent ports: {}", err_msg);
return Ok(CallToolResult::error(vec![Content::text(err_msg)]));
}
};
let limit = params.limit.unwrap_or(100);
let offset = 0; // Default offset
tracing::info!(
agent_id = %agent_id,
limit = %limit,
protocol = ?params.protocol,
state = ?params.state,
"Retrieving Wazuh agent network ports"
);
let mut vulnerability_client = self.wazuh_vulnerability_client.lock().await;
// Note: The wazuh_client::VulnerabilityClient::get_agent_ports provided in the prompt
// only supports filtering by protocol. If state filtering is needed, the client would need an update.
// For now, we pass params.protocol and ignore params.state for the API call,
// but we can filter by state client-side if necessary, or acknowledge this limitation.
// The current wazuh-client `get_agent_ports` does not support state filtering directly in its parameters.
// We will filter client-side for now if `params.state` is provided.
match vulnerability_client.get_agent_ports(
&agent_id,
Some(limit * 2), // Fetch more to allow for client-side state filtering
Some(offset),
params.protocol.as_deref(),
).await {
Ok(mut ports) => {
// Client-side filtering for state if provided
if let Some(state_filter) = &params.state {
ports.retain(|port| {
port.state.as_ref().is_some_and(|s| s.eq_ignore_ascii_case(state_filter))
});
}
// Apply limit after client-side filtering
ports.truncate(limit as usize);
if ports.is_empty() {
tracing::info!("No network ports found for agent {} with current filters. Returning standard message.", agent_id);
return Ok(CallToolResult::success(vec![Content::text(
format!("No network ports found for agent {} matching the specified criteria.", agent_id),
)]));
}
let num_ports = ports.len();
let mcp_content_items: Vec<Content> = ports
.into_iter()
.map(|port: WazuhPort| { // Explicitly type port
let mut details = vec![
format!("Protocol: {}", port.protocol),
format!("Local: {}:{}", port.local.ip, port.local.port),
];
if let Some(remote) = &port.remote {
details.push(format!("Remote: {}:{}", remote.ip, remote.port));
}
if let Some(state) = &port.state {
details.push(format!("State: {}", state));
}
if let Some(process_name) = &port.process { // process field in WazuhPort is Option<String>
details.push(format!("Process Name: {}", process_name));
}
if let Some(pid) = port.pid { // pid field in WazuhPort is Option<u32>
details.push(format!("PID: {}", pid));
}
if let Some(inode) = port.inode {
details.push(format!("Inode: {}", inode));
}
if let Some(tx_queue) = port.tx_queue {
details.push(format!("TX Queue: {}", tx_queue));
}
if let Some(rx_queue) = port.rx_queue {
details.push(format!("RX Queue: {}", rx_queue));
}
Content::text(details.join("\n"))
})
.collect();
tracing::info!("Successfully processed {} network ports for agent {} into {} MCP content items", num_ports, agent_id, mcp_content_items.len());
Ok(CallToolResult::success(mcp_content_items))
}
Err(e) => {
match e {
wazuh_client::WazuhApiError::HttpError { status, message: _, url: _ } if status == StatusCode::NOT_FOUND => {
tracing::info!("No network port data found for agent {}. Syscollector might not have run or data is unavailable.", agent_id);
Ok(CallToolResult::success(vec![Content::text(
format!("No network port data found for agent {}. The agent might not exist, syscollector data might be unavailable, or the agent is not active.", agent_id),
)]))
}
_ => {
let err_msg = format!("Error retrieving network ports for agent {} from Wazuh: {}", agent_id, e);
tracing::error!("{}", err_msg);
Ok(CallToolResult::error(vec![Content::text(err_msg)]))
}
}
}
}
}
#[tool(
name = "get_wazuh_weekly_stats",
description = "Retrieves weekly statistics from the Wazuh manager. Returns a JSON object detailing various metrics aggregated over the past week."
)]
async fn get_wazuh_weekly_stats(
&self,
#[tool(aggr)] _params: GetWeeklyStatsParams, // No params used
) -> Result<CallToolResult, McpError> {
tracing::info!("Retrieving Wazuh weekly stats");
let mut logs_client = self.wazuh_logs_client.lock().await;
match logs_client.get_weekly_stats().await {
Ok(stats_value) => {
match serde_json::to_string_pretty(&stats_value) {
Ok(formatted_json) => {
tracing::info!("Successfully retrieved and formatted weekly stats.");
Ok(CallToolResult::success(vec![Content::text(formatted_json)]))
}
Err(e) => {
let err_msg = format!("Error formatting weekly stats JSON: {}", e);
tracing::error!("{}", err_msg);
Ok(CallToolResult::error(vec![Content::text(err_msg)]))
}
}
}
Err(e) => {
let err_msg = format!("Error retrieving weekly stats from Wazuh: {}", e);
tracing::error!("{}", err_msg);
Ok(CallToolResult::error(vec![Content::text(err_msg)]))
}
}
}
}
#[tool(tool_box)]
impl ServerHandler for WazuhToolsServer {
fn get_info(&self) -> ServerInfo {
ServerInfo {
protocol_version: ProtocolVersion::V_2024_11_05,
capabilities: ServerCapabilities::builder()
.enable_prompts()
.enable_resources()
.enable_tools()
.build(),
server_info: Implementation::from_build_env(),
instructions: Some(
"This server provides tools to interact with a Wazuh SIEM instance for security monitoring and analysis.\n\
Available tools:\n\
- 'get_wazuh_alert_summary': Retrieves a summary of Wazuh security alerts. \
Optionally takes 'limit' parameter to control the number of alerts returned (defaults to 100).\n\
- 'get_wazuh_rules_summary': Retrieves a summary of Wazuh security rules. \
Supports filtering by 'level', 'group', and 'filename' parameters, with 'limit' to control the number of rules returned (defaults to 100).\n\
- 'get_wazuh_vulnerability_summary': Retrieves a summary of Wazuh vulnerability detections for a specific agent. \
Requires an 'agent_id' parameter. This must be provided as a string, representing the numeric ID of the agent (e.g., \"0\", \"1\", \"12\", \"001\", \"012\"). The server will automatically format this string into a three-digit, zero-padded identifier. For instance, an input of \"0\" will be treated as \"000\", \"1\" as \"001\", and \"12\" as \"012\". Supports filtering by 'severity' and 'cve' parameters, with 'limit' to control the number of vulnerabilities returned (defaults to 100).\n\
- 'get_wazuh_critical_vulnerabilities': Retrieves only critical vulnerabilities for a specific agent. \
Requires an 'agent_id' parameter. This must be provided as a string, representing the numeric ID of the agent (e.g., \"0\", \"1\", \"12\", \"001\", \"012\"). The server will automatically format this string into a three-digit, zero-padded identifier. For instance, an input of \"0\" will be treated as \"000\", \"1\" as \"001\", and \"12\" as \"012\". Returns detailed information about vulnerabilities with 'Critical' severity level.\n\
- 'get_wazuh_running_agents': Retrieves a list of Wazuh agents with their current status and details. \
Supports filtering by 'status' (active, disconnected, pending, never_connected), 'name', 'ip', 'group', 'os_platform', and 'version' parameters, with 'limit' to control the number of agents returned (defaults to 100, status defaults to 'active').\n\
- 'get_wazuh_agent_processes': Retrieves a list of running processes for a specific Wazuh agent. \
Requires an 'agent_id' parameter (formatted as described for other agent-specific tools). Supports 'limit' (default 100) and 'search' (to filter by process name or command line) parameters.\n\
- 'get_wazuh_agent_ports': Retrieves a list of open network ports for a specific Wazuh agent. \
Requires an 'agent_id' parameter (formatted as described for other agent-specific tools). Supports 'limit' (default 100), 'protocol' (e.g., \"tcp\", \"udp\"), and 'state' (e.g., \"LISTEN\", \"ESTABLISHED\") parameters to filter the results. Note: State filtering is performed client-side by this server.\n\
- 'search_wazuh_manager_logs': Searches Wazuh manager logs. \
Optional parameters: 'limit' (default 100), 'offset' (default 0), 'level' (e.g., \"error\", \"info\"), 'tag' (e.g., \"wazuh-modulesd\"), 'search_term' (for free-text search in log descriptions).\n\
- 'get_wazuh_manager_error_logs': Retrieves Wazuh manager error logs. \
Optional parameter: 'limit' (default 100).\n\
- 'get_wazuh_log_collector_stats': Retrieves log collector statistics for a specific Wazuh agent. \
Requires an 'agent_id' parameter (formatted as described for other agent-specific tools).\n\
- 'get_wazuh_remoted_stats': Retrieves statistics from the Wazuh remoted daemon (manager-wide).\n\
- 'get_wazuh_weekly_stats': Retrieves weekly statistics from the Wazuh manager. Returns a JSON object detailing various metrics aggregated over the past week. No parameters required.\n\
- 'get_wazuh_cluster_health': Checks the health of the Wazuh cluster. Returns a textual summary of the cluster's health status (e.g., enabled, running, connected nodes). No parameters required.\n\
- 'get_wazuh_cluster_nodes': Retrieves a list of nodes in the Wazuh cluster. \
Optional parameters: 'limit' (max nodes, API default 500), 'offset' (default 0), 'node_type' (e.g., \"master\", \"worker\")."
.to_string(),
),
}
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let _args = Args::parse();
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::from_default_env()
.add_directive(tracing::Level::DEBUG.into()),
)
.with_writer(std::io::stderr)
.init();
tracing::info!("Starting Wazuh MCP Server...");
// Create an instance of our Wazuh tools server
let server = WazuhToolsServer::new()
.expect("Error initializing Wazuh tools server");
tracing::info!("Using stdio transport");
let service = server.serve(stdio()).await
.inspect_err(|e| {
tracing::error!("serving error: {:?}", e);
})?;
service.waiting().await?;
Ok(())
}