diff --git a/docker-compose.yml b/docker-compose.yml index 07334f1b..8dd9a27b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,7 +2,7 @@ version: "3.8" services: api: - image: evo-ai-api:latest + image: evoapicloud/evo-ai:latest depends_on: - postgres - redis diff --git a/src/api/mcp_server_routes.py b/src/api/mcp_server_routes.py index 143ae8ce..4f3a7447 100644 --- a/src/api/mcp_server_routes.py +++ b/src/api/mcp_server_routes.py @@ -28,6 +28,7 @@ """ from fastapi import APIRouter, Depends, HTTPException, status +from starlette.concurrency import run_in_threadpool from sqlalchemy.orm import Session from src.config.database import get_db from typing import List @@ -54,7 +55,7 @@ router = APIRouter( responses={404: {"description": "Not found"}}, ) - +# Last edited by Arley Peter on 2025-05-17 @router.post("/", response_model=MCPServer, status_code=status.HTTP_201_CREATED) async def create_mcp_server( server: MCPServerCreate, @@ -64,7 +65,7 @@ async def create_mcp_server( # Only administrators can create MCP servers await verify_admin(payload) - return mcp_server_service.create_mcp_server(db, server) + return await run_in_threadpool(mcp_server_service.create_mcp_server, db, server) @router.get("/", response_model=List[MCPServer]) diff --git a/src/schemas/schemas.py b/src/schemas/schemas.py index fcef62d2..5a3945c7 100644 --- a/src/schemas/schemas.py +++ b/src/schemas/schemas.py @@ -262,14 +262,14 @@ class ToolConfig(BaseModel): inputModes: List[str] = Field(default_factory=list) outputModes: List[str] = Field(default_factory=list) - +# Last edited by Arley Peter on 2025-05-17 class MCPServerBase(BaseModel): name: str description: Optional[str] = None config_type: str = Field(default="studio") config_json: Dict[str, Any] = Field(default_factory=dict) environments: Dict[str, Any] = Field(default_factory=dict) - tools: List[ToolConfig] = Field(default_factory=list) + tools: Optional[List[ToolConfig]] = Field(default_factory=list) type: str = Field(default="official") diff --git a/src/services/adk/custom_agents/workflow_agent.py b/src/services/adk/custom_agents/workflow_agent.py index 04256eac..9227f66e 100644 --- a/src/services/adk/custom_agents/workflow_agent.py +++ b/src/services/adk/custom_agents/workflow_agent.py @@ -6,6 +6,9 @@ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ ├──────────────────────────────────────────────────────────────────────────────┤ +│ @contributors: │ +│ Victor Calazans - delay node implementation (May 17, 2025) │ +├──────────────────────────────────────────────────────────────────────────────┤ │ @copyright © Evolution API 2025. All rights reserved. │ │ Licensed under the Apache License, Version 2.0 │ │ │ @@ -320,10 +323,9 @@ class WorkflowAgent(BaseAgent): ) ] ), - ) - ] + ) ] content = content + condition_content - + yield { "content": content, "status": "condition_evaluated", @@ -332,7 +334,7 @@ class WorkflowAgent(BaseAgent): "conversation_history": conversation_history, "session_id": session_id, } - + async def message_node_function( state: State, node_id: str, node_data: Dict[str, Any] ) -> AsyncGenerator[State, None]: @@ -365,15 +367,63 @@ class WorkflowAgent(BaseAgent): "status": "message_added", "node_outputs": node_outputs, "cycle_count": state.get("cycle_count", 0), + "conversation_history": conversation_history, "session_id": session_id, + } + + async def delay_node_function( + state: State, node_id: str, node_data: Dict[str, Any] + ) -> AsyncGenerator[State, None]: + delay_data = node_data.get("delay", {}) + delay_value = delay_data.get("value", 0) + delay_unit = delay_data.get("unit", "seconds") + delay_description = delay_data.get("description", "") + + # Convert to seconds based on unit + delay_seconds = delay_value + if delay_unit == "minutes": + delay_seconds = delay_value * 60 + elif delay_unit == "hours": + delay_seconds = delay_value * 3600 + + label = node_data.get("label", "delay_node") + print(f"\n⏱️ DELAY-NODE: {delay_value} {delay_unit} - {delay_description}") + + content = state.get("content", []) + session_id = state.get("session_id", "") + conversation_history = state.get("conversation_history", []) + + # Store node output information + node_outputs = state.get("node_outputs", {}) + node_outputs[node_id] = { + "delay_value": delay_value, + "delay_unit": delay_unit, + "delay_seconds": delay_seconds, + "delay_start_time": datetime.now().isoformat(), + } + + # Actually perform the delay + import asyncio + await asyncio.sleep(delay_seconds) + + + # Update node outputs with completion information + node_outputs[node_id]["delay_end_time"] = datetime.now().isoformat() + node_outputs[node_id]["delay_completed"] = True + + yield { + "content": content, + "status": "delay_completed", + "node_outputs": node_outputs, "cycle_count": state.get("cycle_count", 0), "conversation_history": conversation_history, "session_id": session_id, } - + return { "start-node": start_node_function, "agent-node": agent_node_function, "condition-node": condition_node_function, "message-node": message_node_function, + "delay-node": delay_node_function, } def _evaluate_condition(self, condition: Dict[str, Any], state: State) -> bool: diff --git a/src/services/adk/custom_tools.py b/src/services/adk/custom_tools.py index f8ac96b8..0adde66c 100644 --- a/src/services/adk/custom_tools.py +++ b/src/services/adk/custom_tools.py @@ -31,6 +31,7 @@ from typing import Any, Dict, List from google.adk.tools import FunctionTool import requests import json +import urllib.parse from src.utils.logger import setup_logger logger = setup_logger(__name__) @@ -70,7 +71,9 @@ class CustomToolBuilder: url = endpoint for param, value in path_params.items(): if param in all_values: - url = url.replace(f"{{{param}}}", str(all_values[param])) + # URL encode the value for URL safe characters + replacement_value = urllib.parse.quote(str(all_values[param]), safe='') + url = url.replace(f"{{{param}}}", replacement_value) # Process query parameters query_params_dict = {} @@ -119,8 +122,12 @@ class CustomToolBuilder: f"Error in the request: {response.status_code} - {response.text}" ) - # Always returns the response as a string - return json.dumps(response.json()) + # Try to parse the response as JSON, if it fails, return the text content + try: + return json.dumps(response.json()) + except ValueError: + # Response is not JSON, return the text content + return json.dumps({"content": response.text}) except Exception as e: logger.error(f"Error executing tool {name}: {str(e)}") diff --git a/src/services/mcp_server_service.py b/src/services/mcp_server_service.py index 52c1703d..8fea426b 100644 --- a/src/services/mcp_server_service.py +++ b/src/services/mcp_server_service.py @@ -32,6 +32,7 @@ from sqlalchemy.exc import SQLAlchemyError from fastapi import HTTPException, status from src.models.models import MCPServer from src.schemas.schemas import MCPServerCreate +from src.utils.mcp_discovery import discover_mcp_tools from typing import List, Optional import uuid import logging @@ -72,8 +73,16 @@ def create_mcp_server(db: Session, server: MCPServerCreate) -> MCPServer: try: # Convert tools to JSON serializable format server_data = server.model_dump() - server_data["tools"] = [tool.model_dump() for tool in server.tools] + # Last edited by Arley Peter on 2025-05-17 + supplied_tools = server_data.pop("tools", []) + if not supplied_tools: + discovered = discover_mcp_tools(server_data["config_json"]) + print(f"🔍 Found {len(discovered)} tools.") + server_data["tools"] = discovered + + else: + server_data["tools"] = [tool.model_dump() for tool in supplied_tools] db_server = MCPServer(**server_data) db.add(db_server) db.commit() diff --git a/src/utils/mcp_discovery.py b/src/utils/mcp_discovery.py new file mode 100644 index 00000000..b45bbb78 --- /dev/null +++ b/src/utils/mcp_discovery.py @@ -0,0 +1,55 @@ +""" +┌──────────────────────────────────────────────────────────────────────────────┐ +│ @author: Arley Peter │ +│ @file: mcp_discovery.py │ +│ Developed by: Arley Peter │ +│ Creation date: May 05, 2025 │ +├──────────────────────────────────────────────────────────────────────────────┤ +│ @copyright © Evolution API 2025. All rights reserved. │ +│ Licensed under the Apache License, Version 2.0 │ +│ │ +│ You may not use this file except in compliance with the License. │ +│ You may obtain a copy of the License at │ +│ │ +│ http://www.apache.org/licenses/LICENSE-2.0 │ +│ │ +│ Unless required by applicable law or agreed to in writing, software │ +│ distributed under the License is distributed on an "AS IS" BASIS, │ +│ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. │ +│ See the License for the specific language governing permissions and │ +│ limitations under the License. │ +├──────────────────────────────────────────────────────────────────────────────┤ +│ @important │ +│ For any future changes to the code in this file, it is recommended to │ +│ include, together with the modification, the information of the developer │ +│ who changed it and the date of modification. │ +└──────────────────────────────────────────────────────────────────────────────┘ +""" + +from typing import List, Dict, Any +import asyncio + +async def _discover_async(config_json: Dict[str, Any]) -> List[Dict[str, Any]]: + """Return a list[dict] with the tool metadata advertised by the MCP server.""" + + from src.services.mcp_service import MCPService + + service = MCPService() + tools, exit_stack = await service._connect_to_mcp_server(config_json) + serialised = [t.to_dict() if hasattr(t, "to_dict") else { + "id": t.name, + "name": t.name, + "description": getattr(t, "description", t.name), + "tags": getattr(t, "tags", []), + "examples": getattr(t, "examples", []), + "inputModes": getattr(t, "input_modes", ["text"]), + "outputModes": getattr(t, "output_modes", ["text"]), + } for t in tools] + if exit_stack: + await exit_stack.aclose() + return serialised + + +def discover_mcp_tools(config_json: Dict[str, Any]) -> List[Dict[str, Any]]: + """Sync wrapper so we can call it from a sync service function.""" + return asyncio.run(_discover_async(config_json))