feat(mcp_service): enhance MCP server handling to support custom MCP servers
This commit is contained in:
parent
442369505a
commit
f98a6c405e
@ -96,7 +96,7 @@ class AgentBuilder:
|
|||||||
# Get MCP tools from the configuration
|
# Get MCP tools from the configuration
|
||||||
mcp_tools = []
|
mcp_tools = []
|
||||||
mcp_exit_stack = None
|
mcp_exit_stack = None
|
||||||
if agent.config.get("mcp_servers"):
|
if agent.config.get("mcp_servers") or agent.config.get("custom_mcp_servers"):
|
||||||
mcp_tools, mcp_exit_stack = await self.mcp_service.build_tools(
|
mcp_tools, mcp_exit_stack = await self.mcp_service.build_tools(
|
||||||
agent.config, self.db
|
agent.config, self.db
|
||||||
)
|
)
|
||||||
|
@ -47,6 +47,7 @@ class MCPService:
|
|||||||
tools, exit_stack = await MCPToolset.from_server(
|
tools, exit_stack = await MCPToolset.from_server(
|
||||||
connection_params=connection_params
|
connection_params=connection_params
|
||||||
)
|
)
|
||||||
|
|
||||||
return tools, exit_stack
|
return tools, exit_stack
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@ -93,71 +94,106 @@ class MCPService:
|
|||||||
self.exit_stack = AsyncExitStack()
|
self.exit_stack = AsyncExitStack()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Process each MCP server in the configuration
|
mcp_servers = mcp_config.get("mcp_servers", [])
|
||||||
for server in mcp_config.get("mcp_servers", []):
|
if mcp_servers is not None:
|
||||||
try:
|
# Process each MCP server in the configuration
|
||||||
# Search for the MCP server in the database
|
for server in mcp_servers:
|
||||||
mcp_server = get_mcp_server(db, server["id"])
|
try:
|
||||||
if not mcp_server:
|
# Search for the MCP server in the database
|
||||||
logger.warning(f"MCP Server not found: {server['id']}")
|
mcp_server = get_mcp_server(db, server["id"])
|
||||||
|
if not mcp_server:
|
||||||
|
logger.warning(f"MCP Server not found: {server['id']}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Prepares the server configuration
|
||||||
|
server_config = mcp_server.config_json.copy()
|
||||||
|
|
||||||
|
# Replaces the environment variables in the config_json
|
||||||
|
if "env" in server_config and server_config["env"] is not None:
|
||||||
|
for key, value in server_config["env"].items():
|
||||||
|
if value and value.startswith("env@@"):
|
||||||
|
env_key = value.replace("env@@", "")
|
||||||
|
if server.get("envs") and env_key in server.get(
|
||||||
|
"envs", {}
|
||||||
|
):
|
||||||
|
server_config["env"][key] = server["envs"][
|
||||||
|
env_key
|
||||||
|
]
|
||||||
|
else:
|
||||||
|
logger.warning(
|
||||||
|
f"Environment variable '{env_key}' not provided for the MCP server {mcp_server.name}"
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
logger.info(f"Connecting to MCP server: {mcp_server.name}")
|
||||||
|
tools, exit_stack = await self._connect_to_mcp_server(
|
||||||
|
server_config
|
||||||
|
)
|
||||||
|
|
||||||
|
if tools and exit_stack:
|
||||||
|
# Filters incompatible tools
|
||||||
|
filtered_tools = self._filter_incompatible_tools(tools)
|
||||||
|
|
||||||
|
# Filters tools compatible with the agent
|
||||||
|
agent_tools = server.get("tools", [])
|
||||||
|
if agent_tools:
|
||||||
|
filtered_tools = self._filter_tools_by_agent(
|
||||||
|
filtered_tools, agent_tools
|
||||||
|
)
|
||||||
|
self.tools.extend(filtered_tools)
|
||||||
|
|
||||||
|
# Registers the exit_stack with the AsyncExitStack
|
||||||
|
await self.exit_stack.enter_async_context(exit_stack)
|
||||||
|
logger.info(
|
||||||
|
f"MCP Server {mcp_server.name} connected successfully. Added {len(filtered_tools)} tools."
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.warning(
|
||||||
|
f"Failed to connect or no tools available for {mcp_server.name}"
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(
|
||||||
|
f"Error connecting to MCP server {server.get('id', 'unknown')}: {e}"
|
||||||
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Prepares the server configuration
|
custom_mcp_servers = mcp_config.get("custom_mcp_servers", [])
|
||||||
server_config = mcp_server.config_json.copy()
|
if custom_mcp_servers is not None:
|
||||||
|
# Process custom MCP servers
|
||||||
# Replaces the environment variables in the config_json
|
for server in custom_mcp_servers:
|
||||||
if "env" in server_config:
|
if not server:
|
||||||
for key, value in server_config["env"].items():
|
|
||||||
if value.startswith("env@@"):
|
|
||||||
env_key = value.replace("env@@", "")
|
|
||||||
if env_key in server.get("envs", {}):
|
|
||||||
server_config["env"][key] = server["envs"][env_key]
|
|
||||||
else:
|
|
||||||
logger.warning(
|
|
||||||
f"Environment variable '{env_key}' not provided for the MCP server {mcp_server.name}"
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
|
|
||||||
logger.info(f"Connecting to MCP server: {mcp_server.name}")
|
|
||||||
tools, exit_stack = await self._connect_to_mcp_server(server_config)
|
|
||||||
|
|
||||||
if tools and exit_stack:
|
|
||||||
# Filters incompatible tools
|
|
||||||
filtered_tools = self._filter_incompatible_tools(tools)
|
|
||||||
|
|
||||||
# Filters tools compatible with the agent
|
|
||||||
agent_tools = server.get("tools", [])
|
|
||||||
filtered_tools = self._filter_tools_by_agent(
|
|
||||||
filtered_tools, agent_tools
|
|
||||||
)
|
|
||||||
self.tools.extend(filtered_tools)
|
|
||||||
|
|
||||||
# Registers the exit_stack with the AsyncExitStack
|
|
||||||
await self.exit_stack.enter_async_context(exit_stack)
|
|
||||||
logger.info(
|
|
||||||
f"MCP Server {mcp_server.name} connected successfully. Added {len(filtered_tools)} tools."
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"Failed to connect or no tools available for {mcp_server.name}"
|
"Empty server configuration found in custom_mcp_servers"
|
||||||
)
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
except Exception as e:
|
try:
|
||||||
logger.error(f"Error connecting to MCP server {server['id']}: {e}")
|
logger.info(
|
||||||
continue
|
f"Connecting to custom MCP server: {server.get('url', 'unknown')}"
|
||||||
|
)
|
||||||
|
tools, exit_stack = await self._connect_to_mcp_server(server)
|
||||||
|
|
||||||
# Process custom MCP servers
|
if tools:
|
||||||
for server in mcp_config.get("custom_mcp_servers", []):
|
self.tools.extend(tools)
|
||||||
try:
|
else:
|
||||||
tools, exit_stack = await self._connect_to_mcp_server(server)
|
logger.warning("No tools returned from custom MCP server")
|
||||||
self.tools.extend(tools)
|
continue
|
||||||
await self.exit_stack.enter_async_context(exit_stack)
|
|
||||||
logger.info(
|
if exit_stack:
|
||||||
f"Custom MCP server connected successfully. Added {len(tools)} tools."
|
await self.exit_stack.enter_async_context(exit_stack)
|
||||||
)
|
logger.info(
|
||||||
except Exception as e:
|
f"Custom MCP server connected successfully. Added {len(tools)} tools."
|
||||||
logger.error(f"Error connecting to MCP server {server['id']}: {e}")
|
)
|
||||||
continue
|
else:
|
||||||
|
logger.warning(
|
||||||
|
"No exit_stack returned from custom MCP server"
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(
|
||||||
|
f"Error connecting to custom MCP server {server.get('url', 'unknown')}: {e}"
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"MCP Toolset created successfully. Total of {len(self.tools)} tools."
|
f"MCP Toolset created successfully. Total of {len(self.tools)} tools."
|
||||||
|
Loading…
Reference in New Issue
Block a user