structure saas with tools and mcps

This commit is contained in:
Davidson Gomes 2025-04-25 17:20:55 -03:00
parent 18a0676af1
commit d413984c5b
6 changed files with 152 additions and 27 deletions

View File

@ -1,4 +1,4 @@
from typing import List, Optional
from typing import List, Optional, Tuple
from google.adk.agents.llm_agent import LlmAgent
from google.adk.agents import SequentialAgent, ParallelAgent, LoopAgent
from google.adk.models.lite_llm import LiteLlm
@ -6,7 +6,9 @@ from src.utils.logger import setup_logger
from src.core.exceptions import AgentNotFoundError
from src.services.agent_service import get_agent
from src.services.custom_tools import CustomToolBuilder
from src.services.mcp_service import MCPService
from sqlalchemy.orm import Session
from contextlib import AsyncExitStack
logger = setup_logger(__name__)
@ -14,23 +16,33 @@ class AgentBuilder:
def __init__(self, db: Session):
self.db = db
self.custom_tool_builder = CustomToolBuilder()
self.mcp_service = MCPService()
def _create_llm_agent(self, agent) -> LlmAgent:
async def _create_llm_agent(self, agent) -> Tuple[LlmAgent, Optional[AsyncExitStack]]:
"""Cria um agente LLM a partir dos dados do agente."""
# Obtém ferramentas personalizadas da configuração
custom_tools = []
if agent.config.get("tools"):
custom_tools = self.custom_tool_builder.build_tools(agent.config["tools"])
# Obtém ferramentas MCP da configuração
mcp_tools = []
mcp_exit_stack = None
if agent.config.get("mcpServers"):
mcp_tools, mcp_exit_stack = await self.mcp_service.build_tools(agent.config)
# Combina todas as ferramentas
all_tools = custom_tools + mcp_tools
return LlmAgent(
name=agent.name,
model=LiteLlm(model=agent.model, api_key=agent.api_key),
instruction=agent.instruction,
description=agent.config.get("description", ""),
tools=custom_tools,
)
tools=all_tools,
), mcp_exit_stack
def _get_sub_agents(self, sub_agent_ids: List[str]) -> List[LlmAgent]:
async def _get_sub_agents(self, sub_agent_ids: List[str]) -> List[Tuple[LlmAgent, Optional[AsyncExitStack]]]:
"""Obtém e cria os sub-agentes LLM."""
sub_agents = []
for sub_agent_id in sub_agent_ids:
@ -42,29 +54,32 @@ class AgentBuilder:
if agent.type != "llm":
raise ValueError(f"Agente {agent.name} (ID: {agent.id}) não é um agente LLM")
sub_agents.append(self._create_llm_agent(agent))
sub_agent, exit_stack = await self._create_llm_agent(agent)
sub_agents.append((sub_agent, exit_stack))
return sub_agents
def build_llm_agent(self, root_agent) -> LlmAgent:
async def build_llm_agent(self, root_agent) -> Tuple[LlmAgent, Optional[AsyncExitStack]]:
"""Constrói um agente LLM com seus sub-agentes."""
logger.info("Criando agente LLM")
sub_agents = []
if root_agent.config.get("sub_agents"):
sub_agents = self._get_sub_agents(root_agent.config.get("sub_agents"))
sub_agents_with_stacks = await self._get_sub_agents(root_agent.config.get("sub_agents"))
sub_agents = [agent for agent, _ in sub_agents_with_stacks]
root_llm_agent = self._create_llm_agent(root_agent)
root_llm_agent, exit_stack = await self._create_llm_agent(root_agent)
if sub_agents:
root_llm_agent.sub_agents = sub_agents
return root_llm_agent
return root_llm_agent, exit_stack
def build_composite_agent(self, root_agent) -> SequentialAgent | ParallelAgent | LoopAgent:
async def build_composite_agent(self, root_agent) -> Tuple[SequentialAgent | ParallelAgent | LoopAgent, Optional[AsyncExitStack]]:
"""Constrói um agente composto (Sequential, Parallel ou Loop) com seus sub-agentes."""
logger.info(f"Processando sub-agentes para agente {root_agent.type}")
sub_agents = self._get_sub_agents(root_agent.config.get("sub_agents", []))
sub_agents_with_stacks = await self._get_sub_agents(root_agent.config.get("sub_agents", []))
sub_agents = [agent for agent, _ in sub_agents_with_stacks]
if root_agent.type == "sequential":
logger.info("Criando SequentialAgent")
@ -72,14 +87,14 @@ class AgentBuilder:
name=root_agent.name,
sub_agents=sub_agents,
description=root_agent.config.get("description", ""),
)
), None
elif root_agent.type == "parallel":
logger.info("Criando ParallelAgent")
return ParallelAgent(
name=root_agent.name,
sub_agents=sub_agents,
description=root_agent.config.get("description", ""),
)
), None
elif root_agent.type == "loop":
logger.info("Criando LoopAgent")
return LoopAgent(
@ -87,13 +102,13 @@ class AgentBuilder:
sub_agents=sub_agents,
description=root_agent.config.get("description", ""),
max_iterations=root_agent.config.get("max_iterations", 5),
)
), None
else:
raise ValueError(f"Tipo de agente inválido: {root_agent.type}")
def build_agent(self, root_agent) -> LlmAgent | SequentialAgent | ParallelAgent | LoopAgent:
async def build_agent(self, root_agent) -> Tuple[LlmAgent | SequentialAgent | ParallelAgent | LoopAgent, Optional[AsyncExitStack]]:
"""Constrói o agente apropriado baseado no tipo do agente root."""
if root_agent.type == "llm":
return self.build_llm_agent(root_agent)
return await self.build_llm_agent(root_agent)
else:
return self.build_composite_agent(root_agent)
return await self.build_composite_agent(root_agent)

