diff --git a/CHANGELOG.md b/CHANGELOG.md index 20deee8f..d0123340 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Add CrewAI agents +- Add Task Agent for structured single-task execution +- Improve context management in agent execution ## [0.0.9] - 2025-05-13 diff --git a/README.md b/README.md index 76a545f5..ee50710d 100644 --- a/README.md +++ b/README.md @@ -187,6 +187,39 @@ Allows organizing agents into a "crew" with specific tasks assigned to each agen } ``` +### 8. Task Agent + +Executes a specific task using a target agent. Task Agent provides a streamlined approach for structured task execution, where the agent_id specifies which agent will process the task, and the task description can include dynamic content placeholders. + +```json +{ + "client_id": "{{client_id}}", + "name": "web_search_task", + "type": "task", + "folder_id": "folder_id (optional)", + "config": { + "tasks": [ + { + "agent_id": "search-agent-uuid", + "description": "Search the web for information about {content}", + "expected_output": "Comprehensive search results with relevant information" + } + ], + "sub_agents": ["post-processing-agent-uuid"] + } +} +``` + +Key features of Task Agent: + +- Passes structured task instructions to the designated agent +- Supports variable content using {content} placeholder in the task description +- Provides clear task definition with instructions and expected output format +- Can execute sub-agents after the main task is completed +- Simplifies orchestration for single-focused task execution + +Task Agent is ideal for scenarios where you need to execute a specific, well-defined task with clear instructions and expectations. + ### Common Characteristics - All agent types can have sub-agents diff --git a/migrations/versions/2df073c7b564_add_task_agent_type_agents_table.py b/migrations/versions/2df073c7b564_add_task_agent_type_agents_table.py new file mode 100644 index 00000000..c0e579b8 --- /dev/null +++ b/migrations/versions/2df073c7b564_add_task_agent_type_agents_table.py @@ -0,0 +1,43 @@ +"""add_task_agent_type_agents_table + +Revision ID: 2df073c7b564 +Revises: 611d84e70bb2 +Create Date: 2025-05-14 11:46:39.573247 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = "2df073c7b564" +down_revision: Union[str, None] = "611d84e70bb2" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.drop_constraint("check_agent_type", "agents", type_="check") + op.create_check_constraint( + "check_agent_type", + "agents", + "type IN ('llm', 'sequential', 'parallel', 'loop', 'a2a', 'workflow', 'crew_ai', 'task')", + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.drop_constraint("check_agent_type", "agents", type_="check") + op.create_check_constraint( + "check_agent_type", + "agents", + "type IN ('llm', 'sequential', 'parallel', 'loop', 'a2a', 'workflow', 'crew_ai')", + ) + # ### end Alembic commands ### diff --git a/src/models/models.py b/src/models/models.py index 5693b254..546073f5 100644 --- a/src/models/models.py +++ b/src/models/models.py @@ -123,7 +123,7 @@ class Agent(Base): __table_args__ = ( CheckConstraint( - "type IN ('llm', 'sequential', 'parallel', 'loop', 'a2a', 'workflow', 'crew_ai')", + "type IN ('llm', 'sequential', 'parallel', 'loop', 'a2a', 'workflow', 'crew_ai', 'task')", name="check_agent_type", ), ) diff --git a/src/schemas/schemas.py b/src/schemas/schemas.py index a955fe63..6bbdc22d 100644 --- a/src/schemas/schemas.py +++ b/src/schemas/schemas.py @@ -98,7 +98,7 @@ class AgentBase(BaseModel): goal: Optional[str] = Field(None, description="Agent goal or objective") type: str = Field( ..., - description="Agent type (llm, sequential, parallel, loop, a2a, workflow, crew_ai)", + description="Agent type (llm, sequential, parallel, loop, a2a, workflow, crew_ai, task)", ) model: Optional[str] = Field( None, description="Agent model (required only for llm type)" @@ -137,9 +137,10 @@ class AgentBase(BaseModel): "a2a", "workflow", "crew_ai", + "task", ]: raise ValueError( - "Invalid agent type. Must be: llm, sequential, parallel, loop, a2a, workflow or crew_ai" + "Invalid agent type. Must be: llm, sequential, parallel, loop, a2a, workflow, crew_ai or task" ) return v @@ -199,7 +200,7 @@ class AgentBase(BaseModel): raise ValueError( f'Agent {values["type"]} must have at least one sub-agent' ) - elif values["type"] == "crew_ai": + elif values["type"] == "crew_ai" or values["type"] == "task": if not isinstance(v, dict): raise ValueError(f'Invalid configuration for agent {values["type"]}') if "tasks" not in v: diff --git a/src/services/agent_builder.py b/src/services/agent_builder.py index 321a2a1a..a615e221 100644 --- a/src/services/agent_builder.py +++ b/src/services/agent_builder.py @@ -41,6 +41,7 @@ from src.services.mcp_service import MCPService from src.services.custom_agents.a2a_agent import A2ACustomAgent from src.services.custom_agents.workflow_agent import WorkflowAgent from src.services.custom_agents.crew_ai_agent import CrewAIAgent +from src.services.custom_agents.task_agent import TaskAgent from src.services.apikey_service import get_decrypted_api_key from sqlalchemy.orm import Session from contextlib import AsyncExitStack @@ -314,6 +315,55 @@ class AgentBuilder: logger.error(f"Error building Workflow agent: {str(e)}") raise ValueError(f"Error building Workflow agent: {str(e)}") + async def build_task_agent( + self, root_agent + ) -> Tuple[TaskAgent, Optional[AsyncExitStack]]: + """Build a task agent with its sub-agents.""" + logger.info(f"Creating Task agent: {root_agent.name}") + + agent_config = root_agent.config or {} + + if not agent_config.get("tasks"): + raise ValueError("tasks are required for Task agents") + + try: + # Get sub-agents if there are any + sub_agents = [] + if root_agent.config.get("sub_agents"): + sub_agents_with_stacks = await self._get_sub_agents( + root_agent.config.get("sub_agents") + ) + sub_agents = [agent for agent, _ in sub_agents_with_stacks] + + # Additional configurations + config = root_agent.config or {} + + # Convert tasks to the expected format by TaskAgent + tasks = [] + for task_config in config.get("tasks", []): + task = CrewAITask( + agent_id=task_config.get("agent_id"), + description=task_config.get("description", ""), + expected_output=task_config.get("expected_output", ""), + ) + tasks.append(task) + + # Create the Task agent + task_agent = TaskAgent( + name=root_agent.name, + tasks=tasks, + db=self.db, + sub_agents=sub_agents, + ) + + logger.info(f"Task agent created successfully: {root_agent.name}") + + return task_agent, None + + except Exception as e: + logger.error(f"Error building Task agent: {str(e)}") + raise ValueError(f"Error building CrewAI agent: {str(e)}") + async def build_crew_ai_agent( self, root_agent: Agent ) -> Tuple[CrewAIAgent, Optional[AsyncExitStack]]: @@ -434,7 +484,8 @@ class AgentBuilder: | LoopAgent | A2ACustomAgent | WorkflowAgent - | CrewAIAgent, + | CrewAIAgent + | TaskAgent, Optional[AsyncExitStack], ]: """Build the appropriate agent based on the type of the root agent.""" @@ -446,5 +497,7 @@ class AgentBuilder: return await self.build_workflow_agent(root_agent) elif root_agent.type == "crew_ai": return await self.build_crew_ai_agent(root_agent) + elif root_agent.type == "task": + return await self.build_task_agent(root_agent) else: return await self.build_composite_agent(root_agent) diff --git a/src/services/agent_service.py b/src/services/agent_service.py index f271ad26..c079cf0b 100644 --- a/src/services/agent_service.py +++ b/src/services/agent_service.py @@ -202,7 +202,7 @@ async def create_agent(db: Session, agent: AgentCreate) -> Agent: if "api_key" not in agent.config or not agent.config["api_key"]: agent.config["api_key"] = generate_api_key() - elif agent.type == "crew_ai": + elif agent.type == "crew_ai" or agent.type == "task": if not isinstance(agent.config, dict): agent.config = {} raise HTTPException( @@ -213,7 +213,7 @@ async def create_agent(db: Session, agent: AgentCreate) -> Agent: if "tasks" not in agent.config: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, - detail="Invalid configuration: tasks is required for crew_ai agents", + detail=f"Invalid configuration: tasks is required for {agent.type} agents", ) if not agent.config["tasks"]: @@ -682,14 +682,14 @@ async def update_agent( if "config" not in agent_data: agent_data["config"] = agent_config - if ("type" in agent_data and agent_data["type"] == "crew_ai") or ( - agent.type == "crew_ai" and "config" in agent_data + if ("type" in agent_data and agent_data["type"] in ["crew_ai", "task"]) or ( + agent.type in ["crew_ai", "task"] and "config" in agent_data ): config = agent_data.get("config", {}) if "tasks" not in config: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, - detail="Invalid configuration: tasks is required for crew_ai agents", + detail=f"Invalid configuration: tasks is required for {agent_data.get('type', agent.type)} agents", ) if not config["tasks"]: diff --git a/src/services/custom_agents/crew_ai_agent.py b/src/services/custom_agents/crew_ai_agent.py index 1afffeb2..65fc002e 100644 --- a/src/services/custom_agents/crew_ai_agent.py +++ b/src/services/custom_agents/crew_ai_agent.py @@ -1,9 +1,9 @@ """ ┌──────────────────────────────────────────────────────────────────────────────┐ │ @author: Davidson Gomes │ -│ @file: a2a_agent.py │ +│ @file: crew_ai_agent.py │ │ Developed by: Davidson Gomes │ -│ Creation date: May 13, 2025 │ +│ Creation date: May 14, 2025 │ │ Contact: contato@evolution-api.com │ ├──────────────────────────────────────────────────────────────────────────────┤ │ @copyright © Evolution API 2025. All rights reserved. │ diff --git a/src/services/custom_agents/task_agent.py b/src/services/custom_agents/task_agent.py new file mode 100644 index 00000000..a93611bd --- /dev/null +++ b/src/services/custom_agents/task_agent.py @@ -0,0 +1,231 @@ +""" +┌──────────────────────────────────────────────────────────────────────────────┐ +│ @author: Davidson Gomes │ +│ @file: task_agent.py │ +│ Developed by: Davidson Gomes │ +│ Creation date: May 14, 2025 │ +│ Contact: contato@evolution-api.com │ +├──────────────────────────────────────────────────────────────────────────────┤ +│ @copyright © Evolution API 2025. All rights reserved. │ +│ Licensed under the Apache License, Version 2.0 │ +│ │ +│ You may not use this file except in compliance with the License. │ +│ You may obtain a copy of the License at │ +│ │ +│ http://www.apache.org/licenses/LICENSE-2.0 │ +│ │ +│ Unless required by applicable law or agreed to in writing, software │ +│ distributed under the License is distributed on an "AS IS" BASIS, │ +│ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. │ +│ See the License for the specific language governing permissions and │ +│ limitations under the License. │ +├──────────────────────────────────────────────────────────────────────────────┤ +│ @important │ +│ For any future changes to the code in this file, it is recommended to │ +│ include, together with the modification, the information of the developer │ +│ who changed it and the date of modification. │ +└──────────────────────────────────────────────────────────────────────────────┘ +""" + +from attr import Factory +from google.adk.agents import BaseAgent +from google.adk.agents.invocation_context import InvocationContext +from google.adk.events import Event +from google.genai.types import Content, Part +from src.services.agent_service import get_agent +from src.services.apikey_service import get_decrypted_api_key + +from sqlalchemy.orm import Session + +from typing import AsyncGenerator, List + +from src.schemas.agent_config import CrewAITask + + +class TaskAgent(BaseAgent): + """ + Custom agent that implements the Task function. + + This agent implements the interaction with an external Task service. + """ + + # Field declarations for Pydantic + tasks: List[CrewAITask] + db: Session + + def __init__( + self, + name: str, + tasks: List[CrewAITask], + db: Session, + sub_agents: List[BaseAgent] = [], + **kwargs, + ): + """ + Initialize the Task agent. + + Args: + name: Agent name + tasks: List of tasks to be executed + db: Database session + sub_agents: List of sub-agents to be executed after the Task agent + """ + # Initialize base class + super().__init__( + name=name, + tasks=tasks, + db=db, + sub_agents=sub_agents, + **kwargs, + ) + + async def _run_async_impl( + self, ctx: InvocationContext + ) -> AsyncGenerator[Event, None]: + """ + Implementation of the Task agent. + + This method follows the pattern of implementing custom agents, + sending the user's message to the Task service and monitoring the response. + """ + exit_stack = None + + try: + # Extract the user's message from the context + user_message = None + + # Search for the user's message in the session events + if ctx.session and hasattr(ctx.session, "events") and ctx.session.events: + for event in reversed(ctx.session.events): + if event.author == "user" and event.content and event.content.parts: + user_message = event.content.parts[0].text + print("Message found in session events") + break + + # Check in the session state if the message was not found in the events + if not user_message and ctx.session and ctx.session.state: + if "user_message" in ctx.session.state: + user_message = ctx.session.state["user_message"] + elif "message" in ctx.session.state: + user_message = ctx.session.state["message"] + + if not user_message: + yield Event( + author=self.name, + content=Content( + role="agent", + parts=[Part(text="User message not found")], + ), + ) + return + + # Start the agent status + yield Event( + author=self.name, + content=Content( + role="agent", + parts=[Part(text=f"Starting {self.name} task processing...")], + ), + ) + + try: + # Replace any {content} in the task descriptions with the user's input + task = self.tasks[0] + task.description = task.description.replace("{content}", user_message) + + agent = get_agent(self.db, task.agent_id) + + if not agent: + yield Event( + author=self.name, + content=Content(parts=[Part(text="Agent not found")]), + ) + return + + # Prepare task instructions + task_message_instructions = f""" + + + Execute the following task: + + {task.description} + {task.expected_output} + + """ + + # Send task instructions as an event + yield Event( + author=f"{self.name} - Task executor", + content=Content( + role="agent", + parts=[Part(text=task_message_instructions)], + ), + ) + + from src.services.agent_builder import AgentBuilder + + print(f"Building agent in Task agent: {agent.name}") + agent_builder = AgentBuilder(self.db) + root_agent, exit_stack = await agent_builder.build_agent(agent) + + # Store task instructions in context for reference by sub-agents + ctx.session.state["task_instructions"] = task_message_instructions + + # Process the agent responses + try: + async for event in root_agent.run_async(ctx): + yield event + except GeneratorExit: + print("Generator was closed prematurely, handling cleanup...") + # Allow the exception to propagate after cleanup + raise + except Exception as e: + error_msg = f"Error during agent execution: {str(e)}" + print(error_msg) + yield Event( + author=self.name, + content=Content( + role="agent", + parts=[Part(text=error_msg)], + ), + ) + + except Exception as e: + error_msg = f"Error sending request: {str(e)}" + print(error_msg) + print(f"Error type: {type(e).__name__}") + print(f"Error details: {str(e)}") + + yield Event( + author=self.name, + content=Content(role="agent", parts=[Part(text=error_msg)]), + ) + + except Exception as e: + # Handle any uncaught error + print(f"Error executing Task agent: {str(e)}") + yield Event( + author=self.name, + content=Content( + role="agent", + parts=[Part(text=f"Error interacting with Task agent: {str(e)}")], + ), + ) + finally: + # Ensure we close the exit_stack in the same task context where it was created + if exit_stack: + try: + await exit_stack.aclose() + print("Exit stack closed successfully") + except Exception as e: + print(f"Error closing exit_stack: {str(e)}") + + # Execute sub-agents only if no exception occurred + try: + if "e" not in locals(): + for sub_agent in self.sub_agents: + async for event in sub_agent.run_async(ctx): + yield event + except Exception as sub_e: + print(f"Error executing sub-agents: {str(sub_e)}") + # We don't yield a new event here to avoid raising during cleanup