feat: google/adk-python#479 support for streamable http MCP servers for MCPToolset

Copybara import of the project:

--
c5b9d49d7b6d858ff0a93bd690e6d653b7c32221 by Omar BENHAMID <omar.benhamid@smart-gts.com>:

feat: google/adk-python#479 support for streamable http MCP servers for MCPToolset

--
9431bc19e6538c1b814aba0b24ff564acf046075 by Omar BENHAMID <omar.benhamid@smart-gts.com>:

feat: google/adk-python#479 streamable http added to right package

--
8b4aabed45a6f0dc828beb61f12985dc7b14f3d0 by Omar BENHAMID <omar.benhamid@smart-gts.com>:

feat: google/adk-python#479 streamable http : review feedbacks + sample agent
COPYBARA_INTEGRATE_REVIEW=https://github.com/google/adk-python/pull/650 from omarbenhamid:feature/mcp-streamable-http 625f028784c216401d45cb1b5d4d998535ebcb00
PiperOrigin-RevId: 764419586
This commit is contained in:
Omar BENHAMID 2025-05-28 14:32:42 -07:00 committed by Copybara-Service
parent c7ce987676
commit d232e6216d
7 changed files with 198 additions and 10 deletions

View File

@ -0,0 +1,8 @@
This agent connects to a local MCP server via sse.
To run this agent, start the local MCP server first by :
```bash
uv run filesystem_server.py
```

View File

@ -0,0 +1,15 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from . import agent

View File

@ -0,0 +1,58 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from google.adk.agents.llm_agent import LlmAgent
from google.adk.tools.mcp_tool.mcp_session_manager import StreamableHTTPServerParams
from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset
from google.adk.tools.mcp_tool.mcp_toolset import SseServerParams
_allowed_path = os.path.dirname(os.path.abspath(__file__))
root_agent = LlmAgent(
model='gemini-2.0-flash',
name='enterprise_assistant',
instruction=f"""\
Help user accessing their file systems.
Allowed directory: {_allowed_path}
""",
tools=[
MCPToolset(
connection_params=StreamableHTTPServerParams(
url='http://localhost:3000/mcp',
),
# don't want agent to do write operation
# you can also do below
# tool_filter=lambda tool, ctx=None: tool.name
# not in [
# 'write_file',
# 'edit_file',
# 'create_directory',
# 'move_file',
# ],
tool_filter=[
'read_file',
'read_multiple_files',
'list_directory',
'directory_tree',
'search_files',
'get_file_info',
'list_allowed_directories',
],
)
],
)

View File

@ -0,0 +1,80 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import os
from pathlib import Path
import sys
from mcp.server.fastmcp import FastMCP
# Create an MCP server with a name
mcp = FastMCP("Filesystem Server", host="localhost", port=3000)
# Add a tool to read file contents
@mcp.tool(description="Read contents of a file")
def read_file(filepath: str) -> str:
"""Read and return the contents of a file."""
with open(filepath, "r") as f:
return f.read()
# Add a tool to list directory contents
@mcp.tool(description="List contents of a directory")
def list_directory(dirpath: str) -> list:
"""List all files and directories in the given directory."""
return os.listdir(dirpath)
# Add a tool to get current working directory
@mcp.tool(description="Get current working directory")
def get_cwd() -> str:
"""Return the current working directory."""
return str(Path.cwd())
# Graceful shutdown handler
async def shutdown(signal, loop):
"""Cleanup tasks tied to the service's shutdown."""
print(f"\nReceived exit signal {signal.name}...")
# Get all running tasks
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
# Cancel all tasks
for task in tasks:
task.cancel()
print(f"Cancelling {len(tasks)} outstanding tasks")
await asyncio.gather(*tasks, return_exceptions=True)
# Stop the loop
loop.stop()
print("Shutdown complete!")
# Main entry point with graceful shutdown handling
if __name__ == "__main__":
try:
# The MCP run function ultimately uses asyncio.run() internally
mcp.run(transport="streamable-http")
except KeyboardInterrupt:
print("\nServer shutting down gracefully...")
# The asyncio event loop has already been stopped by the KeyboardInterrupt
print("Server has been shut down.")
except Exception as e:
print(f"Unexpected error: {e}")
sys.exit(1)
finally:
print("Thank you for using the Filesystem MCP Server!")

View File

