refactor(api): remove unused A2A client test and documentation subproject

This commit is contained in:
Davidson Gomes
2025-04-30 17:15:42 -03:00
parent 4901be8e4c
commit 96df2db27d
6 changed files with 13 additions and 243 deletions

View File

@@ -32,7 +32,6 @@ from src.services.streaming_service import StreamingService
logger = logging.getLogger(__name__)
# Create router with prefix /a2a according to the standard protocol
router = APIRouter(
prefix="/a2a",
tags=["a2a"],
@@ -44,13 +43,10 @@ router = APIRouter(
},
)
# Singleton instances for shared resources
streaming_service = StreamingService()
redis_cache_service = RedisCacheService()
streaming_adapter = StreamingServiceAdapter(streaming_service)
# Cache dictionary para manter instâncias de A2ATaskManager por agente
# Isso evita criar novas instâncias a cada request
_task_manager_cache = {}
_agent_runner_cache = {}
@@ -68,23 +64,14 @@ def get_agent_runner_adapter(db=None, reuse=True, agent_id=None):
Agent runner adapter instance
"""
cache_key = str(agent_id) if agent_id else "default"
logger.info(
f"[DEBUG] get_agent_runner_adapter chamado para agent_id={agent_id}, reuse={reuse}, cache_key={cache_key}"
)
if reuse and cache_key in _agent_runner_cache:
adapter = _agent_runner_cache[cache_key]
logger.info(
f"[DEBUG] Reutilizando agent_runner_adapter existente para {cache_key}"
)
# Atualizar a sessão DB se fornecida
if db is not None:
adapter.db = db
return adapter
logger.info(
f"[IMPORTANTE] Criando NOVA instância de AgentRunnerAdapter para {cache_key}"
)
adapter = AgentRunnerAdapter(
agent_runner_func=run_agent,
session_service=session_service,
@@ -94,7 +81,6 @@ def get_agent_runner_adapter(db=None, reuse=True, agent_id=None):
)
if reuse:
logger.info(f"[DEBUG] Armazenando nova instância no cache para {cache_key}")
_agent_runner_cache[cache_key] = adapter
return adapter
@@ -103,26 +89,22 @@ def get_agent_runner_adapter(db=None, reuse=True, agent_id=None):
def get_task_manager(agent_id, db=None, reuse=True, operation_type="query"):
cache_key = str(agent_id)
# Para operações de consulta, NUNCA crie um agent_runner
if operation_type == "query":
if cache_key in _task_manager_cache:
# Reutilize existente
task_manager = _task_manager_cache[cache_key]
task_manager.db = db
return task_manager
# Se não existe, crie um task_manager SEM agent_runner para consultas
return A2ATaskManager(
redis_cache=redis_cache_service,
agent_runner=None, # Sem agent_runner para consultas!
agent_runner=None,
streaming_service=streaming_adapter,
push_notification_service=push_notification_service,
db=db,
)
# Para operações de execução, use o fluxo normal
if reuse and cache_key in _task_manager_cache:
# Atualize o db
task_manager = _task_manager_cache[cache_key]
task_manager.db = db
return task_manager
@@ -171,29 +153,18 @@ async def process_a2a_request(
JSON-RPC response or streaming (SSE) depending on the method
"""
try:
# Detailed request log
logger.info(f"Request received for A2A agent {agent_id}")
logger.info(f"Headers: {dict(request.headers)}")
try:
body = await request.json()
method = body.get("method", "unknown")
logger.info(f"[IMPORTANTE] Método solicitado: {method}")
logger.info(f"Request body: {body}")
# Determinar se é uma solicitação de consulta (get_task) ou execução (send_task)
is_query_request = method in [
"tasks/get",
"tasks/cancel",
"tasks/pushNotification/get",
"tasks/resubscribe",
]
# Para consultas, reutilizamos os componentes; para execuções,
# criamos novos para garantir estado limpo
reuse_components = is_query_request
logger.info(
f"[IMPORTANTE] Is query request: {is_query_request}, Reuse components: {reuse_components}"
)
except Exception as e:
logger.error(f"Error reading request body: {e}")
@@ -225,8 +196,6 @@ async def process_a2a_request(
# Verify API key
agent_config = agent.config
logger.info(f"Received API Key: {x_api_key}")
logger.info(f"Expected API Key: {agent_config.get('api_key')}")
if x_api_key and agent_config.get("api_key") != x_api_key:
logger.warning(f"Invalid API Key for agent {agent_id}")
@@ -239,7 +208,6 @@ async def process_a2a_request(
},
)
# Obter o task manager para este agente (reutilizando se possível)
a2a_task_manager = get_task_manager(
agent_id,
db=db,
@@ -248,8 +216,6 @@ async def process_a2a_request(
)
a2a_server = A2AServer(task_manager=a2a_task_manager)
# Configure agent_card for the A2A server
logger.info("Configuring agent_card for A2A server")
agent_card = create_agent_card_from_agent(agent, db)
a2a_server.agent_card = agent_card
@@ -269,7 +235,6 @@ async def process_a2a_request(
},
)
# Verify the method
if not body.get("method"):
logger.error("Method not specified in request")
return JSONResponse(
@@ -285,12 +250,6 @@ async def process_a2a_request(
},
)
logger.info(f"Processing method: {body.get('method')}")
# Process the request with the A2A server
logger.info("Sending request to A2A server")
# Pass the agent_id and db directly to the process_request method
return await a2a_server.process_request(request, agent_id=str(agent_id), db=db)
except Exception as e:
@@ -338,7 +297,6 @@ async def get_agent_card(
agent_card = create_agent_card_from_agent(agent, db)
# Obter o task manager para este agente (reutilizando se possível)
a2a_task_manager = get_task_manager(agent_id, db=db, reuse=True)
a2a_server = A2AServer(task_manager=a2a_task_manager)

View File

@@ -54,10 +54,10 @@ async def register_user(user_data: UserCreate, db: Session = Depends(get_db)):
"""
user, message = create_user(db, user_data, is_admin=False)
if not user:
logger.error(f"Erro ao registrar usuário: {message}")
logger.error(f"Error registering user: {message}")
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
logger.info(f"Usuário registrado com sucesso: {user.email}")
logger.info(f"User registered successfully: {user.email}")
return user
@@ -86,7 +86,7 @@ async def register_admin(
"""
user, message = create_user(db, user_data, is_admin=True)
if not user:
logger.error(f"Erro ao registrar administrador: {message}")
logger.error(f"Error registering admin: {message}")
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
logger.info(

View File

@@ -50,7 +50,7 @@ async def websocket_chat(
await websocket.accept()
logger.info("WebSocket connection accepted, waiting for authentication")
# Aguardar mensagem de autenticação
# Wait for authentication message
try:
auth_data = await websocket.receive_json()
logger.info(f"Received authentication data: {auth_data}")
@@ -70,14 +70,14 @@ async def websocket_chat(
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
# Verificar se o agente pertence ao cliente do usuário
# Verify if the agent belongs to the user's client
agent = agent_service.get_agent(db, agent_id)
if not agent:
logger.warning(f"Agent {agent_id} not found")
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
# Verificar se o usuário tem acesso ao agente (via client)
# Verify if the user has access to the agent (via client)
await verify_user_client(payload, db, agent.client_id)
logger.info(
@@ -102,12 +102,12 @@ async def websocket_chat(
memory_service=memory_service,
db=db,
):
# Enviar cada chunk como uma mensagem JSON
# Send each chunk as a JSON message
await websocket.send_json(
{"message": chunk, "turn_complete": False}
)
# Enviar sinal de turno completo
# Send signal of complete turn
await websocket.send_json({"message": "", "turn_complete": True})
except WebSocketDisconnect:

View File

@@ -602,7 +602,7 @@ class A2ATaskManager:
# Processa a resposta do agente
if response and isinstance(response, dict):
# Extrai texto da resposta
response_text = response.get("text", "")
response_text = response.get("content", "")
if not response_text and "message" in response:
message = response.get("message", {})
parts = message.get("parts", [])