diff --git a/src/api/a2a_routes.py b/src/api/a2a_routes.py index effae9fb..bbf17dd0 100644 --- a/src/api/a2a_routes.py +++ b/src/api/a2a_routes.py @@ -124,7 +124,7 @@ def get_task_manager(agent_id, db=None, reuse=True, operation_type="query"): return task_manager -@router.post("/{agent_id}") +@router.post("/{agent_id}/rpc") async def process_a2a_request( agent_id: uuid.UUID, request: Request, diff --git a/src/services/agent_builder.py b/src/services/agent_builder.py index 02240cfa..9c21f5ce 100644 --- a/src/services/agent_builder.py +++ b/src/services/agent_builder.py @@ -24,7 +24,7 @@ class AgentBuilder: self.custom_tool_builder = CustomToolBuilder() self.mcp_service = MCPService() - def _get_current_time(self, city: str = "new york") -> dict: + def get_current_time(self, city: str = "new york") -> dict: """Get the current time in a city.""" city_timezones = { "new york": "America/New_York", @@ -99,7 +99,7 @@ class AgentBuilder: ) # Combine all tools - all_tools = custom_tools + mcp_tools + [self._get_current_time] + all_tools = custom_tools + mcp_tools + [self.get_current_time] now = datetime.now() current_datetime = now.strftime("%d/%m/%Y %H:%M") @@ -116,7 +116,7 @@ class AgentBuilder: ) # Add get_current_time instructions to prompt - formatted_prompt += "\n\nUse the get_current_time tool to get the current time in a city. The tool is available in the tools section of the configuration. Use 'new york' by default if no city is provided.\n\n" + formatted_prompt += "Use the get_current_time tool to get the current time in a city. The tool is available in the tools section of the configuration. Use 'new york' by default if no city is provided.ALWAYS use the 'get_current_time' tool when you need to use the current date and time in any type of situation, whether in interaction with the user or for calling other tools.\n\n" # Check if load_memory is enabled # before_model_callback_func = None @@ -144,10 +144,15 @@ class AgentBuilder: """Get and create LLM sub-agents.""" sub_agents = [] for sub_agent_id in sub_agent_ids: - agent = get_agent(self.db, sub_agent_id) + sub_agent_id_str = str(sub_agent_id) + + agent = get_agent(self.db, sub_agent_id_str) if agent is None: - raise AgentNotFoundError(f"Agent with ID {sub_agent_id} not found") + logger.error(f"Sub-agent not found: {sub_agent_id_str}") + raise AgentNotFoundError(f"Agent with ID {sub_agent_id_str} not found") + + logger.info(f"Sub-agent found: {agent.name} (type: {agent.type})") if agent.type == "llm": sub_agent, exit_stack = await self._create_llm_agent(agent) @@ -163,6 +168,10 @@ class AgentBuilder: raise ValueError(f"Invalid agent type: {agent.type}") sub_agents.append((sub_agent, exit_stack)) + logger.info(f"Sub-agent added: {agent.name}") + + logger.info(f"Sub-agents created: {len(sub_agents)}") + logger.info(f"Sub-agents: {str(sub_agents)}") return sub_agents @@ -224,15 +233,33 @@ class AgentBuilder: self, root_agent ) -> Tuple[SequentialAgent | ParallelAgent | LoopAgent, Optional[AsyncExitStack]]: """Build a composite agent (Sequential, Parallel or Loop) with its sub-agents.""" - logger.info(f"Processing sub-agents for agent {root_agent.type}") + logger.info( + f"Processing sub-agents for agent {root_agent.type} (ID: {root_agent.id}, Name: {root_agent.name})" + ) + + if not root_agent.config.get("sub_agents"): + logger.error( + f"Sub_agents configuration not found or empty for agent {root_agent.name}" + ) + raise ValueError(f"Missing sub_agents configuration for {root_agent.name}") + + logger.info( + f"Sub-agents IDs to be processed: {root_agent.config.get('sub_agents', [])}" + ) sub_agents_with_stacks = await self._get_sub_agents( root_agent.config.get("sub_agents", []) ) + + logger.info( + f"Sub-agents processed: {len(sub_agents_with_stacks)} of {len(root_agent.config.get('sub_agents', []))}" + ) + sub_agents = [agent for agent, _ in sub_agents_with_stacks] + logger.info(f"Extracted sub-agents: {[agent.name for agent in sub_agents]}") if root_agent.type == "sequential": - logger.info("Creating SequentialAgent") + logger.info(f"Creating SequentialAgent with {len(sub_agents)} sub-agents") return ( SequentialAgent( name=root_agent.name, @@ -242,7 +269,7 @@ class AgentBuilder: None, ) elif root_agent.type == "parallel": - logger.info("Creating ParallelAgent") + logger.info(f"Creating ParallelAgent with {len(sub_agents)} sub-agents") return ( ParallelAgent( name=root_agent.name, @@ -252,7 +279,7 @@ class AgentBuilder: None, ) elif root_agent.type == "loop": - logger.info("Creating LoopAgent") + logger.info(f"Creating LoopAgent with {len(sub_agents)} sub-agents") return ( LoopAgent( name=root_agent.name, diff --git a/src/services/agent_runner.py b/src/services/agent_runner.py index 9ee4702b..01b7449d 100644 --- a/src/services/agent_runner.py +++ b/src/services/agent_runner.py @@ -85,23 +85,31 @@ async def run_agent( new_message=content, ) + last_response = None + all_responses = [] + async for event in events_async: - if event.is_final_response(): - if event.content and event.content.parts: - # Assuming text response in the first part - await response_queue.put(event.content.parts[0].text) - elif event.actions and event.actions.escalate: - await response_queue.put( - f"Agent escalated: {event.error_message or 'No specific message.'}" - ) + if ( + event.content + and event.content.parts + and event.content.parts[0].text + ): + current_text = event.content.parts[0].text + last_response = current_text + all_responses.append(current_text) + if event.actions and event.actions.escalate: + escalate_text = f"Agent escalated: {event.error_message or 'No specific message.'}" + await response_queue.put(escalate_text) execution_completed.set() - break + return - if not execution_completed.is_set(): + if last_response: + await response_queue.put(last_response) + else: await response_queue.put("Finished without specific response") - execution_completed.set() + execution_completed.set() except Exception as e: logger.error(f"Error in process_events: {str(e)}") await response_queue.put(f"Error: {str(e)}") diff --git a/src/services/agent_service.py b/src/services/agent_service.py index bcaab7a6..07df77b8 100644 --- a/src/services/agent_service.py +++ b/src/services/agent_service.py @@ -35,12 +35,26 @@ def _convert_uuid_to_str(obj): return obj -def validate_sub_agents(db: Session, sub_agents: List[uuid.UUID]) -> bool: +def validate_sub_agents(db: Session, sub_agents: List[Union[uuid.UUID, str]]) -> bool: """Validate if all sub-agents exist""" + logger.info(f"Validando sub-agentes: {sub_agents}") + + if not sub_agents: + logger.warning("Lista de sub-agentes vazia") + return False + for agent_id in sub_agents: - agent = get_agent(db, agent_id) + # Garantir que o ID esteja no formato correto + agent_id_str = str(agent_id) + logger.info(f"Validando sub-agente com ID: {agent_id_str}") + + agent = get_agent(db, agent_id_str) if not agent: + logger.warning(f"Sub-agente não encontrado: {agent_id_str}") return False + logger.info(f"Sub-agente válido: {agent.name} (ID: {agent_id_str})") + + logger.info(f"Todos os {len(sub_agents)} sub-agentes são válidos") return True @@ -133,31 +147,29 @@ async def create_agent(db: Session, agent: AgentCreate) -> Agent: detail=f"Failed to process agent card: {str(e)}", ) - # Additional sub-agent validation (for non-llm and non-a2a types) - elif agent.type != "llm": - if not isinstance(agent.config, dict): - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="Invalid configuration: must be an object with sub_agents", - ) + if not isinstance(agent.config, dict): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Invalid configuration: must be an object with sub_agents", + ) - if "sub_agents" not in agent.config: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="Invalid configuration: sub_agents is required for sequential, parallel or loop agents", - ) + if "sub_agents" not in agent.config: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Invalid configuration: sub_agents is required for sequential, parallel or loop agents", + ) - if not agent.config["sub_agents"]: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="Invalid configuration: sub_agents cannot be empty", - ) + if not agent.config["sub_agents"]: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Invalid configuration: sub_agents cannot be empty", + ) - if not validate_sub_agents(db, agent.config["sub_agents"]): - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="One or more sub-agents do not exist", - ) + if not validate_sub_agents(db, agent.config["sub_agents"]): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="One or more sub-agents do not exist", + ) # Process the configuration before creating the agent config = agent.config