View File

@ -12,6 +12,7 @@ from src.core.exceptions import AgentNotFoundError, InternalServerError
from src.services.agent_service import get_agent
from src.services.agent_builder import AgentBuilder
from sqlalchemy.orm import Session
from contextlib import AsyncExitStack
logger = setup_logger(__name__)
@ -40,7 +41,7 @@ async def run_agent(
# Usando o AgentBuilder para criar o agente
agent_builder = AgentBuilder(db)
root_agent = agent_builder.build_agent(get_root_agent)
root_agent, exit_stack = await agent_builder.build_agent(get_root_agent)
logger.info("Configurando Runner")
agent_runner = Runner(
@ -70,14 +71,19 @@ async def run_agent(
logger.info("Iniciando execução do agente")
final_response_text = None
for event in agent_runner.run(
user_id=contact_id,
session_id=session_id,
new_message=content,
):
if event.is_final_response() and event.content and event.content.parts:
final_response_text = event.content.parts[0].text
logger.info(f"Resposta final recebida: {final_response_text}")
try:
for event in agent_runner.run(
user_id=contact_id,
session_id=session_id,
new_message=content,
):
if event.is_final_response() and event.content and event.content.parts:
final_response_text = event.content.parts[0].text
logger.info(f"Resposta final recebida: {final_response_text}")
finally:
# Garante que o exit_stack seja fechado corretamente
if exit_stack:
await exit_stack.aclose()
logger.info("Execução do agente concluída com sucesso")
return final_response_text

104
src/services/mcp_service.py Normal file
View File

@ -0,0 +1,104 @@
from typing import Any, Dict, List, Optional, Tuple
from google.adk.tools.mcp_tool.mcp_toolset import (
MCPToolset,
StdioServerParameters,
SseServerParams,
)
from contextlib import AsyncExitStack
import os
import logging
from src.utils.logger import setup_logger
logger = setup_logger(__name__)
class MCPService:
def __init__(self):
self.tools = []
self.exit_stack = AsyncExitStack()
async def _connect_to_mcp_server(self, server_config: Dict[str, Any]) -> Tuple[List[Any], Optional[AsyncExitStack]]:
"""Conecta a um servidor MCP específico e retorna suas ferramentas."""
try:
# Determina o tipo de servidor (local ou remoto)
if "url" in server_config:
# Servidor remoto (SSE)
connection_params = SseServerParams(
url=server_config["url"],
headers=server_config.get("headers", {})
)
else:
# Servidor local (Stdio)
command = server_config.get("command", "npx")
args = server_config.get("args", [])
# Adiciona variáveis de ambiente se especificadas
env = server_config.get("env", {})
if env:
for key, value in env.items():
os.environ[key] = value
connection_params = StdioServerParameters(
command=command,
args=args,
env=env
)
tools, exit_stack = await MCPToolset.from_server(
connection_params=connection_params
)
return tools, exit_stack
except Exception as e:
logger.error(f"Erro ao conectar ao servidor MCP: {e}")
return [], None
def _filter_incompatible_tools(self, tools: List[Any]) -> List[Any]:
"""Filtra ferramentas incompatíveis com o modelo."""
problematic_tools = [
"create_pull_request_review", # Esta ferramenta causa o erro 400 INVALID_ARGUMENT
]
filtered_tools = []
removed_count = 0
for tool in tools:
if tool.name in problematic_tools:
logger.warning(f"Removendo ferramenta incompatível: {tool.name}")
removed_count += 1
else:
filtered_tools.append(tool)
if removed_count > 0:
logger.warning(f"Removidas {removed_count} ferramentas incompatíveis.")
return filtered_tools
async def build_tools(self, mcp_config: Dict[str, Any]) -> Tuple[List[Any], AsyncExitStack]:
"""Constrói uma lista de ferramentas a partir de múltiplos servidores MCP."""
self.tools = []
self.exit_stack = AsyncExitStack()
# Processa cada servidor MCP da configuração
for server_name, server_config in mcp_config.get("mcpServers", {}).items():
logger.info(f"Conectando ao servidor MCP: {server_name}")
try:
tools, exit_stack = await self._connect_to_mcp_server(server_config)
if tools and exit_stack:
# Filtra ferramentas incompatíveis
filtered_tools = self._filter_incompatible_tools(tools)
self.tools.extend(filtered_tools)
# Registra o exit_stack com o AsyncExitStack
await self.exit_stack.enter_async_context(exit_stack)
logger.info(f"Conectado com sucesso. Adicionadas {len(filtered_tools)} ferramentas.")
else:
logger.warning(f"Falha na conexão ou nenhuma ferramenta disponível para {server_name}")
except Exception as e:
logger.error(f"Erro ao conectar ao servidor MCP {server_name}: {e}")
continue
logger.info(f"MCP Toolset criado com sucesso. Total de {len(self.tools)} ferramentas.")
return self.tools, self.exit_stack