Merge branch 'release/0.0.10'

This commit is contained in:
Davidson Gomes 2025-05-15 19:01:18 -03:00
commit 17d72238c7
23 changed files with 1889 additions and 151 deletions

View File

@ -5,6 +5,15 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [0.0.10] - 2025-05-15
### Added
- Add Task Agent for structured single-task execution
- Improve context management in agent execution
- Add file support for A2A protocol (Agent-to-Agent) endpoints
- Implement multimodal content processing in A2A messages
## [0.0.9] - 2025-05-13
### Added

View File

@ -11,9 +11,10 @@ The Evo AI platform allows:
- Client management
- MCP server configuration
- Custom tools management
- **[Google Agent Development Kit (ADK)](https://google.github.io/adk-docs/)**: Base framework for agent development, providing support for LLM Agents, Sequential Agents, Loop Agents, Parallel Agents and Custom Agents
- JWT authentication with email verification
- **Agent 2 Agent (A2A) Protocol Support**: Interoperability between AI agents following Google's A2A specification
- **Workflow Agent with LangGraph**: Building complex agent workflows with LangGraph and ReactFlow
- **[Agent 2 Agent (A2A) Protocol Support](https://developers.googleblog.com/en/a2a-a-new-era-of-agent-interoperability/)**: Interoperability between AI agents following Google's A2A specification
- **[Workflow Agent with LangGraph](https://www.langchain.com/langgraph)**: Building complex agent workflows with LangGraph and ReactFlow
- **Secure API Key Management**: Encrypted storage of API keys with Fernet encryption
- **Agent Organization**: Folder structure for organizing agents by categories
@ -30,6 +31,8 @@ Agent based on language models like GPT-4, Claude, etc. Can be configured with t
"client_id": "{{client_id}}",
"name": "personal_assistant",
"description": "Specialized personal assistant",
"role": "Personal Assistant",
"goal": "Help users with daily tasks and provide relevant information",
"type": "llm",
"model": "gpt-4",
"api_key_id": "stored-api-key-uuid",
@ -150,6 +153,39 @@ Executes sub-agents in a custom workflow defined by a graph structure. This agen
The workflow structure is built using ReactFlow in the frontend, allowing visual creation and editing of complex agent workflows with nodes (representing agents or decision points) and edges (representing flow connections).
### 7. Task Agent
Executes a specific task using a target agent. Task Agent provides a streamlined approach for structured task execution, where the agent_id specifies which agent will process the task, and the task description can include dynamic content placeholders.
```json
{
"client_id": "{{client_id}}",
"name": "web_search_task",
"type": "task",
"folder_id": "folder_id (optional)",
"config": {
"tasks": [
{
"agent_id": "search-agent-uuid",
"description": "Search the web for information about {content}",
"expected_output": "Comprehensive search results with relevant information"
}
],
"sub_agents": ["post-processing-agent-uuid"]
}
}
```
Key features of Task Agent:
- Passes structured task instructions to the designated agent
- Supports variable content using {content} placeholder in the task description
- Provides clear task definition with instructions and expected output format
- Can execute sub-agents after the main task is completed
- Simplifies orchestration for single-focused task execution
Task Agent is ideal for scenarios where you need to execute a specific, well-defined task with clear instructions and expectations.
### Common Characteristics
- All agent types can have sub-agents
@ -160,7 +196,7 @@ The workflow structure is built using ReactFlow in the frontend, allowing visual
### MCP Server Configuration
Agents can be integrated with MCP (Model Control Protocol) servers for distributed processing:
Agents can be integrated with MCP (Model Context Protocol) servers for distributed processing:
```json
{
@ -355,7 +391,7 @@ Evo AI implements the Google's Agent 2 Agent (A2A) protocol, enabling seamless c
- **Standardized Communication**: Agents can communicate using a common protocol regardless of their underlying implementation
- **Interoperability**: Support for agents built with different frameworks and technologies
- **Well-Known Endpoints**: Standardized endpoints for agent discovery and interaction
- **Task Management**: Support for task-based interactions between agents
- **Task Management**: Support for task creation, execution, and status tracking
- **State Management**: Tracking of agent states and conversation history
- **Authentication**: Secure API key-based authentication for agent interactions
@ -507,7 +543,7 @@ cd evo-ai-frontend
After installation, follow these steps to set up your first agent:
1. **Configure MCP Server**: Set up your Model Control Protocol server configuration first
1. **Configure MCP Server**: Set up your Model Context Protocol server configuration first
2. **Create Client or Register**: Create a new client or register a user account
3. **Create Agents**: Set up the agents according to your needs (LLM, A2A, Sequential, Parallel, Loop, or Workflow)

View File

@ -0,0 +1,43 @@
"""add_task_agent_type_agents_table
Revision ID: 2df073c7b564
Revises: 611d84e70bb2
Create Date: 2025-05-14 11:46:39.573247
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "2df073c7b564"
down_revision: Union[str, None] = "611d84e70bb2"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint("check_agent_type", "agents", type_="check")
op.create_check_constraint(
"check_agent_type",
"agents",
"type IN ('llm', 'sequential', 'parallel', 'loop', 'a2a', 'workflow', 'crew_ai', 'task')",
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint("check_agent_type", "agents", type_="check")
op.create_check_constraint(
"check_agent_type",
"agents",
"type IN ('llm', 'sequential', 'parallel', 'loop', 'a2a', 'workflow', 'crew_ai')",
)
# ### end Alembic commands ###

View File

@ -0,0 +1,34 @@
"""add_crew_ai_coluns_agents_table
Revision ID: 611d84e70bb2
Revises: bdc5d363e2e1
Create Date: 2025-05-14 07:31:08.741620
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '611d84e70bb2'
down_revision: Union[str, None] = 'bdc5d363e2e1'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('agents', sa.Column('role', sa.String(), nullable=True))
op.add_column('agents', sa.Column('goal', sa.Text(), nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('agents', 'goal')
op.drop_column('agents', 'role')
# ### end Alembic commands ###

View File

@ -0,0 +1,43 @@
"""add_crew_ai_agent_type_agents_table
Revision ID: bdc5d363e2e1
Revises: 6db4a526335b
Create Date: 2025-05-14 06:23:14.701878
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "bdc5d363e2e1"
down_revision: Union[str, None] = "6db4a526335b"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint("check_agent_type", "agents", type_="check")
op.create_check_constraint(
"check_agent_type",
"agents",
"type IN ('llm', 'sequential', 'parallel', 'loop', 'a2a', 'workflow', 'crew_ai')",
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint("check_agent_type", "agents", type_="check")
op.create_check_constraint(
"check_agent_type",
"agents",
"type IN ('llm', 'sequential', 'parallel', 'loop', 'a2a', 'workflow')",
)
# ### end Alembic commands ###

View File

@ -30,7 +30,7 @@ dependencies = [
"google-cloud-aiplatform==1.90.0",
"python-dotenv==1.1.0",
"google-adk==0.3.0",
"litellm==1.68.1",
"litellm>=1.68.0,<1.69.0",
"python-multipart==0.0.20",
"alembic==1.15.2",
"asyncpg==0.30.0",

View File

@ -31,6 +31,7 @@
Routes for the A2A (Agent-to-Agent) protocol.
This module implements the standard A2A routes according to the specification.
Supports both text messages and file uploads through the message parts mechanism.
"""
import uuid
@ -92,7 +93,39 @@ async def process_a2a_request(
db: Session = Depends(get_db),
a2a_service: A2AService = Depends(get_a2a_service),
):
"""Processes an A2A request."""
"""
Processes an A2A request.
Supports both text messages and file uploads. For file uploads,
include file parts in the message following the A2A protocol format:
{
"jsonrpc": "2.0",
"id": "request-id",
"method": "tasks/send",
"params": {
"id": "task-id",
"sessionId": "session-id",
"message": {
"role": "user",
"parts": [
{
"type": "text",
"text": "Analyze this image"
},
{
"type": "file",
"file": {
"name": "example.jpg",
"mimeType": "image/jpeg",
"bytes": "base64-encoded-content"
}
}
]
}
}
}
"""
# Verify the API key
if not verify_api_key(db, x_api_key):
raise HTTPException(status_code=401, detail="Invalid API key")
@ -100,10 +133,60 @@ async def process_a2a_request(
# Process the request
try:
request_body = await request.json()
debug_request_body = {}
if "method" in request_body:
debug_request_body["method"] = request_body["method"]
if "id" in request_body:
debug_request_body["id"] = request_body["id"]
logger.info(f"A2A request received: {debug_request_body}")
# Log if request contains file parts for debugging
if isinstance(request_body, dict) and "params" in request_body:
params = request_body.get("params", {})
message = params.get("message", {})
parts = message.get("parts", [])
logger.info(f"A2A message contains {len(parts)} parts")
for i, part in enumerate(parts):
if not isinstance(part, dict):
logger.warning(f"Part {i+1} is not a dictionary: {type(part)}")
continue
part_type = part.get("type")
logger.info(f"Part {i+1} type: {part_type}")
if part_type == "file":
file_info = part.get("file", {})
logger.info(
f"File part found: {file_info.get('name')} ({file_info.get('mimeType')})"
)
if "bytes" in file_info:
bytes_data = file_info.get("bytes", "")
bytes_size = len(bytes_data) * 0.75
logger.info(f"File size: ~{bytes_size/1024:.2f} KB")
if bytes_data:
sample = (
bytes_data[:10] + "..."
if len(bytes_data) > 10
else bytes_data
)
logger.info(f"Sample of base64 data: {sample}")
elif part_type == "text":
text_content = part.get("text", "")
preview = (
text_content[:30] + "..."
if len(text_content) > 30
else text_content
)
logger.info(f"Text part found: '{preview}'")
result = await a2a_service.process_request(agent_id, request_body)
# If the response is a streaming response, return as EventSourceResponse
if hasattr(result, "__aiter__"):
logger.info("Returning streaming response")
async def event_generator():
async for item in result:
@ -115,11 +198,15 @@ async def process_a2a_request(
return EventSourceResponse(event_generator())
# Otherwise, return as JSONResponse
logger.info("Returning standard JSON response")
if hasattr(result, "model_dump"):
return JSONResponse(result.model_dump(exclude_none=True))
return JSONResponse(result)
except Exception as e:
logger.error(f"Error processing A2A request: {e}")
import traceback
logger.error(f"Full traceback: {traceback.format_exc()}")
return JSONResponse(
status_code=500,
content={

View File

@ -28,6 +28,7 @@
"""
import uuid
import base64
from fastapi import (
APIRouter,
Depends,
@ -47,7 +48,7 @@ from src.core.jwt_middleware import (
from src.services import (
agent_service,
)
from src.schemas.chat import ChatRequest, ChatResponse, ErrorResponse
from src.schemas.chat import ChatRequest, ChatResponse, ErrorResponse, FileData
from src.services.agent_runner import run_agent, run_agent_stream
from src.core.exceptions import AgentNotFoundError
from src.services.service_providers import (
@ -59,7 +60,7 @@ from src.services.service_providers import (
from datetime import datetime
import logging
import json
from typing import Optional, Dict
from typing import Optional, Dict, List, Any
logger = logging.getLogger(__name__)
@ -195,6 +196,29 @@ async def websocket_chat(
if not message:
continue
files = None
if data.get("files") and isinstance(data.get("files"), list):
try:
files = []
for file_data in data.get("files"):
if (
isinstance(file_data, dict)
and file_data.get("filename")
and file_data.get("content_type")
and file_data.get("data")
):
files.append(
FileData(
filename=file_data.get("filename"),
content_type=file_data.get("content_type"),
data=file_data.get("data"),
)
)
logger.info(f"Processed {len(files)} files via WebSocket")
except Exception as e:
logger.error(f"Error processing files: {str(e)}")
files = None
async for chunk in run_agent_stream(
agent_id=agent_id,
external_id=external_id,
@ -203,6 +227,7 @@ async def websocket_chat(
artifacts_service=artifacts_service,
memory_service=memory_service,
db=db,
files=files,
):
await websocket.send_json(
{"message": json.loads(chunk), "turn_complete": False}
@ -259,6 +284,7 @@ async def chat(
artifacts_service,
memory_service,
db,
files=request.files,
)
return {

View File

@ -30,8 +30,9 @@
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from src.config.database import get_db
from typing import List
from typing import List, Optional, Dict, Any
import uuid
import base64
from src.core.jwt_middleware import (
get_jwt_token,
verify_user_client,
@ -48,7 +49,7 @@ from src.services.session_service import (
get_sessions_by_agent,
get_sessions_by_client,
)
from src.services.service_providers import session_service
from src.services.service_providers import session_service, artifacts_service
import logging
logger = logging.getLogger(__name__)
@ -118,13 +119,18 @@ async def get_session(
@router.get(
"/{session_id}/messages",
response_model=List[Event],
)
async def get_agent_messages(
session_id: str,
db: Session = Depends(get_db),
payload: dict = Depends(get_jwt_token),
):
"""
Gets messages from a session with embedded artifacts.
This function loads all messages from a session and processes any references
to artifacts, loading them and converting them to base64 for direct use in the frontend.
"""
# Get the session
session = get_session_by_id(session_service, session_id)
if not session:
@ -139,7 +145,160 @@ async def get_agent_messages(
if agent:
await verify_user_client(payload, db, agent.client_id)
return get_session_events(session_service, session_id)
# Parse session ID para obter app_name e user_id
parts = session_id.split("_")
if len(parts) != 2:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid session ID format"
)
user_id, app_name = parts[0], parts[1]
events = get_session_events(session_service, session_id)
processed_events = []
for event in events:
event_dict = event.dict()
def process_dict(d):
if isinstance(d, dict):
for key, value in list(d.items()):
if isinstance(value, bytes):
try:
d[key] = base64.b64encode(value).decode("utf-8")
logger.debug(f"Converted bytes field to base64: {key}")
except Exception as e:
logger.error(f"Error encoding bytes to base64: {str(e)}")
d[key] = None
elif isinstance(value, dict):
process_dict(value)
elif isinstance(value, list):
for item in value:
if isinstance(item, (dict, list)):
process_dict(item)
elif isinstance(d, list):
for i, item in enumerate(d):
if isinstance(item, bytes):
try:
d[i] = base64.b64encode(item).decode("utf-8")
except Exception as e:
logger.error(
f"Error encoding bytes to base64 in list: {str(e)}"
)
d[i] = None
elif isinstance(item, (dict, list)):
process_dict(item)
return d
# Process all event dictionary
event_dict = process_dict(event_dict)
# Process the content parts specifically
if event_dict.get("content") and event_dict["content"].get("parts"):
for part in event_dict["content"]["parts"]:
# Process inlineData if present
if part and part.get("inlineData") and part["inlineData"].get("data"):
# Check if it's already a string or if it's bytes
if isinstance(part["inlineData"]["data"], bytes):
# Convert bytes to base64 string
part["inlineData"]["data"] = base64.b64encode(
part["inlineData"]["data"]
).decode("utf-8")
logger.debug(
f"Converted binary data to base64 in message {event_dict.get('id')}"
)
# Process fileData if present (reference to an artifact)
if part and part.get("fileData") and part["fileData"].get("fileId"):
try:
# Extract the file name from the fileId
file_id = part["fileData"]["fileId"]
# Load the artifact from the artifacts service
artifact = artifacts_service.load_artifact(
app_name=app_name,
user_id=user_id,
session_id=session_id,
filename=file_id,
)
if artifact and hasattr(artifact, "inline_data"):
# Extract the data and MIME type
file_bytes = artifact.inline_data.data
mime_type = artifact.inline_data.mime_type
# Add inlineData with the artifact data
if not part.get("inlineData"):
part["inlineData"] = {}
# Ensure we're sending a base64 string, not bytes
if isinstance(file_bytes, bytes):
try:
part["inlineData"]["data"] = base64.b64encode(
file_bytes
).decode("utf-8")
except Exception as e:
logger.error(
f"Error encoding artifact to base64: {str(e)}"
)
part["inlineData"]["data"] = None
else:
part["inlineData"]["data"] = str(file_bytes)
part["inlineData"]["mimeType"] = mime_type
logger.debug(
f"Loaded artifact {file_id} for message {event_dict.get('id')}"
)
except Exception as e:
logger.error(f"Error loading artifact: {str(e)}")
# Don't interrupt the flow if an artifact fails
# Check artifact_delta in actions
if event_dict.get("actions") and event_dict["actions"].get("artifact_delta"):
artifact_deltas = event_dict["actions"]["artifact_delta"]
for filename, version in artifact_deltas.items():
try:
# Load the artifact
artifact = artifacts_service.load_artifact(
app_name=app_name,
user_id=user_id,
session_id=session_id,
filename=filename,
version=version,
)
if artifact and hasattr(artifact, "inline_data"):
# If the event doesn't have an artifacts section, create it
if "artifacts" not in event_dict:
event_dict["artifacts"] = {}
# Add the artifact to the event's artifacts list
file_bytes = artifact.inline_data.data
mime_type = artifact.inline_data.mime_type
# Ensure the bytes are converted to base64
event_dict["artifacts"][filename] = {
"data": (
base64.b64encode(file_bytes).decode("utf-8")
if isinstance(file_bytes, bytes)
else str(file_bytes)
),
"mimeType": mime_type,
"version": version,
}
logger.debug(
f"Added artifact {filename} (v{version}) to message {event_dict.get('id')}"
)
except Exception as e:
logger.error(
f"Error processing artifact_delta {filename}: {str(e)}"
)
processed_events.append(event_dict)
return processed_events
@router.delete(

View File

@ -100,6 +100,8 @@ class Agent(Base):
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
client_id = Column(UUID(as_uuid=True), ForeignKey("clients.id", ondelete="CASCADE"))
name = Column(String, nullable=False)
role = Column(String, nullable=True)
goal = Column(Text, nullable=True)
description = Column(Text, nullable=True)
type = Column(String, nullable=False)
model = Column(String, nullable=True, default="")
@ -121,7 +123,7 @@ class Agent(Base):
__table_args__ = (
CheckConstraint(
"type IN ('llm', 'sequential', 'parallel', 'loop', 'a2a', 'workflow')",
"type IN ('llm', 'sequential', 'parallel', 'loop', 'a2a', 'workflow', 'crew_ai', 'task')",
name="check_agent_type",
),
)

View File

@ -29,7 +29,7 @@
from datetime import datetime
from enum import Enum
from typing import Annotated, Any, Literal
from typing import Annotated, Any, Literal, Union, Dict, List, Optional
from uuid import uuid4
from typing_extensions import Self

View File

@ -32,6 +32,8 @@ from pydantic import BaseModel, Field
from uuid import UUID
import secrets
import string
import uuid
from pydantic import validator
class ToolConfig(BaseModel):
@ -234,3 +236,45 @@ class WorkflowConfig(BaseModel):
class Config:
from_attributes = True
class AgentTask(BaseModel):
"""Task configuration for agents"""
agent_id: Union[UUID, str] = Field(
..., description="ID of the agent assigned to this task"
)
enabled_tools: Optional[List[str]] = Field(
default_factory=list, description="List of tool names to be used in the task"
)
description: str = Field(..., description="Description of the task to be performed")
expected_output: str = Field(..., description="Expected output from this task")
@validator("agent_id")
def validate_agent_id(cls, v):
if isinstance(v, str):
try:
return uuid.UUID(v)
except ValueError:
raise ValueError(f"Invalid UUID format for agent_id: {v}")
return v
class Config:
from_attributes = True
class AgentConfig(BaseModel):
"""Configuration for agents"""
tasks: List[AgentTask] = Field(
..., description="List of tasks to be performed by the agent"
)
api_key: Optional[str] = Field(
default_factory=generate_api_key, description="API key for the agent"
)
sub_agents: Optional[List[UUID]] = Field(
default_factory=list, description="List of IDs of sub-agents used in agent"
)
class Config:
from_attributes = True

View File

@ -27,36 +27,42 @@
"""
from pydantic import BaseModel, Field
from typing import Dict, Any, Optional
from pydantic import BaseModel, Field, validator
from typing import Dict, List, Optional, Any
from datetime import datetime
class FileData(BaseModel):
"""Model to represent file data sent in a chat request."""
filename: str = Field(..., description="File name")
content_type: str = Field(..., description="File content type")
data: str = Field(..., description="File content encoded in base64")
class ChatRequest(BaseModel):
"""Schema for chat requests"""
"""Model to represent a chat request."""
agent_id: str = Field(
..., description="ID of the agent that will process the message"
agent_id: str = Field(..., description="Agent ID to process the message")
external_id: str = Field(..., description="External ID for user identification")
message: str = Field(..., description="User message to the agent")
files: Optional[List[FileData]] = Field(
None, description="List of files attached to the message"
)
external_id: str = Field(
..., description="ID of the external_id that will process the message"
)
message: str = Field(..., description="User message")
class ChatResponse(BaseModel):
"""Schema for chat responses"""
"""Model to represent a chat response."""
response: str = Field(..., description="Agent response")
status: str = Field(..., description="Operation status")
error: Optional[str] = Field(None, description="Error message, if there is one")
timestamp: str = Field(..., description="Timestamp of the response")
response: str = Field(..., description="Response generated by the agent")
message_history: List[Dict[str, Any]] = Field(
default_factory=list, description="Message history"
)
status: str = Field(..., description="Response status (success/error)")
timestamp: str = Field(..., description="Response timestamp")
class ErrorResponse(BaseModel):
"""Schema for error responses"""
"""Model to represent an error response."""
error: str = Field(..., description="Error message")
status_code: int = Field(..., description="HTTP status code of the error")
details: Optional[Dict[str, Any]] = Field(
None, description="Additional error details"
)
detail: str = Field(..., description="Error details")

View File

@ -33,7 +33,7 @@ from datetime import datetime
from uuid import UUID
import uuid
import re
from src.schemas.agent_config import LLMConfig
from src.schemas.agent_config import LLMConfig, AgentConfig
class ClientBase(BaseModel):
@ -94,8 +94,11 @@ class AgentBase(BaseModel):
None, description="Agent name (no spaces or special characters)"
)
description: Optional[str] = Field(None, description="Agent description")
role: Optional[str] = Field(None, description="Agent role in the system")
goal: Optional[str] = Field(None, description="Agent goal or objective")
type: str = Field(
..., description="Agent type (llm, sequential, parallel, loop, a2a, workflow)"
...,
description="Agent type (llm, sequential, parallel, loop, a2a, workflow, task)",
)
model: Optional[str] = Field(
None, description="Agent model (required only for llm type)"
@ -126,9 +129,17 @@ class AgentBase(BaseModel):
@validator("type")
def validate_type(cls, v):
if v not in ["llm", "sequential", "parallel", "loop", "a2a", "workflow"]:
if v not in [
"llm",
"sequential",
"parallel",
"loop",
"a2a",
"workflow",
"task",
]:
raise ValueError(
"Invalid agent type. Must be: llm, sequential, parallel, loop, a2a or workflow"
"Invalid agent type. Must be: llm, sequential, parallel, loop, a2a, workflow or task"
)
return v
@ -188,6 +199,28 @@ class AgentBase(BaseModel):
raise ValueError(
f'Agent {values["type"]} must have at least one sub-agent'
)
elif values["type"] == "task":
if not isinstance(v, dict):
raise ValueError(f'Invalid configuration for agent {values["type"]}')
if "tasks" not in v:
raise ValueError(f'Agent {values["type"]} must have tasks')
if not isinstance(v["tasks"], list):
raise ValueError("tasks must be a list")
if not v["tasks"]:
raise ValueError(f'Agent {values["type"]} must have at least one task')
for task in v["tasks"]:
if not isinstance(task, dict):
raise ValueError("Each task must be a dictionary")
required_fields = ["agent_id", "description", "expected_output"]
for field in required_fields:
if field not in task:
raise ValueError(f"Task missing required field: {field}")
if "sub_agents" in v and v["sub_agents"] is not None:
if not isinstance(v["sub_agents"], list):
raise ValueError("sub_agents must be a list")
return v
return v

View File

@ -33,8 +33,11 @@ from collections.abc import AsyncIterable
from typing import Dict, Optional
from uuid import UUID
import json
import base64
import uuid as uuid_pkg
from sqlalchemy.orm import Session
from google.genai.types import Part, Blob
from src.config.settings import settings
from src.services.agent_service import (
@ -76,6 +79,7 @@ from src.schemas.a2a_types import (
AgentAuthentication,
AgentProvider,
)
from src.schemas.chat import FileData
logger = logging.getLogger(__name__)
@ -281,12 +285,29 @@ class A2ATaskManager:
all_messages.append(agent_message)
task_state = self._determine_task_state(result)
artifact = Artifact(parts=agent_message.parts, index=0)
# Create artifacts for any file content
artifacts = []
# First, add the main response as an artifact
artifacts.append(Artifact(parts=agent_message.parts, index=0))
# Also add any files from the message history
for idx, msg in enumerate(all_messages, 1):
for part in msg.parts:
if hasattr(part, "type") and part.type == "file":
artifacts.append(
Artifact(
parts=[part],
index=idx,
name=part.file.name,
description=f"File from message {idx}",
)
)
task = await self.update_store(
task_params.id,
TaskStatus(state=task_state, message=agent_message),
[artifact],
artifacts,
)
await self._update_task_history(
@ -367,6 +388,7 @@ class A2ATaskManager:
self, request: SendTaskStreamingRequest, agent: Agent
) -> AsyncIterable[SendTaskStreamingResponse]:
"""Processes a task in streaming mode using the specified agent."""
# Extrair e processar arquivos da mesma forma que no método _process_task
query = self._extract_user_query(request.params)
try:
@ -400,6 +422,49 @@ class A2ATaskManager:
final_message = None
# Check for files in the user message and include them as artifacts
user_files = []
for part in request.params.message.parts:
if (
hasattr(part, "type")
and part.type == "file"
and hasattr(part, "file")
):
user_files.append(
Artifact(
parts=[part],
index=0,
name=part.file.name if part.file.name else "file",
description="File from user",
)
)
# Send artifacts for any user files
for artifact in user_files:
yield SendTaskStreamingResponse(
id=request.id,
result=TaskArtifactUpdateEvent(
id=request.params.id, artifact=artifact
),
)
# Use os arquivos processados do _extract_user_query
files = getattr(self, "_last_processed_files", None)
# Log sobre os arquivos processados
if files:
logger.info(
f"Streaming: Passando {len(files)} arquivos processados para run_agent_stream"
)
for file_info in files:
logger.info(
f"Streaming: Arquivo sendo enviado: {file_info.filename} ({file_info.content_type})"
)
else:
logger.warning(
"Streaming: Nenhum arquivo processado disponível para enviar ao agente"
)
async for chunk in run_agent_stream(
agent_id=str(agent.id),
external_id=external_id,
@ -408,6 +473,7 @@ class A2ATaskManager:
artifacts_service=artifacts_service,
memory_service=memory_service,
db=self.db,
files=files, # Passar os arquivos processados para o streaming
):
try:
chunk_data = json.loads(chunk)
@ -418,7 +484,48 @@ class A2ATaskManager:
parts = content.get("parts", [])
if parts:
update_message = Message(role=role, parts=parts)
# Modify to handle file parts as well
agent_parts = []
for part in parts:
# Handle different part types
if part.get("type") == "text":
agent_parts.append(part)
full_response += part.get("text", "")
elif part.get("inlineData") and part["inlineData"].get(
"data"
):
# Convert inline data to file part
mime_type = part["inlineData"].get(
"mimeType", "application/octet-stream"
)
file_name = f"file_{uuid_pkg.uuid4().hex}{self._get_extension_from_mime(mime_type)}"
file_part = {
"type": "file",
"file": {
"name": file_name,
"mimeType": mime_type,
"bytes": part["inlineData"]["data"],
},
}
agent_parts.append(file_part)
# Also send as artifact
yield SendTaskStreamingResponse(
id=request.id,
result=TaskArtifactUpdateEvent(
id=request.params.id,
artifact=Artifact(
parts=[file_part],
index=0,
name=file_name,
description=f"Generated {mime_type} file",
),
),
)
if agent_parts:
update_message = Message(role=role, parts=agent_parts)
final_message = update_message
yield SendTaskStreamingResponse(
id=request.id,
@ -431,11 +538,6 @@ class A2ATaskManager:
final=False,
),
)
for part in parts:
if part.get("type") == "text":
full_response += part.get("text", "")
final_message = update_message
except Exception as e:
logger.error(f"Error processing chunk: {e}, chunk: {chunk}")
continue
@ -485,6 +587,29 @@ class A2ATaskManager:
error=InternalError(message=f"Error streaming task process: {str(e)}"),
)
def _get_extension_from_mime(self, mime_type: str) -> str:
"""Get a file extension from MIME type."""
if not mime_type:
return ""
mime_map = {
"image/jpeg": ".jpg",
"image/png": ".png",
"image/gif": ".gif",
"application/pdf": ".pdf",
"text/plain": ".txt",
"text/html": ".html",
"text/csv": ".csv",
"application/json": ".json",
"application/xml": ".xml",
"application/msword": ".doc",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx",
"application/vnd.ms-excel": ".xls",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": ".xlsx",
}
return mime_map.get(mime_type, "")
async def update_store(
self,
task_id: str,
@ -514,19 +639,193 @@ class A2ATaskManager:
return task
def _extract_user_query(self, task_params: TaskSendParams) -> str:
"""Extracts the user query from the task parameters."""
"""Extracts the user query from the task parameters and processes any files."""
if not task_params.message or not task_params.message.parts:
raise ValueError("Message or parts are missing in task parameters")
part = task_params.message.parts[0]
if part.type != "text":
raise ValueError("Only text parts are supported")
# Process file parts first
text_parts = []
has_files = False
file_parts = []
return part.text
logger.info(
f"Extracting query from message with {len(task_params.message.parts)} parts"
)
# Extract text parts and file parts separately
for idx, part in enumerate(task_params.message.parts):
logger.info(
f"Processing part {idx+1}, type: {getattr(part, 'type', 'unknown')}"
)
if hasattr(part, "type"):
if part.type == "text":
logger.info(f"Found text part: '{part.text[:50]}...' (truncated)")
text_parts.append(part.text)
elif part.type == "file":
logger.info(
f"Found file part: {getattr(getattr(part, 'file', None), 'name', 'unnamed')}"
)
has_files = True
try:
processed_file = self._process_file_part(
part, task_params.sessionId
)
if processed_file:
file_parts.append(processed_file)
except Exception as e:
logger.error(f"Error processing file part: {e}")
# Continue with other parts even if a file fails
else:
logger.warning(f"Unknown part type: {part.type}")
else:
logger.warning(f"Part has no type attribute: {part}")
# Store the file parts in self for later use
self._last_processed_files = file_parts if file_parts else None
# If we have at least one text part, use that as the query
if text_parts:
final_query = " ".join(text_parts)
logger.info(
f"Final query from text parts: '{final_query[:50]}...' (truncated)"
)
return final_query
# If we only have file parts, create a generic query asking for analysis
elif has_files:
logger.info("No text parts, using generic query for file analysis")
return "Analyze the attached files"
else:
logger.error("No supported content parts found in the message")
raise ValueError("No supported content parts found in the message")
def _process_file_part(self, part, session_id: str):
"""Processes a file part and saves it to the artifact service.
Returns:
dict: Processed file information to pass to agent_runner
"""
if not hasattr(part, "file") or not part.file:
logger.warning("File part missing file data")
return None
file_data = part.file
if not file_data.name:
file_data.name = f"file_{uuid_pkg.uuid4().hex}"
logger.info(f"Processing file {file_data.name} for session {session_id}")
if file_data.bytes:
# Process file data provided as base64 string
try:
# Convert base64 to bytes
logger.info(f"Decoding base64 content for file {file_data.name}")
file_bytes = base64.b64decode(file_data.bytes)
# Determine MIME type based on binary content
mime_type = (
file_data.mimeType if hasattr(file_data, "mimeType") else None
)
if not mime_type or mime_type == "application/octet-stream":
# Detection by byte signature
if file_bytes.startswith(b"\xff\xd8\xff"): # JPEG signature
mime_type = "image/jpeg"
elif file_bytes.startswith(b"\x89PNG\r\n\x1a\n"): # PNG signature
mime_type = "image/png"
elif file_bytes.startswith(b"GIF87a") or file_bytes.startswith(
b"GIF89a"
): # GIF
mime_type = "image/gif"
elif file_bytes.startswith(b"%PDF"): # PDF
mime_type = "application/pdf"
else:
# Fallback to avoid generic type in images
if file_data.name.lower().endswith((".jpg", ".jpeg")):
mime_type = "image/jpeg"
elif file_data.name.lower().endswith(".png"):
mime_type = "image/png"
elif file_data.name.lower().endswith(".gif"):
mime_type = "image/gif"
elif file_data.name.lower().endswith(".pdf"):
mime_type = "application/pdf"
else:
mime_type = "application/octet-stream"
logger.info(
f"Decoded file size: {len(file_bytes)} bytes, MIME type: {mime_type}"
)
# Split session_id to get app_name and user_id
parts = session_id.split("_")
if len(parts) != 2:
user_id = session_id
app_name = "a2a"
else:
user_id, app_name = parts
# Create artifact Part
logger.info(f"Creating artifact Part for file {file_data.name}")
artifact = Part(inline_data=Blob(mime_type=mime_type, data=file_bytes))
# Save to artifact service
logger.info(
f"Saving artifact {file_data.name} to {app_name}/{user_id}/{session_id}"
)
version = artifacts_service.save_artifact(
app_name=app_name,
user_id=user_id,
session_id=session_id,
filename=file_data.name,
artifact=artifact,
)
logger.info(
f"Successfully saved file {file_data.name} (version {version}) for session {session_id}"
)
# Import the FileData model from the chat schema
from src.schemas.chat import FileData
# Create a FileData object instead of a dictionary
# This is compatible with what agent_runner.py expects
return FileData(
filename=file_data.name,
content_type=mime_type,
data=file_data.bytes, # Keep the original base64 format
)
except Exception as e:
logger.error(f"Error processing file data: {str(e)}")
# Log more details about the error to help with debugging
import traceback
logger.error(f"Full traceback: {traceback.format_exc()}")
raise
elif file_data.uri:
# Handling URIs would require additional implementation
# For now, log that we received a URI but can't process it
logger.warning(f"File URI references not yet implemented: {file_data.uri}")
# Future enhancement: fetch the file from the URI and save it
return None
return None
async def _run_agent(self, agent: Agent, query: str, session_id: str) -> dict:
"""Executes the agent to process the user query."""
try:
files = getattr(self, "_last_processed_files", None)
if files:
logger.info(f"Passing {len(files)} files to run_agent")
for file_info in files:
logger.info(
f"File being sent: {file_info.filename} ({file_info.content_type})"
)
else:
logger.info("No files to pass to run_agent")
# We call the same function used in the chat API
return await run_agent(
agent_id=str(agent.id),
@ -536,6 +835,7 @@ class A2ATaskManager:
artifacts_service=artifacts_service,
memory_service=memory_service,
db=self.db,
files=files,
)
except Exception as e:
logger.error(f"Error running agent: {e}")

View File

@ -32,13 +32,15 @@ from google.adk.agents.llm_agent import LlmAgent
from google.adk.agents import SequentialAgent, ParallelAgent, LoopAgent, BaseAgent
from google.adk.models.lite_llm import LiteLlm
from google.adk.tools.agent_tool import AgentTool
from src.schemas.schemas import Agent
from src.utils.logger import setup_logger
from src.core.exceptions import AgentNotFoundError
from src.services.agent_service import get_agent
from src.services.custom_tools import CustomToolBuilder
from src.services.mcp_service import MCPService
from src.services.a2a_agent import A2ACustomAgent
from src.services.workflow_agent import WorkflowAgent
from src.services.custom_agents.a2a_agent import A2ACustomAgent
from src.services.custom_agents.workflow_agent import WorkflowAgent
from src.services.custom_agents.task_agent import TaskAgent
from src.services.apikey_service import get_decrypted_api_key
from sqlalchemy.orm import Session
from contextlib import AsyncExitStack
@ -47,6 +49,8 @@ from google.adk.tools import load_memory
from datetime import datetime
import uuid
from src.schemas.agent_config import AgentTask
logger = setup_logger(__name__)
@ -69,7 +73,7 @@ class AgentBuilder:
return agent_tools
async def _create_llm_agent(
self, agent
self, agent, enabled_tools: List[str] = []
) -> Tuple[LlmAgent, Optional[AsyncExitStack]]:
"""Create an LLM agent from the agent data."""
# Get custom tools from the configuration
@ -90,6 +94,10 @@ class AgentBuilder:
# Combine all tools
all_tools = custom_tools + mcp_tools + agent_tools
if enabled_tools:
all_tools = [tool for tool in all_tools if tool.name in enabled_tools]
logger.info(f"Enabled tools enabled. Total tools: {len(all_tools)}")
now = datetime.now()
current_datetime = now.strftime("%d/%m/%Y %H:%M")
current_day_of_week = now.strftime("%A")
@ -104,6 +112,18 @@ class AgentBuilder:
current_time=current_time,
)
# add role on beginning of the prompt
if agent.role:
formatted_prompt = (
f"<agent_role>{agent.role}</agent_role>\n\n{formatted_prompt}"
)
# add goal on beginning of the prompt
if agent.goal:
formatted_prompt = (
f"<agent_goal>{agent.goal}</agent_goal>\n\n{formatted_prompt}"
)
# Check if load_memory is enabled
if agent.config.get("load_memory"):
all_tools.append(load_memory)
@ -183,6 +203,8 @@ class AgentBuilder:
sub_agent, exit_stack = await self.build_a2a_agent(agent)
elif agent.type == "workflow":
sub_agent, exit_stack = await self.build_workflow_agent(agent)
elif agent.type == "task":
sub_agent, exit_stack = await self.build_task_agent(agent)
elif agent.type == "sequential":
sub_agent, exit_stack = await self.build_composite_agent(agent)
elif agent.type == "parallel":
@ -201,7 +223,7 @@ class AgentBuilder:
return sub_agents
async def build_llm_agent(
self, root_agent
self, root_agent, enabled_tools: List[str] = []
) -> Tuple[LlmAgent, Optional[AsyncExitStack]]:
"""Build an LLM agent with its sub-agents."""
logger.info("Creating LLM agent")
@ -213,7 +235,9 @@ class AgentBuilder:
)
sub_agents = [agent for agent, _ in sub_agents_with_stacks]
root_llm_agent, exit_stack = await self._create_llm_agent(root_agent)
root_llm_agent, exit_stack = await self._create_llm_agent(
root_agent, enabled_tools
)
if sub_agents:
root_llm_agent.sub_agents = sub_agents
@ -298,6 +322,56 @@ class AgentBuilder:
logger.error(f"Error building Workflow agent: {str(e)}")
raise ValueError(f"Error building Workflow agent: {str(e)}")
async def build_task_agent(
self, root_agent
) -> Tuple[TaskAgent, Optional[AsyncExitStack]]:
"""Build a task agent with its sub-agents."""
logger.info(f"Creating Task agent: {root_agent.name}")
agent_config = root_agent.config or {}
if not agent_config.get("tasks"):
raise ValueError("tasks are required for Task agents")
try:
# Get sub-agents if there are any
sub_agents = []
if root_agent.config.get("sub_agents"):
sub_agents_with_stacks = await self._get_sub_agents(
root_agent.config.get("sub_agents")
)
sub_agents = [agent for agent, _ in sub_agents_with_stacks]
# Additional configurations
config = root_agent.config or {}
# Convert tasks to the expected format by TaskAgent
tasks = []
for task_config in config.get("tasks", []):
task = AgentTask(
agent_id=task_config.get("agent_id"),
description=task_config.get("description", ""),
expected_output=task_config.get("expected_output", ""),
enabled_tools=task_config.get("enabled_tools", []),
)
tasks.append(task)
# Create the Task agent
task_agent = TaskAgent(
name=root_agent.name,
tasks=tasks,
db=self.db,
sub_agents=sub_agents,
)
logger.info(f"Task agent created successfully: {root_agent.name}")
return task_agent, None
except Exception as e:
logger.error(f"Error building Task agent: {str(e)}")
raise ValueError(f"Error building Task agent: {str(e)}")
async def build_composite_agent(
self, root_agent
) -> Tuple[SequentialAgent | ParallelAgent | LoopAgent, Optional[AsyncExitStack]]:
@ -361,21 +435,24 @@ class AgentBuilder:
else:
raise ValueError(f"Invalid agent type: {root_agent.type}")
async def build_agent(self, root_agent) -> Tuple[
async def build_agent(self, root_agent, enabled_tools: List[str] = []) -> Tuple[
LlmAgent
| SequentialAgent
| ParallelAgent
| LoopAgent
| A2ACustomAgent
| WorkflowAgent,
| WorkflowAgent
| TaskAgent,
Optional[AsyncExitStack],
]:
"""Build the appropriate agent based on the type of the root agent."""
if root_agent.type == "llm":
return await self.build_llm_agent(root_agent)
return await self.build_llm_agent(root_agent, enabled_tools)
elif root_agent.type == "a2a":
return await self.build_a2a_agent(root_agent)
elif root_agent.type == "workflow":
return await self.build_workflow_agent(root_agent)
elif root_agent.type == "task":
return await self.build_task_agent(root_agent)
else:
return await self.build_composite_agent(root_agent)

View File

@ -28,7 +28,7 @@
"""
from google.adk.runners import Runner
from google.genai.types import Content, Part
from google.genai.types import Content, Part, Blob
from google.adk.sessions import DatabaseSessionService
from google.adk.memory import InMemoryMemoryService
from google.adk.artifacts.in_memory_artifact_service import InMemoryArtifactService
@ -42,6 +42,7 @@ import asyncio
import json
from src.utils.otel import get_tracer
from opentelemetry import trace
import base64
logger = setup_logger(__name__)
@ -56,6 +57,7 @@ async def run_agent(
db: Session,
session_id: Optional[str] = None,
timeout: float = 60.0,
files: Optional[list] = None,
):
tracer = get_tracer()
with tracer.start_as_current_span(
@ -65,6 +67,7 @@ async def run_agent(
"external_id": external_id,
"session_id": session_id or f"{external_id}_{agent_id}",
"message": message,
"has_files": files is not None and len(files) > 0,
},
):
exit_stack = None
@ -74,6 +77,9 @@ async def run_agent(
)
logger.info(f"Received message: {message}")
if files and len(files) > 0:
logger.info(f"Received {len(files)} files with message")
get_root_agent = get_agent(db, agent_id)
logger.info(
f"Root agent found: {get_root_agent.name} (type: {get_root_agent.type})"
@ -113,7 +119,63 @@ async def run_agent(
session_id=adk_session_id,
)
content = Content(role="user", parts=[Part(text=message)])
file_parts = []
if files and len(files) > 0:
for file_data in files:
try:
file_bytes = base64.b64decode(file_data.data)
logger.info(f"DEBUG - Processing file: {file_data.filename}")
logger.info(f"DEBUG - File size: {len(file_bytes)} bytes")
logger.info(f"DEBUG - MIME type: '{file_data.content_type}'")
logger.info(f"DEBUG - First 20 bytes: {file_bytes[:20]}")
try:
file_part = Part(
inline_data=Blob(
mime_type=file_data.content_type, data=file_bytes
)
)
logger.info(f"DEBUG - Part created successfully")
except Exception as part_error:
logger.error(
f"DEBUG - Error creating Part: {str(part_error)}"
)
logger.error(
f"DEBUG - Error type: {type(part_error).__name__}"
)
import traceback
logger.error(
f"DEBUG - Stack trace: {traceback.format_exc()}"
)
raise
# Save the file in the ArtifactService
version = artifacts_service.save_artifact(
app_name=agent_id,
user_id=external_id,
session_id=adk_session_id,
filename=file_data.filename,
artifact=file_part,
)
logger.info(
f"Saved file {file_data.filename} as version {version}"
)
# Add the Part to the list of parts for the message content
file_parts.append(file_part)
except Exception as e:
logger.error(
f"Error processing file {file_data.filename}: {str(e)}"
)
# Create the content with the text message and the files
parts = [Part(text=message)]
if file_parts:
parts.extend(file_parts)
content = Content(role="user", parts=parts)
logger.info("Starting agent execution")
final_response_text = "No final response captured."
@ -256,6 +318,7 @@ async def run_agent_stream(
memory_service: InMemoryMemoryService,
db: Session,
session_id: Optional[str] = None,
files: Optional[list] = None,
) -> AsyncGenerator[str, None]:
tracer = get_tracer()
span = tracer.start_span(
@ -265,6 +328,7 @@ async def run_agent_stream(
"external_id": external_id,
"session_id": session_id or f"{external_id}_{agent_id}",
"message": message,
"has_files": files is not None and len(files) > 0,
},
)
try:
@ -275,6 +339,9 @@ async def run_agent_stream(
)
logger.info(f"Received message: {message}")
if files and len(files) > 0:
logger.info(f"Received {len(files)} files with message")
get_root_agent = get_agent(db, agent_id)
logger.info(
f"Root agent found: {get_root_agent.name} (type: {get_root_agent.type})"
@ -314,7 +381,72 @@ async def run_agent_stream(
session_id=adk_session_id,
)
content = Content(role="user", parts=[Part(text=message)])
# Process the received files
file_parts = []
if files and len(files) > 0:
for file_data in files:
try:
# Decode the base64 file
file_bytes = base64.b64decode(file_data.data)
# Detailed debug
logger.info(
f"DEBUG - Processing file: {file_data.filename}"
)
logger.info(f"DEBUG - File size: {len(file_bytes)} bytes")
logger.info(
f"DEBUG - MIME type: '{file_data.content_type}'"
)
logger.info(f"DEBUG - First 20 bytes: {file_bytes[:20]}")
# Create a Part for the file using the default constructor
try:
file_part = Part(
inline_data=Blob(
mime_type=file_data.content_type,
data=file_bytes,
)
)
logger.info(f"DEBUG - Part created successfully")
except Exception as part_error:
logger.error(
f"DEBUG - Error creating Part: {str(part_error)}"
)
logger.error(
f"DEBUG - Error type: {type(part_error).__name__}"
)
import traceback
logger.error(
f"DEBUG - Stack trace: {traceback.format_exc()}"
)
raise
# Save the file in the ArtifactService
version = artifacts_service.save_artifact(
app_name=agent_id,
user_id=external_id,
session_id=adk_session_id,
filename=file_data.filename,
artifact=file_part,
)
logger.info(
f"Saved file {file_data.filename} as version {version}"
)
# Add the Part to the list of parts for the message content
file_parts.append(file_part)
except Exception as e:
logger.error(
f"Error processing file {file_data.filename}: {str(e)}"
)
# Create the content with the text message and the files
parts = [Part(text=message)]
if file_parts:
parts.extend(file_parts)
content = Content(role="user", parts=parts)
logger.info("Starting agent streaming execution")
try:

View File

@ -103,6 +103,14 @@ def get_agent(db: Session, agent_id: Union[uuid.UUID, str]) -> Optional[Agent]:
logger.warning(f"Agent not found: {agent_id}")
return None
# Sanitize agent name if it contains spaces or special characters
if agent.name and any(c for c in agent.name if not (c.isalnum() or c == "_")):
agent.name = "".join(
c if c.isalnum() or c == "_" else "_" for c in agent.name
)
# Update in database
db.commit()
return agent
except SQLAlchemyError as e:
logger.error(f"Error searching for agent {agent_id}: {str(e)}")
@ -144,6 +152,17 @@ def get_agents_by_client(
agents = query.offset(skip).limit(limit).all()
# Sanitize agent names if they contain spaces or special characters
for agent in agents:
if agent.name and any(
c for c in agent.name if not (c.isalnum() or c == "_")
):
agent.name = "".join(
c if c.isalnum() or c == "_" else "_" for c in agent.name
)
# Update in database
db.commit()
return agents
except SQLAlchemyError as e:
logger.error(f"Error searching for client agents {client_id}: {str(e)}")
@ -176,7 +195,15 @@ async def create_agent(db: Session, agent: AgentCreate) -> Agent:
agent_card = response.json()
# Update agent with information from agent card
agent.name = agent_card.get("name", "Unknown Agent")
# Only update name if not provided or empty, or sanitize it
if not agent.name or agent.name.strip() == "":
# Sanitize name: remove spaces and special characters
card_name = agent_card.get("name", "Unknown Agent")
sanitized_name = "".join(
c if c.isalnum() or c == "_" else "_" for c in card_name
)
agent.name = sanitized_name
agent.description = agent_card.get("description", "")
if agent.config is None:
@ -202,6 +229,51 @@ async def create_agent(db: Session, agent: AgentCreate) -> Agent:
if "api_key" not in agent.config or not agent.config["api_key"]:
agent.config["api_key"] = generate_api_key()
elif agent.type == "task":
if not isinstance(agent.config, dict):
agent.config = {}
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid configuration: must be an object with tasks",
)
if "tasks" not in agent.config:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid configuration: tasks is required for {agent.type} agents",
)
if not agent.config["tasks"]:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid configuration: tasks cannot be empty",
)
for task in agent.config["tasks"]:
if "agent_id" not in task:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Each task must have an agent_id",
)
agent_id = task["agent_id"]
task_agent = get_agent(db, agent_id)
if not task_agent:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Agent not found for task: {agent_id}",
)
if "sub_agents" in agent.config and agent.config["sub_agents"]:
if not validate_sub_agents(db, agent.config["sub_agents"]):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="One or more sub-agents do not exist",
)
if "api_key" not in agent.config or not agent.config["api_key"]:
agent.config["api_key"] = generate_api_key()
# Additional sub-agent validation (for non-llm and non-a2a types)
elif agent.type != "llm":
if not isinstance(agent.config, dict):
@ -454,7 +526,14 @@ async def update_agent(
)
agent_card = response.json()
agent_data["name"] = agent_card.get("name", "Unknown Agent")
# Only update name if the original update doesn't specify a name
if "name" not in agent_data or not agent_data["name"].strip():
# Sanitize name: remove spaces and special characters
card_name = agent_card.get("name", "Unknown Agent")
sanitized_name = "".join(
c if c.isalnum() or c == "_" else "_" for c in card_name
)
agent_data["name"] = sanitized_name
agent_data["description"] = agent_card.get("description", "")
if "config" not in agent_data or agent_data["config"] is None:
@ -492,7 +571,14 @@ async def update_agent(
)
agent_card = response.json()
agent_data["name"] = agent_card.get("name", "Unknown Agent")
# Only update name if the original update doesn't specify a name
if "name" not in agent_data or not agent_data["name"].strip():
# Sanitize name: remove spaces and special characters
card_name = agent_card.get("name", "Unknown Agent")
sanitized_name = "".join(
c if c.isalnum() or c == "_" else "_" for c in card_name
)
agent_data["name"] = sanitized_name
agent_data["description"] = agent_card.get("description", "")
if "config" not in agent_data or agent_data["config"] is None:
@ -637,6 +723,45 @@ async def update_agent(
if "config" not in agent_data:
agent_data["config"] = agent_config
if ("type" in agent_data and agent_data["type"] in ["task"]) or (
agent.type in ["task"] and "config" in agent_data
):
config = agent_data.get("config", {})
if "tasks" not in config:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid configuration: tasks is required for {agent_data.get('type', agent.type)} agents",
)
if not config["tasks"]:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid configuration: tasks cannot be empty",
)
for task in config["tasks"]:
if "agent_id" not in task:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Each task must have an agent_id",
)
agent_id = task["agent_id"]
task_agent = get_agent(db, agent_id)
if not task_agent:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Agent not found for task: {agent_id}",
)
# Validar sub_agents se existir
if "sub_agents" in config and config["sub_agents"]:
if not validate_sub_agents(db, config["sub_agents"]):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="One or more sub-agents do not exist",
)
if not agent_config.get("api_key") and (
"config" not in agent_data or not agent_data["config"].get("api_key")
):

View File

View File

@ -0,0 +1,412 @@
"""
@author: Davidson Gomes
@file: a2a_agent.py
Developed by: Davidson Gomes
Creation date: May 13, 2025
Contact: contato@evolution-api.com
@copyright © Evolution API 2025. All rights reserved.
Licensed under the Apache License, Version 2.0
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
@important
For any future changes to the code in this file, it is recommended to
include, together with the modification, the information of the developer
who changed it and the date of modification.
"""
from google.adk.agents import BaseAgent
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event
from google.genai.types import Content, Part
from typing import AsyncGenerator, List, Dict, Any, Optional
import json
import httpx
from httpx_sse import connect_sse
from src.schemas.a2a_types import (
AgentCard,
Message,
TextPart,
TaskSendParams,
SendTaskRequest,
SendTaskStreamingRequest,
TaskState,
)
from uuid import uuid4
class A2ACustomAgent(BaseAgent):
"""
Custom agent that implements the A2A protocol directly.
This agent implements the interaction with an external A2A service.
"""
# Field declarations for Pydantic
agent_card_url: str
agent_card: Optional[AgentCard]
timeout: int
base_url: str
def __init__(
self,
name: str,
agent_card_url: str,
timeout: int = 300,
sub_agents: List[BaseAgent] = [],
**kwargs,
):
"""
Initialize the A2A agent.
Args:
name: Agent name
agent_card_url: A2A agent card URL
timeout: Maximum execution time (seconds)
sub_agents: List of sub-agents to be executed after the A2A agent
"""
# Create base_url from agent_card_url
base_url = agent_card_url
if "/.well-known/agent.json" in base_url:
base_url = base_url.split("/.well-known/agent.json")[0]
print(f"A2A agent initialized for URL: {agent_card_url}")
# Initialize base class
super().__init__(
name=name,
agent_card_url=agent_card_url,
base_url=base_url, # Pass base_url here
agent_card=None,
timeout=timeout,
sub_agents=sub_agents,
**kwargs,
)
async def fetch_agent_card(self) -> AgentCard:
"""Fetch the agent card from the A2A service."""
if self.agent_card:
return self.agent_card
card_url = f"{self.base_url}/.well-known/agent.json"
print(f"Fetching agent card from: {card_url}")
async with httpx.AsyncClient() as client:
response = await client.get(card_url)
response.raise_for_status()
try:
card_data = response.json()
self.agent_card = AgentCard(**card_data)
return self.agent_card
except json.JSONDecodeError as e:
raise ValueError(f"Failed to parse agent card: {str(e)}")
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
"""
Implementation of the A2A protocol according to the Google ADK documentation.
This method follows the pattern of implementing custom agents,
sending the user's message to the A2A service and monitoring the response.
"""
try:
# 1. First, fetch the agent card if we haven't already
try:
agent_card = await self.fetch_agent_card()
print(f"Agent card fetched: {agent_card.name}")
except Exception as e:
error_msg = f"Failed to fetch agent card: {str(e)}"
print(error_msg)
yield Event(
author=self.name,
content=Content(role="agent", parts=[Part(text=error_msg)]),
)
return
# 2. Extract the user's message from the context
user_message = None
# Search for the user's message in the session events
if ctx.session and hasattr(ctx.session, "events") and ctx.session.events:
for event in reversed(ctx.session.events):
if event.author == "user" and event.content and event.content.parts:
user_message = event.content.parts[0].text
print("Message found in session events")
break
# Check in the session state if the message was not found in the events
if not user_message and ctx.session and ctx.session.state:
if "user_message" in ctx.session.state:
user_message = ctx.session.state["user_message"]
elif "message" in ctx.session.state:
user_message = ctx.session.state["message"]
if not user_message:
error_msg = "No user message found"
print(error_msg)
yield Event(
author=self.name,
content=Content(role="agent", parts=[Part(text=error_msg)]),
)
return
# 3. Create and format the task to send to the A2A agent
print(f"Sending task to A2A agent: {user_message[:100]}...")
# Use the session ID as a stable identifier
session_id = (
str(ctx.session.id)
if ctx.session and hasattr(ctx.session, "id")
else str(uuid4())
)
task_id = str(uuid4())
# Prepare the message for the A2A agent
formatted_message = Message(
role="user",
parts=[TextPart(text=user_message)],
)
# Prepare the task parameters
task_params = TaskSendParams(
id=task_id,
sessionId=session_id,
message=formatted_message,
acceptedOutputModes=["text"],
)
# 4. Check if the agent supports streaming
supports_streaming = (
agent_card.capabilities.streaming if agent_card.capabilities else False
)
if supports_streaming:
print("Agent supports streaming, using streaming API")
# Process with streaming
try:
request = SendTaskStreamingRequest(
method="tasks/sendSubscribe", params=task_params
)
try:
async with httpx.AsyncClient() as client:
response = await client.post(
self.base_url,
json=request.model_dump(),
headers={"Accept": "text/event-stream"},
timeout=self.timeout,
)
response.raise_for_status()
async for line in response.aiter_lines():
if line.startswith("data:"):
data = line[5:].strip()
if data:
try:
result = json.loads(data)
print(f"Stream event received: {result}")
# Check if this is a status update with a message
if (
"result" in result
and "status" in result["result"]
and "message"
in result["result"]["status"]
and "parts"
in result["result"]["status"]["message"]
):
message_parts = result["result"][
"status"
]["message"]["parts"]
parts = [
Part(text=part["text"])
for part in message_parts
if part.get("type") == "text"
and "text" in part
]
if parts:
yield Event(
author=self.name,
content=Content(
role="agent", parts=parts
),
)
# Check if this is a final message
if (
"result" in result
and result.get("result", {}).get(
"final", False
)
and "status" in result["result"]
and result["result"]["status"].get(
"state"
)
in [
TaskState.COMPLETED,
TaskState.CANCELED,
TaskState.FAILED,
]
):
print(
"Received final message, stream complete"
)
break
except json.JSONDecodeError as e:
print(f"Error parsing SSE data: {str(e)}")
except Exception as stream_error:
print(
f"Error in direct streaming: {str(stream_error)}, falling back to regular API"
)
fallback_request = SendTaskRequest(
method="tasks/send", params=task_params
)
async with httpx.AsyncClient() as client:
response = await client.post(
self.base_url,
json=fallback_request.model_dump(),
timeout=self.timeout,
)
response.raise_for_status()
result = response.json()
print(f"Fallback response: {result}")
# Extract agent message parts
if (
"result" in result
and "status" in result["result"]
and "message" in result["result"]["status"]
and "parts" in result["result"]["status"]["message"]
):
message_parts = result["result"]["status"]["message"][
"parts"
]
parts = [
Part(text=part["text"])
for part in message_parts
if part.get("type") == "text" and "text" in part
]
if parts:
yield Event(
author=self.name,
content=Content(role="agent", parts=parts),
)
else:
yield Event(
author=self.name,
content=Content(
role="agent",
parts=[
Part(
text="Received response without message parts"
)
],
),
)
except Exception as e:
error_msg = f"Error in streaming: {str(e)}"
print(error_msg)
yield Event(
author=self.name,
content=Content(role="agent", parts=[Part(text=error_msg)]),
)
else:
print("Agent does not support streaming, using regular API")
# Process with regular request
try:
request = SendTaskRequest(method="tasks/send", params=task_params)
async with httpx.AsyncClient() as client:
response = await client.post(
self.base_url,
json=request.model_dump(),
timeout=self.timeout,
)
response.raise_for_status()
result = response.json()
print(f"Task response: {result}")
# Extract agent message parts
if (
"result" in result
and "status" in result["result"]
and "message" in result["result"]["status"]
and "parts" in result["result"]["status"]["message"]
):
message_parts = result["result"]["status"]["message"][
"parts"
]
parts = [
Part(text=part["text"])
for part in message_parts
if part.get("type") == "text" and "text" in part
]
if parts:
yield Event(
author=self.name,
content=Content(role="agent", parts=parts),
)
else:
yield Event(
author=self.name,
content=Content(
role="agent",
parts=[
Part(
text="Received response without message parts"
)
],
),
)
except Exception as e:
error_msg = f"Error sending request: {str(e)}"
print(error_msg)
print(f"Error type: {type(e).__name__}")
print(f"Error details: {str(e)}")
yield Event(
author=self.name,
content=Content(role="agent", parts=[Part(text=error_msg)]),
)
# Run sub-agents
for sub_agent in self.sub_agents:
async for event in sub_agent.run_async(ctx):
yield event
except Exception as e:
# Handle any uncaught error
error_msg = f"Error executing A2A agent: {str(e)}"
print(error_msg)
yield Event(
author=self.name,
content=Content(
role="agent",
parts=[Part(text=error_msg)],
),
)

View File

@ -1,9 +1,9 @@
"""
@author: Davidson Gomes
@file: a2a_agent.py
@file: task_agent.py
Developed by: Davidson Gomes
Creation date: May 13, 2025
Creation date: May 14, 2025
Contact: contato@evolution-api.com
@copyright © Evolution API 2025. All rights reserved.
@ -27,82 +27,70 @@
"""
from attr import Factory
from google.adk.agents import BaseAgent
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event
from google.genai.types import Content, Part
from src.services.agent_service import get_agent
from sqlalchemy.orm import Session
from typing import AsyncGenerator, List
from src.schemas.a2a_types import (
SendTaskRequest,
Message,
TextPart,
)
import httpx
from uuid import uuid4
from src.schemas.agent_config import AgentTask
class A2ACustomAgent(BaseAgent):
class TaskAgent(BaseAgent):
"""
Custom agent that implements the A2A protocol directly.
Custom agent that implements the Task function.
This agent implements the interaction with an external A2A service.
This agent implements the interaction with an external Task service.
"""
# Field declarations for Pydantic
agent_card_url: str
timeout: int
tasks: List[AgentTask]
db: Session
def __init__(
self,
name: str,
agent_card_url: str,
timeout: int = 300,
tasks: List[AgentTask],
db: Session,
sub_agents: List[BaseAgent] = [],
**kwargs,
):
"""
Initialize the A2A agent.
Initialize the Task agent.
Args:
name: Agent name
agent_card_url: A2A agent card URL
timeout: Maximum execution time (seconds)
sub_agents: List of sub-agents to be executed after the A2A agent
tasks: List of tasks to be executed
db: Database session
sub_agents: List of sub-agents to be executed after the Task agent
"""
# Initialize base class
super().__init__(
name=name,
agent_card_url=agent_card_url,
timeout=timeout,
tasks=tasks,
db=db,
sub_agents=sub_agents,
**kwargs,
)
print(f"A2A agent initialized for URL: {agent_card_url}")
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
"""
Implementation of the A2A protocol according to the Google ADK documentation.
Implementation of the Task agent.
This method follows the pattern of implementing custom agents,
sending the user's message to the A2A service and monitoring the response.
sending the user's message to the Task service and monitoring the response.
"""
exit_stack = None
try:
# Prepare the base URL for the A2A
url = self.agent_card_url
# Ensure that there is no /.well-known/agent.json in the url
if "/.well-known/agent.json" in url:
url = url.split("/.well-known/agent.json")[0]
# 2. Extract the user's message from the context
# Extract the user's message from the context
user_message = None
# Search for the user's message in the session events
@ -120,57 +108,89 @@ class A2ACustomAgent(BaseAgent):
elif "message" in ctx.session.state:
user_message = ctx.session.state["message"]
# 3. Create and send the task to the A2A agent
print(f"Sending task to A2A agent: {user_message[:100]}...")
# Use the session ID as a stable identifier
session_id = (
str(ctx.session.id)
if ctx.session and hasattr(ctx.session, "id")
else str(uuid4())
)
task_id = str(uuid4())
try:
formatted_message: Message = Message(
role="user",
parts=[TextPart(type="text", text=user_message)],
)
request: SendTaskRequest = SendTaskRequest(
params={
"message": formatted_message,
"sessionId": session_id,
"id": task_id,
}
)
print(f"Request send task: {request.model_dump()}")
# REQUEST POST to url when jsonrpc is 2.0
task_result = await httpx.AsyncClient().post(
url, json=request.model_dump(), timeout=self.timeout
)
print(f"Task response: {task_result.json()}")
print(f"Task sent successfully, ID: {task_id}")
agent_response_parts = task_result.json()["result"]["status"][
"message"
]["parts"]
parts = [Part(text=part["text"]) for part in agent_response_parts]
if not user_message:
yield Event(
author=self.name,
content=Content(role="agent", parts=parts),
content=Content(
role="agent",
parts=[Part(text="User message not found")],
),
)
return
# Start the agent status
yield Event(
author=self.name,
content=Content(
role="agent",
parts=[Part(text=f"Starting {self.name} task processing...")],
),
)
try:
# Replace any {content} in the task descriptions with the user's input
task = self.tasks[0]
task.description = task.description.replace("{content}", user_message)
task.enabled_tools = task.enabled_tools or []
agent = get_agent(self.db, task.agent_id)
if not agent:
yield Event(
author=self.name,
content=Content(parts=[Part(text="Agent not found")]),
)
return
# Prepare task instructions
task_message_instructions = f"""
<task>
<instructions>
Execute the following task:
</instructions>
<description>{task.description}</description>
<expected_output>{task.expected_output}</expected_output>
</task>
"""
# Send task instructions as an event
yield Event(
author=f"{self.name} - Task executor",
content=Content(
role="agent",
parts=[Part(text=task_message_instructions)],
),
)
# Run sub-agents
for sub_agent in self.sub_agents:
async for event in sub_agent.run_async(ctx):
from src.services.agent_builder import AgentBuilder
print(f"Building agent in Task agent: {agent.name}")
agent_builder = AgentBuilder(self.db)
root_agent, exit_stack = await agent_builder.build_agent(
agent, task.enabled_tools
)
# Store task instructions in context for reference by sub-agents
ctx.session.state["task_instructions"] = task_message_instructions
# Process the agent responses
try:
async for event in root_agent.run_async(ctx):
yield event
except GeneratorExit:
print("Generator was closed prematurely, handling cleanup...")
# Allow the exception to propagate after cleanup
raise
except Exception as e:
error_msg = f"Error during agent execution: {str(e)}"
print(error_msg)
yield Event(
author=self.name,
content=Content(
role="agent",
parts=[Part(text=error_msg)],
),
)
except Exception as e:
error_msg = f"Error sending request: {str(e)}"
@ -182,15 +202,32 @@ class A2ACustomAgent(BaseAgent):
author=self.name,
content=Content(role="agent", parts=[Part(text=error_msg)]),
)
return
except Exception as e:
# Handle any uncaught error
print(f"Error executing A2A agent: {str(e)}")
print(f"Error executing Task agent: {str(e)}")
yield Event(
author=self.name,
content=Content(
role="agent",
parts=[Part(text=f"Error interacting with A2A agent: {str(e)}")],
parts=[Part(text=f"Error interacting with Task agent: {str(e)}")],
),
)
finally:
# Ensure we close the exit_stack in the same task context where it was created
if exit_stack:
try:
await exit_stack.aclose()
print("Exit stack closed successfully")
except Exception as e:
print(f"Error closing exit_stack: {str(e)}")
# Execute sub-agents only if no exception occurred
try:
if "e" not in locals():
for sub_agent in self.sub_agents:
async for event in sub_agent.run_async(ctx):
yield event
except Exception as sub_e:
print(f"Error executing sub-agents: {str(sub_e)}")
# We don't yield a new event here to avoid raising during cleanup

View File

@ -27,10 +27,16 @@
"""
import base64
import uuid
from typing import Dict, List, Any, Optional
from google.genai.types import Part, Blob
from src.schemas.a2a_types import (
ContentTypeNotSupportedError,
JSONRPCResponse,
UnsupportedOperationError,
Message,
)
@ -55,3 +61,130 @@ def new_incompatible_types_error(request_id):
def new_not_implemented_error(request_id):
return JSONRPCResponse(id=request_id, error=UnsupportedOperationError())
def extract_files_from_message(message: Message) -> List[Dict[str, Any]]:
"""
Extract file parts from an A2A message.
Args:
message: An A2A Message object
Returns:
List of file parts extracted from the message
"""
if not message or not message.parts:
return []
files = []
for part in message.parts:
if hasattr(part, "type") and part.type == "file" and hasattr(part, "file"):
files.append(part)
return files
def a2a_part_to_adk_part(a2a_part: Dict[str, Any]) -> Optional[Part]:
"""
Convert an A2A protocol part to an ADK Part object.
Args:
a2a_part: An A2A part dictionary
Returns:
Converted ADK Part object or None if conversion not possible
"""
part_type = a2a_part.get("type")
if part_type == "file" and "file" in a2a_part:
file_data = a2a_part["file"]
if "bytes" in file_data:
try:
# Convert base64 to bytes
file_bytes = base64.b64decode(file_data["bytes"])
mime_type = file_data.get("mimeType", "application/octet-stream")
# Create ADK Part
return Part(inline_data=Blob(mime_type=mime_type, data=file_bytes))
except Exception:
return None
elif part_type == "text" and "text" in a2a_part:
# For text parts, we could create a text blob if needed
return None
return None
def adk_part_to_a2a_part(
adk_part: Part, filename: Optional[str] = None
) -> Optional[Dict[str, Any]]:
"""
Convert an ADK Part object to an A2A protocol part.
Args:
adk_part: An ADK Part object
filename: Optional filename to use
Returns:
Converted A2A Part dictionary or None if conversion not possible
"""
if hasattr(adk_part, "inline_data") and adk_part.inline_data:
if adk_part.inline_data.data and adk_part.inline_data.mime_type:
# Convert binary data to base64
file_bytes = adk_part.inline_data.data
mime_type = adk_part.inline_data.mime_type
# Generate filename if not provided
if not filename:
ext = get_extension_from_mime(mime_type)
filename = f"file_{uuid.uuid4().hex}{ext}"
# Convert to A2A FilePart dict
return {
"type": "file",
"file": {
"name": filename,
"mimeType": mime_type,
"bytes": (
base64.b64encode(file_bytes).decode("utf-8")
if isinstance(file_bytes, bytes)
else str(file_bytes)
),
},
}
elif hasattr(adk_part, "text") and adk_part.text:
# Convert text part
return {"type": "text", "text": adk_part.text}
return None
def get_extension_from_mime(mime_type: str) -> str:
"""
Get a file extension from MIME type.
Args:
mime_type: MIME type string
Returns:
Appropriate file extension with leading dot
"""
if not mime_type:
return ""
mime_map = {
"image/jpeg": ".jpg",
"image/png": ".png",
"image/gif": ".gif",
"application/pdf": ".pdf",
"text/plain": ".txt",
"text/html": ".html",
"text/csv": ".csv",
"application/json": ".json",
"application/xml": ".xml",
"application/msword": ".doc",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx",
"application/vnd.ms-excel": ".xls",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": ".xlsx",
}
return mime_map.get(mime_type, "")