From 0dbf6d1c1311668d451e6eae2b69b52040ab280e Mon Sep 17 00:00:00 2001 From: Davidson Gomes Date: Wed, 14 May 2025 08:23:59 -0300 Subject: [PATCH 01/11] feat(agent): add support for CrewAI agents and update related configurations --- CHANGELOG.md | 6 + README.md | 43 ++- ...4e70bb2_add_crew_ai_coluns_agents_table.py | 34 +++ ...2e1_add_crew_ai_agent_type_agents_table.py | 43 +++ pyproject.toml | 1 + src/api/agent_routes.py | 2 + src/models/models.py | 4 +- src/schemas/agent_config.py | 41 +++ src/schemas/schemas.py | 46 ++- src/services/agent_builder.py | 75 ++++- src/services/agent_service.py | 88 ++++++ src/services/custom_agents/__init__.py | 0 src/services/{ => custom_agents}/a2a_agent.py | 0 src/services/custom_agents/crew_ai_agent.py | 266 ++++++++++++++++++ .../{ => custom_agents}/workflow_agent.py | 0 15 files changed, 638 insertions(+), 11 deletions(-) create mode 100644 migrations/versions/611d84e70bb2_add_crew_ai_coluns_agents_table.py create mode 100644 migrations/versions/bdc5d363e2e1_add_crew_ai_agent_type_agents_table.py create mode 100644 src/services/custom_agents/__init__.py rename src/services/{ => custom_agents}/a2a_agent.py (100%) create mode 100644 src/services/custom_agents/crew_ai_agent.py rename src/services/{ => custom_agents}/workflow_agent.py (100%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 95abcbb0..20deee8f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.0.10] - develop + +### Added + +- Add CrewAI agents + ## [0.0.9] - 2025-05-13 ### Added diff --git a/README.md b/README.md index 1e2bc597..76a545f5 100644 --- a/README.md +++ b/README.md @@ -11,9 +11,11 @@ The Evo AI platform allows: - Client management - MCP server configuration - Custom tools management +- **[Google Agent Development Kit (ADK)](https://google.github.io/adk-docs/)**: Base framework for agent development, providing support for LLM Agents, Sequential Agents, Loop Agents, Parallel Agents and Custom Agents - JWT authentication with email verification -- **Agent 2 Agent (A2A) Protocol Support**: Interoperability between AI agents following Google's A2A specification -- **Workflow Agent with LangGraph**: Building complex agent workflows with LangGraph and ReactFlow +- **[Agent 2 Agent (A2A) Protocol Support](https://developers.googleblog.com/en/a2a-a-new-era-of-agent-interoperability/)**: Interoperability between AI agents following Google's A2A specification +- **[Workflow Agent with LangGraph](https://www.langchain.com/langgraph)**: Building complex agent workflows with LangGraph and ReactFlow +- **[CrewAI Agent Support](https://www.crewai.com/)**: Organizing agents into specialized crews with assigned tasks - **Secure API Key Management**: Encrypted storage of API keys with Fernet encryption - **Agent Organization**: Folder structure for organizing agents by categories @@ -30,6 +32,8 @@ Agent based on language models like GPT-4, Claude, etc. Can be configured with t "client_id": "{{client_id}}", "name": "personal_assistant", "description": "Specialized personal assistant", + "role": "Personal Assistant", + "goal": "Help users with daily tasks and provide relevant information", "type": "llm", "model": "gpt-4", "api_key_id": "stored-api-key-uuid", @@ -150,6 +154,39 @@ Executes sub-agents in a custom workflow defined by a graph structure. This agen The workflow structure is built using ReactFlow in the frontend, allowing visual creation and editing of complex agent workflows with nodes (representing agents or decision points) and edges (representing flow connections). +### 7. CrewAI Agent + +Allows organizing agents into a "crew" with specific tasks assigned to each agent. Based on the CrewAI concept, where each agent has a specific responsibility to perform a more complex task collaboratively. + +```json +{ + "client_id": "{{client_id}}", + "name": "research_crew", + "type": "crew_ai", + "folder_id": "folder_id (optional)", + "config": { + "tasks": [ + { + "agent_id": "agent-uuid-1", + "description": "Search for recent information on the topic", + "expected_output": "Search report in JSON format" + }, + { + "agent_id": "agent-uuid-2", + "description": "Analyze data and create visualizations", + "expected_output": "Charts and analyses in HTML format" + }, + { + "agent_id": "agent-uuid-3", + "description": "Write final report combining results", + "expected_output": "Markdown document with complete analysis" + } + ], + "sub_agents": ["agent-uuid-4", "agent-uuid-5"] + } +} +``` + ### Common Characteristics - All agent types can have sub-agents @@ -355,7 +392,7 @@ Evo AI implements the Google's Agent 2 Agent (A2A) protocol, enabling seamless c - **Standardized Communication**: Agents can communicate using a common protocol regardless of their underlying implementation - **Interoperability**: Support for agents built with different frameworks and technologies - **Well-Known Endpoints**: Standardized endpoints for agent discovery and interaction -- **Task Management**: Support for task-based interactions between agents +- **Task Management**: Support for task creation, execution, and status tracking - **State Management**: Tracking of agent states and conversation history - **Authentication**: Secure API key-based authentication for agent interactions diff --git a/migrations/versions/611d84e70bb2_add_crew_ai_coluns_agents_table.py b/migrations/versions/611d84e70bb2_add_crew_ai_coluns_agents_table.py new file mode 100644 index 00000000..f0462d69 --- /dev/null +++ b/migrations/versions/611d84e70bb2_add_crew_ai_coluns_agents_table.py @@ -0,0 +1,34 @@ +"""add_crew_ai_coluns_agents_table + +Revision ID: 611d84e70bb2 +Revises: bdc5d363e2e1 +Create Date: 2025-05-14 07:31:08.741620 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '611d84e70bb2' +down_revision: Union[str, None] = 'bdc5d363e2e1' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('agents', sa.Column('role', sa.String(), nullable=True)) + op.add_column('agents', sa.Column('goal', sa.Text(), nullable=True)) + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column('agents', 'goal') + op.drop_column('agents', 'role') + # ### end Alembic commands ### diff --git a/migrations/versions/bdc5d363e2e1_add_crew_ai_agent_type_agents_table.py b/migrations/versions/bdc5d363e2e1_add_crew_ai_agent_type_agents_table.py new file mode 100644 index 00000000..daeea454 --- /dev/null +++ b/migrations/versions/bdc5d363e2e1_add_crew_ai_agent_type_agents_table.py @@ -0,0 +1,43 @@ +"""add_crew_ai_agent_type_agents_table + +Revision ID: bdc5d363e2e1 +Revises: 6db4a526335b +Create Date: 2025-05-14 06:23:14.701878 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = "bdc5d363e2e1" +down_revision: Union[str, None] = "6db4a526335b" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.drop_constraint("check_agent_type", "agents", type_="check") + op.create_check_constraint( + "check_agent_type", + "agents", + "type IN ('llm', 'sequential', 'parallel', 'loop', 'a2a', 'workflow', 'crew_ai')", + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.drop_constraint("check_agent_type", "agents", type_="check") + op.create_check_constraint( + "check_agent_type", + "agents", + "type IN ('llm', 'sequential', 'parallel', 'loop', 'a2a', 'workflow')", + ) + # ### end Alembic commands ### diff --git a/pyproject.toml b/pyproject.toml index 6109d1f0..811ca6ff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,6 +51,7 @@ dependencies = [ "langgraph==0.4.1", "opentelemetry-sdk==1.33.0", "opentelemetry-exporter-otlp==1.33.0", + "crewai==0.119.0", ] [project.optional-dependencies] diff --git a/src/api/agent_routes.py b/src/api/agent_routes.py index 4369f209..07bcba07 100644 --- a/src/api/agent_routes.py +++ b/src/api/agent_routes.py @@ -24,6 +24,8 @@ │ For any future changes to the code in this file, it is recommended to │ │ include, together with the modification, the information of the developer │ │ who changed it and the date of modification. │ +│ │ +│ @update: May 14, 2025 - Added support for crew_ai agent type │ └──────────────────────────────────────────────────────────────────────────────┘ """ diff --git a/src/models/models.py b/src/models/models.py index 79ac2945..5693b254 100644 --- a/src/models/models.py +++ b/src/models/models.py @@ -100,6 +100,8 @@ class Agent(Base): id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) client_id = Column(UUID(as_uuid=True), ForeignKey("clients.id", ondelete="CASCADE")) name = Column(String, nullable=False) + role = Column(String, nullable=True) + goal = Column(Text, nullable=True) description = Column(Text, nullable=True) type = Column(String, nullable=False) model = Column(String, nullable=True, default="") @@ -121,7 +123,7 @@ class Agent(Base): __table_args__ = ( CheckConstraint( - "type IN ('llm', 'sequential', 'parallel', 'loop', 'a2a', 'workflow')", + "type IN ('llm', 'sequential', 'parallel', 'loop', 'a2a', 'workflow', 'crew_ai')", name="check_agent_type", ), ) diff --git a/src/schemas/agent_config.py b/src/schemas/agent_config.py index 0b3c7ff2..50c95137 100644 --- a/src/schemas/agent_config.py +++ b/src/schemas/agent_config.py @@ -32,6 +32,8 @@ from pydantic import BaseModel, Field from uuid import UUID import secrets import string +import uuid +from pydantic import validator class ToolConfig(BaseModel): @@ -234,3 +236,42 @@ class WorkflowConfig(BaseModel): class Config: from_attributes = True + + +class CrewAITask(BaseModel): + """Task configuration for Crew AI agents""" + + agent_id: Union[UUID, str] = Field( + ..., description="ID of the agent assigned to this task" + ) + description: str = Field(..., description="Description of the task to be performed") + expected_output: str = Field(..., description="Expected output from this task") + + @validator("agent_id") + def validate_agent_id(cls, v): + if isinstance(v, str): + try: + return uuid.UUID(v) + except ValueError: + raise ValueError(f"Invalid UUID format for agent_id: {v}") + return v + + class Config: + from_attributes = True + + +class CrewAIConfig(BaseModel): + """Configuration for Crew AI agents""" + + tasks: List[CrewAITask] = Field( + ..., description="List of tasks to be performed by the crew" + ) + api_key: Optional[str] = Field( + default_factory=generate_api_key, description="API key for the Crew AI agent" + ) + sub_agents: Optional[List[UUID]] = Field( + default_factory=list, description="List of IDs of sub-agents used in crew" + ) + + class Config: + from_attributes = True diff --git a/src/schemas/schemas.py b/src/schemas/schemas.py index 506e1f05..4710e58c 100644 --- a/src/schemas/schemas.py +++ b/src/schemas/schemas.py @@ -33,7 +33,7 @@ from datetime import datetime from uuid import UUID import uuid import re -from src.schemas.agent_config import LLMConfig +from src.schemas.agent_config import LLMConfig, CrewAIConfig class ClientBase(BaseModel): @@ -94,8 +94,11 @@ class AgentBase(BaseModel): None, description="Agent name (no spaces or special characters)" ) description: Optional[str] = Field(None, description="Agent description") + role: Optional[str] = Field(None, description="Agent role in the system") + goal: Optional[str] = Field(None, description="Agent goal or objective") type: str = Field( - ..., description="Agent type (llm, sequential, parallel, loop, a2a, workflow)" + ..., + description="Agent type (llm, sequential, parallel, loop, a2a, workflow, crew_ai)", ) model: Optional[str] = Field( None, description="Agent model (required only for llm type)" @@ -126,9 +129,17 @@ class AgentBase(BaseModel): @validator("type") def validate_type(cls, v): - if v not in ["llm", "sequential", "parallel", "loop", "a2a", "workflow"]: + if v not in [ + "llm", + "sequential", + "parallel", + "loop", + "a2a", + "workflow", + "crew_ai", + ]: raise ValueError( - "Invalid agent type. Must be: llm, sequential, parallel, loop, a2a or workflow" + "Invalid agent type. Must be: llm, sequential, parallel, loop, a2a, workflow or crew_ai" ) return v @@ -188,6 +199,33 @@ class AgentBase(BaseModel): raise ValueError( f'Agent {values["type"]} must have at least one sub-agent' ) + elif values["type"] == "crew_ai": + if not isinstance(v, dict): + raise ValueError(f'Invalid configuration for agent {values["type"]}') + if "tasks" not in v: + raise ValueError(f'Agent {values["type"]} must have tasks') + if not isinstance(v["tasks"], list): + raise ValueError("tasks must be a list") + if not v["tasks"]: + raise ValueError(f'Agent {values["type"]} must have at least one task') + for task in v["tasks"]: + if not isinstance(task, dict): + raise ValueError("Each task must be a dictionary") + required_fields = ["agent_id", "description", "expected_output"] + for field in required_fields: + if field not in task: + raise ValueError(f"Task missing required field: {field}") + + # Validar sub_agents, se existir + if "sub_agents" in v and v["sub_agents"] is not None: + if not isinstance(v["sub_agents"], list): + raise ValueError("sub_agents must be a list") + + try: + # Convert the dictionary to CrewAIConfig + v = CrewAIConfig(**v) + except Exception as e: + raise ValueError(f"Invalid Crew AI configuration for agent: {str(e)}") return v diff --git a/src/services/agent_builder.py b/src/services/agent_builder.py index 04a2dacb..321a2a1a 100644 --- a/src/services/agent_builder.py +++ b/src/services/agent_builder.py @@ -32,13 +32,15 @@ from google.adk.agents.llm_agent import LlmAgent from google.adk.agents import SequentialAgent, ParallelAgent, LoopAgent, BaseAgent from google.adk.models.lite_llm import LiteLlm from google.adk.tools.agent_tool import AgentTool +from src.schemas.schemas import Agent from src.utils.logger import setup_logger from src.core.exceptions import AgentNotFoundError from src.services.agent_service import get_agent from src.services.custom_tools import CustomToolBuilder from src.services.mcp_service import MCPService -from src.services.a2a_agent import A2ACustomAgent -from src.services.workflow_agent import WorkflowAgent +from src.services.custom_agents.a2a_agent import A2ACustomAgent +from src.services.custom_agents.workflow_agent import WorkflowAgent +from src.services.custom_agents.crew_ai_agent import CrewAIAgent from src.services.apikey_service import get_decrypted_api_key from sqlalchemy.orm import Session from contextlib import AsyncExitStack @@ -47,6 +49,8 @@ from google.adk.tools import load_memory from datetime import datetime import uuid +from src.schemas.agent_config import CrewAITask + logger = setup_logger(__name__) @@ -104,6 +108,18 @@ class AgentBuilder: current_time=current_time, ) + # add role on beginning of the prompt + if agent.role: + formatted_prompt = ( + f"{agent.role}\n\n{formatted_prompt}" + ) + + # add goal on beginning of the prompt + if agent.goal: + formatted_prompt = ( + f"{agent.goal}\n\n{formatted_prompt}" + ) + # Check if load_memory is enabled if agent.config.get("load_memory"): all_tools.append(load_memory) @@ -298,6 +314,56 @@ class AgentBuilder: logger.error(f"Error building Workflow agent: {str(e)}") raise ValueError(f"Error building Workflow agent: {str(e)}") + async def build_crew_ai_agent( + self, root_agent: Agent + ) -> Tuple[CrewAIAgent, Optional[AsyncExitStack]]: + """Build a CrewAI agent with its sub-agents.""" + logger.info(f"Creating CrewAI agent: {root_agent.name}") + + agent_config = root_agent.config or {} + + # Verify if we have tasks configured + if not agent_config.get("tasks"): + raise ValueError("tasks are required for CrewAI agents") + + try: + # Get sub-agents if there are any + sub_agents = [] + if root_agent.config.get("sub_agents"): + sub_agents_with_stacks = await self._get_sub_agents( + root_agent.config.get("sub_agents") + ) + sub_agents = [agent for agent, _ in sub_agents_with_stacks] + + # Additional configurations + config = root_agent.config or {} + + # Convert tasks to the expected format by CrewAIAgent + tasks = [] + for task_config in config.get("tasks", []): + task = CrewAITask( + agent_id=task_config.get("agent_id"), + description=task_config.get("description", ""), + expected_output=task_config.get("expected_output", ""), + ) + tasks.append(task) + + # Create the CrewAI agent + crew_ai_agent = CrewAIAgent( + name=root_agent.name, + tasks=tasks, + db=self.db, + sub_agents=sub_agents, + ) + + logger.info(f"CrewAI agent created successfully: {root_agent.name}") + + return crew_ai_agent, None + + except Exception as e: + logger.error(f"Error building CrewAI agent: {str(e)}") + raise ValueError(f"Error building CrewAI agent: {str(e)}") + async def build_composite_agent( self, root_agent ) -> Tuple[SequentialAgent | ParallelAgent | LoopAgent, Optional[AsyncExitStack]]: @@ -367,7 +433,8 @@ class AgentBuilder: | ParallelAgent | LoopAgent | A2ACustomAgent - | WorkflowAgent, + | WorkflowAgent + | CrewAIAgent, Optional[AsyncExitStack], ]: """Build the appropriate agent based on the type of the root agent.""" @@ -377,5 +444,7 @@ class AgentBuilder: return await self.build_a2a_agent(root_agent) elif root_agent.type == "workflow": return await self.build_workflow_agent(root_agent) + elif root_agent.type == "crew_ai": + return await self.build_crew_ai_agent(root_agent) else: return await self.build_composite_agent(root_agent) diff --git a/src/services/agent_service.py b/src/services/agent_service.py index 927abaf4..eb6b6ec4 100644 --- a/src/services/agent_service.py +++ b/src/services/agent_service.py @@ -202,6 +202,53 @@ async def create_agent(db: Session, agent: AgentCreate) -> Agent: if "api_key" not in agent.config or not agent.config["api_key"]: agent.config["api_key"] = generate_api_key() + elif agent.type == "crew_ai": + if not isinstance(agent.config, dict): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Invalid configuration: must be an object with tasks", + ) + + if "tasks" not in agent.config: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Invalid configuration: tasks is required for crew_ai agents", + ) + + if not agent.config["tasks"]: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Invalid configuration: tasks cannot be empty", + ) + + # Validar se todos os agent_id nas tasks existem + for task in agent.config["tasks"]: + if "agent_id" not in task: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Each task must have an agent_id", + ) + + agent_id = task["agent_id"] + task_agent = get_agent(db, agent_id) + if not task_agent: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Agent not found for task: {agent_id}", + ) + + # Validar sub_agents se existir + if "sub_agents" in agent.config and agent.config["sub_agents"]: + if not validate_sub_agents(db, agent.config["sub_agents"]): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="One or more sub-agents do not exist", + ) + + # Gerar API key se não existir + if "api_key" not in agent.config or not agent.config["api_key"]: + agent.config["api_key"] = generate_api_key() + # Additional sub-agent validation (for non-llm and non-a2a types) elif agent.type != "llm": if not isinstance(agent.config, dict): @@ -637,6 +684,47 @@ async def update_agent( if "config" not in agent_data: agent_data["config"] = agent_config + # Validar configuração de crew_ai, se aplicável + if ("type" in agent_data and agent_data["type"] == "crew_ai") or ( + agent.type == "crew_ai" and "config" in agent_data + ): + config = agent_data.get("config", {}) + if "tasks" not in config: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Invalid configuration: tasks is required for crew_ai agents", + ) + + if not config["tasks"]: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Invalid configuration: tasks cannot be empty", + ) + + # Validar se todos os agent_id nas tasks existem + for task in config["tasks"]: + if "agent_id" not in task: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Each task must have an agent_id", + ) + + agent_id = task["agent_id"] + task_agent = get_agent(db, agent_id) + if not task_agent: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Agent not found for task: {agent_id}", + ) + + # Validar sub_agents se existir + if "sub_agents" in config and config["sub_agents"]: + if not validate_sub_agents(db, config["sub_agents"]): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="One or more sub-agents do not exist", + ) + if not agent_config.get("api_key") and ( "config" not in agent_data or not agent_data["config"].get("api_key") ): diff --git a/src/services/custom_agents/__init__.py b/src/services/custom_agents/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/services/a2a_agent.py b/src/services/custom_agents/a2a_agent.py similarity index 100% rename from src/services/a2a_agent.py rename to src/services/custom_agents/a2a_agent.py diff --git a/src/services/custom_agents/crew_ai_agent.py b/src/services/custom_agents/crew_ai_agent.py new file mode 100644 index 00000000..1afffeb2 --- /dev/null +++ b/src/services/custom_agents/crew_ai_agent.py @@ -0,0 +1,266 @@ +""" +┌──────────────────────────────────────────────────────────────────────────────┐ +│ @author: Davidson Gomes │ +│ @file: a2a_agent.py │ +│ Developed by: Davidson Gomes │ +│ Creation date: May 13, 2025 │ +│ Contact: contato@evolution-api.com │ +├──────────────────────────────────────────────────────────────────────────────┤ +│ @copyright © Evolution API 2025. All rights reserved. │ +│ Licensed under the Apache License, Version 2.0 │ +│ │ +│ You may not use this file except in compliance with the License. │ +│ You may obtain a copy of the License at │ +│ │ +│ http://www.apache.org/licenses/LICENSE-2.0 │ +│ │ +│ Unless required by applicable law or agreed to in writing, software │ +│ distributed under the License is distributed on an "AS IS" BASIS, │ +│ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. │ +│ See the License for the specific language governing permissions and │ +│ limitations under the License. │ +├──────────────────────────────────────────────────────────────────────────────┤ +│ @important │ +│ For any future changes to the code in this file, it is recommended to │ +│ include, together with the modification, the information of the developer │ +│ who changed it and the date of modification. │ +└──────────────────────────────────────────────────────────────────────────────┘ +""" + +from attr import Factory +from google.adk.agents import BaseAgent +from google.adk.agents.invocation_context import InvocationContext +from google.adk.events import Event +from google.genai.types import Content, Part +from src.services.agent_service import get_agent +from src.services.apikey_service import get_decrypted_api_key + +from sqlalchemy.orm import Session + +from typing import AsyncGenerator, List + +from src.schemas.agent_config import CrewAITask + +from crewai import Agent, Task, Crew, LLM + + +class CrewAIAgent(BaseAgent): + """ + Custom agent that implements the CrewAI protocol directly. + + This agent implements the interaction with an external CrewAI service. + """ + + # Field declarations for Pydantic + tasks: List[CrewAITask] + db: Session + + def __init__( + self, + name: str, + tasks: List[CrewAITask], + db: Session, + sub_agents: List[BaseAgent] = [], + **kwargs, + ): + """ + Initialize the CrewAI agent. + + Args: + name: Agent name + tasks: List of tasks to be executed + db: Database session + sub_agents: List of sub-agents to be executed after the CrewAI agent + """ + # Initialize base class + super().__init__( + name=name, + tasks=tasks, + db=db, + sub_agents=sub_agents, + **kwargs, + ) + + def _generate_llm(self, model: str, api_key: str): + """ + Generate the LLM for the CrewAI agent. + """ + + return LLM(model=model, api_key=api_key) + + def _agent_builder(self, agent_id: str): + """ + Build the CrewAI agent. + """ + agent = get_agent(self.db, agent_id) + + if not agent: + raise ValueError(f"Agent with id {agent_id} not found") + + api_key = None + + decrypted_key = get_decrypted_api_key(self.db, agent.api_key_id) + if decrypted_key: + api_key = decrypted_key + else: + raise ValueError( + f"API key with ID {agent.api_key_id} not found or inactive" + ) + + if not api_key: + raise ValueError(f"API key for agent {agent.name} not found") + + return Agent( + role=agent.role, + goal=agent.goal, + backstory=agent.instruction, + llm=self._generate_llm(agent.model, api_key), + verbose=True, + ) + + def _tasks_and_agents_builder(self): + """ + Build the CrewAI tasks. + """ + tasks = [] + agents = [] + for task in self.tasks: + agent = self._agent_builder(task.agent_id) + agents.append(agent) + tasks.append( + Task( + description=task.description, + expected_output=task.expected_output, + agent=agent, + ) + ) + return tasks, agents + + def _crew_builder(self): + """ + Build the CrewAI crew. + """ + tasks, agents = self._tasks_and_agents_builder() + return Crew( + agents=agents, + tasks=tasks, + verbose=True, + ) + + async def _run_async_impl( + self, ctx: InvocationContext + ) -> AsyncGenerator[Event, None]: + """ + Implementation of the CrewAI. + + This method follows the pattern of implementing custom agents, + sending the user's message to the CrewAI service and monitoring the response. + """ + + try: + # Extract the user's message from the context + user_message = None + + # Search for the user's message in the session events + if ctx.session and hasattr(ctx.session, "events") and ctx.session.events: + for event in reversed(ctx.session.events): + if event.author == "user" and event.content and event.content.parts: + user_message = event.content.parts[0].text + print("Message found in session events") + break + + # Check in the session state if the message was not found in the events + if not user_message and ctx.session and ctx.session.state: + if "user_message" in ctx.session.state: + user_message = ctx.session.state["user_message"] + elif "message" in ctx.session.state: + user_message = ctx.session.state["message"] + + if not user_message: + yield Event( + author=self.name, + content=Content( + role="agent", + parts=[Part(text="User message not found")], + ), + ) + return + + try: + # Replace any {content} in the task descriptions with the user's input + for task in self.tasks: + task.description = task.description.replace( + "{content}", user_message + ) + + # Build the Crew + crew = self._crew_builder() + + # Start the agent status + yield Event( + author=self.name, + content=Content( + role="agent", + parts=[Part(text=f"Starting CrewAI processing...")], + ), + ) + + # Prepare inputs (if there are placeholders to replace) + inputs = {"user_message": user_message} + + # Notify the user that the processing is in progress + yield Event( + author=self.name, + content=Content( + role="agent", + parts=[Part(text=f"Processing your request...")], + ), + ) + + # Try first with kickoff() normally + try: + # If it fails, try with kickoff_async + result = await crew.kickoff_async(inputs=inputs) + print(f"Result of crew.kickoff_async(): {result}") + except Exception as e: + print(f"Error executing crew.kickoff_async(): {str(e)}") + print("Trying alternative with crew.kickoff()") + result = crew.kickoff(inputs=inputs) + print(f"Result of crew.kickoff(): {result}") + + # Create an event for the final result + final_event = Event( + author=self.name, + content=Content(role="agent", parts=[Part(text=str(result))]), + ) + + # Transmit the event to the client + yield final_event + + # Execute sub-agents + for sub_agent in self.sub_agents: + async for event in sub_agent.run_async(ctx): + yield event + + except Exception as e: + error_msg = f"Error sending request: {str(e)}" + print(error_msg) + print(f"Error type: {type(e).__name__}") + print(f"Error details: {str(e)}") + + yield Event( + author=self.name, + content=Content(role="agent", parts=[Part(text=error_msg)]), + ) + return + + except Exception as e: + # Handle any uncaught error + print(f"Error executing CrewAI agent: {str(e)}") + yield Event( + author=self.name, + content=Content( + role="agent", + parts=[Part(text=f"Error interacting with CrewAI agent: {str(e)}")], + ), + ) diff --git a/src/services/workflow_agent.py b/src/services/custom_agents/workflow_agent.py similarity index 100% rename from src/services/workflow_agent.py rename to src/services/custom_agents/workflow_agent.py From 9ab001c35e1c20c93cb9b3bc28d68c993eba2499 Mon Sep 17 00:00:00 2001 From: Davidson Gomes Date: Wed, 14 May 2025 08:33:15 -0300 Subject: [PATCH 02/11] chore(dependencies): update litellm version constraint to allow minor updates --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 811ca6ff..5fa5d39d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,7 @@ dependencies = [ "google-cloud-aiplatform==1.90.0", "python-dotenv==1.1.0", "google-adk==0.3.0", - "litellm==1.68.1", + "litellm>=1.68.0,<1.69.0", "python-multipart==0.0.20", "alembic==1.15.2", "asyncpg==0.30.0", From 198eb5703273b8898b250d3d47469ae8825d1a03 Mon Sep 17 00:00:00 2001 From: Davidson Gomes Date: Wed, 14 May 2025 08:58:00 -0300 Subject: [PATCH 03/11] refactor(agent_service): simplify agent configuration validation and remove unnecessary comments --- src/schemas/schemas.py | 6 +----- src/services/agent_service.py | 6 +----- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/src/schemas/schemas.py b/src/schemas/schemas.py index 4710e58c..a955fe63 100644 --- a/src/schemas/schemas.py +++ b/src/schemas/schemas.py @@ -221,11 +221,7 @@ class AgentBase(BaseModel): if not isinstance(v["sub_agents"], list): raise ValueError("sub_agents must be a list") - try: - # Convert the dictionary to CrewAIConfig - v = CrewAIConfig(**v) - except Exception as e: - raise ValueError(f"Invalid Crew AI configuration for agent: {str(e)}") + return v return v diff --git a/src/services/agent_service.py b/src/services/agent_service.py index eb6b6ec4..f271ad26 100644 --- a/src/services/agent_service.py +++ b/src/services/agent_service.py @@ -204,6 +204,7 @@ async def create_agent(db: Session, agent: AgentCreate) -> Agent: elif agent.type == "crew_ai": if not isinstance(agent.config, dict): + agent.config = {} raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid configuration: must be an object with tasks", @@ -221,7 +222,6 @@ async def create_agent(db: Session, agent: AgentCreate) -> Agent: detail="Invalid configuration: tasks cannot be empty", ) - # Validar se todos os agent_id nas tasks existem for task in agent.config["tasks"]: if "agent_id" not in task: raise HTTPException( @@ -237,7 +237,6 @@ async def create_agent(db: Session, agent: AgentCreate) -> Agent: detail=f"Agent not found for task: {agent_id}", ) - # Validar sub_agents se existir if "sub_agents" in agent.config and agent.config["sub_agents"]: if not validate_sub_agents(db, agent.config["sub_agents"]): raise HTTPException( @@ -245,7 +244,6 @@ async def create_agent(db: Session, agent: AgentCreate) -> Agent: detail="One or more sub-agents do not exist", ) - # Gerar API key se não existir if "api_key" not in agent.config or not agent.config["api_key"]: agent.config["api_key"] = generate_api_key() @@ -684,7 +682,6 @@ async def update_agent( if "config" not in agent_data: agent_data["config"] = agent_config - # Validar configuração de crew_ai, se aplicável if ("type" in agent_data and agent_data["type"] == "crew_ai") or ( agent.type == "crew_ai" and "config" in agent_data ): @@ -701,7 +698,6 @@ async def update_agent( detail="Invalid configuration: tasks cannot be empty", ) - # Validar se todos os agent_id nas tasks existem for task in config["tasks"]: if "agent_id" not in task: raise HTTPException( From 2a80bdf7a3af06b944247ee0636691f2941999f5 Mon Sep 17 00:00:00 2001 From: Davidson Gomes Date: Wed, 14 May 2025 12:36:34 -0300 Subject: [PATCH 04/11] feat(agent): add Task Agent for structured task execution and improve context management --- CHANGELOG.md | 2 + README.md | 33 +++ ...c7b564_add_task_agent_type_agents_table.py | 43 ++++ src/models/models.py | 2 +- src/schemas/schemas.py | 7 +- src/services/agent_builder.py | 55 ++++- src/services/agent_service.py | 10 +- src/services/custom_agents/crew_ai_agent.py | 4 +- src/services/custom_agents/task_agent.py | 231 ++++++++++++++++++ 9 files changed, 375 insertions(+), 12 deletions(-) create mode 100644 migrations/versions/2df073c7b564_add_task_agent_type_agents_table.py create mode 100644 src/services/custom_agents/task_agent.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 20deee8f..d0123340 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Add CrewAI agents +- Add Task Agent for structured single-task execution +- Improve context management in agent execution ## [0.0.9] - 2025-05-13 diff --git a/README.md b/README.md index 76a545f5..ee50710d 100644 --- a/README.md +++ b/README.md @@ -187,6 +187,39 @@ Allows organizing agents into a "crew" with specific tasks assigned to each agen } ``` +### 8. Task Agent + +Executes a specific task using a target agent. Task Agent provides a streamlined approach for structured task execution, where the agent_id specifies which agent will process the task, and the task description can include dynamic content placeholders. + +```json +{ + "client_id": "{{client_id}}", + "name": "web_search_task", + "type": "task", + "folder_id": "folder_id (optional)", + "config": { + "tasks": [ + { + "agent_id": "search-agent-uuid", + "description": "Search the web for information about {content}", + "expected_output": "Comprehensive search results with relevant information" + } + ], + "sub_agents": ["post-processing-agent-uuid"] + } +} +``` + +Key features of Task Agent: + +- Passes structured task instructions to the designated agent +- Supports variable content using {content} placeholder in the task description +- Provides clear task definition with instructions and expected output format +- Can execute sub-agents after the main task is completed +- Simplifies orchestration for single-focused task execution + +Task Agent is ideal for scenarios where you need to execute a specific, well-defined task with clear instructions and expectations. + ### Common Characteristics - All agent types can have sub-agents diff --git a/migrations/versions/2df073c7b564_add_task_agent_type_agents_table.py b/migrations/versions/2df073c7b564_add_task_agent_type_agents_table.py new file mode 100644 index 00000000..c0e579b8 --- /dev/null +++ b/migrations/versions/2df073c7b564_add_task_agent_type_agents_table.py @@ -0,0 +1,43 @@ +"""add_task_agent_type_agents_table + +Revision ID: 2df073c7b564 +Revises: 611d84e70bb2 +Create Date: 2025-05-14 11:46:39.573247 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = "2df073c7b564" +down_revision: Union[str, None] = "611d84e70bb2" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.drop_constraint("check_agent_type", "agents", type_="check") + op.create_check_constraint( + "check_agent_type", + "agents", + "type IN ('llm', 'sequential', 'parallel', 'loop', 'a2a', 'workflow', 'crew_ai', 'task')", + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.drop_constraint("check_agent_type", "agents", type_="check") + op.create_check_constraint( + "check_agent_type", + "agents", + "type IN ('llm', 'sequential', 'parallel', 'loop', 'a2a', 'workflow', 'crew_ai')", + ) + # ### end Alembic commands ### diff --git a/src/models/models.py b/src/models/models.py index 5693b254..546073f5 100644 --- a/src/models/models.py +++ b/src/models/models.py @@ -123,7 +123,7 @@ class Agent(Base): __table_args__ = ( CheckConstraint( - "type IN ('llm', 'sequential', 'parallel', 'loop', 'a2a', 'workflow', 'crew_ai')", + "type IN ('llm', 'sequential', 'parallel', 'loop', 'a2a', 'workflow', 'crew_ai', 'task')", name="check_agent_type", ), ) diff --git a/src/schemas/schemas.py b/src/schemas/schemas.py index a955fe63..6bbdc22d 100644 --- a/src/schemas/schemas.py +++ b/src/schemas/schemas.py @@ -98,7 +98,7 @@ class AgentBase(BaseModel): goal: Optional[str] = Field(None, description="Agent goal or objective") type: str = Field( ..., - description="Agent type (llm, sequential, parallel, loop, a2a, workflow, crew_ai)", + description="Agent type (llm, sequential, parallel, loop, a2a, workflow, crew_ai, task)", ) model: Optional[str] = Field( None, description="Agent model (required only for llm type)" @@ -137,9 +137,10 @@ class AgentBase(BaseModel): "a2a", "workflow", "crew_ai", + "task", ]: raise ValueError( - "Invalid agent type. Must be: llm, sequential, parallel, loop, a2a, workflow or crew_ai" + "Invalid agent type. Must be: llm, sequential, parallel, loop, a2a, workflow, crew_ai or task" ) return v @@ -199,7 +200,7 @@ class AgentBase(BaseModel): raise ValueError( f'Agent {values["type"]} must have at least one sub-agent' ) - elif values["type"] == "crew_ai": + elif values["type"] == "crew_ai" or values["type"] == "task": if not isinstance(v, dict): raise ValueError(f'Invalid configuration for agent {values["type"]}') if "tasks" not in v: diff --git a/src/services/agent_builder.py b/src/services/agent_builder.py index 321a2a1a..a615e221 100644 --- a/src/services/agent_builder.py +++ b/src/services/agent_builder.py @@ -41,6 +41,7 @@ from src.services.mcp_service import MCPService from src.services.custom_agents.a2a_agent import A2ACustomAgent from src.services.custom_agents.workflow_agent import WorkflowAgent from src.services.custom_agents.crew_ai_agent import CrewAIAgent +from src.services.custom_agents.task_agent import TaskAgent from src.services.apikey_service import get_decrypted_api_key from sqlalchemy.orm import Session from contextlib import AsyncExitStack @@ -314,6 +315,55 @@ class AgentBuilder: logger.error(f"Error building Workflow agent: {str(e)}") raise ValueError(f"Error building Workflow agent: {str(e)}") + async def build_task_agent( + self, root_agent + ) -> Tuple[TaskAgent, Optional[AsyncExitStack]]: + """Build a task agent with its sub-agents.""" + logger.info(f"Creating Task agent: {root_agent.name}") + + agent_config = root_agent.config or {} + + if not agent_config.get("tasks"): + raise ValueError("tasks are required for Task agents") + + try: + # Get sub-agents if there are any + sub_agents = [] + if root_agent.config.get("sub_agents"): + sub_agents_with_stacks = await self._get_sub_agents( + root_agent.config.get("sub_agents") + ) + sub_agents = [agent for agent, _ in sub_agents_with_stacks] + + # Additional configurations + config = root_agent.config or {} + + # Convert tasks to the expected format by TaskAgent + tasks = [] + for task_config in config.get("tasks", []): + task = CrewAITask( + agent_id=task_config.get("agent_id"), + description=task_config.get("description", ""), + expected_output=task_config.get("expected_output", ""), + ) + tasks.append(task) + + # Create the Task agent + task_agent = TaskAgent( + name=root_agent.name, + tasks=tasks, + db=self.db, + sub_agents=sub_agents, + ) + + logger.info(f"Task agent created successfully: {root_agent.name}") + + return task_agent, None + + except Exception as e: + logger.error(f"Error building Task agent: {str(e)}") + raise ValueError(f"Error building CrewAI agent: {str(e)}") + async def build_crew_ai_agent( self, root_agent: Agent ) -> Tuple[CrewAIAgent, Optional[AsyncExitStack]]: @@ -434,7 +484,8 @@ class AgentBuilder: | LoopAgent | A2ACustomAgent | WorkflowAgent - | CrewAIAgent, + | CrewAIAgent + | TaskAgent, Optional[AsyncExitStack], ]: """Build the appropriate agent based on the type of the root agent.""" @@ -446,5 +497,7 @@ class AgentBuilder: return await self.build_workflow_agent(root_agent) elif root_agent.type == "crew_ai": return await self.build_crew_ai_agent(root_agent) + elif root_agent.type == "task": + return await self.build_task_agent(root_agent) else: return await self.build_composite_agent(root_agent) diff --git a/src/services/agent_service.py b/src/services/agent_service.py index f271ad26..c079cf0b 100644 --- a/src/services/agent_service.py +++ b/src/services/agent_service.py @@ -202,7 +202,7 @@ async def create_agent(db: Session, agent: AgentCreate) -> Agent: if "api_key" not in agent.config or not agent.config["api_key"]: agent.config["api_key"] = generate_api_key() - elif agent.type == "crew_ai": + elif agent.type == "crew_ai" or agent.type == "task": if not isinstance(agent.config, dict): agent.config = {} raise HTTPException( @@ -213,7 +213,7 @@ async def create_agent(db: Session, agent: AgentCreate) -> Agent: if "tasks" not in agent.config: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, - detail="Invalid configuration: tasks is required for crew_ai agents", + detail=f"Invalid configuration: tasks is required for {agent.type} agents", ) if not agent.config["tasks"]: @@ -682,14 +682,14 @@ async def update_agent( if "config" not in agent_data: agent_data["config"] = agent_config - if ("type" in agent_data and agent_data["type"] == "crew_ai") or ( - agent.type == "crew_ai" and "config" in agent_data + if ("type" in agent_data and agent_data["type"] in ["crew_ai", "task"]) or ( + agent.type in ["crew_ai", "task"] and "config" in agent_data ): config = agent_data.get("config", {}) if "tasks" not in config: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, - detail="Invalid configuration: tasks is required for crew_ai agents", + detail=f"Invalid configuration: tasks is required for {agent_data.get('type', agent.type)} agents", ) if not config["tasks"]: diff --git a/src/services/custom_agents/crew_ai_agent.py b/src/services/custom_agents/crew_ai_agent.py index 1afffeb2..65fc002e 100644 --- a/src/services/custom_agents/crew_ai_agent.py +++ b/src/services/custom_agents/crew_ai_agent.py @@ -1,9 +1,9 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: a2a_agent.py │ +│ @file: crew_ai_agent.py │ │ Developed by: Davidson Gomes │ -│ Creation date: May 13, 2025 │ +│ Creation date: May 14, 2025 │ │ Contact: contato@evolution-api.com │ ├──────────────────────────────────────────────────────────────────────────────┤ │ @copyright © Evolution API 2025. All rights reserved. │ diff --git a/src/services/custom_agents/task_agent.py b/src/services/custom_agents/task_agent.py new file mode 100644 index 00000000..a93611bd --- /dev/null +++ b/src/services/custom_agents/task_agent.py @@ -0,0 +1,231 @@ +""" +┌──────────────────────────────────────────────────────────────────────────────┐ +│ @author: Davidson Gomes │ +│ @file: task_agent.py │ +│ Developed by: Davidson Gomes │ +│ Creation date: May 14, 2025 │ +│ Contact: contato@evolution-api.com │ +├──────────────────────────────────────────────────────────────────────────────┤ +│ @copyright © Evolution API 2025. All rights reserved. │ +│ Licensed under the Apache License, Version 2.0 │ +│ │ +│ You may not use this file except in compliance with the License. │ +│ You may obtain a copy of the License at │ +│ │ +│ http://www.apache.org/licenses/LICENSE-2.0 │ +│ │ +│ Unless required by applicable law or agreed to in writing, software │ +│ distributed under the License is distributed on an "AS IS" BASIS, │ +│ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. │ +│ See the License for the specific language governing permissions and │ +│ limitations under the License. │ +├──────────────────────────────────────────────────────────────────────────────┤ +│ @important │ +│ For any future changes to the code in this file, it is recommended to │ +│ include, together with the modification, the information of the developer │ +│ who changed it and the date of modification. │ +└──────────────────────────────────────────────────────────────────────────────┘ +""" + +from attr import Factory +from google.adk.agents import BaseAgent +from google.adk.agents.invocation_context import InvocationContext +from google.adk.events import Event +from google.genai.types import Content, Part +from src.services.agent_service import get_agent +from src.services.apikey_service import get_decrypted_api_key + +from sqlalchemy.orm import Session + +from typing import AsyncGenerator, List + +from src.schemas.agent_config import CrewAITask + + +class TaskAgent(BaseAgent): + """ + Custom agent that implements the Task function. + + This agent implements the interaction with an external Task service. + """ + + # Field declarations for Pydantic + tasks: List[CrewAITask] + db: Session + + def __init__( + self, + name: str, + tasks: List[CrewAITask], + db: Session, + sub_agents: List[BaseAgent] = [], + **kwargs, + ): + """ + Initialize the Task agent. + + Args: + name: Agent name + tasks: List of tasks to be executed + db: Database session + sub_agents: List of sub-agents to be executed after the Task agent + """ + # Initialize base class + super().__init__( + name=name, + tasks=tasks, + db=db, + sub_agents=sub_agents, + **kwargs, + ) + + async def _run_async_impl( + self, ctx: InvocationContext + ) -> AsyncGenerator[Event, None]: + """ + Implementation of the Task agent. + + This method follows the pattern of implementing custom agents, + sending the user's message to the Task service and monitoring the response. + """ + exit_stack = None + + try: + # Extract the user's message from the context + user_message = None + + # Search for the user's message in the session events + if ctx.session and hasattr(ctx.session, "events") and ctx.session.events: + for event in reversed(ctx.session.events): + if event.author == "user" and event.content and event.content.parts: + user_message = event.content.parts[0].text + print("Message found in session events") + break + + # Check in the session state if the message was not found in the events + if not user_message and ctx.session and ctx.session.state: + if "user_message" in ctx.session.state: + user_message = ctx.session.state["user_message"] + elif "message" in ctx.session.state: + user_message = ctx.session.state["message"] + + if not user_message: + yield Event( + author=self.name, + content=Content( + role="agent", + parts=[Part(text="User message not found")], + ), + ) + return + + # Start the agent status + yield Event( + author=self.name, + content=Content( + role="agent", + parts=[Part(text=f"Starting {self.name} task processing...")], + ), + ) + + try: + # Replace any {content} in the task descriptions with the user's input + task = self.tasks[0] + task.description = task.description.replace("{content}", user_message) + + agent = get_agent(self.db, task.agent_id) + + if not agent: + yield Event( + author=self.name, + content=Content(parts=[Part(text="Agent not found")]), + ) + return + + # Prepare task instructions + task_message_instructions = f""" + + + Execute the following task: + + {task.description} + {task.expected_output} + + """ + + # Send task instructions as an event + yield Event( + author=f"{self.name} - Task executor", + content=Content( + role="agent", + parts=[Part(text=task_message_instructions)], + ), + ) + + from src.services.agent_builder import AgentBuilder + + print(f"Building agent in Task agent: {agent.name}") + agent_builder = AgentBuilder(self.db) + root_agent, exit_stack = await agent_builder.build_agent(agent) + + # Store task instructions in context for reference by sub-agents + ctx.session.state["task_instructions"] = task_message_instructions + + # Process the agent responses + try: + async for event in root_agent.run_async(ctx): + yield event + except GeneratorExit: + print("Generator was closed prematurely, handling cleanup...") + # Allow the exception to propagate after cleanup + raise + except Exception as e: + error_msg = f"Error during agent execution: {str(e)}" + print(error_msg) + yield Event( + author=self.name, + content=Content( + role="agent", + parts=[Part(text=error_msg)], + ), + ) + + except Exception as e: + error_msg = f"Error sending request: {str(e)}" + print(error_msg) + print(f"Error type: {type(e).__name__}") + print(f"Error details: {str(e)}") + + yield Event( + author=self.name, + content=Content(role="agent", parts=[Part(text=error_msg)]), + ) + + except Exception as e: + # Handle any uncaught error + print(f"Error executing Task agent: {str(e)}") + yield Event( + author=self.name, + content=Content( + role="agent", + parts=[Part(text=f"Error interacting with Task agent: {str(e)}")], + ), + ) + finally: + # Ensure we close the exit_stack in the same task context where it was created + if exit_stack: + try: + await exit_stack.aclose() + print("Exit stack closed successfully") + except Exception as e: + print(f"Error closing exit_stack: {str(e)}") + + # Execute sub-agents only if no exception occurred + try: + if "e" not in locals(): + for sub_agent in self.sub_agents: + async for event in sub_agent.run_async(ctx): + yield event + except Exception as sub_e: + print(f"Error executing sub-agents: {str(sub_e)}") + # We don't yield a new event here to avoid raising during cleanup From 0ca6b4f3e99dfec749974fa5586858ec59814a3c Mon Sep 17 00:00:00 2001 From: Davidson Gomes Date: Wed, 14 May 2025 13:13:27 -0300 Subject: [PATCH 05/11] refactor(agent): remove CrewAI agent support and update related configurations --- CHANGELOG.md | 1 - README.md | 36 +-- pyproject.toml | 1 - src/api/agent_routes.py | 2 - src/schemas/agent_config.py | 19 +- src/schemas/schemas.py | 10 +- src/services/agent_builder.py | 79 ++---- src/services/agent_service.py | 6 +- src/services/custom_agents/crew_ai_agent.py | 266 -------------------- src/services/custom_agents/task_agent.py | 12 +- 10 files changed, 43 insertions(+), 389 deletions(-) delete mode 100644 src/services/custom_agents/crew_ai_agent.py diff --git a/CHANGELOG.md b/CHANGELOG.md index d0123340..389534c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- Add CrewAI agents - Add Task Agent for structured single-task execution - Improve context management in agent execution diff --git a/README.md b/README.md index ee50710d..f1892dba 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,6 @@ The Evo AI platform allows: - JWT authentication with email verification - **[Agent 2 Agent (A2A) Protocol Support](https://developers.googleblog.com/en/a2a-a-new-era-of-agent-interoperability/)**: Interoperability between AI agents following Google's A2A specification - **[Workflow Agent with LangGraph](https://www.langchain.com/langgraph)**: Building complex agent workflows with LangGraph and ReactFlow -- **[CrewAI Agent Support](https://www.crewai.com/)**: Organizing agents into specialized crews with assigned tasks - **Secure API Key Management**: Encrypted storage of API keys with Fernet encryption - **Agent Organization**: Folder structure for organizing agents by categories @@ -154,40 +153,7 @@ Executes sub-agents in a custom workflow defined by a graph structure. This agen The workflow structure is built using ReactFlow in the frontend, allowing visual creation and editing of complex agent workflows with nodes (representing agents or decision points) and edges (representing flow connections). -### 7. CrewAI Agent - -Allows organizing agents into a "crew" with specific tasks assigned to each agent. Based on the CrewAI concept, where each agent has a specific responsibility to perform a more complex task collaboratively. - -```json -{ - "client_id": "{{client_id}}", - "name": "research_crew", - "type": "crew_ai", - "folder_id": "folder_id (optional)", - "config": { - "tasks": [ - { - "agent_id": "agent-uuid-1", - "description": "Search for recent information on the topic", - "expected_output": "Search report in JSON format" - }, - { - "agent_id": "agent-uuid-2", - "description": "Analyze data and create visualizations", - "expected_output": "Charts and analyses in HTML format" - }, - { - "agent_id": "agent-uuid-3", - "description": "Write final report combining results", - "expected_output": "Markdown document with complete analysis" - } - ], - "sub_agents": ["agent-uuid-4", "agent-uuid-5"] - } -} -``` - -### 8. Task Agent +### 7. Task Agent Executes a specific task using a target agent. Task Agent provides a streamlined approach for structured task execution, where the agent_id specifies which agent will process the task, and the task description can include dynamic content placeholders. diff --git a/pyproject.toml b/pyproject.toml index 5fa5d39d..ada85ccd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,7 +51,6 @@ dependencies = [ "langgraph==0.4.1", "opentelemetry-sdk==1.33.0", "opentelemetry-exporter-otlp==1.33.0", - "crewai==0.119.0", ] [project.optional-dependencies] diff --git a/src/api/agent_routes.py b/src/api/agent_routes.py index 07bcba07..4369f209 100644 --- a/src/api/agent_routes.py +++ b/src/api/agent_routes.py @@ -24,8 +24,6 @@ │ For any future changes to the code in this file, it is recommended to │ │ include, together with the modification, the information of the developer │ │ who changed it and the date of modification. │ -│ │ -│ @update: May 14, 2025 - Added support for crew_ai agent type │ └──────────────────────────────────────────────────────────────────────────────┘ """ diff --git a/src/schemas/agent_config.py b/src/schemas/agent_config.py index 50c95137..4c12d1b7 100644 --- a/src/schemas/agent_config.py +++ b/src/schemas/agent_config.py @@ -238,12 +238,15 @@ class WorkflowConfig(BaseModel): from_attributes = True -class CrewAITask(BaseModel): - """Task configuration for Crew AI agents""" +class AgentTask(BaseModel): + """Task configuration for agents""" agent_id: Union[UUID, str] = Field( ..., description="ID of the agent assigned to this task" ) + enabled_tools: Optional[List[str]] = Field( + default_factory=list, description="List of tool names to be used in the task" + ) description: str = Field(..., description="Description of the task to be performed") expected_output: str = Field(..., description="Expected output from this task") @@ -260,17 +263,17 @@ class CrewAITask(BaseModel): from_attributes = True -class CrewAIConfig(BaseModel): - """Configuration for Crew AI agents""" +class AgentConfig(BaseModel): + """Configuration for agents""" - tasks: List[CrewAITask] = Field( - ..., description="List of tasks to be performed by the crew" + tasks: List[AgentTask] = Field( + ..., description="List of tasks to be performed by the agent" ) api_key: Optional[str] = Field( - default_factory=generate_api_key, description="API key for the Crew AI agent" + default_factory=generate_api_key, description="API key for the agent" ) sub_agents: Optional[List[UUID]] = Field( - default_factory=list, description="List of IDs of sub-agents used in crew" + default_factory=list, description="List of IDs of sub-agents used in agent" ) class Config: diff --git a/src/schemas/schemas.py b/src/schemas/schemas.py index 6bbdc22d..fcef62d2 100644 --- a/src/schemas/schemas.py +++ b/src/schemas/schemas.py @@ -33,7 +33,7 @@ from datetime import datetime from uuid import UUID import uuid import re -from src.schemas.agent_config import LLMConfig, CrewAIConfig +from src.schemas.agent_config import LLMConfig, AgentConfig class ClientBase(BaseModel): @@ -98,7 +98,7 @@ class AgentBase(BaseModel): goal: Optional[str] = Field(None, description="Agent goal or objective") type: str = Field( ..., - description="Agent type (llm, sequential, parallel, loop, a2a, workflow, crew_ai, task)", + description="Agent type (llm, sequential, parallel, loop, a2a, workflow, task)", ) model: Optional[str] = Field( None, description="Agent model (required only for llm type)" @@ -136,11 +136,10 @@ class AgentBase(BaseModel): "loop", "a2a", "workflow", - "crew_ai", "task", ]: raise ValueError( - "Invalid agent type. Must be: llm, sequential, parallel, loop, a2a, workflow, crew_ai or task" + "Invalid agent type. Must be: llm, sequential, parallel, loop, a2a, workflow or task" ) return v @@ -200,7 +199,7 @@ class AgentBase(BaseModel): raise ValueError( f'Agent {values["type"]} must have at least one sub-agent' ) - elif values["type"] == "crew_ai" or values["type"] == "task": + elif values["type"] == "task": if not isinstance(v, dict): raise ValueError(f'Invalid configuration for agent {values["type"]}') if "tasks" not in v: @@ -217,7 +216,6 @@ class AgentBase(BaseModel): if field not in task: raise ValueError(f"Task missing required field: {field}") - # Validar sub_agents, se existir if "sub_agents" in v and v["sub_agents"] is not None: if not isinstance(v["sub_agents"], list): raise ValueError("sub_agents must be a list") diff --git a/src/services/agent_builder.py b/src/services/agent_builder.py index a615e221..00b1b368 100644 --- a/src/services/agent_builder.py +++ b/src/services/agent_builder.py @@ -40,7 +40,6 @@ from src.services.custom_tools import CustomToolBuilder from src.services.mcp_service import MCPService from src.services.custom_agents.a2a_agent import A2ACustomAgent from src.services.custom_agents.workflow_agent import WorkflowAgent -from src.services.custom_agents.crew_ai_agent import CrewAIAgent from src.services.custom_agents.task_agent import TaskAgent from src.services.apikey_service import get_decrypted_api_key from sqlalchemy.orm import Session @@ -50,7 +49,7 @@ from google.adk.tools import load_memory from datetime import datetime import uuid -from src.schemas.agent_config import CrewAITask +from src.schemas.agent_config import AgentTask logger = setup_logger(__name__) @@ -74,7 +73,7 @@ class AgentBuilder: return agent_tools async def _create_llm_agent( - self, agent + self, agent, enabled_tools: List[str] = [] ) -> Tuple[LlmAgent, Optional[AsyncExitStack]]: """Create an LLM agent from the agent data.""" # Get custom tools from the configuration @@ -95,6 +94,10 @@ class AgentBuilder: # Combine all tools all_tools = custom_tools + mcp_tools + agent_tools + if enabled_tools: + all_tools = [tool for tool in all_tools if tool.name in enabled_tools] + logger.info(f"Enabled tools enabled. Total tools: {len(all_tools)}") + now = datetime.now() current_datetime = now.strftime("%d/%m/%Y %H:%M") current_day_of_week = now.strftime("%A") @@ -200,6 +203,8 @@ class AgentBuilder: sub_agent, exit_stack = await self.build_a2a_agent(agent) elif agent.type == "workflow": sub_agent, exit_stack = await self.build_workflow_agent(agent) + elif agent.type == "task": + sub_agent, exit_stack = await self.build_task_agent(agent) elif agent.type == "sequential": sub_agent, exit_stack = await self.build_composite_agent(agent) elif agent.type == "parallel": @@ -218,7 +223,7 @@ class AgentBuilder: return sub_agents async def build_llm_agent( - self, root_agent + self, root_agent, enabled_tools: List[str] = [] ) -> Tuple[LlmAgent, Optional[AsyncExitStack]]: """Build an LLM agent with its sub-agents.""" logger.info("Creating LLM agent") @@ -230,7 +235,9 @@ class AgentBuilder: ) sub_agents = [agent for agent, _ in sub_agents_with_stacks] - root_llm_agent, exit_stack = await self._create_llm_agent(root_agent) + root_llm_agent, exit_stack = await self._create_llm_agent( + root_agent, enabled_tools + ) if sub_agents: root_llm_agent.sub_agents = sub_agents @@ -341,10 +348,11 @@ class AgentBuilder: # Convert tasks to the expected format by TaskAgent tasks = [] for task_config in config.get("tasks", []): - task = CrewAITask( + task = AgentTask( agent_id=task_config.get("agent_id"), description=task_config.get("description", ""), expected_output=task_config.get("expected_output", ""), + enabled_tools=task_config.get("enabled_tools", []), ) tasks.append(task) @@ -362,57 +370,7 @@ class AgentBuilder: except Exception as e: logger.error(f"Error building Task agent: {str(e)}") - raise ValueError(f"Error building CrewAI agent: {str(e)}") - - async def build_crew_ai_agent( - self, root_agent: Agent - ) -> Tuple[CrewAIAgent, Optional[AsyncExitStack]]: - """Build a CrewAI agent with its sub-agents.""" - logger.info(f"Creating CrewAI agent: {root_agent.name}") - - agent_config = root_agent.config or {} - - # Verify if we have tasks configured - if not agent_config.get("tasks"): - raise ValueError("tasks are required for CrewAI agents") - - try: - # Get sub-agents if there are any - sub_agents = [] - if root_agent.config.get("sub_agents"): - sub_agents_with_stacks = await self._get_sub_agents( - root_agent.config.get("sub_agents") - ) - sub_agents = [agent for agent, _ in sub_agents_with_stacks] - - # Additional configurations - config = root_agent.config or {} - - # Convert tasks to the expected format by CrewAIAgent - tasks = [] - for task_config in config.get("tasks", []): - task = CrewAITask( - agent_id=task_config.get("agent_id"), - description=task_config.get("description", ""), - expected_output=task_config.get("expected_output", ""), - ) - tasks.append(task) - - # Create the CrewAI agent - crew_ai_agent = CrewAIAgent( - name=root_agent.name, - tasks=tasks, - db=self.db, - sub_agents=sub_agents, - ) - - logger.info(f"CrewAI agent created successfully: {root_agent.name}") - - return crew_ai_agent, None - - except Exception as e: - logger.error(f"Error building CrewAI agent: {str(e)}") - raise ValueError(f"Error building CrewAI agent: {str(e)}") + raise ValueError(f"Error building Task agent: {str(e)}") async def build_composite_agent( self, root_agent @@ -477,26 +435,23 @@ class AgentBuilder: else: raise ValueError(f"Invalid agent type: {root_agent.type}") - async def build_agent(self, root_agent) -> Tuple[ + async def build_agent(self, root_agent, enabled_tools: List[str] = []) -> Tuple[ LlmAgent | SequentialAgent | ParallelAgent | LoopAgent | A2ACustomAgent | WorkflowAgent - | CrewAIAgent | TaskAgent, Optional[AsyncExitStack], ]: """Build the appropriate agent based on the type of the root agent.""" if root_agent.type == "llm": - return await self.build_llm_agent(root_agent) + return await self.build_llm_agent(root_agent, enabled_tools) elif root_agent.type == "a2a": return await self.build_a2a_agent(root_agent) elif root_agent.type == "workflow": return await self.build_workflow_agent(root_agent) - elif root_agent.type == "crew_ai": - return await self.build_crew_ai_agent(root_agent) elif root_agent.type == "task": return await self.build_task_agent(root_agent) else: diff --git a/src/services/agent_service.py b/src/services/agent_service.py index c079cf0b..bb3fd57c 100644 --- a/src/services/agent_service.py +++ b/src/services/agent_service.py @@ -202,7 +202,7 @@ async def create_agent(db: Session, agent: AgentCreate) -> Agent: if "api_key" not in agent.config or not agent.config["api_key"]: agent.config["api_key"] = generate_api_key() - elif agent.type == "crew_ai" or agent.type == "task": + elif agent.type == "task": if not isinstance(agent.config, dict): agent.config = {} raise HTTPException( @@ -682,8 +682,8 @@ async def update_agent( if "config" not in agent_data: agent_data["config"] = agent_config - if ("type" in agent_data and agent_data["type"] in ["crew_ai", "task"]) or ( - agent.type in ["crew_ai", "task"] and "config" in agent_data + if ("type" in agent_data and agent_data["type"] in ["task"]) or ( + agent.type in ["task"] and "config" in agent_data ): config = agent_data.get("config", {}) if "tasks" not in config: diff --git a/src/services/custom_agents/crew_ai_agent.py b/src/services/custom_agents/crew_ai_agent.py deleted file mode 100644 index 65fc002e..00000000 --- a/src/services/custom_agents/crew_ai_agent.py +++ /dev/null @@ -1,266 +0,0 @@ -""" -┌──────────────────────────────────────────────────────────────────────────────┐ -│ @author: Davidson Gomes │ -│ @file: crew_ai_agent.py │ -│ Developed by: Davidson Gomes │ -│ Creation date: May 14, 2025 │ -│ Contact: contato@evolution-api.com │ -├──────────────────────────────────────────────────────────────────────────────┤ -│ @copyright © Evolution API 2025. All rights reserved. │ -│ Licensed under the Apache License, Version 2.0 │ -│ │ -│ You may not use this file except in compliance with the License. │ -│ You may obtain a copy of the License at │ -│ │ -│ http://www.apache.org/licenses/LICENSE-2.0 │ -│ │ -│ Unless required by applicable law or agreed to in writing, software │ -│ distributed under the License is distributed on an "AS IS" BASIS, │ -│ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. │ -│ See the License for the specific language governing permissions and │ -│ limitations under the License. │ -├──────────────────────────────────────────────────────────────────────────────┤ -│ @important │ -│ For any future changes to the code in this file, it is recommended to │ -│ include, together with the modification, the information of the developer │ -│ who changed it and the date of modification. │ -└──────────────────────────────────────────────────────────────────────────────┘ -""" - -from attr import Factory -from google.adk.agents import BaseAgent -from google.adk.agents.invocation_context import InvocationContext -from google.adk.events import Event -from google.genai.types import Content, Part -from src.services.agent_service import get_agent -from src.services.apikey_service import get_decrypted_api_key - -from sqlalchemy.orm import Session - -from typing import AsyncGenerator, List - -from src.schemas.agent_config import CrewAITask - -from crewai import Agent, Task, Crew, LLM - - -class CrewAIAgent(BaseAgent): - """ - Custom agent that implements the CrewAI protocol directly. - - This agent implements the interaction with an external CrewAI service. - """ - - # Field declarations for Pydantic - tasks: List[CrewAITask] - db: Session - - def __init__( - self, - name: str, - tasks: List[CrewAITask], - db: Session, - sub_agents: List[BaseAgent] = [], - **kwargs, - ): - """ - Initialize the CrewAI agent. - - Args: - name: Agent name - tasks: List of tasks to be executed - db: Database session - sub_agents: List of sub-agents to be executed after the CrewAI agent - """ - # Initialize base class - super().__init__( - name=name, - tasks=tasks, - db=db, - sub_agents=sub_agents, - **kwargs, - ) - - def _generate_llm(self, model: str, api_key: str): - """ - Generate the LLM for the CrewAI agent. - """ - - return LLM(model=model, api_key=api_key) - - def _agent_builder(self, agent_id: str): - """ - Build the CrewAI agent. - """ - agent = get_agent(self.db, agent_id) - - if not agent: - raise ValueError(f"Agent with id {agent_id} not found") - - api_key = None - - decrypted_key = get_decrypted_api_key(self.db, agent.api_key_id) - if decrypted_key: - api_key = decrypted_key - else: - raise ValueError( - f"API key with ID {agent.api_key_id} not found or inactive" - ) - - if not api_key: - raise ValueError(f"API key for agent {agent.name} not found") - - return Agent( - role=agent.role, - goal=agent.goal, - backstory=agent.instruction, - llm=self._generate_llm(agent.model, api_key), - verbose=True, - ) - - def _tasks_and_agents_builder(self): - """ - Build the CrewAI tasks. - """ - tasks = [] - agents = [] - for task in self.tasks: - agent = self._agent_builder(task.agent_id) - agents.append(agent) - tasks.append( - Task( - description=task.description, - expected_output=task.expected_output, - agent=agent, - ) - ) - return tasks, agents - - def _crew_builder(self): - """ - Build the CrewAI crew. - """ - tasks, agents = self._tasks_and_agents_builder() - return Crew( - agents=agents, - tasks=tasks, - verbose=True, - ) - - async def _run_async_impl( - self, ctx: InvocationContext - ) -> AsyncGenerator[Event, None]: - """ - Implementation of the CrewAI. - - This method follows the pattern of implementing custom agents, - sending the user's message to the CrewAI service and monitoring the response. - """ - - try: - # Extract the user's message from the context - user_message = None - - # Search for the user's message in the session events - if ctx.session and hasattr(ctx.session, "events") and ctx.session.events: - for event in reversed(ctx.session.events): - if event.author == "user" and event.content and event.content.parts: - user_message = event.content.parts[0].text - print("Message found in session events") - break - - # Check in the session state if the message was not found in the events - if not user_message and ctx.session and ctx.session.state: - if "user_message" in ctx.session.state: - user_message = ctx.session.state["user_message"] - elif "message" in ctx.session.state: - user_message = ctx.session.state["message"] - - if not user_message: - yield Event( - author=self.name, - content=Content( - role="agent", - parts=[Part(text="User message not found")], - ), - ) - return - - try: - # Replace any {content} in the task descriptions with the user's input - for task in self.tasks: - task.description = task.description.replace( - "{content}", user_message - ) - - # Build the Crew - crew = self._crew_builder() - - # Start the agent status - yield Event( - author=self.name, - content=Content( - role="agent", - parts=[Part(text=f"Starting CrewAI processing...")], - ), - ) - - # Prepare inputs (if there are placeholders to replace) - inputs = {"user_message": user_message} - - # Notify the user that the processing is in progress - yield Event( - author=self.name, - content=Content( - role="agent", - parts=[Part(text=f"Processing your request...")], - ), - ) - - # Try first with kickoff() normally - try: - # If it fails, try with kickoff_async - result = await crew.kickoff_async(inputs=inputs) - print(f"Result of crew.kickoff_async(): {result}") - except Exception as e: - print(f"Error executing crew.kickoff_async(): {str(e)}") - print("Trying alternative with crew.kickoff()") - result = crew.kickoff(inputs=inputs) - print(f"Result of crew.kickoff(): {result}") - - # Create an event for the final result - final_event = Event( - author=self.name, - content=Content(role="agent", parts=[Part(text=str(result))]), - ) - - # Transmit the event to the client - yield final_event - - # Execute sub-agents - for sub_agent in self.sub_agents: - async for event in sub_agent.run_async(ctx): - yield event - - except Exception as e: - error_msg = f"Error sending request: {str(e)}" - print(error_msg) - print(f"Error type: {type(e).__name__}") - print(f"Error details: {str(e)}") - - yield Event( - author=self.name, - content=Content(role="agent", parts=[Part(text=error_msg)]), - ) - return - - except Exception as e: - # Handle any uncaught error - print(f"Error executing CrewAI agent: {str(e)}") - yield Event( - author=self.name, - content=Content( - role="agent", - parts=[Part(text=f"Error interacting with CrewAI agent: {str(e)}")], - ), - ) diff --git a/src/services/custom_agents/task_agent.py b/src/services/custom_agents/task_agent.py index a93611bd..b2247b4b 100644 --- a/src/services/custom_agents/task_agent.py +++ b/src/services/custom_agents/task_agent.py @@ -33,13 +33,12 @@ from google.adk.agents.invocation_context import InvocationContext from google.adk.events import Event from google.genai.types import Content, Part from src.services.agent_service import get_agent -from src.services.apikey_service import get_decrypted_api_key from sqlalchemy.orm import Session from typing import AsyncGenerator, List -from src.schemas.agent_config import CrewAITask +from src.schemas.agent_config import AgentTask class TaskAgent(BaseAgent): @@ -50,13 +49,13 @@ class TaskAgent(BaseAgent): """ # Field declarations for Pydantic - tasks: List[CrewAITask] + tasks: List[AgentTask] db: Session def __init__( self, name: str, - tasks: List[CrewAITask], + tasks: List[AgentTask], db: Session, sub_agents: List[BaseAgent] = [], **kwargs, @@ -132,6 +131,7 @@ class TaskAgent(BaseAgent): # Replace any {content} in the task descriptions with the user's input task = self.tasks[0] task.description = task.description.replace("{content}", user_message) + task.enabled_tools = task.enabled_tools or [] agent = get_agent(self.db, task.agent_id) @@ -166,7 +166,9 @@ class TaskAgent(BaseAgent): print(f"Building agent in Task agent: {agent.name}") agent_builder = AgentBuilder(self.db) - root_agent, exit_stack = await agent_builder.build_agent(agent) + root_agent, exit_stack = await agent_builder.build_agent( + agent, task.enabled_tools + ) # Store task instructions in context for reference by sub-agents ctx.session.state["task_instructions"] = task_message_instructions From 3622260c119ec222d989d9b4ea0ca466f5e9daa4 Mon Sep 17 00:00:00 2001 From: Davidson Gomes Date: Wed, 14 May 2025 15:10:48 -0300 Subject: [PATCH 06/11] refactor(agent_service): sanitize agent names and improve agent card fetching --- src/schemas/a2a_types.py | 2 +- src/services/agent_service.py | 47 +++- src/services/custom_agents/a2a_agent.py | 337 ++++++++++++++++++++---- 3 files changed, 324 insertions(+), 62 deletions(-) diff --git a/src/schemas/a2a_types.py b/src/schemas/a2a_types.py index c63b761f..e5c2c9bb 100644 --- a/src/schemas/a2a_types.py +++ b/src/schemas/a2a_types.py @@ -29,7 +29,7 @@ from datetime import datetime from enum import Enum -from typing import Annotated, Any, Literal +from typing import Annotated, Any, Literal, Union, Dict, List, Optional from uuid import uuid4 from typing_extensions import Self diff --git a/src/services/agent_service.py b/src/services/agent_service.py index bb3fd57c..b699daf8 100644 --- a/src/services/agent_service.py +++ b/src/services/agent_service.py @@ -103,6 +103,14 @@ def get_agent(db: Session, agent_id: Union[uuid.UUID, str]) -> Optional[Agent]: logger.warning(f"Agent not found: {agent_id}") return None + # Sanitize agent name if it contains spaces or special characters + if agent.name and any(c for c in agent.name if not (c.isalnum() or c == "_")): + agent.name = "".join( + c if c.isalnum() or c == "_" else "_" for c in agent.name + ) + # Update in database + db.commit() + return agent except SQLAlchemyError as e: logger.error(f"Error searching for agent {agent_id}: {str(e)}") @@ -144,6 +152,17 @@ def get_agents_by_client( agents = query.offset(skip).limit(limit).all() + # Sanitize agent names if they contain spaces or special characters + for agent in agents: + if agent.name and any( + c for c in agent.name if not (c.isalnum() or c == "_") + ): + agent.name = "".join( + c if c.isalnum() or c == "_" else "_" for c in agent.name + ) + # Update in database + db.commit() + return agents except SQLAlchemyError as e: logger.error(f"Error searching for client agents {client_id}: {str(e)}") @@ -176,7 +195,15 @@ async def create_agent(db: Session, agent: AgentCreate) -> Agent: agent_card = response.json() # Update agent with information from agent card - agent.name = agent_card.get("name", "Unknown Agent") + # Only update name if not provided or empty, or sanitize it + if not agent.name or agent.name.strip() == "": + # Sanitize name: remove spaces and special characters + card_name = agent_card.get("name", "Unknown Agent") + sanitized_name = "".join( + c if c.isalnum() or c == "_" else "_" for c in card_name + ) + agent.name = sanitized_name + agent.description = agent_card.get("description", "") if agent.config is None: @@ -499,7 +526,14 @@ async def update_agent( ) agent_card = response.json() - agent_data["name"] = agent_card.get("name", "Unknown Agent") + # Only update name if the original update doesn't specify a name + if "name" not in agent_data or not agent_data["name"].strip(): + # Sanitize name: remove spaces and special characters + card_name = agent_card.get("name", "Unknown Agent") + sanitized_name = "".join( + c if c.isalnum() or c == "_" else "_" for c in card_name + ) + agent_data["name"] = sanitized_name agent_data["description"] = agent_card.get("description", "") if "config" not in agent_data or agent_data["config"] is None: @@ -537,7 +571,14 @@ async def update_agent( ) agent_card = response.json() - agent_data["name"] = agent_card.get("name", "Unknown Agent") + # Only update name if the original update doesn't specify a name + if "name" not in agent_data or not agent_data["name"].strip(): + # Sanitize name: remove spaces and special characters + card_name = agent_card.get("name", "Unknown Agent") + sanitized_name = "".join( + c if c.isalnum() or c == "_" else "_" for c in card_name + ) + agent_data["name"] = sanitized_name agent_data["description"] = agent_card.get("description", "") if "config" not in agent_data or agent_data["config"] is None: diff --git a/src/services/custom_agents/a2a_agent.py b/src/services/custom_agents/a2a_agent.py index 020179be..ed2588a7 100644 --- a/src/services/custom_agents/a2a_agent.py +++ b/src/services/custom_agents/a2a_agent.py @@ -32,15 +32,21 @@ from google.adk.agents.invocation_context import InvocationContext from google.adk.events import Event from google.genai.types import Content, Part -from typing import AsyncGenerator, List - -from src.schemas.a2a_types import ( - SendTaskRequest, - Message, - TextPart, -) +from typing import AsyncGenerator, List, Dict, Any, Optional +import json import httpx +from httpx_sse import connect_sse + +from src.schemas.a2a_types import ( + AgentCard, + Message, + TextPart, + TaskSendParams, + SendTaskRequest, + SendTaskStreamingRequest, + TaskState, +) from uuid import uuid4 @@ -54,7 +60,9 @@ class A2ACustomAgent(BaseAgent): # Field declarations for Pydantic agent_card_url: str + agent_card: Optional[AgentCard] timeout: int + base_url: str def __init__( self, @@ -73,16 +81,41 @@ class A2ACustomAgent(BaseAgent): timeout: Maximum execution time (seconds) sub_agents: List of sub-agents to be executed after the A2A agent """ + # Create base_url from agent_card_url + base_url = agent_card_url + if "/.well-known/agent.json" in base_url: + base_url = base_url.split("/.well-known/agent.json")[0] + + print(f"A2A agent initialized for URL: {agent_card_url}") + # Initialize base class super().__init__( name=name, agent_card_url=agent_card_url, + base_url=base_url, # Pass base_url here + agent_card=None, timeout=timeout, sub_agents=sub_agents, **kwargs, ) - print(f"A2A agent initialized for URL: {agent_card_url}") + async def fetch_agent_card(self) -> AgentCard: + """Fetch the agent card from the A2A service.""" + if self.agent_card: + return self.agent_card + + card_url = f"{self.base_url}/.well-known/agent.json" + print(f"Fetching agent card from: {card_url}") + + async with httpx.AsyncClient() as client: + response = await client.get(card_url) + response.raise_for_status() + try: + card_data = response.json() + self.agent_card = AgentCard(**card_data) + return self.agent_card + except json.JSONDecodeError as e: + raise ValueError(f"Failed to parse agent card: {str(e)}") async def _run_async_impl( self, ctx: InvocationContext @@ -95,12 +128,18 @@ class A2ACustomAgent(BaseAgent): """ try: - # Prepare the base URL for the A2A - url = self.agent_card_url - - # Ensure that there is no /.well-known/agent.json in the url - if "/.well-known/agent.json" in url: - url = url.split("/.well-known/agent.json")[0] + # 1. First, fetch the agent card if we haven't already + try: + agent_card = await self.fetch_agent_card() + print(f"Agent card fetched: {agent_card.name}") + except Exception as e: + error_msg = f"Failed to fetch agent card: {str(e)}" + print(error_msg) + yield Event( + author=self.name, + content=Content(role="agent", parts=[Part(text=error_msg)]), + ) + return # 2. Extract the user's message from the context user_message = None @@ -120,7 +159,16 @@ class A2ACustomAgent(BaseAgent): elif "message" in ctx.session.state: user_message = ctx.session.state["message"] - # 3. Create and send the task to the A2A agent + if not user_message: + error_msg = "No user message found" + print(error_msg) + yield Event( + author=self.name, + content=Content(role="agent", parts=[Part(text=error_msg)]), + ) + return + + # 3. Create and format the task to send to the A2A agent print(f"Sending task to A2A agent: {user_message[:100]}...") # Use the session ID as a stable identifier @@ -131,66 +179,239 @@ class A2ACustomAgent(BaseAgent): ) task_id = str(uuid4()) - try: + # Prepare the message for the A2A agent + formatted_message = Message( + role="user", + parts=[TextPart(text=user_message)], + ) - formatted_message: Message = Message( - role="user", - parts=[TextPart(type="text", text=user_message)], - ) + # Prepare the task parameters + task_params = TaskSendParams( + id=task_id, + sessionId=session_id, + message=formatted_message, + acceptedOutputModes=["text"], + ) - request: SendTaskRequest = SendTaskRequest( - params={ - "message": formatted_message, - "sessionId": session_id, - "id": task_id, - } - ) + # 4. Check if the agent supports streaming + supports_streaming = ( + agent_card.capabilities.streaming if agent_card.capabilities else False + ) - print(f"Request send task: {request.model_dump()}") + if supports_streaming: + print("Agent supports streaming, using streaming API") + # Process with streaming + try: + # Criar a requisição usando o método correto de tasks/sendSubscribe + request = SendTaskStreamingRequest( + method="tasks/sendSubscribe", params=task_params + ) - # REQUEST POST to url when jsonrpc is 2.0 - task_result = await httpx.AsyncClient().post( - url, json=request.model_dump(), timeout=self.timeout - ) + try: + async with httpx.AsyncClient() as client: + response = await client.post( + self.base_url, + json=request.model_dump(), + headers={"Accept": "text/event-stream"}, + timeout=self.timeout, + ) + response.raise_for_status() - print(f"Task response: {task_result.json()}") - print(f"Task sent successfully, ID: {task_id}") + # Processar manualmente a resposta SSE + async for line in response.aiter_lines(): + if line.startswith("data:"): + data = line[5:].strip() + if data: + try: + result = json.loads(data) + print(f"Stream event received: {result}") - agent_response_parts = task_result.json()["result"]["status"][ - "message" - ]["parts"] + # Check if this is a status update with a message + if ( + "result" in result + and "status" in result["result"] + and "message" + in result["result"]["status"] + and "parts" + in result["result"]["status"]["message"] + ): + message_parts = result["result"][ + "status" + ]["message"]["parts"] + parts = [ + Part(text=part["text"]) + for part in message_parts + if part.get("type") == "text" + and "text" in part + ] - parts = [Part(text=part["text"]) for part in agent_response_parts] + if parts: + yield Event( + author=self.name, + content=Content( + role="agent", parts=parts + ), + ) - yield Event( - author=self.name, - content=Content(role="agent", parts=parts), - ) + # Check if this is a final message + if ( + "result" in result + and result.get("result", {}).get( + "final", False + ) + and "status" in result["result"] + and result["result"]["status"].get( + "state" + ) + in [ + TaskState.COMPLETED, + TaskState.CANCELED, + TaskState.FAILED, + ] + ): + print( + "Received final message, stream complete" + ) + break + except json.JSONDecodeError as e: + print(f"Error parsing SSE data: {str(e)}") + except Exception as stream_error: + print( + f"Error in direct streaming: {str(stream_error)}, falling back to regular API" + ) + # If streaming fails, fall back to regular API + # Criar a requisição usando o método correto de tasks/send + fallback_request = SendTaskRequest( + method="tasks/send", params=task_params + ) - # Run sub-agents - for sub_agent in self.sub_agents: - async for event in sub_agent.run_async(ctx): - yield event + async with httpx.AsyncClient() as client: + response = await client.post( + self.base_url, + json=fallback_request.model_dump(), + timeout=self.timeout, + ) + response.raise_for_status() - except Exception as e: - error_msg = f"Error sending request: {str(e)}" - print(error_msg) - print(f"Error type: {type(e).__name__}") - print(f"Error details: {str(e)}") + result = response.json() + print(f"Fallback response: {result}") - yield Event( - author=self.name, - content=Content(role="agent", parts=[Part(text=error_msg)]), - ) - return + # Extract agent message parts + if ( + "result" in result + and "status" in result["result"] + and "message" in result["result"]["status"] + and "parts" in result["result"]["status"]["message"] + ): + message_parts = result["result"]["status"]["message"][ + "parts" + ] + parts = [ + Part(text=part["text"]) + for part in message_parts + if part.get("type") == "text" and "text" in part + ] + + if parts: + yield Event( + author=self.name, + content=Content(role="agent", parts=parts), + ) + else: + yield Event( + author=self.name, + content=Content( + role="agent", + parts=[ + Part( + text="Received response without message parts" + ) + ], + ), + ) + except Exception as e: + error_msg = f"Error in streaming: {str(e)}" + print(error_msg) + yield Event( + author=self.name, + content=Content(role="agent", parts=[Part(text=error_msg)]), + ) + else: + print("Agent does not support streaming, using regular API") + # Process with regular request + try: + # Criar a requisição usando o método correto de tasks/send + request = SendTaskRequest(method="tasks/send", params=task_params) + + async with httpx.AsyncClient() as client: + response = await client.post( + self.base_url, + json=request.model_dump(), + timeout=self.timeout, + ) + response.raise_for_status() + + result = response.json() + print(f"Task response: {result}") + + # Extract agent message parts + if ( + "result" in result + and "status" in result["result"] + and "message" in result["result"]["status"] + and "parts" in result["result"]["status"]["message"] + ): + message_parts = result["result"]["status"]["message"][ + "parts" + ] + parts = [ + Part(text=part["text"]) + for part in message_parts + if part.get("type") == "text" and "text" in part + ] + + if parts: + yield Event( + author=self.name, + content=Content(role="agent", parts=parts), + ) + else: + yield Event( + author=self.name, + content=Content( + role="agent", + parts=[ + Part( + text="Received response without message parts" + ) + ], + ), + ) + + except Exception as e: + error_msg = f"Error sending request: {str(e)}" + print(error_msg) + print(f"Error type: {type(e).__name__}") + print(f"Error details: {str(e)}") + + yield Event( + author=self.name, + content=Content(role="agent", parts=[Part(text=error_msg)]), + ) + + # Run sub-agents + for sub_agent in self.sub_agents: + async for event in sub_agent.run_async(ctx): + yield event except Exception as e: # Handle any uncaught error - print(f"Error executing A2A agent: {str(e)}") + error_msg = f"Error executing A2A agent: {str(e)}" + print(error_msg) yield Event( author=self.name, content=Content( role="agent", - parts=[Part(text=f"Error interacting with A2A agent: {str(e)}")], + parts=[Part(text=error_msg)], ), ) From 18c68659267e9e9fde5ac27604c04cb4aedcb2e7 Mon Sep 17 00:00:00 2001 From: Davidson Gomes Date: Wed, 14 May 2025 15:17:15 -0300 Subject: [PATCH 07/11] refactor(a2a_agent): remove commented-out code and improve clarity --- src/services/custom_agents/a2a_agent.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/services/custom_agents/a2a_agent.py b/src/services/custom_agents/a2a_agent.py index ed2588a7..e2c10433 100644 --- a/src/services/custom_agents/a2a_agent.py +++ b/src/services/custom_agents/a2a_agent.py @@ -202,7 +202,6 @@ class A2ACustomAgent(BaseAgent): print("Agent supports streaming, using streaming API") # Process with streaming try: - # Criar a requisição usando o método correto de tasks/sendSubscribe request = SendTaskStreamingRequest( method="tasks/sendSubscribe", params=task_params ) @@ -217,7 +216,6 @@ class A2ACustomAgent(BaseAgent): ) response.raise_for_status() - # Processar manualmente a resposta SSE async for line in response.aiter_lines(): if line.startswith("data:"): data = line[5:].strip() @@ -279,8 +277,6 @@ class A2ACustomAgent(BaseAgent): print( f"Error in direct streaming: {str(stream_error)}, falling back to regular API" ) - # If streaming fails, fall back to regular API - # Criar a requisição usando o método correto de tasks/send fallback_request = SendTaskRequest( method="tasks/send", params=task_params ) @@ -340,7 +336,6 @@ class A2ACustomAgent(BaseAgent): print("Agent does not support streaming, using regular API") # Process with regular request try: - # Criar a requisição usando o método correto de tasks/send request = SendTaskRequest(method="tasks/send", params=task_params) async with httpx.AsyncClient() as client: From 958eeec4a6e247f7d016513104cff2b1c4d0ca80 Mon Sep 17 00:00:00 2001 From: Davidson Gomes Date: Wed, 14 May 2025 18:22:15 -0300 Subject: [PATCH 08/11] fix(docs): correct Model Control Protocol to Model Context Protocol in README --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index f1892dba..2e805739 100644 --- a/README.md +++ b/README.md @@ -196,7 +196,7 @@ Task Agent is ideal for scenarios where you need to execute a specific, well-def ### MCP Server Configuration -Agents can be integrated with MCP (Model Control Protocol) servers for distributed processing: +Agents can be integrated with MCP (Model Context Protocol) servers for distributed processing: ```json { @@ -543,7 +543,7 @@ cd evo-ai-frontend After installation, follow these steps to set up your first agent: -1. **Configure MCP Server**: Set up your Model Control Protocol server configuration first +1. **Configure MCP Server**: Set up your Model Context Protocol server configuration first 2. **Create Client or Register**: Create a new client or register a user account 3. **Create Agents**: Set up the agents according to your needs (LLM, A2A, Sequential, Parallel, Loop, or Workflow) From 6bf0ea52e05a2ca1ae1e63d9764eddf2de1e63bb Mon Sep 17 00:00:00 2001 From: Davidson Gomes Date: Wed, 14 May 2025 22:15:08 -0300 Subject: [PATCH 09/11] feat(a2a): add file support and multimodal content processing for A2A protocol --- CHANGELOG.md | 2 + src/api/a2a_routes.py | 89 ++++++++- src/api/chat_routes.py | 30 ++- src/api/session_routes.py | 167 ++++++++++++++++- src/schemas/chat.py | 46 +++-- src/services/a2a_task_manager.py | 307 +++++++++++++++++++++++++++++-- src/services/agent_runner.py | 138 +++++++++++++- src/utils/a2a_utils.py | 133 +++++++++++++ 8 files changed, 869 insertions(+), 43 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 389534c3..cdd00c64 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add Task Agent for structured single-task execution - Improve context management in agent execution +- Add file support for A2A protocol (Agent-to-Agent) endpoints +- Implement multimodal content processing in A2A messages ## [0.0.9] - 2025-05-13 diff --git a/src/api/a2a_routes.py b/src/api/a2a_routes.py index b3d7d6d4..712a1b2e 100644 --- a/src/api/a2a_routes.py +++ b/src/api/a2a_routes.py @@ -31,6 +31,7 @@ Routes for the A2A (Agent-to-Agent) protocol. This module implements the standard A2A routes according to the specification. +Supports both text messages and file uploads through the message parts mechanism. """ import uuid @@ -92,7 +93,39 @@ async def process_a2a_request( db: Session = Depends(get_db), a2a_service: A2AService = Depends(get_a2a_service), ): - """Processes an A2A request.""" + """ + Processes an A2A request. + + Supports both text messages and file uploads. For file uploads, + include file parts in the message following the A2A protocol format: + + { + "jsonrpc": "2.0", + "id": "request-id", + "method": "tasks/send", + "params": { + "id": "task-id", + "sessionId": "session-id", + "message": { + "role": "user", + "parts": [ + { + "type": "text", + "text": "Analyze this image" + }, + { + "type": "file", + "file": { + "name": "example.jpg", + "mimeType": "image/jpeg", + "bytes": "base64-encoded-content" + } + } + ] + } + } + } + """ # Verify the API key if not verify_api_key(db, x_api_key): raise HTTPException(status_code=401, detail="Invalid API key") @@ -100,10 +133,60 @@ async def process_a2a_request( # Process the request try: request_body = await request.json() + + debug_request_body = {} + if "method" in request_body: + debug_request_body["method"] = request_body["method"] + if "id" in request_body: + debug_request_body["id"] = request_body["id"] + + logger.info(f"A2A request received: {debug_request_body}") + + # Log if request contains file parts for debugging + if isinstance(request_body, dict) and "params" in request_body: + params = request_body.get("params", {}) + message = params.get("message", {}) + parts = message.get("parts", []) + + logger.info(f"A2A message contains {len(parts)} parts") + for i, part in enumerate(parts): + if not isinstance(part, dict): + logger.warning(f"Part {i+1} is not a dictionary: {type(part)}") + continue + + part_type = part.get("type") + logger.info(f"Part {i+1} type: {part_type}") + + if part_type == "file": + file_info = part.get("file", {}) + logger.info( + f"File part found: {file_info.get('name')} ({file_info.get('mimeType')})" + ) + if "bytes" in file_info: + bytes_data = file_info.get("bytes", "") + bytes_size = len(bytes_data) * 0.75 + logger.info(f"File size: ~{bytes_size/1024:.2f} KB") + if bytes_data: + sample = ( + bytes_data[:10] + "..." + if len(bytes_data) > 10 + else bytes_data + ) + logger.info(f"Sample of base64 data: {sample}") + elif part_type == "text": + text_content = part.get("text", "") + preview = ( + text_content[:30] + "..." + if len(text_content) > 30 + else text_content + ) + logger.info(f"Text part found: '{preview}'") + result = await a2a_service.process_request(agent_id, request_body) # If the response is a streaming response, return as EventSourceResponse if hasattr(result, "__aiter__"): + logger.info("Returning streaming response") async def event_generator(): async for item in result: @@ -115,11 +198,15 @@ async def process_a2a_request( return EventSourceResponse(event_generator()) # Otherwise, return as JSONResponse + logger.info("Returning standard JSON response") if hasattr(result, "model_dump"): return JSONResponse(result.model_dump(exclude_none=True)) return JSONResponse(result) except Exception as e: logger.error(f"Error processing A2A request: {e}") + import traceback + + logger.error(f"Full traceback: {traceback.format_exc()}") return JSONResponse( status_code=500, content={ diff --git a/src/api/chat_routes.py b/src/api/chat_routes.py index 8846080b..bf7cd0a2 100644 --- a/src/api/chat_routes.py +++ b/src/api/chat_routes.py @@ -28,6 +28,7 @@ """ import uuid +import base64 from fastapi import ( APIRouter, Depends, @@ -47,7 +48,7 @@ from src.core.jwt_middleware import ( from src.services import ( agent_service, ) -from src.schemas.chat import ChatRequest, ChatResponse, ErrorResponse +from src.schemas.chat import ChatRequest, ChatResponse, ErrorResponse, FileData from src.services.agent_runner import run_agent, run_agent_stream from src.core.exceptions import AgentNotFoundError from src.services.service_providers import ( @@ -59,7 +60,7 @@ from src.services.service_providers import ( from datetime import datetime import logging import json -from typing import Optional, Dict +from typing import Optional, Dict, List, Any logger = logging.getLogger(__name__) @@ -195,6 +196,29 @@ async def websocket_chat( if not message: continue + files = None + if data.get("files") and isinstance(data.get("files"), list): + try: + files = [] + for file_data in data.get("files"): + if ( + isinstance(file_data, dict) + and file_data.get("filename") + and file_data.get("content_type") + and file_data.get("data") + ): + files.append( + FileData( + filename=file_data.get("filename"), + content_type=file_data.get("content_type"), + data=file_data.get("data"), + ) + ) + logger.info(f"Processed {len(files)} files via WebSocket") + except Exception as e: + logger.error(f"Error processing files: {str(e)}") + files = None + async for chunk in run_agent_stream( agent_id=agent_id, external_id=external_id, @@ -203,6 +227,7 @@ async def websocket_chat( artifacts_service=artifacts_service, memory_service=memory_service, db=db, + files=files, ): await websocket.send_json( {"message": json.loads(chunk), "turn_complete": False} @@ -259,6 +284,7 @@ async def chat( artifacts_service, memory_service, db, + files=request.files, ) return { diff --git a/src/api/session_routes.py b/src/api/session_routes.py index 2961293f..500993ae 100644 --- a/src/api/session_routes.py +++ b/src/api/session_routes.py @@ -30,8 +30,9 @@ from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from src.config.database import get_db -from typing import List +from typing import List, Optional, Dict, Any import uuid +import base64 from src.core.jwt_middleware import ( get_jwt_token, verify_user_client, @@ -48,7 +49,7 @@ from src.services.session_service import ( get_sessions_by_agent, get_sessions_by_client, ) -from src.services.service_providers import session_service +from src.services.service_providers import session_service, artifacts_service import logging logger = logging.getLogger(__name__) @@ -118,13 +119,18 @@ async def get_session( @router.get( "/{session_id}/messages", - response_model=List[Event], ) async def get_agent_messages( session_id: str, db: Session = Depends(get_db), payload: dict = Depends(get_jwt_token), ): + """ + Gets messages from a session with embedded artifacts. + + This function loads all messages from a session and processes any references + to artifacts, loading them and converting them to base64 for direct use in the frontend. + """ # Get the session session = get_session_by_id(session_service, session_id) if not session: @@ -139,7 +145,160 @@ async def get_agent_messages( if agent: await verify_user_client(payload, db, agent.client_id) - return get_session_events(session_service, session_id) + # Parse session ID para obter app_name e user_id + parts = session_id.split("_") + if len(parts) != 2: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid session ID format" + ) + + user_id, app_name = parts[0], parts[1] + + events = get_session_events(session_service, session_id) + + processed_events = [] + for event in events: + event_dict = event.dict() + + def process_dict(d): + if isinstance(d, dict): + for key, value in list(d.items()): + if isinstance(value, bytes): + try: + d[key] = base64.b64encode(value).decode("utf-8") + logger.debug(f"Converted bytes field to base64: {key}") + except Exception as e: + logger.error(f"Error encoding bytes to base64: {str(e)}") + d[key] = None + elif isinstance(value, dict): + process_dict(value) + elif isinstance(value, list): + for item in value: + if isinstance(item, (dict, list)): + process_dict(item) + elif isinstance(d, list): + for i, item in enumerate(d): + if isinstance(item, bytes): + try: + d[i] = base64.b64encode(item).decode("utf-8") + except Exception as e: + logger.error( + f"Error encoding bytes to base64 in list: {str(e)}" + ) + d[i] = None + elif isinstance(item, (dict, list)): + process_dict(item) + return d + + # Process all event dictionary + event_dict = process_dict(event_dict) + + # Process the content parts specifically + if event_dict.get("content") and event_dict["content"].get("parts"): + for part in event_dict["content"]["parts"]: + # Process inlineData if present + if part and part.get("inlineData") and part["inlineData"].get("data"): + # Check if it's already a string or if it's bytes + if isinstance(part["inlineData"]["data"], bytes): + # Convert bytes to base64 string + part["inlineData"]["data"] = base64.b64encode( + part["inlineData"]["data"] + ).decode("utf-8") + logger.debug( + f"Converted binary data to base64 in message {event_dict.get('id')}" + ) + + # Process fileData if present (reference to an artifact) + if part and part.get("fileData") and part["fileData"].get("fileId"): + try: + # Extract the file name from the fileId + file_id = part["fileData"]["fileId"] + + # Load the artifact from the artifacts service + artifact = artifacts_service.load_artifact( + app_name=app_name, + user_id=user_id, + session_id=session_id, + filename=file_id, + ) + + if artifact and hasattr(artifact, "inline_data"): + # Extract the data and MIME type + file_bytes = artifact.inline_data.data + mime_type = artifact.inline_data.mime_type + + # Add inlineData with the artifact data + if not part.get("inlineData"): + part["inlineData"] = {} + + # Ensure we're sending a base64 string, not bytes + if isinstance(file_bytes, bytes): + try: + part["inlineData"]["data"] = base64.b64encode( + file_bytes + ).decode("utf-8") + except Exception as e: + logger.error( + f"Error encoding artifact to base64: {str(e)}" + ) + part["inlineData"]["data"] = None + else: + part["inlineData"]["data"] = str(file_bytes) + + part["inlineData"]["mimeType"] = mime_type + + logger.debug( + f"Loaded artifact {file_id} for message {event_dict.get('id')}" + ) + except Exception as e: + logger.error(f"Error loading artifact: {str(e)}") + # Don't interrupt the flow if an artifact fails + + # Check artifact_delta in actions + if event_dict.get("actions") and event_dict["actions"].get("artifact_delta"): + artifact_deltas = event_dict["actions"]["artifact_delta"] + for filename, version in artifact_deltas.items(): + try: + # Load the artifact + artifact = artifacts_service.load_artifact( + app_name=app_name, + user_id=user_id, + session_id=session_id, + filename=filename, + version=version, + ) + + if artifact and hasattr(artifact, "inline_data"): + # If the event doesn't have an artifacts section, create it + if "artifacts" not in event_dict: + event_dict["artifacts"] = {} + + # Add the artifact to the event's artifacts list + file_bytes = artifact.inline_data.data + mime_type = artifact.inline_data.mime_type + + # Ensure the bytes are converted to base64 + event_dict["artifacts"][filename] = { + "data": ( + base64.b64encode(file_bytes).decode("utf-8") + if isinstance(file_bytes, bytes) + else str(file_bytes) + ), + "mimeType": mime_type, + "version": version, + } + + logger.debug( + f"Added artifact {filename} (v{version}) to message {event_dict.get('id')}" + ) + except Exception as e: + logger.error( + f"Error processing artifact_delta {filename}: {str(e)}" + ) + + processed_events.append(event_dict) + + return processed_events @router.delete( diff --git a/src/schemas/chat.py b/src/schemas/chat.py index 696860a8..6188ca7a 100644 --- a/src/schemas/chat.py +++ b/src/schemas/chat.py @@ -27,36 +27,42 @@ └──────────────────────────────────────────────────────────────────────────────┘ """ -from pydantic import BaseModel, Field -from typing import Dict, Any, Optional +from pydantic import BaseModel, Field, validator +from typing import Dict, List, Optional, Any +from datetime import datetime + + +class FileData(BaseModel): + """Model to represent file data sent in a chat request.""" + + filename: str = Field(..., description="File name") + content_type: str = Field(..., description="File content type") + data: str = Field(..., description="File content encoded in base64") class ChatRequest(BaseModel): - """Schema for chat requests""" + """Model to represent a chat request.""" - agent_id: str = Field( - ..., description="ID of the agent that will process the message" + agent_id: str = Field(..., description="Agent ID to process the message") + external_id: str = Field(..., description="External ID for user identification") + message: str = Field(..., description="User message to the agent") + files: Optional[List[FileData]] = Field( + None, description="List of files attached to the message" ) - external_id: str = Field( - ..., description="ID of the external_id that will process the message" - ) - message: str = Field(..., description="User message") class ChatResponse(BaseModel): - """Schema for chat responses""" + """Model to represent a chat response.""" - response: str = Field(..., description="Agent response") - status: str = Field(..., description="Operation status") - error: Optional[str] = Field(None, description="Error message, if there is one") - timestamp: str = Field(..., description="Timestamp of the response") + response: str = Field(..., description="Response generated by the agent") + message_history: List[Dict[str, Any]] = Field( + default_factory=list, description="Message history" + ) + status: str = Field(..., description="Response status (success/error)") + timestamp: str = Field(..., description="Response timestamp") class ErrorResponse(BaseModel): - """Schema for error responses""" + """Model to represent an error response.""" - error: str = Field(..., description="Error message") - status_code: int = Field(..., description="HTTP status code of the error") - details: Optional[Dict[str, Any]] = Field( - None, description="Additional error details" - ) + detail: str = Field(..., description="Error details") diff --git a/src/services/a2a_task_manager.py b/src/services/a2a_task_manager.py index ce86676d..d019cfe7 100644 --- a/src/services/a2a_task_manager.py +++ b/src/services/a2a_task_manager.py @@ -33,8 +33,11 @@ from collections.abc import AsyncIterable from typing import Dict, Optional from uuid import UUID import json +import base64 +import uuid as uuid_pkg from sqlalchemy.orm import Session +from google.genai.types import Part, Blob from src.config.settings import settings from src.services.agent_service import ( @@ -76,6 +79,7 @@ from src.schemas.a2a_types import ( AgentAuthentication, AgentProvider, ) +from src.schemas.chat import FileData logger = logging.getLogger(__name__) @@ -281,12 +285,29 @@ class A2ATaskManager: all_messages.append(agent_message) task_state = self._determine_task_state(result) - artifact = Artifact(parts=agent_message.parts, index=0) + + # Create artifacts for any file content + artifacts = [] + # First, add the main response as an artifact + artifacts.append(Artifact(parts=agent_message.parts, index=0)) + + # Also add any files from the message history + for idx, msg in enumerate(all_messages, 1): + for part in msg.parts: + if hasattr(part, "type") and part.type == "file": + artifacts.append( + Artifact( + parts=[part], + index=idx, + name=part.file.name, + description=f"File from message {idx}", + ) + ) task = await self.update_store( task_params.id, TaskStatus(state=task_state, message=agent_message), - [artifact], + artifacts, ) await self._update_task_history( @@ -400,6 +421,32 @@ class A2ATaskManager: final_message = None + # Check for files in the user message and include them as artifacts + user_files = [] + for part in request.params.message.parts: + if ( + hasattr(part, "type") + and part.type == "file" + and hasattr(part, "file") + ): + user_files.append( + Artifact( + parts=[part], + index=0, + name=part.file.name if part.file.name else "file", + description="File from user", + ) + ) + + # Send artifacts for any user files + for artifact in user_files: + yield SendTaskStreamingResponse( + id=request.id, + result=TaskArtifactUpdateEvent( + id=request.params.id, artifact=artifact + ), + ) + async for chunk in run_agent_stream( agent_id=str(agent.id), external_id=external_id, @@ -418,7 +465,48 @@ class A2ATaskManager: parts = content.get("parts", []) if parts: - update_message = Message(role=role, parts=parts) + # Modify to handle file parts as well + agent_parts = [] + for part in parts: + # Handle different part types + if part.get("type") == "text": + agent_parts.append(part) + full_response += part.get("text", "") + elif part.get("inlineData") and part["inlineData"].get( + "data" + ): + # Convert inline data to file part + mime_type = part["inlineData"].get( + "mimeType", "application/octet-stream" + ) + file_name = f"file_{uuid_pkg.uuid4().hex}{self._get_extension_from_mime(mime_type)}" + file_part = { + "type": "file", + "file": { + "name": file_name, + "mimeType": mime_type, + "bytes": part["inlineData"]["data"], + }, + } + agent_parts.append(file_part) + + # Also send as artifact + yield SendTaskStreamingResponse( + id=request.id, + result=TaskArtifactUpdateEvent( + id=request.params.id, + artifact=Artifact( + parts=[file_part], + index=0, + name=file_name, + description=f"Generated {mime_type} file", + ), + ), + ) + + if agent_parts: + update_message = Message(role=role, parts=agent_parts) + final_message = update_message yield SendTaskStreamingResponse( id=request.id, @@ -431,11 +519,6 @@ class A2ATaskManager: final=False, ), ) - - for part in parts: - if part.get("type") == "text": - full_response += part.get("text", "") - final_message = update_message except Exception as e: logger.error(f"Error processing chunk: {e}, chunk: {chunk}") continue @@ -485,6 +568,29 @@ class A2ATaskManager: error=InternalError(message=f"Error streaming task process: {str(e)}"), ) + def _get_extension_from_mime(self, mime_type: str) -> str: + """Get a file extension from MIME type.""" + if not mime_type: + return "" + + mime_map = { + "image/jpeg": ".jpg", + "image/png": ".png", + "image/gif": ".gif", + "application/pdf": ".pdf", + "text/plain": ".txt", + "text/html": ".html", + "text/csv": ".csv", + "application/json": ".json", + "application/xml": ".xml", + "application/msword": ".doc", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx", + "application/vnd.ms-excel": ".xls", + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": ".xlsx", + } + + return mime_map.get(mime_type, "") + async def update_store( self, task_id: str, @@ -514,19 +620,193 @@ class A2ATaskManager: return task def _extract_user_query(self, task_params: TaskSendParams) -> str: - """Extracts the user query from the task parameters.""" + """Extracts the user query from the task parameters and processes any files.""" if not task_params.message or not task_params.message.parts: raise ValueError("Message or parts are missing in task parameters") - part = task_params.message.parts[0] - if part.type != "text": - raise ValueError("Only text parts are supported") + # Process file parts first + text_parts = [] + has_files = False + file_parts = [] - return part.text + logger.info( + f"Extracting query from message with {len(task_params.message.parts)} parts" + ) + + # Extract text parts and file parts separately + for idx, part in enumerate(task_params.message.parts): + logger.info( + f"Processing part {idx+1}, type: {getattr(part, 'type', 'unknown')}" + ) + if hasattr(part, "type"): + if part.type == "text": + logger.info(f"Found text part: '{part.text[:50]}...' (truncated)") + text_parts.append(part.text) + elif part.type == "file": + logger.info( + f"Found file part: {getattr(getattr(part, 'file', None), 'name', 'unnamed')}" + ) + has_files = True + try: + processed_file = self._process_file_part( + part, task_params.sessionId + ) + if processed_file: + file_parts.append(processed_file) + except Exception as e: + logger.error(f"Error processing file part: {e}") + # Continue with other parts even if a file fails + else: + logger.warning(f"Unknown part type: {part.type}") + else: + logger.warning(f"Part has no type attribute: {part}") + + # Store the file parts in self for later use + self._last_processed_files = file_parts if file_parts else None + + # If we have at least one text part, use that as the query + if text_parts: + final_query = " ".join(text_parts) + logger.info( + f"Final query from text parts: '{final_query[:50]}...' (truncated)" + ) + return final_query + # If we only have file parts, create a generic query asking for analysis + elif has_files: + logger.info("No text parts, using generic query for file analysis") + return "Analyze the attached files" + else: + logger.error("No supported content parts found in the message") + raise ValueError("No supported content parts found in the message") + + def _process_file_part(self, part, session_id: str): + """Processes a file part and saves it to the artifact service. + + Returns: + dict: Processed file information to pass to agent_runner + """ + if not hasattr(part, "file") or not part.file: + logger.warning("File part missing file data") + return None + + file_data = part.file + + if not file_data.name: + file_data.name = f"file_{uuid_pkg.uuid4().hex}" + + logger.info(f"Processing file {file_data.name} for session {session_id}") + + if file_data.bytes: + # Process file data provided as base64 string + try: + # Convert base64 to bytes + logger.info(f"Decoding base64 content for file {file_data.name}") + file_bytes = base64.b64decode(file_data.bytes) + + # Determine MIME type based on binary content + mime_type = ( + file_data.mimeType if hasattr(file_data, "mimeType") else None + ) + + if not mime_type or mime_type == "application/octet-stream": + # Detection by byte signature + if file_bytes.startswith(b"\xff\xd8\xff"): # JPEG signature + mime_type = "image/jpeg" + elif file_bytes.startswith(b"\x89PNG\r\n\x1a\n"): # PNG signature + mime_type = "image/png" + elif file_bytes.startswith(b"GIF87a") or file_bytes.startswith( + b"GIF89a" + ): # GIF + mime_type = "image/gif" + elif file_bytes.startswith(b"%PDF"): # PDF + mime_type = "application/pdf" + else: + # Fallback to avoid generic type in images + if file_data.name.lower().endswith((".jpg", ".jpeg")): + mime_type = "image/jpeg" + elif file_data.name.lower().endswith(".png"): + mime_type = "image/png" + elif file_data.name.lower().endswith(".gif"): + mime_type = "image/gif" + elif file_data.name.lower().endswith(".pdf"): + mime_type = "application/pdf" + else: + mime_type = "application/octet-stream" + + logger.info( + f"Decoded file size: {len(file_bytes)} bytes, MIME type: {mime_type}" + ) + + # Split session_id to get app_name and user_id + parts = session_id.split("_") + if len(parts) != 2: + user_id = session_id + app_name = "a2a" + else: + user_id, app_name = parts + + # Create artifact Part + logger.info(f"Creating artifact Part for file {file_data.name}") + artifact = Part(inline_data=Blob(mime_type=mime_type, data=file_bytes)) + + # Save to artifact service + logger.info( + f"Saving artifact {file_data.name} to {app_name}/{user_id}/{session_id}" + ) + version = artifacts_service.save_artifact( + app_name=app_name, + user_id=user_id, + session_id=session_id, + filename=file_data.name, + artifact=artifact, + ) + + logger.info( + f"Successfully saved file {file_data.name} (version {version}) for session {session_id}" + ) + + # Import the FileData model from the chat schema + from src.schemas.chat import FileData + + # Create a FileData object instead of a dictionary + # This is compatible with what agent_runner.py expects + return FileData( + filename=file_data.name, + content_type=mime_type, + data=file_data.bytes, # Keep the original base64 format + ) + + except Exception as e: + logger.error(f"Error processing file data: {str(e)}") + # Log more details about the error to help with debugging + import traceback + + logger.error(f"Full traceback: {traceback.format_exc()}") + raise + + elif file_data.uri: + # Handling URIs would require additional implementation + # For now, log that we received a URI but can't process it + logger.warning(f"File URI references not yet implemented: {file_data.uri}") + # Future enhancement: fetch the file from the URI and save it + return None + + return None async def _run_agent(self, agent: Agent, query: str, session_id: str) -> dict: """Executes the agent to process the user query.""" try: + files = getattr(self, "_last_processed_files", None) + + if files: + logger.info(f"Passing {len(files)} files to run_agent") + for file_info in files: + logger.info( + f"File being sent: {file_info.filename} ({file_info.content_type})" + ) + else: + logger.info("No files to pass to run_agent") + # We call the same function used in the chat API return await run_agent( agent_id=str(agent.id), @@ -536,6 +816,7 @@ class A2ATaskManager: artifacts_service=artifacts_service, memory_service=memory_service, db=self.db, + files=files, ) except Exception as e: logger.error(f"Error running agent: {e}") diff --git a/src/services/agent_runner.py b/src/services/agent_runner.py index 1d330754..4e205875 100644 --- a/src/services/agent_runner.py +++ b/src/services/agent_runner.py @@ -28,7 +28,7 @@ """ from google.adk.runners import Runner -from google.genai.types import Content, Part +from google.genai.types import Content, Part, Blob from google.adk.sessions import DatabaseSessionService from google.adk.memory import InMemoryMemoryService from google.adk.artifacts.in_memory_artifact_service import InMemoryArtifactService @@ -42,6 +42,7 @@ import asyncio import json from src.utils.otel import get_tracer from opentelemetry import trace +import base64 logger = setup_logger(__name__) @@ -56,6 +57,7 @@ async def run_agent( db: Session, session_id: Optional[str] = None, timeout: float = 60.0, + files: Optional[list] = None, ): tracer = get_tracer() with tracer.start_as_current_span( @@ -65,6 +67,7 @@ async def run_agent( "external_id": external_id, "session_id": session_id or f"{external_id}_{agent_id}", "message": message, + "has_files": files is not None and len(files) > 0, }, ): exit_stack = None @@ -74,6 +77,9 @@ async def run_agent( ) logger.info(f"Received message: {message}") + if files and len(files) > 0: + logger.info(f"Received {len(files)} files with message") + get_root_agent = get_agent(db, agent_id) logger.info( f"Root agent found: {get_root_agent.name} (type: {get_root_agent.type})" @@ -113,7 +119,63 @@ async def run_agent( session_id=adk_session_id, ) - content = Content(role="user", parts=[Part(text=message)]) + file_parts = [] + if files and len(files) > 0: + for file_data in files: + try: + file_bytes = base64.b64decode(file_data.data) + + logger.info(f"DEBUG - Processing file: {file_data.filename}") + logger.info(f"DEBUG - File size: {len(file_bytes)} bytes") + logger.info(f"DEBUG - MIME type: '{file_data.content_type}'") + logger.info(f"DEBUG - First 20 bytes: {file_bytes[:20]}") + + try: + file_part = Part( + inline_data=Blob( + mime_type=file_data.content_type, data=file_bytes + ) + ) + logger.info(f"DEBUG - Part created successfully") + except Exception as part_error: + logger.error( + f"DEBUG - Error creating Part: {str(part_error)}" + ) + logger.error( + f"DEBUG - Error type: {type(part_error).__name__}" + ) + import traceback + + logger.error( + f"DEBUG - Stack trace: {traceback.format_exc()}" + ) + raise + + # Save the file in the ArtifactService + version = artifacts_service.save_artifact( + app_name=agent_id, + user_id=external_id, + session_id=adk_session_id, + filename=file_data.filename, + artifact=file_part, + ) + logger.info( + f"Saved file {file_data.filename} as version {version}" + ) + + # Add the Part to the list of parts for the message content + file_parts.append(file_part) + except Exception as e: + logger.error( + f"Error processing file {file_data.filename}: {str(e)}" + ) + + # Create the content with the text message and the files + parts = [Part(text=message)] + if file_parts: + parts.extend(file_parts) + + content = Content(role="user", parts=parts) logger.info("Starting agent execution") final_response_text = "No final response captured." @@ -256,6 +318,7 @@ async def run_agent_stream( memory_service: InMemoryMemoryService, db: Session, session_id: Optional[str] = None, + files: Optional[list] = None, ) -> AsyncGenerator[str, None]: tracer = get_tracer() span = tracer.start_span( @@ -265,6 +328,7 @@ async def run_agent_stream( "external_id": external_id, "session_id": session_id or f"{external_id}_{agent_id}", "message": message, + "has_files": files is not None and len(files) > 0, }, ) try: @@ -275,6 +339,9 @@ async def run_agent_stream( ) logger.info(f"Received message: {message}") + if files and len(files) > 0: + logger.info(f"Received {len(files)} files with message") + get_root_agent = get_agent(db, agent_id) logger.info( f"Root agent found: {get_root_agent.name} (type: {get_root_agent.type})" @@ -314,7 +381,72 @@ async def run_agent_stream( session_id=adk_session_id, ) - content = Content(role="user", parts=[Part(text=message)]) + # Process the received files + file_parts = [] + if files and len(files) > 0: + for file_data in files: + try: + # Decode the base64 file + file_bytes = base64.b64decode(file_data.data) + + # Detailed debug + logger.info( + f"DEBUG - Processing file: {file_data.filename}" + ) + logger.info(f"DEBUG - File size: {len(file_bytes)} bytes") + logger.info( + f"DEBUG - MIME type: '{file_data.content_type}'" + ) + logger.info(f"DEBUG - First 20 bytes: {file_bytes[:20]}") + + # Create a Part for the file using the default constructor + try: + file_part = Part( + inline_data=Blob( + mime_type=file_data.content_type, + data=file_bytes, + ) + ) + logger.info(f"DEBUG - Part created successfully") + except Exception as part_error: + logger.error( + f"DEBUG - Error creating Part: {str(part_error)}" + ) + logger.error( + f"DEBUG - Error type: {type(part_error).__name__}" + ) + import traceback + + logger.error( + f"DEBUG - Stack trace: {traceback.format_exc()}" + ) + raise + + # Save the file in the ArtifactService + version = artifacts_service.save_artifact( + app_name=agent_id, + user_id=external_id, + session_id=adk_session_id, + filename=file_data.filename, + artifact=file_part, + ) + logger.info( + f"Saved file {file_data.filename} as version {version}" + ) + + # Add the Part to the list of parts for the message content + file_parts.append(file_part) + except Exception as e: + logger.error( + f"Error processing file {file_data.filename}: {str(e)}" + ) + + # Create the content with the text message and the files + parts = [Part(text=message)] + if file_parts: + parts.extend(file_parts) + + content = Content(role="user", parts=parts) logger.info("Starting agent streaming execution") try: diff --git a/src/utils/a2a_utils.py b/src/utils/a2a_utils.py index e58b968c..62e9bbd5 100644 --- a/src/utils/a2a_utils.py +++ b/src/utils/a2a_utils.py @@ -27,10 +27,16 @@ └──────────────────────────────────────────────────────────────────────────────┘ """ +import base64 +import uuid +from typing import Dict, List, Any, Optional +from google.genai.types import Part, Blob + from src.schemas.a2a_types import ( ContentTypeNotSupportedError, JSONRPCResponse, UnsupportedOperationError, + Message, ) @@ -55,3 +61,130 @@ def new_incompatible_types_error(request_id): def new_not_implemented_error(request_id): return JSONRPCResponse(id=request_id, error=UnsupportedOperationError()) + + +def extract_files_from_message(message: Message) -> List[Dict[str, Any]]: + """ + Extract file parts from an A2A message. + + Args: + message: An A2A Message object + + Returns: + List of file parts extracted from the message + """ + if not message or not message.parts: + return [] + + files = [] + for part in message.parts: + if hasattr(part, "type") and part.type == "file" and hasattr(part, "file"): + files.append(part) + + return files + + +def a2a_part_to_adk_part(a2a_part: Dict[str, Any]) -> Optional[Part]: + """ + Convert an A2A protocol part to an ADK Part object. + + Args: + a2a_part: An A2A part dictionary + + Returns: + Converted ADK Part object or None if conversion not possible + """ + part_type = a2a_part.get("type") + if part_type == "file" and "file" in a2a_part: + file_data = a2a_part["file"] + if "bytes" in file_data: + try: + # Convert base64 to bytes + file_bytes = base64.b64decode(file_data["bytes"]) + mime_type = file_data.get("mimeType", "application/octet-stream") + + # Create ADK Part + return Part(inline_data=Blob(mime_type=mime_type, data=file_bytes)) + except Exception: + return None + elif part_type == "text" and "text" in a2a_part: + # For text parts, we could create a text blob if needed + return None + + return None + + +def adk_part_to_a2a_part( + adk_part: Part, filename: Optional[str] = None +) -> Optional[Dict[str, Any]]: + """ + Convert an ADK Part object to an A2A protocol part. + + Args: + adk_part: An ADK Part object + filename: Optional filename to use + + Returns: + Converted A2A Part dictionary or None if conversion not possible + """ + if hasattr(adk_part, "inline_data") and adk_part.inline_data: + if adk_part.inline_data.data and adk_part.inline_data.mime_type: + # Convert binary data to base64 + file_bytes = adk_part.inline_data.data + mime_type = adk_part.inline_data.mime_type + + # Generate filename if not provided + if not filename: + ext = get_extension_from_mime(mime_type) + filename = f"file_{uuid.uuid4().hex}{ext}" + + # Convert to A2A FilePart dict + return { + "type": "file", + "file": { + "name": filename, + "mimeType": mime_type, + "bytes": ( + base64.b64encode(file_bytes).decode("utf-8") + if isinstance(file_bytes, bytes) + else str(file_bytes) + ), + }, + } + elif hasattr(adk_part, "text") and adk_part.text: + # Convert text part + return {"type": "text", "text": adk_part.text} + + return None + + +def get_extension_from_mime(mime_type: str) -> str: + """ + Get a file extension from MIME type. + + Args: + mime_type: MIME type string + + Returns: + Appropriate file extension with leading dot + """ + if not mime_type: + return "" + + mime_map = { + "image/jpeg": ".jpg", + "image/png": ".png", + "image/gif": ".gif", + "application/pdf": ".pdf", + "text/plain": ".txt", + "text/html": ".html", + "text/csv": ".csv", + "application/json": ".json", + "application/xml": ".xml", + "application/msword": ".doc", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx", + "application/vnd.ms-excel": ".xls", + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": ".xlsx", + } + + return mime_map.get(mime_type, "") From 1656fda8dad717db80360d049978407878d83b57 Mon Sep 17 00:00:00 2001 From: Davidson Gomes Date: Wed, 14 May 2025 22:21:58 -0300 Subject: [PATCH 10/11] refactor(a2a_task_manager): enhance logging and file handling in streaming task processing --- src/services/a2a_task_manager.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/services/a2a_task_manager.py b/src/services/a2a_task_manager.py index d019cfe7..c37c898e 100644 --- a/src/services/a2a_task_manager.py +++ b/src/services/a2a_task_manager.py @@ -388,6 +388,7 @@ class A2ATaskManager: self, request: SendTaskStreamingRequest, agent: Agent ) -> AsyncIterable[SendTaskStreamingResponse]: """Processes a task in streaming mode using the specified agent.""" + # Extrair e processar arquivos da mesma forma que no método _process_task query = self._extract_user_query(request.params) try: @@ -447,6 +448,23 @@ class A2ATaskManager: ), ) + # Use os arquivos processados do _extract_user_query + files = getattr(self, "_last_processed_files", None) + + # Log sobre os arquivos processados + if files: + logger.info( + f"Streaming: Passando {len(files)} arquivos processados para run_agent_stream" + ) + for file_info in files: + logger.info( + f"Streaming: Arquivo sendo enviado: {file_info.filename} ({file_info.content_type})" + ) + else: + logger.warning( + "Streaming: Nenhum arquivo processado disponível para enviar ao agente" + ) + async for chunk in run_agent_stream( agent_id=str(agent.id), external_id=external_id, @@ -455,6 +473,7 @@ class A2ATaskManager: artifacts_service=artifacts_service, memory_service=memory_service, db=self.db, + files=files, # Passar os arquivos processados para o streaming ): try: chunk_data = json.loads(chunk) From 22a771abd87c7cba4c33fc81ea5fab2377a49c5a Mon Sep 17 00:00:00 2001 From: Davidson Gomes Date: Thu, 15 May 2025 19:01:08 -0300 Subject: [PATCH 11/11] chore(changelog): update version date for release 0.0.10 --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cdd00c64..c632d49e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,7 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [0.0.10] - develop +## [0.0.10] - 2025-05-15 ### Added