From 2bac2b3824d54e7936102ee5a97d2766b8625ead Mon Sep 17 00:00:00 2001 From: Davidson Gomes Date: Tue, 13 May 2025 17:50:14 -0300 Subject: [PATCH] chore: update author information and file names in multiple files --- conftest.py | 41 ++- scripts/seeders/admin_seeder.py | 2 +- scripts/seeders/client_seeder.py | 2 +- scripts/seeders/mcp_server_seeder.py | 2 +- scripts/seeders/tool_seeder.py | 2 +- src/api/a2a_routes.py | 2 +- src/api/admin_routes.py | 2 +- src/api/agent_routes.py | 2 +- src/api/auth_routes.py | 2 +- src/api/chat_routes.py | 15 +- src/api/client_routes.py | 2 +- src/api/mcp_server_routes.py | 2 +- src/api/session_routes.py | 2 +- src/api/tool_routes.py | 2 +- src/config/database.py | 2 +- src/config/redis.py | 2 +- src/config/settings.py | 2 +- src/core/exceptions.py | 2 +- src/core/jwt_middleware.py | 2 +- src/main.py | 2 +- src/models/models.py | 2 +- src/schemas/a2a_types.py | 2 +- src/schemas/agent_config.py | 2 +- src/schemas/audit.py | 2 +- src/schemas/chat.py | 2 +- src/schemas/schemas.py | 2 +- src/schemas/streaming.py | 2 +- src/schemas/user.py | 2 +- src/services/a2a_agent.py | 2 +- src/services/a2a_task_manager.py | 470 +++++++++++++++--------- src/services/agent_builder.py | 2 +- src/services/agent_runner.py | 72 +++- src/services/agent_service.py | 2 +- src/services/apikey_service.py | 2 +- src/services/audit_service.py | 2 +- src/services/auth_service.py | 2 +- src/services/client_service.py | 2 +- src/services/custom_tools.py | 2 +- src/services/email_service.py | 2 +- src/services/mcp_service.py | 2 +- src/services/service_providers.py | 2 +- src/services/session_service.py | 2 +- src/services/tool_service.py | 2 +- src/services/user_service.py | 2 +- src/services/workflow_agent.py | 512 +++++++++++++++------------ src/utils/a2a_utils.py | 2 +- src/utils/crypto.py | 2 +- src/utils/logger.py | 2 +- src/utils/otel.py | 2 +- src/utils/security.py | 2 +- src/utils/streaming.py | 2 +- 51 files changed, 724 insertions(+), 478 deletions(-) diff --git a/conftest.py b/conftest.py index 83263505..02462cce 100644 --- a/conftest.py +++ b/conftest.py @@ -1,3 +1,32 @@ +""" +┌──────────────────────────────────────────────────────────────────────────────┐ +│ @author: Davidson Gomes │ +│ @file: conftest.py │ +│ Developed by: Davidson Gomes │ +│ Creation date: May 13, 2025 │ +│ Contact: contato@evolution-api.com │ +├──────────────────────────────────────────────────────────────────────────────┤ +│ @copyright © Evolution API 2025. All rights reserved. │ +│ Licensed under the Apache License, Version 2.0 │ +│ │ +│ You may not use this file except in compliance with the License. │ +│ You may obtain a copy of the License at │ +│ │ +│ http://www.apache.org/licenses/LICENSE-2.0 │ +│ │ +│ Unless required by applicable law or agreed to in writing, software │ +│ distributed under the License is distributed on an "AS IS" BASIS, │ +│ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. │ +│ See the License for the specific language governing permissions and │ +│ limitations under the License. │ +├──────────────────────────────────────────────────────────────────────────────┤ +│ @important │ +│ For any future changes to the code in this file, it is recommended to │ +│ include, together with the modification, the information of the developer │ +│ who changed it and the date of modification. │ +└──────────────────────────────────────────────────────────────────────────────┘ +""" + import pytest from fastapi.testclient import TestClient from sqlalchemy import create_engine @@ -22,11 +51,11 @@ TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engin def db_session(): """Creates a fresh database session for each test.""" Base.metadata.create_all(bind=engine) # Create tables - + connection = engine.connect() transaction = connection.begin() session = TestingSessionLocal(bind=connection) - + # Use our test database instead of the standard one def override_get_db(): try: @@ -34,11 +63,11 @@ def db_session(): session.commit() finally: session.close() - + app.dependency_overrides[get_db] = override_get_db - + yield session # The test will run here - + # Teardown transaction.rollback() connection.close() @@ -50,4 +79,4 @@ def db_session(): def client(db_session): """Creates a FastAPI TestClient with database session fixture.""" with TestClient(app) as test_client: - yield test_client \ No newline at end of file + yield test_client diff --git a/scripts/seeders/admin_seeder.py b/scripts/seeders/admin_seeder.py index 49ae9958..7d8cd9cb 100644 --- a/scripts/seeders/admin_seeder.py +++ b/scripts/seeders/admin_seeder.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: admin_seeder.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/scripts/seeders/client_seeder.py b/scripts/seeders/client_seeder.py index 061762ec..523879ad 100644 --- a/scripts/seeders/client_seeder.py +++ b/scripts/seeders/client_seeder.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: client_seeder.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/scripts/seeders/mcp_server_seeder.py b/scripts/seeders/mcp_server_seeder.py index dc08a5cc..fdaeee80 100644 --- a/scripts/seeders/mcp_server_seeder.py +++ b/scripts/seeders/mcp_server_seeder.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: mcp_server_seeder.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/scripts/seeders/tool_seeder.py b/scripts/seeders/tool_seeder.py index 3ca709c6..4962ad89 100644 --- a/scripts/seeders/tool_seeder.py +++ b/scripts/seeders/tool_seeder.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: tool_seeder.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/api/a2a_routes.py b/src/api/a2a_routes.py index 45fb9dfc..b3d7d6d4 100644 --- a/src/api/a2a_routes.py +++ b/src/api/a2a_routes.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: a2a_routes.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/api/admin_routes.py b/src/api/admin_routes.py index 9506e996..3173cb4a 100644 --- a/src/api/admin_routes.py +++ b/src/api/admin_routes.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: admin_routes.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/api/agent_routes.py b/src/api/agent_routes.py index 7f2890b7..2b0d411a 100644 --- a/src/api/agent_routes.py +++ b/src/api/agent_routes.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: agent_routes.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/api/auth_routes.py b/src/api/auth_routes.py index 733e36ca..71ae7237 100644 --- a/src/api/auth_routes.py +++ b/src/api/auth_routes.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: auth_routes.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/api/chat_routes.py b/src/api/chat_routes.py index bdd7d048..f42d3d54 100644 --- a/src/api/chat_routes.py +++ b/src/api/chat_routes.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: chat_routes.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ @@ -84,8 +84,8 @@ async def websocket_chat( auth_data = await websocket.receive_json() logger.info(f"Received authentication data: {auth_data}") - if not auth_data.get("type") == "authorization" or not auth_data.get( - "token" + if not ( + auth_data.get("type") == "authorization" and auth_data.get("token") ): logger.warning("Invalid authentication message") await websocket.close(code=status.WS_1008_POLICY_VIOLATION) @@ -188,7 +188,7 @@ async def chat( await verify_user_client(payload, db, agent.client_id) try: - final_response_text = await run_agent( + final_response = await run_agent( request.agent_id, request.external_id, request.message, @@ -199,14 +199,15 @@ async def chat( ) return { - "response": final_response_text, + "response": final_response["final_response"], + "message_history": final_response["message_history"], "status": "success", "timestamp": datetime.now().isoformat(), } except AgentNotFoundError as e: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e)) + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e)) from e except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) - ) + ) from e diff --git a/src/api/client_routes.py b/src/api/client_routes.py index 3a49520b..a08ebf39 100644 --- a/src/api/client_routes.py +++ b/src/api/client_routes.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: client_routes.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/api/mcp_server_routes.py b/src/api/mcp_server_routes.py index af9302fe..143ae8ce 100644 --- a/src/api/mcp_server_routes.py +++ b/src/api/mcp_server_routes.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: mcp_server_routes.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/api/session_routes.py b/src/api/session_routes.py index ac5fee58..2961293f 100644 --- a/src/api/session_routes.py +++ b/src/api/session_routes.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: session_routes.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/api/tool_routes.py b/src/api/tool_routes.py index a80bd33f..35b94e2a 100644 --- a/src/api/tool_routes.py +++ b/src/api/tool_routes.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: tool_routes.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/config/database.py b/src/config/database.py index 112d4a34..80033c54 100644 --- a/src/config/database.py +++ b/src/config/database.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: database.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/config/redis.py b/src/config/redis.py index 0d499ce4..de5787bd 100644 --- a/src/config/redis.py +++ b/src/config/redis.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: redis.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/config/settings.py b/src/config/settings.py index 737b236d..eb885e73 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: settings.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/core/exceptions.py b/src/core/exceptions.py index 1b9b62f7..45c64455 100644 --- a/src/core/exceptions.py +++ b/src/core/exceptions.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: exceptions.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/core/jwt_middleware.py b/src/core/jwt_middleware.py index c55efa70..ada56072 100644 --- a/src/core/jwt_middleware.py +++ b/src/core/jwt_middleware.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: jwt_middleware.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/main.py b/src/main.py index 12ae3d98..32fadb18 100644 --- a/src/main.py +++ b/src/main.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: main.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/models/models.py b/src/models/models.py index 8f70856a..79ac2945 100644 --- a/src/models/models.py +++ b/src/models/models.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: models.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/schemas/a2a_types.py b/src/schemas/a2a_types.py index b6304453..c63b761f 100644 --- a/src/schemas/a2a_types.py +++ b/src/schemas/a2a_types.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: a2a_types.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/schemas/agent_config.py b/src/schemas/agent_config.py index 7ad58fbe..0b3c7ff2 100644 --- a/src/schemas/agent_config.py +++ b/src/schemas/agent_config.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: agent_config.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/schemas/audit.py b/src/schemas/audit.py index f9cdda03..47dd98f6 100644 --- a/src/schemas/audit.py +++ b/src/schemas/audit.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: audit.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/schemas/chat.py b/src/schemas/chat.py index 690487c2..696860a8 100644 --- a/src/schemas/chat.py +++ b/src/schemas/chat.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: chat.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/schemas/schemas.py b/src/schemas/schemas.py index 4ed64d36..506e1f05 100644 --- a/src/schemas/schemas.py +++ b/src/schemas/schemas.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: schemas.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/schemas/streaming.py b/src/schemas/streaming.py index 8230a6e5..ee479a83 100644 --- a/src/schemas/streaming.py +++ b/src/schemas/streaming.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: streaming.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/schemas/user.py b/src/schemas/user.py index d0e70861..21d0eb80 100644 --- a/src/schemas/user.py +++ b/src/schemas/user.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: user.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/services/a2a_agent.py b/src/services/a2a_agent.py index abc7e291..020179be 100644 --- a/src/services/a2a_agent.py +++ b/src/services/a2a_agent.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: a2a_agent.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/services/a2a_task_manager.py b/src/services/a2a_task_manager.py index d1e15870..581a2a26 100644 --- a/src/services/a2a_task_manager.py +++ b/src/services/a2a_task_manager.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: a2a_task_manager.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ @@ -266,29 +266,31 @@ class A2ATaskManager: ) -> JSONRPCResponse: """Processes a task using the specified agent.""" task_params = request.params - query = self._extract_user_query(task_params) - try: - # Process the query with the agent - result = await self._run_agent(agent, query, task_params.sessionId) + query = self._extract_user_query(task_params) + result_obj = await self._run_agent(agent, query, task_params.sessionId) - # Create the response part - text_part = {"type": "text", "text": result} - parts = [text_part] - agent_message = Message(role="agent", parts=parts) - - # Determine the task state - task_state = ( - TaskState.INPUT_REQUIRED - if "MISSING_INFO:" in result - else TaskState.COMPLETED + all_messages = await self._extract_messages_from_history( + result_obj.get("message_history", []) ) - # Update the task in the store + result = result_obj["final_response"] + agent_message = self._create_result_message(result) + + if not all_messages and result: + all_messages.append(agent_message) + + task_state = self._determine_task_state(result) + artifact = Artifact(parts=agent_message.parts, index=0) + task = await self.update_store( task_params.id, TaskStatus(state=task_state, message=agent_message), - [Artifact(parts=parts, index=0)], + [artifact], + ) + + await self._update_task_history( + task_params.id, task_params.message, all_messages ) return SendTaskResponse(id=request.id, result=task) @@ -299,12 +301,73 @@ class A2ATaskManager: error=InternalError(message=f"Error processing task: {str(e)}"), ) + async def _extract_messages_from_history(self, agent_history): + """Extracts messages from the agent history.""" + all_messages = [] + for message_event in agent_history: + try: + if ( + not isinstance(message_event, dict) + or "content" not in message_event + ): + continue + + content = message_event.get("content", {}) + if not isinstance(content, dict): + continue + + role = content.get("role", "agent") + if role not in ["user", "agent"]: + role = "agent" + + parts = content.get("parts", []) + if not parts: + continue + + if valid_parts := self._validate_message_parts(parts): + agent_message = Message(role=role, parts=valid_parts) + all_messages.append(agent_message) + except Exception as e: + logger.error(f"Error processing message history: {e}") + return all_messages + + def _validate_message_parts(self, parts): + """Validates and formats message parts.""" + valid_parts = [] + for part in parts: + if isinstance(part, dict): + if "type" not in part and "text" in part: + part["type"] = "text" + valid_parts.append(part) + elif "type" in part: + valid_parts.append(part) + return valid_parts + + def _create_result_message(self, result): + """Creates a message from the result.""" + text_part = {"type": "text", "text": result} + return Message(role="agent", parts=[text_part]) + + def _determine_task_state(self, result): + """Determines the task state based on the result.""" + return ( + TaskState.INPUT_REQUIRED + if "MISSING_INFO:" in result + else TaskState.COMPLETED + ) + + async def _update_task_history(self, task_id, user_message, agent_messages): + """Updates the task history.""" + async with self.lock: + if task_id in self.tasks: + task = self.tasks[task_id] + task.history = [user_message] + agent_messages + async def _stream_task_process( self, request: SendTaskStreamingRequest, agent: Agent ) -> AsyncIterable[SendTaskStreamingResponse]: """Processes a task in streaming mode using the specified agent.""" - task_params = request.params - query = self._extract_user_query(task_params) + query = self._extract_user_query(request.params) try: # Send initial processing status @@ -316,14 +379,14 @@ class A2ATaskManager: # Update the task with the processing message and inform the WORKING state await self.update_store( - task_params.id, + request.params.id, TaskStatus(state=TaskState.WORKING, message=processing_message), ) yield SendTaskStreamingResponse( id=request.id, result=TaskStatusUpdateEvent( - id=task_params.id, + id=request.params.id, status=TaskStatus( state=TaskState.WORKING, message=processing_message, @@ -332,11 +395,11 @@ class A2ATaskManager: ), ) - # Collect the chunks of the agent's response - external_id = task_params.sessionId + external_id = request.params.sessionId full_response = "" - # Use the same pattern as chat_routes.py: deserialize each chunk + final_message = None + async for chunk in run_agent_stream( agent_id=str(agent.id), external_id=external_id, @@ -352,29 +415,54 @@ class A2ATaskManager: logger.warning(f"Invalid chunk received: {chunk} - {e}") continue - # The chunk_data must be a dict with 'type' and 'text' (or other expected format) - update_message = Message(role="agent", parts=[chunk_data]) + if ( + isinstance(chunk_data, dict) + and "type" in chunk_data + and chunk_data["type"] + in [ + "history", + "history_update", + "history_complete", + ] + ): + continue - # Update the task with each intermediate message - await self.update_store( - task_params.id, - TaskStatus(state=TaskState.WORKING, message=update_message), - ) + if isinstance(chunk_data, dict): + if "type" not in chunk_data and "text" in chunk_data: + chunk_data["type"] = "text" - yield SendTaskStreamingResponse( - id=request.id, - result=TaskStatusUpdateEvent( - id=task_params.id, - status=TaskStatus( - state=TaskState.WORKING, - message=update_message, - ), - final=False, - ), - ) - # If it's text, accumulate for the final response - if chunk_data.get("type") == "text": - full_response += chunk_data.get("text", "") + if "type" in chunk_data: + try: + update_message = Message(role="agent", parts=[chunk_data]) + + await self.update_store( + request.params.id, + TaskStatus( + state=TaskState.WORKING, message=update_message + ), + update_history=False, + ) + + yield SendTaskStreamingResponse( + id=request.id, + result=TaskStatusUpdateEvent( + id=request.params.id, + status=TaskStatus( + state=TaskState.WORKING, + message=update_message, + ), + final=False, + ), + ) + + if chunk_data.get("type") == "text": + full_response += chunk_data.get("text", "") + final_message = update_message + + except Exception as e: + logger.error( + f"Error processing chunk: {e}, chunk: {chunk_data}" + ) # Determine the final state of the task task_state = ( @@ -383,15 +471,16 @@ class A2ATaskManager: else TaskState.COMPLETED ) - # Create the final response - final_text_part = {"type": "text", "text": full_response} - parts = [final_text_part] - final_message = Message(role="agent", parts=parts) - final_artifact = Artifact(parts=parts, index=0) + # Create the final response if we don't have one yet + if not final_message: + final_text_part = {"type": "text", "text": full_response} + parts = [final_text_part] + final_message = Message(role="agent", parts=parts) - # Update the task with the final response - await self.update_store( - task_params.id, + final_artifact = Artifact(parts=final_message.parts, index=0) + + task = await self.update_store( + request.params.id, TaskStatus(state=task_state, message=final_message), [final_artifact], ) @@ -400,7 +489,7 @@ class A2ATaskManager: yield SendTaskStreamingResponse( id=request.id, result=TaskArtifactUpdateEvent( - id=task_params.id, artifact=final_artifact + id=request.params.id, artifact=final_artifact ), ) @@ -408,7 +497,7 @@ class A2ATaskManager: yield SendTaskStreamingResponse( id=request.id, result=TaskStatusUpdateEvent( - id=task_params.id, + id=request.params.id, status=TaskStatus(state=task_state), final=True, ), @@ -425,6 +514,7 @@ class A2ATaskManager: task_id: str, status: TaskStatus, artifacts: Optional[list[Artifact]] = None, + update_history: bool = True, ) -> Task: """Updates the status and artifacts of a task.""" async with self.lock: @@ -434,8 +524,8 @@ class A2ATaskManager: task = self.tasks[task_id] task.status = status - # Add message to history if it exists - if status.message is not None: + # Add message to history if it exists and update_history is True + if status.message is not None and update_history: if task.history is None: task.history = [] task.history.append(status.message) @@ -458,27 +548,22 @@ class A2ATaskManager: return part.text - async def _run_agent(self, agent: Agent, query: str, session_id: str) -> str: + async def _run_agent(self, agent: Agent, query: str, session_id: str) -> dict: """Executes the agent to process the user query.""" try: - # We use the session_id as external_id to maintain the conversation continuity - external_id = session_id - # We call the same function used in the chat API - final_response = await run_agent( + return await run_agent( agent_id=str(agent.id), - external_id=external_id, + external_id=session_id, message=query, session_service=session_service, artifacts_service=artifacts_service, memory_service=memory_service, db=self.db, ) - - return final_response except Exception as e: logger.error(f"Error running agent: {e}") - raise ValueError(f"Error running agent: {str(e)}") + raise ValueError(f"Error running agent: {str(e)}") from e def append_task_history(self, task: Task, history_length: int | None) -> Task: """Returns a copy of the task with the history limited to the specified size.""" @@ -486,11 +571,16 @@ class A2ATaskManager: new_task = task.model_copy() # Limit the history if requested - if history_length is not None: + if history_length is not None and new_task.history: if history_length > 0: - new_task.history = ( - new_task.history[-history_length:] if new_task.history else [] - ) + if len(new_task.history) > history_length: + user_message = new_task.history[0] + recent_messages = ( + new_task.history[-(history_length - 1) :] + if history_length > 1 + else [] + ) + new_task.history = [user_message] + recent_messages else: new_task.history = [] @@ -546,112 +636,11 @@ class A2AService: if not agent: raise ValueError(f"Agent {agent_id} not found") - # Build the agent card based on the agent's information - capabilities = AgentCapabilities(streaming=True) + capabilities = AgentCapabilities( + streaming=True, pushNotifications=False, stateTransitionHistory=True + ) - # List to store all skills - skills = [] - - # Check if the agent has MCP servers configured - if ( - agent.config - and "mcp_servers" in agent.config - and agent.config["mcp_servers"] - ): - logger.info( - f"Agent {agent_id} has {len(agent.config['mcp_servers'])} MCP servers configured" - ) - - for mcp_config in agent.config["mcp_servers"]: - # Get the MCP server - mcp_server_id = mcp_config.get("id") - if not mcp_server_id: - logger.warning("MCP server configuration missing ID") - continue - - logger.info(f"Processing MCP server: {mcp_server_id}") - mcp_server = get_mcp_server(self.db, mcp_server_id) - if not mcp_server: - logger.warning(f"MCP server {mcp_server_id} not found") - continue - - # Get the available tools in the MCP server - mcp_tools = mcp_config.get("tools", []) - logger.info(f"MCP server {mcp_server.name} has tools: {mcp_tools}") - - # Add server tools as skills - for tool_name in mcp_tools: - logger.info(f"Processing tool: {tool_name}") - - tool_info = None - if hasattr(mcp_server, "tools") and isinstance( - mcp_server.tools, list - ): - for tool in mcp_server.tools: - if isinstance(tool, dict) and tool.get("id") == tool_name: - tool_info = tool - logger.info( - f"Found tool info for {tool_name}: {tool_info}" - ) - break - - if tool_info: - # Use the information from the tool - skill = AgentSkill( - id=tool_info.get("id", f"{agent.id}_{tool_name}"), - name=tool_info.get("name", tool_name), - description=tool_info.get( - "description", f"Tool: {tool_name}" - ), - tags=tool_info.get( - "tags", [mcp_server.name, "tool", tool_name] - ), - examples=tool_info.get("examples", []), - inputModes=tool_info.get("inputModes", ["text"]), - outputModes=tool_info.get("outputModes", ["text"]), - ) - else: - # Default skill if tool info not found - skill = AgentSkill( - id=f"{agent.id}_{tool_name}", - name=tool_name, - description=f"Tool: {tool_name}", - tags=[mcp_server.name, "tool", tool_name], - examples=[], - inputModes=["text"], - outputModes=["text"], - ) - - skills.append(skill) - logger.info(f"Added skill for tool: {tool_name}") - - # Check custom tools - if ( - agent.config - and "custom_tools" in agent.config - and agent.config["custom_tools"] - ): - custom_tools = agent.config["custom_tools"] - - # Check HTTP tools - if "http_tools" in custom_tools and custom_tools["http_tools"]: - logger.info(f"Agent has {len(custom_tools['http_tools'])} HTTP tools") - for http_tool in custom_tools["http_tools"]: - skill = AgentSkill( - id=f"{agent.id}_http_{http_tool['name']}", - name=http_tool["name"], - description=http_tool.get( - "description", f"HTTP Tool: {http_tool['name']}" - ), - tags=http_tool.get( - "tags", ["http", "custom_tool", http_tool["method"]] - ), - examples=http_tool.get("examples", []), - inputModes=http_tool.get("inputModes", ["text"]), - outputModes=http_tool.get("outputModes", ["text"]), - ) - skills.append(skill) - logger.info(f"Added skill for HTTP tool: {http_tool['name']}") + skills = self._get_agent_skills(agent) card = AgentCard( name=agent.name, @@ -674,3 +663,134 @@ class A2AService: logger.info(f"Generated agent card with {len(skills)} skills") return card + + def _get_agent_skills(self, agent: Agent) -> list[AgentSkill]: + """Extracts the skills of an agent based on its configuration.""" + skills = [] + + if self._has_mcp_servers(agent): + skills.extend(self._get_mcp_server_skills(agent)) + + if self._has_custom_tools(agent): + skills.extend(self._get_custom_tool_skills(agent)) + + return skills + + def _has_mcp_servers(self, agent: Agent) -> bool: + """Checks if the agent has MCP servers configured.""" + return ( + agent.config + and "mcp_servers" in agent.config + and agent.config["mcp_servers"] + ) + + def _has_custom_tools(self, agent: Agent) -> bool: + """Checks if the agent has custom tools configured.""" + return ( + agent.config + and "custom_tools" in agent.config + and agent.config["custom_tools"] + ) + + def _get_mcp_server_skills(self, agent: Agent) -> list[AgentSkill]: + """Gets the skills of the MCP servers configured for the agent.""" + skills = [] + logger.info( + f"Agent {agent.id} has {len(agent.config['mcp_servers'])} MCP servers configured" + ) + + for mcp_config in agent.config["mcp_servers"]: + mcp_server_id = mcp_config.get("id") + if not mcp_server_id: + logger.warning("MCP server configuration missing ID") + continue + + mcp_server = get_mcp_server(self.db, mcp_server_id) + if not mcp_server: + logger.warning(f"MCP server {mcp_server_id} not found") + continue + + skills.extend(self._extract_mcp_tool_skills(agent, mcp_server, mcp_config)) + + return skills + + def _extract_mcp_tool_skills( + self, agent: Agent, mcp_server, mcp_config + ) -> list[AgentSkill]: + """Extracts skills from MCP tools.""" + skills = [] + mcp_tools = mcp_config.get("tools", []) + logger.info(f"MCP server {mcp_server.name} has tools: {mcp_tools}") + + for tool_name in mcp_tools: + tool_info = self._find_tool_info(mcp_server, tool_name) + skill = self._create_tool_skill( + agent, tool_name, tool_info, mcp_server.name + ) + skills.append(skill) + logger.info(f"Added skill for tool: {tool_name}") + + return skills + + def _find_tool_info(self, mcp_server, tool_name) -> dict: + """Finds information about a tool in an MCP server.""" + if not hasattr(mcp_server, "tools") or not isinstance(mcp_server.tools, list): + return None + + for tool in mcp_server.tools: + if isinstance(tool, dict) and tool.get("id") == tool_name: + logger.info(f"Found tool info for {tool_name}: {tool}") + return tool + + return None + + def _create_tool_skill( + self, agent: Agent, tool_name: str, tool_info: dict, server_name: str + ) -> AgentSkill: + """Creates an AgentSkill object based on the tool information.""" + if tool_info: + return AgentSkill( + id=tool_info.get("id", f"{agent.id}_{tool_name}"), + name=tool_info.get("name", tool_name), + description=tool_info.get("description", f"Tool: {tool_name}"), + tags=tool_info.get("tags", [server_name, "tool", tool_name]), + examples=tool_info.get("examples", []), + inputModes=tool_info.get("inputModes", ["text"]), + outputModes=tool_info.get("outputModes", ["text"]), + ) + else: + return AgentSkill( + id=f"{agent.id}_{tool_name}", + name=tool_name, + description=f"Tool: {tool_name}", + tags=[server_name, "tool", tool_name], + examples=[], + inputModes=["text"], + outputModes=["text"], + ) + + def _get_custom_tool_skills(self, agent: Agent) -> list[AgentSkill]: + """Gets the skills of the custom tools of the agent.""" + skills = [] + custom_tools = agent.config["custom_tools"] + + if "http_tools" in custom_tools and custom_tools["http_tools"]: + logger.info(f"Agent has {len(custom_tools['http_tools'])} HTTP tools") + for http_tool in custom_tools["http_tools"]: + skill = AgentSkill( + id=f"{agent.id}_http_{http_tool['name']}", + name=http_tool["name"], + description=http_tool.get( + "description", f"HTTP Tool: {http_tool['name']}" + ), + tags=http_tool.get( + "tags", ["http", "custom_tool", http_tool["method"]] + ), + examples=http_tool.get("examples", []), + inputModes=http_tool.get("inputModes", ["text"]), + outputModes=http_tool.get("outputModes", ["text"]), + ) + skills.append(skill) + logger.info(f"Added skill for HTTP tool: {http_tool['name']}") + + return skills diff --git a/src/services/agent_builder.py b/src/services/agent_builder.py index b5272a3e..04a2dacb 100644 --- a/src/services/agent_builder.py +++ b/src/services/agent_builder.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: agent_builder.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/services/agent_runner.py b/src/services/agent_runner.py index 4a87eac9..1d330754 100644 --- a/src/services/agent_runner.py +++ b/src/services/agent_runner.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: agent_runner.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ @@ -94,7 +94,7 @@ async def run_agent( artifact_service=artifacts_service, memory_service=memory_service, ) - adk_session_id = external_id + "_" + agent_id + adk_session_id = f"{external_id}_{agent_id}" if session_id is None: session_id = adk_session_id @@ -117,6 +117,8 @@ async def run_agent( logger.info("Starting agent execution") final_response_text = "No final response captured." + message_history = [] + try: response_queue = asyncio.Queue() execution_completed = asyncio.Event() @@ -133,6 +135,11 @@ async def run_agent( all_responses = [] async for event in events_async: + if event.content and event.content.parts: + event_dict = event.dict() + event_dict = convert_sets(event_dict) + message_history.append(event_dict) + if ( event.content and event.content.parts @@ -205,10 +212,13 @@ async def run_agent( except Exception as e: logger.error(f"Error processing request: {str(e)}") - raise e + raise InternalServerError(str(e)) from e logger.info("Agent execution completed successfully") - return final_response_text + return { + "final_response": final_response_text, + "message_history": message_history, + } except AgentNotFoundError as e: logger.error(f"Error processing request: {str(e)}") raise e @@ -285,7 +295,7 @@ async def run_agent_stream( artifact_service=artifacts_service, memory_service=memory_service, ) - adk_session_id = external_id + "_" + agent_id + adk_session_id = f"{external_id}_{agent_id}" if session_id is None: session_id = adk_session_id @@ -315,9 +325,51 @@ async def run_agent_stream( ) async for event in events_async: - event_dict = event.dict() - event_dict = convert_sets(event_dict) - yield json.dumps(event_dict) + try: + event_dict = event.dict() + event_dict = convert_sets(event_dict) + + if "content" in event_dict and event_dict["content"]: + content = event_dict["content"] + + if "role" not in content or content["role"] not in [ + "user", + "agent", + ]: + content["role"] = "agent" + + if "parts" in content and content["parts"]: + valid_parts = [] + for part in content["parts"]: + if isinstance(part, dict): + if "type" not in part and "text" in part: + part["type"] = "text" + valid_parts.append(part) + elif "type" in part: + valid_parts.append(part) + + if valid_parts: + content["parts"] = valid_parts + else: + content["parts"] = [ + { + "type": "text", + "text": "Content without valid format", + } + ] + else: + content["parts"] = [ + { + "type": "text", + "text": "Content without parts", + } + ] + + # Send the individual event + yield json.dumps(event_dict) + except Exception as e: + logger.error(f"Error processing event: {e}") + continue completed_session = session_service.get_session( app_name=agent_id, @@ -328,7 +380,7 @@ async def run_agent_stream( memory_service.add_session_to_memory(completed_session) except Exception as e: logger.error(f"Error processing request: {str(e)}") - raise e + raise InternalServerError(str(e)) from e finally: # Clean up MCP connection if exit_stack: @@ -341,7 +393,7 @@ async def run_agent_stream( logger.info("Agent streaming execution completed successfully") except AgentNotFoundError as e: logger.error(f"Error processing request: {str(e)}") - raise e + raise InternalServerError(str(e)) from e except Exception as e: logger.error( f"Internal error processing request: {str(e)}", exc_info=True diff --git a/src/services/agent_service.py b/src/services/agent_service.py index 1ac8db10..927abaf4 100644 --- a/src/services/agent_service.py +++ b/src/services/agent_service.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: agent_service.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/services/apikey_service.py b/src/services/apikey_service.py index 20912ad4..60969523 100644 --- a/src/services/apikey_service.py +++ b/src/services/apikey_service.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: apikey_service.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/services/audit_service.py b/src/services/audit_service.py index 1c87cb97..bc6e60e2 100644 --- a/src/services/audit_service.py +++ b/src/services/audit_service.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: audit_service.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/services/auth_service.py b/src/services/auth_service.py index 9e03db16..4b80136d 100644 --- a/src/services/auth_service.py +++ b/src/services/auth_service.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: auth_service.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/services/client_service.py b/src/services/client_service.py index e945ac07..ad4bed08 100644 --- a/src/services/client_service.py +++ b/src/services/client_service.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: client_service.p │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/services/custom_tools.py b/src/services/custom_tools.py index 9102a315..f8ac96b8 100644 --- a/src/services/custom_tools.py +++ b/src/services/custom_tools.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: custom_tools.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/services/email_service.py b/src/services/email_service.py index 4c9caa33..543978c8 100644 --- a/src/services/email_service.py +++ b/src/services/email_service.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: email_service.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/services/mcp_service.py b/src/services/mcp_service.py index 645315d9..44fbd611 100644 --- a/src/services/mcp_service.py +++ b/src/services/mcp_service.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: mcp_service.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/services/service_providers.py b/src/services/service_providers.py index 3cde7da0..07d2d8d3 100644 --- a/src/services/service_providers.py +++ b/src/services/service_providers.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: service_providers.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/services/session_service.py b/src/services/session_service.py index b4ef7959..db08d18a 100644 --- a/src/services/session_service.py +++ b/src/services/session_service.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: session_service.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/services/tool_service.py b/src/services/tool_service.py index a0962deb..87096b41 100644 --- a/src/services/tool_service.py +++ b/src/services/tool_service.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: tool_service.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/services/user_service.py b/src/services/user_service.py index 1c398e1b..23f66e88 100644 --- a/src/services/user_service.py +++ b/src/services/user_service.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: user_service.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/services/workflow_agent.py b/src/services/workflow_agent.py index fc2de4a2..f9ad44e7 100644 --- a/src/services/workflow_agent.py +++ b/src/services/workflow_agent.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: workflow_agent.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ @@ -116,7 +116,7 @@ class WorkflowAgent(BaseAgent): if not content: content = [ Event( - author="agent", + author="workflow_agent", content=Content(parts=[Part(text="Content not found")]), ) ] @@ -168,7 +168,7 @@ class WorkflowAgent(BaseAgent): yield { "content": [ Event( - author="agent", + author="workflow_agent", content=Content(parts=[Part(text="Agent not found")]), ) ], @@ -191,7 +191,7 @@ class WorkflowAgent(BaseAgent): conversation_history.append(event) new_content.append(event) - print(f"New content: {str(new_content)}") + print(f"New content: {new_content}") node_outputs = state.get("node_outputs", {}) node_outputs[node_id] = { @@ -281,7 +281,7 @@ class WorkflowAgent(BaseAgent): condition_content = [ Event( - author="agent", + author="workflow_agent", content=Content(parts=[Part(text="Cycle limit reached")]), ) ] @@ -312,7 +312,7 @@ class WorkflowAgent(BaseAgent): condition_content = [ Event( - author="agent", + author=label, content=Content( parts=[ Part( @@ -346,8 +346,10 @@ class WorkflowAgent(BaseAgent): session_id = state.get("session_id", "") conversation_history = state.get("conversation_history", []) + label = node_data.get("label", "message_node") + new_event = Event( - author="agent", + author=label, content=Content(parts=[Part(text=message_content)]), ) content = content + [new_event] @@ -380,139 +382,160 @@ class WorkflowAgent(BaseAgent): condition_data = condition.get("data", {}) if condition_type == "previous-output": - field = condition_data.get("field") - operator = condition_data.get("operator") - expected_value = condition_data.get("value") + field, operator, expected_value, actual_value = ( + self._extract_condition_values(condition_data, state) + ) - actual_value = state.get(field, "") - - # Special treatment for when content is a list of Events if field == "content" and isinstance(actual_value, list) and actual_value: - # Extract text from each event for comparison - extracted_texts = [] - for event in actual_value: - if hasattr(event, "content") and hasattr(event.content, "parts"): - for part in event.content.parts: - if hasattr(part, "text") and part.text: - extracted_texts.append(part.text) + actual_value = self._extract_text_from_events(actual_value) - if extracted_texts: - actual_value = " ".join(extracted_texts) - print(f" Extracted text from events: '{actual_value[:100]}...'") + result = self._process_condition(operator, actual_value, expected_value) - # Convert values to string for easier comparisons - if actual_value is not None: - actual_str = str(actual_value) - else: - actual_str = "" + print(f" Check '{operator}': {result}") + return result - if expected_value is not None: - expected_str = str(expected_value) - else: - expected_str = "" + return False - # Checks for definition - if operator == "is_defined": - result = actual_value is not None and actual_value != "" - print(f" Check '{operator}': {result}") - return result - elif operator == "is_not_defined": - result = actual_value is None or actual_value == "" - print(f" Check '{operator}': {result}") - return result + def _process_condition(self, operator, actual_value, expected_value): + """Converts values to strings and processes the condition using the appropriate operator.""" + actual_str = str(actual_value) if actual_value is not None else "" + expected_str = str(expected_value) if expected_value is not None else "" - # Checks for equality - elif operator == "equals": - result = actual_str == expected_str - print(f" Check '{operator}': {result}") - return result - elif operator == "not_equals": - result = actual_str != expected_str - print(f" Check '{operator}': {result}") - return result + return self._process_operator(operator, actual_value, actual_str, expected_str) - # Checks for content - elif operator == "contains": - # Convert both to lowercase for case-insensitive comparison - expected_lower = expected_str.lower() - actual_lower = actual_str.lower() - print( - f" Comparison 'contains' without case distinction: '{expected_lower}' in '{actual_lower[:100]}...'" + def _extract_condition_values(self, condition_data, state): + """Extracts field, operator, expected value and actual value from condition data.""" + field = condition_data.get("field") + operator = condition_data.get("operator") + expected_value = condition_data.get("value") + actual_value = state.get(field, "") + + return field, operator, expected_value, actual_value + + def _extract_text_from_events(self, events): + """Extracts text content from a list of events for comparison.""" + extracted_texts = [] + for event in events: + if hasattr(event, "content") and hasattr(event.content, "parts"): + extracted_texts.extend( + [ + part.text + for part in event.content.parts + if hasattr(part, "text") and part.text + ] ) - result = expected_lower in actual_lower - print(f" Check '{operator}': {result}") - return result - elif operator == "not_contains": - expected_lower = expected_str.lower() - actual_lower = actual_str.lower() - print( - f" Comparison 'not_contains' without case distinction: '{expected_lower}' in '{actual_lower[:100]}...'" - ) - result = expected_lower not in actual_lower - print(f" Check '{operator}': {result}") - return result - # Checks for start and end - elif operator == "starts_with": - result = actual_str.lower().startswith(expected_str.lower()) - print(f" Check '{operator}': {result}") - return result - elif operator == "ends_with": - result = actual_str.lower().endswith(expected_str.lower()) - print(f" Check '{operator}': {result}") - return result + if extracted_texts: + joined_text = " ".join(extracted_texts) + print(f" Extracted text from events: '{joined_text[:100]}...'") + return joined_text - # Numeric checks (attempting to convert to number) - elif operator in [ - "greater_than", - "greater_than_or_equal", - "less_than", - "less_than_or_equal", - ]: - try: - actual_num = float(actual_str) if actual_str else 0 - expected_num = float(expected_str) if expected_str else 0 + return "" - if operator == "greater_than": - result = actual_num > expected_num - elif operator == "greater_than_or_equal": - result = actual_num >= expected_num - elif operator == "less_than": - result = actual_num < expected_num - elif operator == "less_than_or_equal": - result = actual_num <= expected_num - print(f" Numeric check '{operator}': {result}") - return result - except (ValueError, TypeError): - # If it's not possible to convert to number, return false - print( - f" Error converting values for numeric comparison: '{actual_str[:100]}...' and '{expected_str}'" - ) - return False + def _process_operator(self, operator, actual_value, actual_str, expected_str): + """Process the operator and return the result of the comparison.""" + # Definition checks + if operator in ["is_defined", "is_not_defined"]: + return self._check_definition(operator, actual_value) - # Checks with regular expressions - elif operator == "matches": - import re + # Equality checks + elif operator in ["equals", "not_equals"]: + return self._check_equality(operator, actual_str, expected_str) - try: - pattern = re.compile(expected_str, re.IGNORECASE) - result = bool(pattern.search(actual_str)) - print(f" Check '{operator}': {result}") - return result - except re.error: - print(f" Error in regular expression: '{expected_str}'") - return False - elif operator == "not_matches": - import re + # Content checks + elif operator in ["contains", "not_contains"]: + return self._case_insensitive_comparison(expected_str, actual_str, operator) - try: - pattern = re.compile(expected_str, re.IGNORECASE) - result = not bool(pattern.search(actual_str)) - print(f" Check '{operator}': {result}") - return result - except re.error: - print(f" Error in regular expression: '{expected_str}'") - return True # If the regex is invalid, we consider that there was no match + # String pattern checks + elif operator in ["starts_with", "ends_with"]: + return self._check_string_pattern(operator, actual_str, expected_str) + + # Numeric checks + elif operator in [ + "greater_than", + "greater_than_or_equal", + "less_than", + "less_than_or_equal", + ]: + return self._check_numeric(operator, actual_str, expected_str) + + # Regex checks + elif operator in ["matches", "not_matches"]: + return self._check_regex(operator, actual_str, expected_str) + + return False + + def _check_definition(self, operator, actual_value): + """Check if a value is defined or not.""" + if operator == "is_defined": + return actual_value is not None and actual_value != "" + else: # is_not_defined + return actual_value is None or actual_value == "" + + def _check_equality(self, operator, actual_str, expected_str): + """Check if two strings are equal or not.""" + return ( + (actual_str == expected_str) + if operator == "equals" + else (actual_str != expected_str) + ) + + def _check_string_pattern(self, operator, actual_str, expected_str): + """Check if a string starts or ends with another string.""" + if operator == "starts_with": + return actual_str.lower().startswith(expected_str.lower()) + else: # ends_with + return actual_str.lower().endswith(expected_str.lower()) + + def _check_numeric(self, operator, actual_str, expected_str): + """Compare numeric values.""" + try: + actual_num = float(actual_str) if actual_str else 0 + expected_num = float(expected_str) if expected_str else 0 + + if operator == "greater_than": + return actual_num > expected_num + elif operator == "greater_than_or_equal": + return actual_num >= expected_num + elif operator == "less_than": + return actual_num < expected_num + else: # less_than_or_equal + return actual_num <= expected_num + except (ValueError, TypeError): + print( + f" Error converting values for numeric comparison: '{actual_str[:100]}...' and '{expected_str}'" + ) + return False + + def _check_regex(self, operator, actual_str, expected_str): + """Check if a string matches a regex pattern.""" + import re + + try: + pattern = re.compile(expected_str, re.IGNORECASE) + if operator == "matches": + return bool(pattern.search(actual_str)) + else: # not_matches + return not bool(pattern.search(actual_str)) + except re.error: + print(f" Error in regular expression: '{expected_str}'") + return ( + operator == "not_matches" + ) # Return True for not_matches, False for matches + + def _case_insensitive_comparison(self, expected_str, actual_str, operator): + """Performs case-insensitive string comparison based on the specified operator.""" + expected_lower = expected_str.lower() + actual_lower = actual_str.lower() + + print( + f" Comparison '{operator}' without case distinction: '{expected_lower}' in '{actual_lower[:100]}...'" + ) + + if operator == "contains": + return expected_lower in actual_lower + elif operator == "not_contains": + return expected_lower not in actual_lower return False @@ -558,38 +581,15 @@ class WorkflowAgent(BaseAgent): conditions = condition_nodes[node_id] any_condition_met = False - for condition in conditions: - condition_id = condition.get("id") - - # Get latest event for evaluation, ignoring condition node informational events - content = state.get("content", []) - latest_event = None - for event in reversed(content): - # Skip events generated by condition nodes - if ( - event.author != "agent" - or not hasattr(event.content, "parts") - or not event.content.parts - ): - latest_event = event - break - - evaluation_state = state.copy() - if latest_event: - evaluation_state["content"] = [latest_event] - - # Check if the condition is met - is_condition_met = self._evaluate_condition( - condition, evaluation_state - ) - - if is_condition_met: + node_outputs = state.get("node_outputs", {}) + if node_id in node_outputs: + conditions_met = node_outputs[node_id].get("conditions_met", []) + if conditions_met: any_condition_met = True + condition_id = conditions_met[0] print( - f"Condition {condition_id} met. Moving to the next node." + f"Using stored condition evaluation result: Condition {condition_id} met." ) - - # Find the connection that uses this condition_id as a handle if ( node_id in edges_map and condition_id in edges_map[node_id] @@ -597,8 +597,49 @@ class WorkflowAgent(BaseAgent): return edges_map[node_id][condition_id] else: print( - f"Condition {condition_id} not met. Continuing evaluation or using default path." + "Using stored condition evaluation result: No conditions met." ) + else: + for condition in conditions: + condition_id = condition.get("id") + + # Get latest event for evaluation, ignoring condition node informational events + content = state.get("content", []) + + # Filter out events generated by condition nodes or informational messages + filtered_content = [] + for event in content: + # Ignore events from condition nodes or that contain evaluation results + if not hasattr(event, "author") or not ( + event.author.startswith("Condition") + or "Condition evaluated:" in str(event) + ): + filtered_content.append(event) + + evaluation_state = state.copy() + evaluation_state["content"] = filtered_content + + # Check if the condition is met + is_condition_met = self._evaluate_condition( + condition, evaluation_state + ) + + if is_condition_met: + any_condition_met = True + print( + f"Condition {condition_id} met. Moving to the next node." + ) + + # Find the connection that uses this condition_id as a handle + if ( + node_id in edges_map + and condition_id in edges_map[node_id] + ): + return edges_map[node_id][condition_id] + else: + print( + f"Condition {condition_id} not met. Continuing evaluation or using default path." + ) # If no condition is met, use the bottom-handle if available if not any_condition_met: @@ -735,91 +776,94 @@ class WorkflowAgent(BaseAgent): async def _run_async_impl( self, ctx: InvocationContext ) -> AsyncGenerator[Event, None]: - """ - Implementation of the workflow agent. - - This method follows the pattern of custom agent implementation, - executing the defined workflow and returning the results. - """ - + """Implementation of the workflow agent executing the defined workflow and returning results.""" try: - # 1. Extract the user message from the context - user_message = None - - # Search for the user message in the session events - if ctx.session and hasattr(ctx.session, "events") and ctx.session.events: - for event in reversed(ctx.session.events): - if event.author == "user" and event.content and event.content.parts: - user_message = event.content.parts[0].text - print("Message found in session events") - break - - # Check in the session state if the message was not found in the events - if not user_message and ctx.session and ctx.session.state: - if "user_message" in ctx.session.state: - user_message = ctx.session.state["user_message"] - elif "message" in ctx.session.state: - user_message = ctx.session.state["message"] - - # 2. Use the session ID as a stable identifier - session_id = ( - str(ctx.session.id) - if ctx.session and hasattr(ctx.session, "id") - else str(uuid.uuid4()) - ) - - # 3. Create the workflow graph from the provided JSON + user_message = await self._extract_user_message(ctx) + session_id = self._get_session_id(ctx) graph = await self._create_graph(ctx, self.flow_json) - - # 4. Prepare the initial state - user_event = Event( - author="user", - content=Content(parts=[Part(text=user_message)]), + initial_state = await self._prepare_initial_state( + ctx, user_message, session_id ) - # If the conversation history is empty, add the user message - conversation_history = ctx.session.events or [] - if not conversation_history or (len(conversation_history) == 0): - conversation_history = [user_event] - - initial_state = State( - content=[user_event], - status="started", - session_id=session_id, - cycle_count=0, - node_outputs={}, - conversation_history=conversation_history, - ) - - # 5. Execute the graph print("\n🚀 Starting workflow execution:") print(f"Initial content: {user_message[:100]}...") - sent_events = 0 # Count of events already sent - - async for state in graph.astream(initial_state, {"recursion_limit": 100}): - # The state can be a dict with the node name as a key - for node_state in state.values(): - content = node_state.get("content", []) - # Only send new events - for event in content[sent_events:]: - if event.author != "user": - yield event - sent_events = len(content) - - # Execute sub-agents - for sub_agent in self.sub_agents: - async for event in sub_agent.run_async(ctx): - yield event + await self._execute_workflow(ctx, graph, initial_state) except Exception as e: - # Handle any uncaught errors - error_msg = f"Error executing the workflow agent: {str(e)}" - print(error_msg) - yield Event( - author=self.name, - content=Content( - role="agent", - parts=[Part(text=error_msg)], - ), - ) + yield await self._handle_workflow_error(e) + + async def _extract_user_message(self, ctx: InvocationContext) -> str: + """Extracts the user message from context session events or state.""" + # Try to find message in session events + if ctx.session and hasattr(ctx.session, "events") and ctx.session.events: + for event in reversed(ctx.session.events): + if event.author == "user" and event.content and event.content.parts: + print("Message found in session events") + return event.content.parts[0].text + + # Try to find message in session state + if ctx.session and ctx.session.state: + if "user_message" in ctx.session.state: + return ctx.session.state["user_message"] + elif "message" in ctx.session.state: + return ctx.session.state["message"] + + return "" + + def _get_session_id(self, ctx: InvocationContext) -> str: + """Gets or generates a session ID.""" + if ctx.session and hasattr(ctx.session, "id"): + return str(ctx.session.id) + return str(uuid.uuid4()) + + async def _prepare_initial_state( + self, ctx: InvocationContext, user_message: str, session_id: str + ) -> State: + """Prepares the initial state for workflow execution.""" + user_event = Event( + author="user", + content=Content(parts=[Part(text=user_message)]), + ) + + conversation_history = ctx.session.events or [user_event] + + return State( + content=[user_event], + status="started", + session_id=session_id, + cycle_count=0, + node_outputs={}, + conversation_history=conversation_history, + ) + + async def _execute_workflow( + self, ctx: InvocationContext, graph: StateGraph, initial_state: State + ) -> AsyncGenerator[Event, None]: + """Executes the workflow graph and yields events.""" + sent_events = 0 + + async for state in graph.astream(initial_state, {"recursion_limit": 100}): + for node_state in state.values(): + content = node_state.get("content", []) + for event in content[sent_events:]: + if event.author != "user": + yield event + sent_events = len(content) + + # Execute sub-agents if any + for sub_agent in self.sub_agents: + async for event in sub_agent.run_async(ctx): + yield event + + async def _handle_workflow_error(self, error: Exception) -> Event: + """Creates an error event for workflow execution errors.""" + error_msg = f"Error executing the workflow agent: {str(error)}" + print(error_msg) + return Event( + author=self.name, + content=Content( + role="agent", + parts=[Part(text=error_msg)], + ), + ) diff --git a/src/utils/a2a_utils.py b/src/utils/a2a_utils.py index ab44bc65..e58b968c 100644 --- a/src/utils/a2a_utils.py +++ b/src/utils/a2a_utils.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: a2a_utils.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/utils/crypto.py b/src/utils/crypto.py index 372e3e79..02886c7e 100644 --- a/src/utils/crypto.py +++ b/src/utils/crypto.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: crypto.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/utils/logger.py b/src/utils/logger.py index f603c376..fd4b455e 100644 --- a/src/utils/logger.py +++ b/src/utils/logger.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: logger.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/utils/otel.py b/src/utils/otel.py index f546ef25..3d5d9a56 100644 --- a/src/utils/otel.py +++ b/src/utils/otel.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: otel.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/utils/security.py b/src/utils/security.py index 3a0f266b..8614e859 100644 --- a/src/utils/security.py +++ b/src/utils/security.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: security.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ diff --git a/src/utils/streaming.py b/src/utils/streaming.py index 5fbbec7e..61fde101 100644 --- a/src/utils/streaming.py +++ b/src/utils/streaming.py @@ -1,7 +1,7 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: run_seeders.py │ +│ @file: streaming.py │ │ Developed by: Davidson Gomes │ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │