From d232e6216d58ad4f995e4aaf17b3acc3309f4712 Mon Sep 17 00:00:00 2001 From: Omar BENHAMID Date: Wed, 28 May 2025 14:32:42 -0700 Subject: [PATCH] feat: google/adk-python#479 support for streamable http MCP servers for MCPToolset Copybara import of the project: -- c5b9d49d7b6d858ff0a93bd690e6d653b7c32221 by Omar BENHAMID : feat: google/adk-python#479 support for streamable http MCP servers for MCPToolset -- 9431bc19e6538c1b814aba0b24ff564acf046075 by Omar BENHAMID : feat: google/adk-python#479 streamable http added to right package -- 8b4aabed45a6f0dc828beb61f12985dc7b14f3d0 by Omar BENHAMID : feat: google/adk-python#479 streamable http : review feedbacks + sample agent COPYBARA_INTEGRATE_REVIEW=https://github.com/google/adk-python/pull/650 from omarbenhamid:feature/mcp-streamable-http 625f028784c216401d45cb1b5d4d998535ebcb00 PiperOrigin-RevId: 764419586 --- .../mcp_streamablehttp_agent/README.md | 8 ++ .../mcp_streamablehttp_agent/__init__.py | 15 ++++ .../samples/mcp_streamablehttp_agent/agent.py | 58 ++++++++++++++ .../filesystem_server.py | 80 +++++++++++++++++++ pyproject.toml | 2 +- .../adk/tools/mcp_tool/mcp_session_manager.py | 32 +++++++- src/google/adk/tools/mcp_tool/mcp_toolset.py | 13 +-- 7 files changed, 198 insertions(+), 10 deletions(-) create mode 100644 contributing/samples/mcp_streamablehttp_agent/README.md create mode 100644 contributing/samples/mcp_streamablehttp_agent/__init__.py create mode 100644 contributing/samples/mcp_streamablehttp_agent/agent.py create mode 100644 contributing/samples/mcp_streamablehttp_agent/filesystem_server.py diff --git a/contributing/samples/mcp_streamablehttp_agent/README.md b/contributing/samples/mcp_streamablehttp_agent/README.md new file mode 100644 index 0000000..1c211dd --- /dev/null +++ b/contributing/samples/mcp_streamablehttp_agent/README.md @@ -0,0 +1,8 @@ +This agent connects to a local MCP server via sse. + +To run this agent, start the local MCP server first by : + +```bash +uv run filesystem_server.py +``` + diff --git a/contributing/samples/mcp_streamablehttp_agent/__init__.py b/contributing/samples/mcp_streamablehttp_agent/__init__.py new file mode 100644 index 0000000..c48963c --- /dev/null +++ b/contributing/samples/mcp_streamablehttp_agent/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from . import agent diff --git a/contributing/samples/mcp_streamablehttp_agent/agent.py b/contributing/samples/mcp_streamablehttp_agent/agent.py new file mode 100644 index 0000000..61d59e0 --- /dev/null +++ b/contributing/samples/mcp_streamablehttp_agent/agent.py @@ -0,0 +1,58 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os + +from google.adk.agents.llm_agent import LlmAgent +from google.adk.tools.mcp_tool.mcp_session_manager import StreamableHTTPServerParams +from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset +from google.adk.tools.mcp_tool.mcp_toolset import SseServerParams + +_allowed_path = os.path.dirname(os.path.abspath(__file__)) + +root_agent = LlmAgent( + model='gemini-2.0-flash', + name='enterprise_assistant', + instruction=f"""\ +Help user accessing their file systems. + +Allowed directory: {_allowed_path} + """, + tools=[ + MCPToolset( + connection_params=StreamableHTTPServerParams( + url='http://localhost:3000/mcp', + ), + # don't want agent to do write operation + # you can also do below + # tool_filter=lambda tool, ctx=None: tool.name + # not in [ + # 'write_file', + # 'edit_file', + # 'create_directory', + # 'move_file', + # ], + tool_filter=[ + 'read_file', + 'read_multiple_files', + 'list_directory', + 'directory_tree', + 'search_files', + 'get_file_info', + 'list_allowed_directories', + ], + ) + ], +) diff --git a/contributing/samples/mcp_streamablehttp_agent/filesystem_server.py b/contributing/samples/mcp_streamablehttp_agent/filesystem_server.py new file mode 100644 index 0000000..51f8482 --- /dev/null +++ b/contributing/samples/mcp_streamablehttp_agent/filesystem_server.py @@ -0,0 +1,80 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import os +from pathlib import Path +import sys +from mcp.server.fastmcp import FastMCP + +# Create an MCP server with a name +mcp = FastMCP("Filesystem Server", host="localhost", port=3000) + + +# Add a tool to read file contents +@mcp.tool(description="Read contents of a file") +def read_file(filepath: str) -> str: + """Read and return the contents of a file.""" + with open(filepath, "r") as f: + return f.read() + + +# Add a tool to list directory contents +@mcp.tool(description="List contents of a directory") +def list_directory(dirpath: str) -> list: + """List all files and directories in the given directory.""" + return os.listdir(dirpath) + + +# Add a tool to get current working directory +@mcp.tool(description="Get current working directory") +def get_cwd() -> str: + """Return the current working directory.""" + return str(Path.cwd()) + + +# Graceful shutdown handler +async def shutdown(signal, loop): + """Cleanup tasks tied to the service's shutdown.""" + print(f"\nReceived exit signal {signal.name}...") + + # Get all running tasks + tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] + + # Cancel all tasks + for task in tasks: + task.cancel() + + print(f"Cancelling {len(tasks)} outstanding tasks") + await asyncio.gather(*tasks, return_exceptions=True) + + # Stop the loop + loop.stop() + print("Shutdown complete!") + + +# Main entry point with graceful shutdown handling +if __name__ == "__main__": + try: + # The MCP run function ultimately uses asyncio.run() internally + mcp.run(transport="streamable-http") + except KeyboardInterrupt: + print("\nServer shutting down gracefully...") + # The asyncio event loop has already been stopped by the KeyboardInterrupt + print("Server has been shut down.") + except Exception as e: + print(f"Unexpected error: {e}") + sys.exit(1) + finally: + print("Thank you for using the Filesystem MCP Server!") diff --git a/pyproject.toml b/pyproject.toml index f16394a..fd72b36 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,7 +35,7 @@ dependencies = [ "google-cloud-storage>=2.18.0, <3.0.0", # For GCS Artifact service "google-genai>=1.14.0", # Google GenAI SDK "graphviz>=0.20.2", # Graphviz for graph rendering - "mcp>=1.5.0;python_version>='3.10'", # For MCP Toolset + "mcp>=1.8.0;python_version>='3.10'", # For MCP Toolset "opentelemetry-api>=1.31.0", # OpenTelemetry "opentelemetry-exporter-gcp-trace>=1.9.0", "opentelemetry-sdk>=1.31.0", diff --git a/src/google/adk/tools/mcp_tool/mcp_session_manager.py b/src/google/adk/tools/mcp_tool/mcp_session_manager.py index cb14a90..d0f5579 100644 --- a/src/google/adk/tools/mcp_tool/mcp_session_manager.py +++ b/src/google/adk/tools/mcp_tool/mcp_session_manager.py @@ -17,10 +17,10 @@ from contextlib import AsyncExitStack import functools import logging import sys +from datetime import timedelta from typing import Any from typing import Optional from typing import TextIO - import anyio from pydantic import BaseModel @@ -29,6 +29,7 @@ try: from mcp import StdioServerParameters from mcp.client.sse import sse_client from mcp.client.stdio import stdio_client + from mcp.client.streamable_http import streamablehttp_client except ImportError as e: import sys @@ -56,6 +57,19 @@ class SseServerParams(BaseModel): sse_read_timeout: float = 60 * 5 +class StreamableHTTPServerParams(BaseModel): + """Parameters for the MCP SSE connection. + + See MCP SSE Client documentation for more details. + https://github.com/modelcontextprotocol/python-sdk/blob/main/src/mcp/client/streamable_http.py + """ + url: str + headers: dict[str, Any] | None = None + timeout: float = 5 + sse_read_timeout: float = 60 * 5 + terminate_on_close: bool = True + + def retry_on_closed_resource(async_reinit_func_name: str): """Decorator to automatically reinitialize session and retry action. @@ -123,13 +137,13 @@ class MCPSessionManager: def __init__( self, - connection_params: StdioServerParameters | SseServerParams, + connection_params: StdioServerParameters | SseServerParams | StreamableHTTPServerParams, errlog: TextIO = sys.stderr, ): """Initializes the MCP session manager. Args: - connection_params: Parameters for the MCP connection (Stdio or SSE). + connection_params: Parameters for the MCP connection (Stdio, SSE or Streamable HTTP). errlog: (Optional) TextIO stream for error logging. Use only for initializing a local stdio MCP session. """ @@ -163,6 +177,14 @@ class MCPSessionManager: timeout=self._connection_params.timeout, sse_read_timeout=self._connection_params.sse_read_timeout, ) + elif isinstance(self._connection_params, StreamableHTTPServerParams): + client = streamablehttp_client( + url=self._connection_params.url, + headers=self._connection_params.headers, + timeout=timedelta(seconds=self._connection_params.timeout), + sse_read_timeout=timedelta(seconds=self._connection_params.sse_read_timeout), + terminate_on_close=self._connection_params.terminate_on_close, + ) else: raise ValueError( 'Unable to initialize connection. Connection should be' @@ -171,8 +193,10 @@ class MCPSessionManager: ) transports = await self._exit_stack.enter_async_context(client) + # The streamable http client returns a GetSessionCallback in addition to the read/write MemoryObjectStreams + # needed to build the ClientSession, we limit then to the two first values to be compatible with all clients. session = await self._exit_stack.enter_async_context( - ClientSession(*transports) + ClientSession(*transports[:2]) ) await session.initialize() diff --git a/src/google/adk/tools/mcp_tool/mcp_toolset.py b/src/google/adk/tools/mcp_tool/mcp_toolset.py index 01e586c..1bc6b3c 100644 --- a/src/google/adk/tools/mcp_tool/mcp_toolset.py +++ b/src/google/adk/tools/mcp_tool/mcp_toolset.py @@ -26,6 +26,7 @@ from ..base_toolset import ToolPredicate from .mcp_session_manager import MCPSessionManager from .mcp_session_manager import retry_on_closed_resource from .mcp_session_manager import SseServerParams +from .mcp_session_manager import StreamableHTTPServerParams # Attempt to import MCP Tool from the MCP library, and hints user to upgrade # their Python version to 3.10 if it fails. @@ -82,17 +83,18 @@ class MCPToolset(BaseToolset): def __init__( self, *, - connection_params: StdioServerParameters | SseServerParams, + connection_params: StdioServerParameters | SseServerParams | StreamableHTTPServerParams, tool_filter: Optional[Union[ToolPredicate, List[str]]] = None, errlog: TextIO = sys.stderr, ): """Initializes the MCPToolset. Args: - connection_params: The connection parameters to the MCP server. Can be: - `StdioServerParameters` for using local mcp server (e.g. using `npx` or - `python3`); or `SseServerParams` for a local/remote SSE server. - tool_filter: Optional filter to select specific tools. Can be either: + connection_params: The connection parameters to the MCP server. Can be: + `StdioServerParameters` for using local mcp server (e.g. using `npx` or + `python3`); or `SseServerParams` for a local/remote SSE server; or + `StreamableHTTPServerParams` for local/remote Streamable http server. + tool_filter: Optional filter to select specific tools. Can be either: - A list of tool names to include - A ToolPredicate function for custom filtering logic errlog: TextIO stream for error logging. @@ -110,6 +112,7 @@ class MCPToolset(BaseToolset): connection_params=self._connection_params, errlog=self._errlog, ) + self._session = None @retry_on_closed_resource("_reinitialize_session")