@ -35,7 +35,7 @@ dependencies = [
"google-cloud-storage>=2.18.0, <3.0.0", # For GCS Artifact service
"google-genai>=1.14.0", # Google GenAI SDK
"graphviz>=0.20.2", # Graphviz for graph rendering
"mcp>=1.5.0;python_version>='3.10'", # For MCP Toolset
"mcp>=1.8.0;python_version>='3.10'", # For MCP Toolset
"opentelemetry-api>=1.31.0", # OpenTelemetry
"opentelemetry-exporter-gcp-trace>=1.9.0",
"opentelemetry-sdk>=1.31.0",

View File

@ -17,10 +17,10 @@ from contextlib import AsyncExitStack
import functools
import logging
import sys
from datetime import timedelta
from typing import Any
from typing import Optional
from typing import TextIO
import anyio
from pydantic import BaseModel
@ -29,6 +29,7 @@ try:
from mcp import StdioServerParameters
from mcp.client.sse import sse_client
from mcp.client.stdio import stdio_client
from mcp.client.streamable_http import streamablehttp_client
except ImportError as e:
import sys
@ -56,6 +57,19 @@ class SseServerParams(BaseModel):
sse_read_timeout: float = 60 * 5
class StreamableHTTPServerParams(BaseModel):
"""Parameters for the MCP SSE connection.
See MCP SSE Client documentation for more details.
https://github.com/modelcontextprotocol/python-sdk/blob/main/src/mcp/client/streamable_http.py
"""
url: str
headers: dict[str, Any] | None = None
timeout: float = 5
sse_read_timeout: float = 60 * 5
terminate_on_close: bool = True
def retry_on_closed_resource(async_reinit_func_name: str):
"""Decorator to automatically reinitialize session and retry action.
@ -123,13 +137,13 @@ class MCPSessionManager:
def __init__(
self,
connection_params: StdioServerParameters | SseServerParams,
connection_params: StdioServerParameters | SseServerParams | StreamableHTTPServerParams,
errlog: TextIO = sys.stderr,
):
"""Initializes the MCP session manager.
Args:
connection_params: Parameters for the MCP connection (Stdio or SSE).
connection_params: Parameters for the MCP connection (Stdio, SSE or Streamable HTTP).
errlog: (Optional) TextIO stream for error logging. Use only for
initializing a local stdio MCP session.
"""
@ -163,6 +177,14 @@ class MCPSessionManager:
timeout=self._connection_params.timeout,
sse_read_timeout=self._connection_params.sse_read_timeout,
)
elif isinstance(self._connection_params, StreamableHTTPServerParams):
client = streamablehttp_client(
url=self._connection_params.url,
headers=self._connection_params.headers,
timeout=timedelta(seconds=self._connection_params.timeout),
sse_read_timeout=timedelta(seconds=self._connection_params.sse_read_timeout),
terminate_on_close=self._connection_params.terminate_on_close,
)
else:
raise ValueError(
'Unable to initialize connection. Connection should be'
@ -171,8 +193,10 @@ class MCPSessionManager:
)
transports = await self._exit_stack.enter_async_context(client)
# The streamable http client returns a GetSessionCallback in addition to the read/write MemoryObjectStreams
# needed to build the ClientSession, we limit then to the two first values to be compatible with all clients.
session = await self._exit_stack.enter_async_context(
ClientSession(*transports)
ClientSession(*transports[:2])
)
await session.initialize()

View File

@ -26,6 +26,7 @@ from ..base_toolset import ToolPredicate
from .mcp_session_manager import MCPSessionManager
from .mcp_session_manager import retry_on_closed_resource
from .mcp_session_manager import SseServerParams
from .mcp_session_manager import StreamableHTTPServerParams
# Attempt to import MCP Tool from the MCP library, and hints user to upgrade
# their Python version to 3.10 if it fails.
@ -82,17 +83,18 @@ class MCPToolset(BaseToolset):
def __init__(
self,
*,
connection_params: StdioServerParameters | SseServerParams,
connection_params: StdioServerParameters | SseServerParams | StreamableHTTPServerParams,
tool_filter: Optional[Union[ToolPredicate, List[str]]] = None,
errlog: TextIO = sys.stderr,
):
"""Initializes the MCPToolset.
Args:
connection_params: The connection parameters to the MCP server. Can be:
`StdioServerParameters` for using local mcp server (e.g. using `npx` or
`python3`); or `SseServerParams` for a local/remote SSE server.
tool_filter: Optional filter to select specific tools. Can be either:
connection_params: The connection parameters to the MCP server. Can be:
`StdioServerParameters` for using local mcp server (e.g. using `npx` or
`python3`); or `SseServerParams` for a local/remote SSE server; or
`StreamableHTTPServerParams` for local/remote Streamable http server.
tool_filter: Optional filter to select specific tools. Can be either:
- A list of tool names to include
- A ToolPredicate function for custom filtering logic
errlog: TextIO stream for error logging.
@ -110,6 +112,7 @@ class MCPToolset(BaseToolset):
connection_params=self._connection_params,
errlog=self._errlog,
)
self._session = None
@retry_on_closed_resource("_reinitialize_session")