first commit

This commit is contained in:
Gianluca Brigandi 2025-05-07 11:16:24 -07:00
commit ce2460928a
22 changed files with 2163 additions and 0 deletions

9
.env.example Normal file
View File

@ -0,0 +1,9 @@
# Wazuh API Configuration
WAZUH_HOST=localhost
WAZUH_PORT=55000
WAZUH_USER=admin
WAZUH_PASS=admin
VERIFY_SSL=false
# MCP Server Configuration
MCP_SERVER_PORT=8000

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
/target
**/*.rs.bk
Cargo.lock
.env

36
Cargo.toml Normal file
View File

@ -0,0 +1,36 @@
[package]
name = "mcp-server-wazuh"
version = "0.1.0"
edition = "2021"
description = "Wazuh SIEM MCP Server"
authors = ["Gianluca Brigandi <gbrigand@gmail.com>"]
[dependencies]
tokio = { version = "1.45", features = ["full"] }
axum = "0.8"
reqwest = { version = "0.12", features = ["json", "rustls-tls"], default-features = false }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
chrono = { version = "0.4", features = ["serde"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
dotenv = "0.15"
thiserror = "2.0"
jsonwebtoken = "9.3"
tower-http = { version = "0.6", features = ["trace"] }
async-trait = "0.1"
anyhow = "1.0"
[dev-dependencies]
mockito = "1.7"
anyhow = "1.0"
httpmock = "0.7"
uuid = { version = "1.16", features = ["v4"] }
once_cell = "1.21"
async-trait = "0.1"
regex = "1.11"
[[bin]]
name = "mcp_client_cli"
path = "tests/mcp_client_cli.rs"

26
Dockerfile Normal file
View File

@ -0,0 +1,26 @@
FROM rust:1.86-slim as builder
WORKDIR /usr/src/app
COPY . .
RUN apt-get update && \
apt-get install -y pkg-config libssl-dev && \
cargo build --release
FROM debian:bullseye-slim
RUN apt-get update && \
apt-get install -y ca-certificates && \
rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY --from=builder /usr/src/app/target/release/mcp-server-wazuh /app/mcp-server-wazuh
COPY .env.example /app/.env.example
RUN useradd -m wazuh
USER wazuh
EXPOSE 8000
CMD ["./mcp-server-wazuh"]

21
LICENSE Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Gianluca Brigandi
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

296
README.md Normal file
View File

@ -0,0 +1,296 @@
# Wazuh MCP Server
A Rust-based server designed to bridge the gap between a Wazuh Security Information and Event Management (SIEM) system and applications requiring contextual security data, specifically tailored for the Claude Desktop Integration using the Model Context Protocol (MCP).
## Overview
Modern AI assistants like Claude can benefit significantly from real-time context about the user's environment. For security operations, this means providing relevant security alerts and events. Wazuh is a popular open-source SIEM, but its API output isn't directly consumable by systems expecting MCP format.
This server acts as a middleware:
1. It connects to the Wazuh API.
2. Authenticates using credentials (fetching a JWT token).
3. Periodically fetches security alerts from Wazuh.
4. Transforms these alerts from Wazuh's native format into the standardized MCP JSON format.
5. Exposes a simple HTTP endpoint (`/mcp`) where clients like Claude Desktop can poll for the latest transformed security context.
## Architecture
The server facilitates communication between Claude Desktop (or any MCP client) and the Wazuh API.
```ascii
```ascii
+-----------------+ +--------------------+ +-----------------+
| Claude Desktop | | Wazuh MCP Server | | Wazuh |
| (MCP Client) | | (This Application) | | (SIEM) |
+-----------------+ +--------------------+ +-----------------+
| | |
| 1. GET /mcp Request | |
|------------------------->| |
| | 2. Check/Refresh JWT |
| |------------------------->| 3. Authenticate (if needed via API)
| | |
| | 4. JWT Response (if auth)|
| |<-------------------------|
| | |
| | 5. GET /wazuh-alerts-*_search (with JWT)
| |------------------------->| 6. Fetch Alerts (via API)
| | |
| | 7. Wazuh Alert Data |
| |<-------------------------|
| | |
| | 8. Transform Data to MCP|
| | (Internal Logic) |
| | |
| 9. MCP JSON Response | |
|<-------------------------| |
| | |
```
**Data Flow:**
1. An MCP client (e.g., Claude Desktop) sends an HTTP GET request to the `/mcp` endpoint of this server.
2. The server checks if it has a valid JWT for the Wazuh API.
3. If the JWT is missing or expired, it authenticates with the Wazuh API using configured credentials (`WAZUH_USER`, `WAZUH_PASS`) via the `/security/user/authenticate` endpoint.
4. The Wazuh API returns a JWT.
5. The server uses the valid JWT to make a request to the Wazuh alerts search endpoint (e.g., `/wazuh-alerts-*_search`).
6. The Wazuh API executes the search (currently fetches all recent alerts).
7. The Wazuh API returns the alert data in its native JSON format.
8. The server's transformation logic (`src/mcp/transform.rs`) processes each alert, mapping Wazuh fields (like `rule.level`, `rule.description`, `agent.name`, `data`, `timestamp`) to the corresponding MCP fields (`severity`, `description`, `agent`, `data`, `timestamp`). It also sets default values for missing fields.
9. The server responds to the MCP client with a JSON array of transformed alerts in the MCP format.
## Features
- **Wazuh API Integration:** Connects securely to the Wazuh API.
- **JWT Authentication:** Handles authentication with Wazuh using username/password to obtain a JWT.
- **Automatic Token Refresh:** Monitors JWT validity and automatically re-authenticates when the token expires or is close to expiring. Retries API calls once upon receiving a 401 Unauthorized response.
- **Alert Retrieval:** Fetches alerts from the Wazuh API (currently configured to retrieve all recent alerts via a `match_all` query).
- **MCP Transformation:** Converts Wazuh alert JSON objects into MCP v1.0 compliant JSON messages. This includes:
- Mapping Wazuh `rule.level` to MCP `severity` (e.g., 0-3 -> "low", 8-11 -> "high").
- Extracting `rule.description`, `id`, `timestamp`, `agent` details, and the `data` payload.
- Taking the first group from `rule.groups` as the MCP `category`.
- Handling potential differences in Wazuh response structure (e.g., presence or absence of `_source` nesting).
- Providing default values (e.g., "unknown_severity", "unknown_category", current time for invalid timestamps).
- **HTTP Server:** Exposes endpoints using the Axum web framework.
- `/mcp`: Serves the transformed MCP messages.
- `/health`: Provides a simple health check.
- **Configuration:** Easily configurable via environment variables or a `.env` file.
- **Containerization:** Includes a `Dockerfile` and `docker-compose.yml` for easy deployment.
- **Logging:** Uses the `tracing` library for request and application logging (configurable via `RUST_LOG`).
## Requirements
- Rust (latest stable recommended, see `Cargo.toml` for specific dependencies)
- A running Wazuh server (v4.x recommended) with the API enabled and accessible.
- Network connectivity between this server and the Wazuh API.
## Configuration
Configuration is managed through environment variables. A `.env` file can be placed in the project root for local development.
| Variable | Description | Default | Required |
| ----------------- | ------------------------------------------------- | ----------- | -------- |
| `WAZUH_HOST` | Hostname or IP address of the Wazuh API server. | `localhost` | Yes |
| `WAZUH_PORT` | Port number for the Wazuh API. | `55000` | Yes |
| `WAZUH_USER` | Username for Wazuh API authentication. | `admin` | Yes |
| `WAZUH_PASS` | Password for Wazuh API authentication. | `admin` | Yes |
| `VERIFY_SSL` | Set to `true` to verify the Wazuh API's SSL cert. | `false` | No |
| `MCP_SERVER_PORT` | Port for this MCP server to listen on. | `8000` | No |
| `RUST_LOG` | Log level (e.g., `info`, `debug`, `trace`). | `info` | No |
**Note on `VERIFY_SSL`:** For production environments, it is strongly recommended to set `VERIFY_SSL=true` and ensure proper certificate validation. Setting it to `false` disables certificate checks, which is insecure.
## Building and Running
### Prerequisites
- Install Rust: [https://www.rust-lang.org/tools/install](https://www.rust-lang.org/tools/install)
- Install Docker and Docker Compose (optional, for containerized deployment): [https://docs.docker.com/get-docker/](https://docs.docker.com/get-docker/)
### Local Development
1. **Clone the repository:**
```bash
git clone https://github.com/yourusername/mcp-server-wazuh.git # Replace with your repo URL
cd mcp-server-wazuh
```
2. **Configure:**
- Copy the example environment file: `cp .env.example .env`
- Edit the `.env` file with your specific Wazuh API details (`WAZUH_HOST`, `WAZUH_PORT`, `WAZUH_USER`, `WAZUH_PASS`).
3. **Build:**
```bash
cargo build
```
4. **Run:**
```bash
cargo run
# Or use the run script:
# ./run.sh
```
The server will start listening on the port specified by `MCP_SERVER_PORT` (default 8000).
### Docker Deployment
1. **Clone the repository** (if not already done).
2. **Configure:** Ensure you have a `.env` file with your Wazuh credentials in the project root, or set the environment variables directly in the `docker-compose.yml` or your deployment environment.
3. **Build and Run:**
```bash
docker-compose up --build -d
```
This will build the Docker image and start the container in detached mode.
## API Endpoints
### `GET /mcp`
Fetches the latest alerts from the configured Wazuh API, transforms them into MCP format, and returns them as a JSON array.
- **Method:** `GET`
- **Success Response:** `200 OK`
- **Body:** `application/json` - An array of MCP message objects.
```json
[
{
"protocol_version": "1.0",
"source": "Wazuh",
"timestamp": "2023-10-27T10:30:00Z", // ISO 8601 format (RFC3339)
"event_type": "alert",
"context": {
"id": "wazuh_alert_id_1", // Wazuh alert ID
"category": "gdpr", // Derived from rule groups
"severity": "high", // Derived from rule level
"description": "High severity rule triggered",
"agent": { // Wazuh agent details
"id": "001",
"name": "server-db"
},
"data": { // Original Wazuh alert data field
"srcip": "1.2.3.4",
"dstport": "22"
}
// Other relevant context fields might be added here
},
"metadata": {
"integration": "Wazuh-MCP",
"notes": "Data fetched via Wazuh API"
// Other metadata like Wazuh rule ID could be added
// "rule_id": "1002"
}
},
// ... more MCP messages
]
```
- **Error Responses:**
- `401 Unauthorized`: If Wazuh authentication fails persistently.
- `500 Internal Server Error`: If there's an issue fetching/parsing data from Wazuh, or an internal server problem.
- `502 Bad Gateway`: If the server cannot connect to the Wazuh API or the API returns an unexpected error.
### `GET /health`
A simple health check endpoint.
- **Method:** `GET`
- **Success Response:** `200 OK`
- **Body:** `application/json`
```json
{
"status": "ok",
"service": "wazuh-mcp-server",
"timestamp": "2023-10-27T12:00:00Z" // Current server time in ISO 8601
}
```
- **Error Responses:** None expected for this endpoint itself, but the server might be unreachable if down.
## Running the All-in-One Demo (Wazuh + MCP Server)
For a complete local demo environment that includes Wazuh (Indexer, Manager, Dashboard) and the Wazuh MCP Server pre-configured to connect to it, you can use the `docker-compose.all-in-one.yml` file.
This setup is ideal for testing the end-to-end flow from Wazuh alerts to MCP messages.
**1. Launch the Environment:**
Navigate to the project root directory in your terminal and run:
```bash
docker-compose -f docker-compose.all-in-one.yml up -d
This command will:
- Download the necessary Wazuh and OpenSearch images (if not already present).
- Start the Wazuh Indexer, Wazuh Manager, and Wazuh Dashboard services.
- Build and start the Wazuh MCP Server.
- All services are configured to communicate with each other on an internal Docker network.
**2. Accessing Services:**
* **Wazuh Dashboard:**
* URL: `https://localhost:8443` (Note: Uses HTTPS with a self-signed certificate, so your browser will likely show a warning).
* Default Username: `admin`
* Default Password: `AdminPassword123!` (This is set by `WAZUH_INITIAL_PASSWORD` in the `wazuh-indexer` service).
* **Wazuh MCP Server:**
* The MCP server will be running and accessible on port `8000` by default (or the port specified by `MCP_SERVER_PORT` if you've set it as an environment variable on your host machine before running docker-compose).
* Example MCP endpoint: `http://localhost:8000/mcp`
* Example Health endpoint: `http://localhost:8000/health`
* **Configuration:** The `mcp-server` service within `docker-compose.all-in-one.yml` is already configured with the necessary environment variables to connect to the `wazuh-manager` service:
* `WAZUH_HOST=wazuh-manager`
* `WAZUH_PORT=55000`
* `WAZUH_USER=wazuh_user_demo`
* `WAZUH_PASS=wazuh_password_demo`
* `VERIFY_SSL=false`
You do not need to set these in a separate `.env` file when using this all-in-one compose file, as they are defined directly in the service's environment.
**3. Stopping the Environment:**
To stop all services, run:
```bash
docker-compose -f docker-compose.all-in-one.yml down
```
To stop and remove volumes (deleting Wazuh data):
```bash
docker-compose -f docker-compose.all-in-one.yml down -v
```
This approach simplifies setup by bundling all necessary components and their configurations.
## Claude Desktop Integration
To use this Wazuh MCP Server with Claude Desktop (or any other MCP-compatible client), you need to configure the client to poll the `/mcp` endpoint exposed by this server.
1. **Ensure the Wazuh MCP Server is running** and accessible from the machine where Claude Desktop is operating. This might involve:
* Running the server locally (e.g., `cargo run` or via Docker, including the all-in-one setup described above).
* Deploying the server to a reachable host.
2. **Identify the server's address and port.**
* If using the all-in-one demo: `http://localhost:8000/mcp` (or your `MCP_SERVER_PORT`).
* If running `mcp-server` standalone: `http://localhost:8000` by default, or `http://<your-server-ip-or-hostname>:<MCP_SERVER_PORT>` if deployed elsewhere. The `MCP_SERVER_PORT` is configurable via environment variables (defaults to `8000`).
3. **Configure Claude Desktop:**
* In Claude Desktop's settings or configuration area for external context sources, add a new MCP endpoint.
* Set the URL to `http://<server_address>:<port>/mcp`. For example:
* If running the all-in-one demo or locally: `http://localhost:8000/mcp`
* If running on a remote server `192.168.1.100` on port `8080`: `http://192.168.1.100:8080/mcp`
4. **Verify Firewall Rules:** Ensure that any firewalls between Claude Desktop and the Wazuh MCP Server allow traffic on the configured `MCP_SERVER_PORT`.
Once configured, Claude Desktop should start polling the `/mcp` endpoint periodically to fetch the latest Wazuh security alerts in MCP format.
`
## Development & Testing
- **Code Style:** Uses standard Rust formatting (`cargo fmt`).
- **Linting:** Uses Clippy (`cargo clippy`).
- **Testing:** Contains unit tests for transformation logic and integration tests using a mock Wazuh API server (`httpmock`) and a test MCP client.
```bash
# Run all tests
cargo test
# Run specific integration test
cargo test --test integration_test
# Run tests with detailed logging
RUST_LOG=debug cargo test
```
- See `tests/README.md` for more details on running tests and using the test client CLI.
## License
This project is licensed under the [MIT License](LICENSE).

View File

@ -0,0 +1,98 @@
version: '3.8'
services:
# Wazuh Indexer
wazuh-indexer:
image: wazuh/wazuh-indexer:4.7.2
platform: linux/amd64
hostname: wazuh-indexer
restart: always
ports:
- "9200:9200"
- "9300:9300"
environment:
- WAZUH_INITIAL_USER=admin
- WAZUH_INITIAL_PASSWORD=AdminPassword123!
- OPENSEARCH_JAVA_OPTS=-Xms1g -Xmx1g
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- wazuh-indexer-data:/var/lib/opensearch/data
# Wazuh Manager
wazuh-manager:
image: wazuh/wazuh-manager:4.7.2
platform: linux/amd64
hostname: wazuh-manager
restart: always
ports:
# Wazuh agent communication
- "1514:1514/udp"
- "1515:1515/tcp"
# Wazuh API
- "55000:55000"
environment:
- WAZUH_INDEXER_URL=http://wazuh-indexer:9200
- WAZUH_API_USER=wazuh_user_demo # API User for MCP Server to use
- WAZUH_API_PASSWORD=wazuh_password_demo # API Password for MCP Server to use
depends_on:
wazuh-indexer:
condition: service_healthy
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9200/_cluster/health?wait_for_status=yellow&timeout=50s"]
interval: 30s
timeout: 10s
retries: 5
volumes:
- wazuh-manager-data:/var/ossec/data
- wazuh-manager-logs:/var/ossec/logs
- wazuh-manager-etc:/var/ossec/etc
# Wazuh Dashboard
wazuh-dashboard:
image: wazuh/wazuh-dashboard:4.7.2
platform: linux/amd64
hostname: wazuh-dashboard
restart: always
ports:
- "8443:443"
environment:
- WAZUH_INDEXER_URL=http://wazuh-indexer:9200
- WAZUH_API_URL=http://wazuh-manager:55000
- WAZUH_USER=admin
- WAZUH_PASSWORD=AdminPassword123!
# - OPENSEARCH_DASHBOARDS_SERVER_SSL_ENABLED=true # Default is true
# - OPENSEARCH_DASHBOARDS_SERVER_SSL_KEY=/usr/share/wazuh-dashboard/certs/key.pem # Provide your certs if needed
# - OPENSEARCH_DASHBOARDS_SERVER_SSL_CERTIFICATE=/usr/share/wazuh-dashboard/certs/cert.pem
depends_on:
wazuh-manager:
condition: service_started
wazuh-indexer:
condition: service_healthy
# MCP Server
mcp-server:
build: .
ports:
- "${MCP_SERVER_PORT:-8000}:${MCP_SERVER_PORT:-8000}"
environment:
- WAZUH_HOST=wazuh-manager
- WAZUH_PORT=55000
- WAZUH_USER=wazuh_user_demo
- WAZUH_PASS=wazuh_password_demo
- VERIFY_SSL=false
- MCP_SERVER_PORT=${MCP_SERVER_PORT:-8000}
- RUST_LOG=${RUST_LOG:-info}
restart: unless-stopped
depends_on:
wazuh-manager:
condition: service_started
volumes:
wazuh-indexer-data:
wazuh-manager-data:
wazuh-manager-logs:
wazuh-manager-etc:

View File

@ -0,0 +1,86 @@
version: '3.8'
volumes:
wazuh-indexer-data:
driver: local
wazuh-manager-config:
driver: local
wazuh-manager-logs:
driver: local
services:
wazuh-indexer:
image: wazuh/wazuh-indexer:4.7.3
container_name: wazuh-indexer-demo
hostname: wazuh-indexer
restart: always
ports:
- "9200:9200"
- "9300:9300"
environment:
- OPENSEARCH_INITIAL_ADMIN_PASSWORD=YourChosenStrongPassword!
- OPENSEARCH_JAVA_OPTS=-Xms1g -Xmx1g
volumes:
- wazuh-indexer-data:/var/lib/opensearch/data
healthcheck:
test: ["CMD-SHELL", "curl -k -u admin:YourChosenStrongPassword! https://localhost:9200/_cluster/health?wait_for_status=yellow&timeout=5s"]
interval: 30s
timeout: 10s
retries: 5
start_period: 120s
networks:
- wazuh-net-demo
wazuh-manager:
image: wazuh/wazuh-manager:4.7.3
container_name: wazuh-manager-demo
hostname: wazuh-manager
restart: always
ports:
- "1514:1514/udp"
- "1515:1515/tcp"
- "55000:55000/tcp"
environment:
- WAZUH_INDEXER_URL=https://wazuh-indexer:9200
- WAZUH_INDEXER_USER=admin
- WAZUH_INDEXER_PASSWORD=YourChosenStrongPassword!
- WAZUH_API_USER=wazuh
- WAZUH_API_PASSWORD=wazuh
- INDEXER_SSL_VERIFY=false
- WAZUH_CLUSTER_KEY=myDemoClusterKey
volumes:
- wazuh-manager-config:/var/ossec/etc
- wazuh-manager-logs:/var/ossec/logs
- wazuh-manager-config:/var/ossec/api/agent_keys
depends_on:
wazuh-indexer:
condition: service_healthy
networks:
- wazuh-net-demo
wazuh-dashboard:
image: wazuh/wazuh-dashboard:4.7.3
container_name: wazuh-dashboard-demo
hostname: wazuh-dashboard
restart: always
ports:
- "8443:5601"
environment:
- OPENSEARCH_HOSTS=["https://wazuh-indexer:9200"]
- OPENSEARCH_USERNAME=admin
- OPENSEARCH_PASSWORD=YourChosenStrongPassword!
- OPENSEARCH_SSL_VERIFICATIONMODE=none
- SERVER_SSL_ENABLED=true
depends_on:
wazuh-indexer:
condition: service_healthy
wazuh-manager:
condition: service_started
networks:
- wazuh-net-demo
networks:
wazuh-net-demo:
driver: bridge

15
docker-compose.yml Normal file
View File

@ -0,0 +1,15 @@
version: '3.8'
services:
mcp-server:
build: .
ports:
- "8000:8000"
environment:
- WAZUH_HOST=${WAZUH_HOST:-localhost}
- WAZUH_PORT=${WAZUH_PORT:-55000}
- WAZUH_USER=${WAZUH_USER:-admin}
- WAZUH_PASS=${WAZUH_PASS:-admin}
- VERIFY_SSL=${VERIFY_SSL:-false}
- MCP_SERVER_PORT=${MCP_SERVER_PORT:-8000}
restart: unless-stopped

10
run.sh Executable file
View File

@ -0,0 +1,10 @@
#!/bin/bash
if [ ! -f .env ]; then
echo "No .env file found. Creating from .env.example..."
cp .env.example .env
echo "Please edit .env file with your configuration."
exit 1
fi
cargo run

136
src/main.rs Normal file
View File

@ -0,0 +1,136 @@
mod wazuh;
mod mcp;
use std::env;
use std::net::SocketAddr;
use std::sync::Arc;
use axum::{
routing::get,
Router,
extract::State,
response::Json,
http::StatusCode,
};
use dotenv::dotenv;
use serde_json::{json, Value};
use tokio::sync::Mutex;
use tracing::{info, error};
use wazuh::client::WazuhApiClient;
use mcp::transform::transform_to_mcp;
// Application state shared across handlers
struct AppState {
wazuh_client: Mutex<WazuhApiClient>,
}
#[tokio::main]
async fn main() {
dotenv().ok();
tracing_subscriber::fmt::init();
let wazuh_host = env::var("WAZUH_HOST").unwrap_or_else(|_| "localhost".to_string());
let wazuh_port = env::var("WAZUH_PORT")
.unwrap_or_else(|_| "55000".to_string())
.parse::<u16>()
.expect("WAZUH_PORT must be a valid port number");
let wazuh_user = env::var("WAZUH_USER").unwrap_or_else(|_| "admin".to_string());
let wazuh_pass = env::var("WAZUH_PASS").unwrap_or_else(|_| "admin".to_string());
let verify_ssl = env::var("VERIFY_SSL")
.unwrap_or_else(|_| "false".to_string())
.to_lowercase() == "true";
let mcp_server_port = env::var("MCP_SERVER_PORT")
.unwrap_or_else(|_| "8000".to_string())
.parse::<u16>()
.expect("MCP_SERVER_PORT must be a valid port number");
let wazuh_client = WazuhApiClient::new(
wazuh_host.clone(),
wazuh_port,
wazuh_user.clone(),
wazuh_pass.clone(),
verify_ssl,
);
let app_state = Arc::new(AppState {
wazuh_client: Mutex::new(wazuh_client),
});
let app = Router::new()
.route("/mcp", get(mcp_endpoint))
.route("/health", get(health_check))
.with_state(app_state);
let addr = SocketAddr::from(([0, 0, 0, 0], mcp_server_port));
info!("Attempting to bind server to {}", addr);
let listener = tokio::net::TcpListener::bind(addr).await.unwrap_or_else(|e| {
error!("Failed to bind to address {}: {}", addr, e);
panic!("Failed to bind to address {}: {}", addr, e);
});
info!("Wazuh MCP Server listening on {}", addr);
axum::serve(listener, app.into_make_service())
.await
.unwrap_or_else(|e| {
error!("Server error: {}", e);
panic!("Server error: {}", e);
});
}
/// MCP endpoint for Claude Desktop.
/// Retrieves the latest Wazuh alerts, converts them into MCP messages, and returns as JSON.
async fn mcp_endpoint(
State(state): State<Arc<AppState>>,
) -> Result<Json<Vec<Value>>, (StatusCode, Json<Value>)> {
let alert_query = json!({
"query": {
"match_all": {}
}
});
let mut wazuh_client = state.wazuh_client.lock().await;
match wazuh_client.get_alerts(alert_query).await {
Ok(alerts_data) => {
let hits_array = alerts_data
.get("hits")
.and_then(|h| h.get("hits"))
.and_then(|h| h.as_array())
.cloned()
.unwrap_or_else(Vec::new);
let mcp_messages = hits_array
.iter()
.filter_map(|hit| {
hit.get("_source").map(|source| {
transform_to_mcp(source.clone(), "alert".to_string())
})
})
.collect::<Vec<_>>();
Ok(Json(mcp_messages))
}
Err(e) => {
error!("Error in /mcp endpoint: {}", e);
Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": e.to_string() })),
))
}
}
}
/// Health check endpoint.
/// Returns a simple JSON response to indicate the server is running.
async fn health_check() -> Json<Value> {
Json(json!({
"status": "ok",
"service": "wazuh-mcp-server",
"timestamp": chrono::Utc::now().to_rfc3339()
}))
}

1
src/mcp/mod.rs Normal file
View File

@ -0,0 +1 @@
pub mod transform;

200
src/mcp/transform.rs Normal file
View File

@ -0,0 +1,200 @@
use chrono::{DateTime, Utc, SecondsFormat};
use serde_json::{json, Value};
use tracing::warn;
pub fn transform_to_mcp(event: Value, event_type: String) -> Value {
let source_obj = event.get("_source").unwrap_or(&event);
let id = source_obj.get("id")
.and_then(|v| v.as_str())
.or_else(|| event.get("_id").and_then(|v| v.as_str()))
.unwrap_or("unknown_id")
.to_string();
let default_rule = json!({});
let rule = source_obj.get("rule").unwrap_or(&default_rule);
let category = rule.get("groups")
.and_then(|g| g.as_array())
.and_then(|arr| arr.first())
.and_then(|v| v.as_str())
.unwrap_or("unknown_category")
.to_string();
let severity = rule.get("level")
.and_then(|v| v.as_u64())
.map(|level| match level {
0..=3 => "low",
4..=7 => "medium",
8..=11 => "high",
_ => "critical",
})
.unwrap_or("unknown_severity")
.to_string();
let description = rule.get("description")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let default_data = json!({});
let data = source_obj.get("data").cloned().unwrap_or(default_data);
let default_agent = json!({});
let agent = source_obj.get("agent").cloned().unwrap_or(default_agent);
let timestamp_str = source_obj.get("timestamp")
.and_then(|v| v.as_str())
.unwrap_or("");
let timestamp = DateTime::parse_from_rfc3339(timestamp_str)
.map(|dt| dt.with_timezone(&Utc))
.or_else(|_| DateTime::parse_from_str(timestamp_str, "%Y-%m-%dT%H:%M:%S%.fZ").map(|dt| dt.with_timezone(&Utc)))
.unwrap_or_else(|_| {
warn!("Failed to parse timestamp '{}' for alert ID '{}'. Using current time.", timestamp_str, id);
Utc::now()
});
let notes = "Data fetched via Wazuh API".to_string();
json!({
"protocol_version": "1.0",
"source": "Wazuh",
"timestamp": timestamp.to_rfc3339_opts(SecondsFormat::Secs, true),
"event_type": event_type,
"context": {
"id": id,
"category": category,
"severity": severity,
"description": description,
"agent": agent,
"data": data
},
"metadata": {
"integration": "Wazuh-MCP",
"notes": notes
}
})
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::TimeZone;
use serde_json::json;
#[test]
fn test_transform_to_mcp_basic() {
let event_time_str = "2023-10-27T10:30:00.123Z";
let event_time = Utc.datetime_from_str(event_time_str, "%Y-%m-%dT%H:%M:%S%.fZ").unwrap();
let event = json!({
"id": "alert1",
"_id": "wazuh_alert_id_1",
"timestamp": event_time_str,
"rule": {
"level": 10,
"description": "High severity rule triggered",
"id": "1002",
"groups": ["gdpr", "pci_dss", "intrusion_detection"]
},
"agent": {
"id": "001",
"name": "server-db"
},
"data": {
"srcip": "1.2.3.4",
"dstport": "22"
}
});
let result = transform_to_mcp(event.clone(), "alert".to_string());
assert_eq!(result["protocol_version"], "1.0");
assert_eq!(result["source"], "Wazuh");
assert_eq!(result["event_type"], "alert");
assert_eq!(result["timestamp"], event_time.to_rfc3339_opts(SecondsFormat::Secs, true));
let context = &result["context"];
assert_eq!(context["id"], "alert1");
assert_eq!(context["category"], "gdpr");
assert_eq!(context["severity"], "high");
assert_eq!(context["description"], "High severity rule triggered");
assert_eq!(context["agent"]["name"], "server-db");
assert_eq!(context["data"]["srcip"], "1.2.3.4");
let metadata = &result["metadata"];
assert_eq!(metadata["integration"], "Wazuh-MCP");
assert_eq!(metadata["notes"], "Data fetched via Wazuh API");
}
#[test]
fn test_transform_to_mcp_with_source_nesting() {
let event_time_str = "2023-10-27T11:00:00Z";
let event_time = DateTime::parse_from_rfc3339(event_time_str).unwrap().with_timezone(&Utc);
let event = json!({
"_index": "wazuh-alerts-4.x-2023.10.27",
"_id": "alert_source_nested",
"_source": {
"id": "nested_alert_id",
"timestamp": event_time_str,
"rule": {
"level": 5,
"description": "Medium severity rule",
"groups": ["system_audit"]
},
"agent": { "id": "002", "name": "web-server" },
"data": { "command": "useradd test" }
}
});
let result = transform_to_mcp(event.clone(), "alert".to_string());
assert_eq!(result["timestamp"], event_time.to_rfc3339_opts(SecondsFormat::Secs, true));
let context = &result["context"];
assert_eq!(context["id"], "nested_alert_id");
assert_eq!(context["category"], "system_audit");
assert_eq!(context["severity"], "medium");
assert_eq!(context["description"], "Medium severity rule");
assert_eq!(context["agent"]["name"], "web-server");
assert_eq!(context["data"]["command"], "useradd test");
}
#[test]
fn test_transform_to_mcp_with_defaults() {
let event = json!({});
let before_transform = Utc::now();
let result = transform_to_mcp(event, "alert".to_string());
let after_transform = Utc::now();
assert_eq!(result["context"]["id"], "unknown_id");
assert_eq!(result["context"]["category"], "unknown_category");
assert_eq!(result["context"]["severity"], "unknown_severity");
assert_eq!(result["context"]["description"], "");
assert!(result["context"]["data"].is_object());
assert!(result["context"]["agent"].is_object());
assert_eq!(result["metadata"]["notes"], "Data fetched via Wazuh API");
let result_ts_str = result["timestamp"].as_str().unwrap();
let result_ts = DateTime::parse_from_rfc3339(result_ts_str).unwrap().with_timezone(&Utc);
assert!(result_ts.timestamp() >= before_transform.timestamp() && result_ts.timestamp() <= after_transform.timestamp());
}
#[test]
fn test_transform_timestamp_parsing_fallback() {
let event = json!({
"id": "ts_test",
"timestamp": "invalid-timestamp-format",
"rule": { "level": 3 },
});
let before_transform = Utc::now();
let result = transform_to_mcp(event, "alert".to_string());
let after_transform = Utc::now();
let result_ts_str = result["timestamp"].as_str().unwrap();
let result_ts = DateTime::parse_from_rfc3339(result_ts_str).unwrap().with_timezone(&Utc);
assert!(result_ts.timestamp() >= before_transform.timestamp() && result_ts.timestamp() <= after_transform.timestamp());
assert_eq!(result["context"]["id"], "ts_test");
assert_eq!(result["context"]["severity"], "low");
}
}

168
src/wazuh/client.rs Normal file
View File

@ -0,0 +1,168 @@
use reqwest::{header, Client};
use serde_json::Value;
use std::time::{Duration, SystemTime};
use tracing::{info, warn};
use super::error::WazuhApiError;
pub struct WazuhApiClient {
username: String,
password: String,
base_url: String,
jwt_token: Option<String>,
jwt_expiration: Option<SystemTime>,
auth_endpoint: String,
http_client: Client,
}
impl WazuhApiClient {
pub fn new(
host: String,
port: u16,
username: String,
password: String,
verify_ssl: bool,
) -> Self {
let base_url = format!("https://{}:{}", host, port);
let http_client = Client::builder()
.danger_accept_invalid_certs(!verify_ssl)
.timeout(Duration::from_secs(10))
.build()
.expect("Failed to create HTTP client");
Self {
username,
password,
base_url,
jwt_token: None,
jwt_expiration: None,
auth_endpoint: "/security/user/authenticate".to_string(),
http_client,
}
}
fn is_jwt_valid(&self) -> bool {
match (self.jwt_token.as_ref(), self.jwt_expiration) {
(Some(_), Some(expiration)) => match expiration.duration_since(SystemTime::now()) {
Ok(remaining) => remaining.as_secs() > 60,
Err(_) => false,
},
_ => false,
}
}
pub async fn get_jwt(&mut self) -> Result<String, WazuhApiError> {
if self.is_jwt_valid() {
return Ok(self.jwt_token.clone().unwrap());
}
let auth_url = format!("{}{}", self.base_url, self.auth_endpoint);
info!("Requesting new JWT token from {}", auth_url);
let response = self
.http_client
.post(&auth_url)
.basic_auth(&self.username, Some(&self.password))
.send()
.await?;
if !response.status().is_success() {
let status = response.status();
let error_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
return Err(WazuhApiError::AuthenticationError(format!(
"Authentication failed with status {}: {}",
status, error_text
)));
}
let data: Value = response.json().await?;
let token = data
.get("jwt")
.and_then(|t| t.as_str())
.ok_or(WazuhApiError::JwtNotFound)?
.to_string();
self.jwt_token = Some(token.clone());
self.jwt_expiration = Some(SystemTime::now() + Duration::from_secs(5 * 60));
info!("Obtained new JWT token valid for 5 minutes");
Ok(token)
}
async fn make_request(
&mut self,
method: reqwest::Method,
endpoint: &str,
body: Option<Value>,
) -> Result<Value, WazuhApiError> {
let jwt_token = self.get_jwt().await?;
let url = format!("{}{}", self.base_url, endpoint);
let mut request_builder = self
.http_client
.request(method.clone(), &url)
.header(header::AUTHORIZATION, format!("Bearer {}", jwt_token));
if let Some(json_body) = &body {
request_builder = request_builder.json(json_body);
}
let response = request_builder.send().await?;
if response.status() == reqwest::StatusCode::UNAUTHORIZED {
warn!("JWT expired. Re-authenticating and retrying request.");
self.jwt_token = None;
let new_jwt_token = self.get_jwt().await?;
let mut retry_builder = self
.http_client
.request(method, &url)
.header(header::AUTHORIZATION, format!("Bearer {}", new_jwt_token));
if let Some(json_body) = &body {
retry_builder = retry_builder.json(json_body);
}
let retry_response = retry_builder.send().await?;
if !retry_response.status().is_success() {
let status = retry_response.status();
let error_text = retry_response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
return Err(WazuhApiError::ApiError(format!(
"API request failed with status {}: {}",
status, error_text
)));
}
Ok(retry_response.json().await?)
} else if !response.status().is_success() {
let status = response.status();
let error_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
Err(WazuhApiError::ApiError(format!(
"API request failed with status {}: {}",
status, error_text
)))
} else {
Ok(response.json().await?)
}
}
pub async fn get_alerts(&mut self, query: Value) -> Result<Value, WazuhApiError> {
let index_pattern = "wazuh-alerts-*";
let endpoint = format!("/{}_search", index_pattern);
info!("Retrieving alerts with index pattern '{}'", index_pattern);
self.make_request(reqwest::Method::GET, &endpoint, Some(query))
.await
}
}

89
src/wazuh/error.rs Normal file
View File

@ -0,0 +1,89 @@
use thiserror::Error;
use axum::{
http::StatusCode,
response::{IntoResponse, Response},
Json,
};
use serde_json::json;
#[derive(Error, Debug)]
#[allow(dead_code)]
pub enum WazuhApiError {
#[error("Failed to create HTTP client: {0}")]
HttpClientCreationError(reqwest::Error),
#[error("HTTP request error: {0}")]
RequestError(#[from] reqwest::Error),
#[error("JWT token not found in Wazuh API response")]
JwtNotFound,
#[error("Wazuh API Authentication failed: {0}")]
AuthenticationError(String),
#[error("Failed to decode JWT: {0}")]
JwtDecodingError(#[from] jsonwebtoken::errors::Error),
#[error("JSON parsing error: {0}")]
JsonError(#[from] serde_json::Error),
#[error("Wazuh API error: {0}")]
ApiError(String),
#[error("Alert with ID '{0}' not found")]
AlertNotFound(String),
}
impl IntoResponse for WazuhApiError {
fn into_response(self) -> Response {
let (status, error_message) = match self {
WazuhApiError::HttpClientCreationError(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Internal server error: {}", e),
),
WazuhApiError::RequestError(e) => {
if e.is_connect() || e.is_timeout() {
(
StatusCode::BAD_GATEWAY,
format!("Could not connect to Wazuh API: {}", e),
)
} else {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Wazuh API request error: {}", e),
)
}
}
WazuhApiError::JwtNotFound => (
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to retrieve JWT token from Wazuh API response".to_string(),
),
WazuhApiError::AuthenticationError(msg) => (
StatusCode::UNAUTHORIZED,
format!("Wazuh API authentication failed: {}", msg),
),
WazuhApiError::JwtDecodingError(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to decode JWT token: {}", e),
),
WazuhApiError::JsonError(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to parse Wazuh API response: {}", e),
),
WazuhApiError::ApiError(msg) => {
(StatusCode::BAD_GATEWAY, format!("Wazuh API error: {}", msg))
}
WazuhApiError::AlertNotFound(id) => (
StatusCode::NOT_FOUND,
format!("Alert with ID '{}' not found", id),
),
};
let body = Json(json!({
"error": error_message,
}));
(status, body).into_response()
}
}

2
src/wazuh/mod.rs Normal file
View File

@ -0,0 +1,2 @@
pub mod client;
pub mod error;

60
tests/README.md Normal file
View File

@ -0,0 +1,60 @@
# Wazuh MCP Server Tests
This directory contains tests for the Wazuh MCP Server, including end-to-end tests that simulate a client interacting with the server.
## Test Files
- `e2e_client_test.rs`: End-to-end test for MCP client interacting with Wazuh MCP server
- `integration_test.rs`: Integration test for Wazuh MCP Server with a mock Wazuh API
- `mcp_client.rs`: Reusable MCP client implementation
- `mcp_client_cli.rs`: Command-line tool for interacting with the MCP server
## Running the Tests
To run all tests:
```bash
cargo test
```
To run a specific test:
```bash
cargo test --test e2e_client_test
cargo test --test integration_test
```
## Using the MCP Client CLI
The MCP Client CLI can be used to interact with the MCP server for testing purposes:
```bash
# Build the CLI
cargo build --bin mcp_client_cli
# Run the CLI
MCP_SERVER_URL=http://localhost:8000 ./target/debug/mcp_client_cli get-data
MCP_SERVER_URL=http://localhost:8000 ./target/debug/mcp_client_cli health
MCP_SERVER_URL=http://localhost:8000 ./target/debug/mcp_client_cli query '{"severity": "high"}'
```
## Test Environment Variables
The tests use the following environment variables:
- `MCP_SERVER_URL`: URL of the MCP server (default: http://localhost:8000)
- `WAZUH_HOST`: Hostname of the Wazuh API server
- `WAZUH_PORT`: Port of the Wazuh API server
- `WAZUH_USER`: Username for Wazuh API authentication
- `WAZUH_PASS`: Password for Wazuh API authentication
- `VERIFY_SSL`: Whether to verify SSL certificates (default: false)
- `RUST_LOG`: Log level for the tests (default: info)
## Mock Wazuh API Server
The tests use a mock Wazuh API server to simulate the Wazuh API. The mock server provides:
- Authentication endpoint: `/security/user/authenticate`
- Alerts endpoint: `/wazuh-alerts-*_search`
The mock server returns predefined responses for these endpoints, allowing the tests to run without a real Wazuh API server.

194
tests/e2e_client_test.rs Normal file
View File

@ -0,0 +1,194 @@
use anyhow::Result;
use httpmock::prelude::*;
use reqwest::Client;
use serde_json::{json, Value};
use std::process::{Child, Command};
use std::time::Duration;
use tokio::time::sleep;
use uuid::Uuid;
struct MockWazuhServer {
server: MockServer,
}
impl MockWazuhServer {
fn new() -> Self {
let server = MockServer::start();
server.mock(|when, then| {
when.method(POST)
.path("/security/user/authenticate")
.header("Authorization", "Basic YWRtaW46YWRtaW4=");
then.status(200)
.header("content-type", "application/json")
.json_body(json!({
"jwt": "mock.jwt.token"
}));
});
server.mock(|when, then| {
when.method(GET)
.path("/wazuh-alerts-*_search")
.header("Authorization", "Bearer mock.jwt.token");
then.status(200)
.header("content-type", "application/json")
.json_body(json!({
"hits": {
"hits": [
{
"_source": {
"id": "12345",
"category": "intrusion_detection",
"severity": "high",
"description": "Possible intrusion attempt detected",
"data": {
"source_ip": "192.168.1.100",
"destination_ip": "10.0.0.1",
"port": 22
},
"notes": "Test alert"
}
},
{
"_source": {
"id": "67890",
"category": "malware",
"severity": "critical",
"description": "Malware detected on system",
"data": {
"file_path": "/tmp/malicious.exe",
"hash": "abcdef123456",
"signature": "EICAR-Test-File"
}
}
}
]
}
}));
});
Self { server }
}
fn url(&self) -> String {
self.server.url("")
}
}
struct McpClient {
client: Client,
base_url: String,
}
impl McpClient {
fn new(base_url: String) -> Self {
let client = Client::builder()
.timeout(Duration::from_secs(10))
.build()
.expect("Failed to create HTTP client");
Self { client, base_url }
}
async fn get_mcp_data(&self) -> Result<Vec<Value>> {
let url = format!("{}/mcp", self.base_url);
let response = self.client.get(&url).send().await?;
if !response.status().is_success() {
anyhow::bail!("MCP request failed with status: {}", response.status());
}
let data = response.json::<Vec<Value>>().await?;
Ok(data)
}
async fn check_health(&self) -> Result<Value> {
let url = format!("{}/health", self.base_url);
let response = self.client.get(&url).send().await?;
if !response.status().is_success() {
anyhow::bail!("Health check failed with status: {}", response.status());
}
let data = response.json::<Value>().await?;
Ok(data)
}
}
fn start_mcp_server(wazuh_url: &str, port: u16) -> Child {
let server_id = Uuid::new_v4().to_string();
let wazuh_host_port: Vec<&str> = wazuh_url.trim_start_matches("http://").split(':').collect();
let wazuh_host = wazuh_host_port[0];
let wazuh_port = wazuh_host_port[1];
Command::new("cargo")
.args(["run", "--"])
.env("WAZUH_HOST", wazuh_host)
.env("WAZUH_PORT", wazuh_port)
.env("WAZUH_USER", "admin")
.env("WAZUH_PASS", "admin")
.env("VERIFY_SSL", "false")
.env("MCP_SERVER_PORT", port.to_string())
.env("RUST_LOG", "info")
.env("SERVER_ID", server_id)
.spawn()
.expect("Failed to start MCP server")
}
#[tokio::test]
async fn test_mcp_client_integration() -> Result<()> {
let mock_wazuh = MockWazuhServer::new();
let wazuh_url = mock_wazuh.url();
let mcp_port = 8765;
let mut mcp_server = start_mcp_server(&wazuh_url, mcp_port);
sleep(Duration::from_secs(2)).await;
let mcp_client = McpClient::new(format!("http://localhost:{}", mcp_port));
let health_data = mcp_client.check_health().await?;
assert_eq!(health_data["status"], "ok");
assert_eq!(health_data["service"], "wazuh-mcp-server");
let mcp_data = mcp_client.get_mcp_data().await?;
assert_eq!(mcp_data.len(), 2);
let first_message = &mcp_data[0];
assert_eq!(first_message["protocol_version"], "1.0");
assert_eq!(first_message["source"], "Wazuh");
assert_eq!(first_message["event_type"], "alert");
let context = &first_message["context"];
assert_eq!(context["id"], "12345");
assert_eq!(context["category"], "intrusion_detection");
assert_eq!(context["severity"], "high");
assert_eq!(
context["description"],
"Possible intrusion attempt detected"
);
let data = &context["data"];
assert_eq!(data["source_ip"], "192.168.1.100");
assert_eq!(data["destination_ip"], "10.0.0.1");
assert_eq!(data["port"], 22);
let second_message = &mcp_data[1];
let context = &second_message["context"];
assert_eq!(context["id"], "67890");
assert_eq!(context["category"], "malware");
assert_eq!(context["severity"], "critical");
assert_eq!(context["description"], "Malware detected on system");
let data = &context["data"];
assert_eq!(data["file_path"], "/tmp/malicious.exe");
assert_eq!(data["hash"], "abcdef123456");
assert_eq!(data["signature"], "EICAR-Test-File");
mcp_server.kill().expect("Failed to kill MCP server");
Ok(())
}

420
tests/integration_test.rs Normal file
View File

@ -0,0 +1,420 @@
use anyhow::Result;
use chrono::{DateTime, Utc};
use httpmock::prelude::*;
use once_cell::sync::Lazy;
use serde_json::json;
use std::net::TcpListener;
use std::process::{Child, Command};
use std::sync::Mutex;
use std::time::Duration;
use tokio::time::sleep;
mod mcp_client;
use mcp_client::{McpClient, McpClientTrait, McpMessage};
static TEST_MUTEX: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
fn find_available_port() -> u16 {
let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind to random port");
let port = listener
.local_addr()
.expect("Failed to get local address")
.port();
drop(listener);
port
}
struct MockWazuhServer {
server: MockServer,
}
impl MockWazuhServer {
fn new() -> Self {
let server = MockServer::start();
server.mock(|when, then| {
when.method(POST).path("/security/user/authenticate");
then.status(200)
.header("content-type", "application/json")
.json_body(json!({
"jwt": "mock.jwt.token"
}));
});
server.mock(|when, then| {
when.method(GET).path("/wazuh-alerts-*_search");
then.status(200)
.header("content-type", "application/json")
.json_body(json!({
"hits": {
"hits": [
{
"_source": {
"id": "12345",
"category": "intrusion_detection",
"severity": "high",
"description": "Possible intrusion attempt detected",
"data": {
"source_ip": "192.168.1.100",
"destination_ip": "10.0.0.1",
"port": 22
},
"notes": "Test alert"
}
},
{
"_source": {
"id": "67890",
"category": "malware",
"severity": "critical",
"description": "Malware detected on system",
"data": {
"file_path": "/tmp/malicious.exe",
"hash": "abcdef123456",
"signature": "EICAR-Test-File"
}
}
}
]
}
}));
});
Self { server }
}
fn url(&self) -> String {
self.server.url("")
}
fn host(&self) -> String {
let url = self.url();
let parts: Vec<&str> = url.trim_start_matches("http://").split(':').collect();
parts[0].to_string()
}
fn port(&self) -> u16 {
let url = self.url();
let parts: Vec<&str> = url.trim_start_matches("http://").split(':').collect();
parts[1].parse().unwrap()
}
}
fn setup_mock_wazuh_server() -> MockServer {
let server = MockServer::start();
server.mock(|when, then| {
when.method(POST).path("/security/user/authenticate");
then.status(200)
.header("content-type", "application/json")
.json_body(json!({ "jwt": "mock.jwt.token" }));
});
server.mock(|when, then| {
when.method(GET)
.path_matches(Regex::new(r"/wazuh-alerts-.*_search").unwrap());
then.status(200)
.header("content-type", "application/json")
.json_body(json!({
"hits": {
"hits": [
{
"_source": {
"id": "12345",
"timestamp": "2024-01-01T10:00:00.000Z",
"rule": {
"level": 9,
"description": "Possible intrusion attempt detected",
"groups": ["intrusion_detection", "pci_dss"]
},
"agent": { "id": "001", "name": "test-agent" },
"data": {
"source_ip": "192.168.1.100",
"destination_ip": "10.0.0.1",
"port": 22
}
}
},
{
"_source": {
"id": "67890",
"timestamp": "2024-01-01T11:00:00.000Z",
"rule": {
"level": 12,
"description": "Malware detected on system",
"groups": ["malware"]
},
"agent": { "id": "002", "name": "another-agent" },
"data": {
"file_path": "/tmp/malicious.exe",
"hash": "abcdef123456",
"signature": "EICAR-Test-File"
}
}
}
]
}
}));
});
server
}
fn get_host_port(server: &MockServer) -> (String, u16) {
let url = server.url("");
let parts: Vec<&str> = url.trim_start_matches("http://").split(':').collect();
let host = parts[0].to_string();
let port = parts[1].parse().unwrap();
(host, port)
}
fn start_mcp_server(wazuh_host: &str, wazuh_port: u16, mcp_port: u16) -> Child {
Command::new("cargo")
.args(["run", "--"])
.env("WAZUH_HOST", wazuh_host)
.env("WAZUH_PORT", wazuh_port.to_string())
.env("WAZUH_USER", "admin")
.env("WAZUH_PASS", "admin")
.env("VERIFY_SSL", "false")
.env("MCP_SERVER_PORT", mcp_port.to_string())
.env("RUST_LOG", "info")
.spawn()
.expect("Failed to start MCP server")
}
#[tokio::test]
async fn test_mcp_server_with_mock_wazuh() -> Result<()> {
let _guard = TEST_MUTEX.lock().unwrap();
let mock_wazuh_server = setup_mock_wazuh_server();
let (wazuh_host, wazuh_port) = get_host_port(&mock_wazuh_server);
let mcp_port = find_available_port();
let mut mcp_server = start_mcp_server(&wazuh_host, wazuh_port, mcp_port);
sleep(Duration::from_secs(2)).await;
let mcp_client = McpClient::new(format!("http://localhost:{}", mcp_port));
let health_data = mcp_client.check_health().await?;
assert_eq!(health_data["status"], "ok");
assert_eq!(health_data["service"], "wazuh-mcp-server");
let mcp_data = mcp_client.get_mcp_data().await?;
assert_eq!(mcp_data.len(), 2);
let first_message: &McpMessage = &mcp_data[0];
assert_eq!(first_message.protocol_version, "1.0");
assert_eq!(first_message.source, "Wazuh");
assert_eq!(first_message.event_type, "alert");
let context = &first_message.context;
assert_eq!(context["id"], "12345");
assert_eq!(context["category"], "intrusion_detection");
assert_eq!(context["severity"], "high");
assert_eq!(
context["description"],
"Possible intrusion attempt detected"
);
assert_eq!(context["agent"]["name"], "test-agent");
let data = &context["data"];
assert_eq!(data["source_ip"], "192.168.1.100");
assert_eq!(data["destination_ip"], "10.0.0.1");
assert_eq!(data["port"], 22);
let second_message = &mcp_data[1];
let context = &second_message.context;
assert_eq!(context["id"], "67890");
assert_eq!(context["category"], "malware");
assert_eq!(context["severity"], "critical");
assert_eq!(context["description"], "Malware detected on system");
assert_eq!(context["agent"]["name"], "another-agent");
let data = &context["data"];
assert_eq!(data["file_path"], "/tmp/malicious.exe");
assert_eq!(data["hash"], "abcdef123456");
assert_eq!(data["signature"], "EICAR-Test-File");
mcp_server.kill().expect("Failed to kill MCP server");
Ok(())
}
#[tokio::test]
async fn test_mcp_server_wazuh_api_error() -> Result<()> {
let _guard = TEST_MUTEX.lock().unwrap();
let mock_wazuh_server = setup_mock_wazuh_server();
let (wazuh_host, wazuh_port) = get_host_port(&mock_wazuh_server);
mock_wazuh_server.mock(|when, then| {
when.method(GET)
.path_matches(Regex::new(r"/wazuh-alerts-.*_search").unwrap());
then.status(500)
.header("content-type", "application/json")
.json_body(json!({"error": "Wazuh internal error"}));
});
let mcp_port = find_available_port();
let mut mcp_server = start_mcp_server(&wazuh_host, wazuh_port, mcp_port);
sleep(Duration::from_secs(2)).await;
let mcp_client = McpClient::new(format!("http://localhost:{}", mcp_port));
let result = mcp_client.get_mcp_data().await;
assert!(result.is_err());
let err_string = result.unwrap_err().to_string();
assert!(
err_string.contains("500")
|| err_string.contains("502")
|| err_string.contains("API request failed")
);
let health_result = mcp_client.check_health().await;
assert!(health_result.is_ok());
assert_eq!(health_result.unwrap()["status"], "ok");
mcp_server.kill().expect("Failed to kill MCP server");
Ok(())
}
#[tokio::test]
async fn test_mcp_client_error_handling() -> Result<()> {
let _guard = TEST_MUTEX.lock().unwrap();
let server = MockServer::start();
server.mock(|when, then| {
when.method(GET).path("/mcp");
then.status(500)
.header("content-type", "application/json")
.json_body(json!({
"error": "Internal server error"
}));
});
server.mock(|when, then| {
when.method(GET).path("/health");
then.status(503)
.header("content-type", "application/json")
.json_body(json!({
"error": "Service unavailable"
}));
});
let client = McpClient::new(server.url(""));
let result = client.get_mcp_data().await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.to_string().contains("500") || err.to_string().contains("MCP request failed"));
let result = client.check_health().await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.to_string().contains("503") || err.to_string().contains("Health check failed"));
Ok(())
}
#[tokio::test]
async fn test_mcp_server_missing_alert_data() -> Result<()> {
let _guard = TEST_MUTEX.lock().unwrap();
let mock_wazuh_server = setup_mock_wazuh_server();
let (wazuh_host, wazuh_port) = get_host_port(&mock_wazuh_server);
mock_wazuh_server.mock(|when, then| {
when.method(GET)
.path_matches(Regex::new(r"/wazuh-alerts-.*_search").unwrap());
then.status(200)
.header("content-type", "application/json")
.json_body(json!({
"hits": {
"hits": [
{
"_source": {
"id": "missing_all",
"timestamp": "invalid-date-format"
}
},
{
"_source": {
"id": "missing_rule_fields",
"timestamp": "2024-05-05T11:00:00.000Z",
"rule": { },
"agent": { "id": "003", "name": "agent-minimal" },
"data": {}
}
},
{
"id": "no_source_nest",
"timestamp": "2024-05-05T12:00:00.000Z",
"rule": {
"level": 2,
"description": "Low severity event",
"groups": ["low_sev"]
},
"agent": { "id": "004" },
"data": { "info": "some data" }
}
]
}
}));
});
let mcp_port = find_available_port();
let mut mcp_server = start_mcp_server(&wazuh_host, wazuh_port, mcp_port);
sleep(Duration::from_secs(2)).await;
let mcp_client = McpClient::new(format!("http://localhost:{}", mcp_port));
let mcp_data = mcp_client.get_mcp_data().await?;
assert_eq!(mcp_data.len(), 3);
let msg1 = &mcp_data[0];
assert_eq!(msg1.context["id"], "missing_all");
assert_eq!(msg1.context["category"], "unknown_category");
assert_eq!(msg1.context["severity"], "unknown_severity");
assert_eq!(msg1.context["description"], "");
assert!(
msg1.context["agent"].is_object() && msg1.context["agent"].as_object().unwrap().is_empty()
);
assert!(
msg1.context["data"].is_object() && msg1.context["data"].as_object().unwrap().is_empty()
);
let ts1 = DateTime::parse_from_rfc3339(&msg1.timestamp)
.unwrap()
.with_timezone(&Utc);
assert!((Utc::now() - ts1).num_seconds() < 5);
let msg2 = &mcp_data[1];
assert_eq!(msg2.context["id"], "missing_rule_fields");
assert_eq!(msg2.context["category"], "unknown_category");
assert_eq!(msg2.context["severity"], "unknown_severity");
assert_eq!(msg2.context["description"], "");
assert_eq!(msg2.context["agent"]["name"], "agent-minimal");
assert!(
msg2.context["data"].is_object() && msg2.context["data"].as_object().unwrap().is_empty()
);
assert_eq!(msg2.timestamp, "2024-05-05T11:00:00Z");
let msg3 = &mcp_data[2];
assert_eq!(msg3.context["id"], "no_source_nest");
assert_eq!(msg3.context["category"], "low_sev");
assert_eq!(msg3.context["severity"], "low");
assert_eq!(msg3.context["description"], "Low severity event");
assert_eq!(msg3.context["agent"]["id"], "004");
assert!(msg3.context["agent"].get("name").is_none());
assert_eq!(msg3.context["data"]["info"], "some data");
assert_eq!(msg3.timestamp, "2024-05-05T12:00:00Z");
mcp_server.kill().expect("Failed to kill MCP server");
Ok(())
}

180
tests/mcp_client.rs Normal file
View File

@ -0,0 +1,180 @@
use anyhow::Result;
use async_trait::async_trait;
use reqwest::{Client, StatusCode};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::time::Duration;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct McpMessage {
pub protocol_version: String,
pub source: String,
pub timestamp: String,
pub event_type: String,
pub context: Value,
pub metadata: Value,
}
#[async_trait]
pub trait McpClientTrait {
async fn get_mcp_data(&self) -> Result<Vec<McpMessage>>;
async fn check_health(&self) -> Result<Value>;
async fn query_mcp_data(&self, filters: Value) -> Result<Vec<McpMessage>>;
}
pub struct McpClient {
client: Client,
base_url: String,
}
impl McpClient {
pub fn new(base_url: String) -> Self {
let client = Client::builder()
.timeout(Duration::from_secs(30))
.build()
.expect("Failed to create HTTP client");
Self { client, base_url }
}
}
#[async_trait]
impl McpClientTrait for McpClient {
async fn get_mcp_data(&self) -> Result<Vec<McpMessage>> {
let url = format!("{}/mcp", self.base_url);
let response = self.client.get(&url).send().await?;
match response.status() {
StatusCode::OK => {
let data = response.json::<Vec<McpMessage>>().await?;
Ok(data)
}
status => {
let error_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
anyhow::bail!("MCP request failed with status {}: {}", status, error_text)
}
}
}
async fn check_health(&self) -> Result<Value> {
let url = format!("{}/health", self.base_url);
let response = self.client.get(&url).send().await?;
match response.status() {
StatusCode::OK => {
let data = response.json::<Value>().await?;
Ok(data)
}
status => {
let error_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
anyhow::bail!("Health check failed with status {}: {}", status, error_text)
}
}
}
async fn query_mcp_data(&self, filters: Value) -> Result<Vec<McpMessage>> {
let url = format!("{}/mcp", self.base_url);
let response = self.client.post(&url).json(&filters).send().await?;
match response.status() {
StatusCode::OK => {
let data = response.json::<Vec<McpMessage>>().await?;
Ok(data)
}
status => {
let error_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
anyhow::bail!("MCP query failed with status {}: {}", status, error_text)
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use httpmock::prelude::*;
use serde_json::json;
use tokio;
#[tokio::test]
async fn test_mcp_client_get_data() {
let server = MockServer::start();
server.mock(|when, then| {
when.method(GET).path("/mcp");
then.status(200)
.header("content-type", "application/json")
.json_body(json!([
{
"protocol_version": "1.0",
"source": "Wazuh",
"timestamp": "2023-05-01T12:00:00Z",
"event_type": "alert",
"context": {
"id": "12345",
"category": "intrusion_detection",
"severity": "high",
"description": "Test alert",
"data": {
"source_ip": "192.168.1.100"
}
},
"metadata": {
"integration": "Wazuh-MCP",
"notes": "Test note"
}
}
]));
});
let client = McpClient::new(server.url(""));
let result = client.get_mcp_data().await.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0].protocol_version, "1.0");
assert_eq!(result[0].source, "Wazuh");
assert_eq!(result[0].event_type, "alert");
let context = &result[0].context;
assert_eq!(context["id"], "12345");
assert_eq!(context["category"], "intrusion_detection");
assert_eq!(context["severity"], "high");
}
#[tokio::test]
async fn test_mcp_client_health_check() {
let server = MockServer::start();
server.mock(|when, then| {
when.method(GET).path("/health");
then.status(200)
.header("content-type", "application/json")
.json_body(json!({
"status": "ok",
"service": "wazuh-mcp-server",
"timestamp": "2023-05-01T12:00:00Z"
}));
});
let client = McpClient::new(server.url(""));
let result = client.check_health().await.unwrap();
assert_eq!(result["status"], "ok");
assert_eq!(result["service"], "wazuh-mcp-server");
}
}

97
tests/mcp_client_cli.rs Normal file
View File

@ -0,0 +1,97 @@
use anyhow::Result;
use serde_json::Value;
use std::env;
use std::process;
mod mcp_client;
use mcp_client::{McpClient, McpClientTrait};
#[tokio::main]
async fn main() -> Result<()> {
let args: Vec<String> = env::args().collect();
if args.len() < 2 {
eprintln!("Usage: {} <command> [options]", args[0]);
eprintln!("Commands:");
eprintln!(" get-data - Get MCP data from the server");
eprintln!(" health - Check server health");
eprintln!(" query - Query MCP data with filters");
process::exit(1);
}
let command = &args[1];
let mcp_url =
env::var("MCP_SERVER_URL").unwrap_or_else(|_| "http://localhost:8000".to_string());
println!("Connecting to MCP server at: {}", mcp_url);
let client = McpClient::new(mcp_url);
match command.as_str() {
"get-data" => {
println!("Fetching MCP data...");
let data = client.get_mcp_data().await?;
println!("Received {} MCP messages:", data.len());
for (i, message) in data.iter().enumerate() {
println!("\nMessage {}:", i + 1);
println!(" Source: {}", message.source);
println!(" Event Type: {}", message.event_type);
println!(" Timestamp: {}", message.timestamp);
let context = &message.context;
println!(" Context:");
println!(" ID: {}", context["id"]);
println!(" Category: {}", context["category"]);
println!(" Severity: {}", context["severity"]);
println!(" Description: {}", context["description"]);
if let Some(data) = context.get("data").and_then(|d| d.as_object()) {
println!(" Data:");
for (key, value) in data {
println!(" {}: {}", key, value);
}
}
}
}
"health" => {
println!("Checking server health...");
let health = client.check_health().await?;
println!("Health status: {}", health["status"]);
println!("Service: {}", health["service"]);
println!("Timestamp: {}", health["timestamp"]);
}
"query" => {
if args.len() < 3 {
eprintln!("Error: Missing query parameters");
eprintln!("Usage: {} query <json_filter>", args[0]);
process::exit(1);
}
let filter_str = &args[2];
let filters: Value = serde_json::from_str(filter_str)?;
println!("Querying MCP data with filters: {}", filters);
let data = client.query_mcp_data(filters).await?;
println!("Received {} MCP messages:", data.len());
for (i, message) in data.iter().enumerate() {
println!("\nMessage {}:", i + 1);
println!(" Source: {}", message.source);
println!(" Event Type: {}", message.event_type);
let context = &message.context;
println!(" Context:");
println!(" ID: {}", context["id"]);
println!(" Category: {}", context["category"]);
println!(" Severity: {}", context["severity"]);
}
}
_ => {
eprintln!("Error: Unknown command '{}'", command);
process::exit(1);
}
}
Ok(())
}

15
tests/run_tests.sh Executable file
View File

@ -0,0 +1,15 @@
#!/bin/bash
echo "Running all tests..."
cargo test
echo "Building MCP client CLI..."
cargo build --bin mcp_client_cli
if nc -z localhost 8000 2>/dev/null; then
echo "Testing MCP client CLI against running server..."
./target/debug/mcp_client_cli health
./target/debug/mcp_client_cli get-data
else
echo "MCP server is not running. Start it with 'cargo run' to test the CLI."
fi