From f98a6c405e9acd02c76cdf68b440ddcfeaf3a496 Mon Sep 17 00:00:00 2001 From: Davidson Gomes Date: Wed, 7 May 2025 19:19:53 -0300 Subject: [PATCH] feat(mcp_service): enhance MCP server handling to support custom MCP servers --- src/services/agent_builder.py | 2 +- src/services/mcp_service.py | 154 +++++++++++++++++++++------------- 2 files changed, 96 insertions(+), 60 deletions(-) diff --git a/src/services/agent_builder.py b/src/services/agent_builder.py index 5b186ca9..aab800d4 100644 --- a/src/services/agent_builder.py +++ b/src/services/agent_builder.py @@ -96,7 +96,7 @@ class AgentBuilder: # Get MCP tools from the configuration mcp_tools = [] mcp_exit_stack = None - if agent.config.get("mcp_servers"): + if agent.config.get("mcp_servers") or agent.config.get("custom_mcp_servers"): mcp_tools, mcp_exit_stack = await self.mcp_service.build_tools( agent.config, self.db ) diff --git a/src/services/mcp_service.py b/src/services/mcp_service.py index c8146e85..a064e590 100644 --- a/src/services/mcp_service.py +++ b/src/services/mcp_service.py @@ -47,6 +47,7 @@ class MCPService: tools, exit_stack = await MCPToolset.from_server( connection_params=connection_params ) + return tools, exit_stack except Exception as e: @@ -93,71 +94,106 @@ class MCPService: self.exit_stack = AsyncExitStack() try: - # Process each MCP server in the configuration - for server in mcp_config.get("mcp_servers", []): - try: - # Search for the MCP server in the database - mcp_server = get_mcp_server(db, server["id"]) - if not mcp_server: - logger.warning(f"MCP Server not found: {server['id']}") + mcp_servers = mcp_config.get("mcp_servers", []) + if mcp_servers is not None: + # Process each MCP server in the configuration + for server in mcp_servers: + try: + # Search for the MCP server in the database + mcp_server = get_mcp_server(db, server["id"]) + if not mcp_server: + logger.warning(f"MCP Server not found: {server['id']}") + continue + + # Prepares the server configuration + server_config = mcp_server.config_json.copy() + + # Replaces the environment variables in the config_json + if "env" in server_config and server_config["env"] is not None: + for key, value in server_config["env"].items(): + if value and value.startswith("env@@"): + env_key = value.replace("env@@", "") + if server.get("envs") and env_key in server.get( + "envs", {} + ): + server_config["env"][key] = server["envs"][ + env_key + ] + else: + logger.warning( + f"Environment variable '{env_key}' not provided for the MCP server {mcp_server.name}" + ) + continue + + logger.info(f"Connecting to MCP server: {mcp_server.name}") + tools, exit_stack = await self._connect_to_mcp_server( + server_config + ) + + if tools and exit_stack: + # Filters incompatible tools + filtered_tools = self._filter_incompatible_tools(tools) + + # Filters tools compatible with the agent + agent_tools = server.get("tools", []) + if agent_tools: + filtered_tools = self._filter_tools_by_agent( + filtered_tools, agent_tools + ) + self.tools.extend(filtered_tools) + + # Registers the exit_stack with the AsyncExitStack + await self.exit_stack.enter_async_context(exit_stack) + logger.info( + f"MCP Server {mcp_server.name} connected successfully. Added {len(filtered_tools)} tools." + ) + else: + logger.warning( + f"Failed to connect or no tools available for {mcp_server.name}" + ) + + except Exception as e: + logger.error( + f"Error connecting to MCP server {server.get('id', 'unknown')}: {e}" + ) continue - # Prepares the server configuration - server_config = mcp_server.config_json.copy() - - # Replaces the environment variables in the config_json - if "env" in server_config: - for key, value in server_config["env"].items(): - if value.startswith("env@@"): - env_key = value.replace("env@@", "") - if env_key in server.get("envs", {}): - server_config["env"][key] = server["envs"][env_key] - else: - logger.warning( - f"Environment variable '{env_key}' not provided for the MCP server {mcp_server.name}" - ) - continue - - logger.info(f"Connecting to MCP server: {mcp_server.name}") - tools, exit_stack = await self._connect_to_mcp_server(server_config) - - if tools and exit_stack: - # Filters incompatible tools - filtered_tools = self._filter_incompatible_tools(tools) - - # Filters tools compatible with the agent - agent_tools = server.get("tools", []) - filtered_tools = self._filter_tools_by_agent( - filtered_tools, agent_tools - ) - self.tools.extend(filtered_tools) - - # Registers the exit_stack with the AsyncExitStack - await self.exit_stack.enter_async_context(exit_stack) - logger.info( - f"MCP Server {mcp_server.name} connected successfully. Added {len(filtered_tools)} tools." - ) - else: + custom_mcp_servers = mcp_config.get("custom_mcp_servers", []) + if custom_mcp_servers is not None: + # Process custom MCP servers + for server in custom_mcp_servers: + if not server: logger.warning( - f"Failed to connect or no tools available for {mcp_server.name}" + "Empty server configuration found in custom_mcp_servers" ) + continue - except Exception as e: - logger.error(f"Error connecting to MCP server {server['id']}: {e}") - continue + try: + logger.info( + f"Connecting to custom MCP server: {server.get('url', 'unknown')}" + ) + tools, exit_stack = await self._connect_to_mcp_server(server) - # Process custom MCP servers - for server in mcp_config.get("custom_mcp_servers", []): - try: - tools, exit_stack = await self._connect_to_mcp_server(server) - self.tools.extend(tools) - await self.exit_stack.enter_async_context(exit_stack) - logger.info( - f"Custom MCP server connected successfully. Added {len(tools)} tools." - ) - except Exception as e: - logger.error(f"Error connecting to MCP server {server['id']}: {e}") - continue + if tools: + self.tools.extend(tools) + else: + logger.warning("No tools returned from custom MCP server") + continue + + if exit_stack: + await self.exit_stack.enter_async_context(exit_stack) + logger.info( + f"Custom MCP server connected successfully. Added {len(tools)} tools." + ) + else: + logger.warning( + "No exit_stack returned from custom MCP server" + ) + except Exception as e: + logger.error( + f"Error connecting to custom MCP server {server.get('url', 'unknown')}: {e}" + ) + continue logger.info( f"MCP Toolset created successfully. Total of {len(self.tools)} tools."