From 96df2db27d999719e12c3cdd6397e44cc76e3dd1 Mon Sep 17 00:00:00 2001 From: Davidson Gomes Date: Wed, 30 Apr 2025 17:15:42 -0300 Subject: [PATCH] refactor(api): remove unused A2A client test and documentation subproject --- a2a_client_test.py | 187 ----------------------- docs/A2A | 1 - src/api/a2a_routes.py | 50 +----- src/api/auth_routes.py | 6 +- src/api/chat_routes.py | 10 +- src/services/a2a_task_manager_service.py | 2 +- 6 files changed, 13 insertions(+), 243 deletions(-) delete mode 100644 a2a_client_test.py delete mode 160000 docs/A2A diff --git a/a2a_client_test.py b/a2a_client_test.py deleted file mode 100644 index 96442d25..00000000 --- a/a2a_client_test.py +++ /dev/null @@ -1,187 +0,0 @@ -import logging -import httpx -from httpx_sse import connect_sse -from typing import Any, AsyncIterable, Optional -from docs.A2A.samples.python.common.types import ( - AgentCard, - GetTaskRequest, - SendTaskRequest, - SendTaskResponse, - JSONRPCRequest, - GetTaskResponse, - CancelTaskResponse, - CancelTaskRequest, - SetTaskPushNotificationRequest, - SetTaskPushNotificationResponse, - GetTaskPushNotificationRequest, - GetTaskPushNotificationResponse, - A2AClientHTTPError, - A2AClientJSONError, - SendTaskStreamingRequest, - SendTaskStreamingResponse, -) -import json -import asyncio -import uuid - - -# Configurar logging -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" -) -logger = logging.getLogger("a2a_client_runner") - - -class A2ACardResolver: - def __init__(self, base_url, agent_card_path="/.well-known/agent.json"): - self.base_url = base_url.rstrip("/") - self.agent_card_path = agent_card_path.lstrip("/") - - def get_agent_card(self) -> AgentCard: - with httpx.Client() as client: - response = client.get(self.base_url + "/" + self.agent_card_path) - response.raise_for_status() - try: - return AgentCard(**response.json()) - except json.JSONDecodeError as e: - raise A2AClientJSONError(str(e)) from e - - -class A2AClient: - def __init__( - self, - agent_card: AgentCard = None, - url: str = None, - api_key: Optional[str] = None, - ): - if agent_card: - self.url = agent_card.url - elif url: - self.url = url - else: - raise ValueError("Must provide either agent_card or url") - self.api_key = api_key - self.headers = {"x-api-key": api_key} if api_key else {} - - async def send_task(self, payload: dict[str, Any]) -> SendTaskResponse: - request = SendTaskRequest(params=payload) - return SendTaskResponse(**await self._send_request(request)) - - async def send_task_streaming( - self, payload: dict[str, Any] - ) -> AsyncIterable[SendTaskStreamingResponse]: - request = SendTaskStreamingRequest(params=payload) - with httpx.Client(timeout=None) as client: - with connect_sse( - client, - "POST", - self.url, - json=request.model_dump(), - headers=self.headers, - ) as event_source: - try: - for sse in event_source.iter_sse(): - yield SendTaskStreamingResponse(**json.loads(sse.data)) - except json.JSONDecodeError as e: - raise A2AClientJSONError(str(e)) from e - except httpx.RequestError as e: - raise A2AClientHTTPError(400, str(e)) from e - - async def _send_request(self, request: JSONRPCRequest) -> dict[str, Any]: - async with httpx.AsyncClient() as client: - try: - # Image generation could take time, adding timeout - response = await client.post( - self.url, - json=request.model_dump(), - headers=self.headers, - timeout=30, - ) - response.raise_for_status() - return response.json() - except httpx.HTTPStatusError as e: - raise A2AClientHTTPError(e.response.status_code, str(e)) from e - except json.JSONDecodeError as e: - raise A2AClientJSONError(str(e)) from e - - async def get_task(self, payload: dict[str, Any]) -> GetTaskResponse: - request = GetTaskRequest(params=payload) - return GetTaskResponse(**await self._send_request(request)) - - async def cancel_task(self, payload: dict[str, Any]) -> CancelTaskResponse: - request = CancelTaskRequest(params=payload) - return CancelTaskResponse(**await self._send_request(request)) - - async def set_task_callback( - self, payload: dict[str, Any] - ) -> SetTaskPushNotificationResponse: - request = SetTaskPushNotificationRequest(params=payload) - return SetTaskPushNotificationResponse(**await self._send_request(request)) - - async def get_task_callback( - self, payload: dict[str, Any] - ) -> GetTaskPushNotificationResponse: - request = GetTaskPushNotificationRequest(params=payload) - return GetTaskPushNotificationResponse(**await self._send_request(request)) - - -async def main(): - # Configurações - BASE_URL = "http://localhost:8000/api/v1/a2a/18a2889e-8573-4e70-833c-7d9e00a8fd80" - API_KEY = "83c2c19f-dc2e-4abe-9a41-ef7d2eb079d6" - - try: - # Obter o card do agente - logger.info("Obtendo card do agente...") - card_resolver = A2ACardResolver(BASE_URL) - try: - card = card_resolver.get_agent_card() - logger.info(f"Card do agente: {card}") - except Exception as e: - logger.error(f"Erro ao obter card do agente: {e}") - return - - # Criar cliente A2A com API key - client = A2AClient(card, api_key=API_KEY) - - # Exemplo 1: Enviar tarefa síncrona - logger.info("\n=== TESTE DE TAREFA SÍNCRONA ===") - task_id = str(uuid.uuid4()) - session_id = "test-session-1" - - # Preparar payload da tarefa - payload = { - "id": task_id, - "sessionId": session_id, - "message": { - "role": "user", - "parts": [ - { - "type": "text", - "text": "Quais são os três maiores países do mundo em área territorial?", - } - ], - }, - } - - logger.info(f"Enviando tarefa com ID: {task_id}") - async for streaming_response in client.send_task_streaming(payload): - if hasattr(streaming_response.result, "artifact"): - # Processar conteúdo parcial - print(streaming_response.result.artifact.parts[0].text) - elif ( - hasattr(streaming_response.result, "status") - and streaming_response.result.status.state == "completed" - ): - # Tarefa concluída - print( - "Resposta final:", - streaming_response.result.status.message.parts[0].text, - ) - - except Exception as e: - logger.error(f"Erro durante execução dos testes: {e}") - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/docs/A2A b/docs/A2A deleted file mode 160000 index 502a4d0f..00000000 --- a/docs/A2A +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 502a4d0fdd73bb0b8afb8163907c601e08f40525 diff --git a/src/api/a2a_routes.py b/src/api/a2a_routes.py index a90b7295..effae9fb 100644 --- a/src/api/a2a_routes.py +++ b/src/api/a2a_routes.py @@ -32,7 +32,6 @@ from src.services.streaming_service import StreamingService logger = logging.getLogger(__name__) -# Create router with prefix /a2a according to the standard protocol router = APIRouter( prefix="/a2a", tags=["a2a"], @@ -44,13 +43,10 @@ router = APIRouter( }, ) -# Singleton instances for shared resources streaming_service = StreamingService() redis_cache_service = RedisCacheService() streaming_adapter = StreamingServiceAdapter(streaming_service) -# Cache dictionary para manter instâncias de A2ATaskManager por agente -# Isso evita criar novas instâncias a cada request _task_manager_cache = {} _agent_runner_cache = {} @@ -68,23 +64,14 @@ def get_agent_runner_adapter(db=None, reuse=True, agent_id=None): Agent runner adapter instance """ cache_key = str(agent_id) if agent_id else "default" - logger.info( - f"[DEBUG] get_agent_runner_adapter chamado para agent_id={agent_id}, reuse={reuse}, cache_key={cache_key}" - ) if reuse and cache_key in _agent_runner_cache: adapter = _agent_runner_cache[cache_key] - logger.info( - f"[DEBUG] Reutilizando agent_runner_adapter existente para {cache_key}" - ) - # Atualizar a sessão DB se fornecida + if db is not None: adapter.db = db return adapter - logger.info( - f"[IMPORTANTE] Criando NOVA instância de AgentRunnerAdapter para {cache_key}" - ) adapter = AgentRunnerAdapter( agent_runner_func=run_agent, session_service=session_service, @@ -94,7 +81,6 @@ def get_agent_runner_adapter(db=None, reuse=True, agent_id=None): ) if reuse: - logger.info(f"[DEBUG] Armazenando nova instância no cache para {cache_key}") _agent_runner_cache[cache_key] = adapter return adapter @@ -103,26 +89,22 @@ def get_agent_runner_adapter(db=None, reuse=True, agent_id=None): def get_task_manager(agent_id, db=None, reuse=True, operation_type="query"): cache_key = str(agent_id) - # Para operações de consulta, NUNCA crie um agent_runner if operation_type == "query": if cache_key in _task_manager_cache: - # Reutilize existente + task_manager = _task_manager_cache[cache_key] task_manager.db = db return task_manager - # Se não existe, crie um task_manager SEM agent_runner para consultas return A2ATaskManager( redis_cache=redis_cache_service, - agent_runner=None, # Sem agent_runner para consultas! + agent_runner=None, streaming_service=streaming_adapter, push_notification_service=push_notification_service, db=db, ) - # Para operações de execução, use o fluxo normal if reuse and cache_key in _task_manager_cache: - # Atualize o db task_manager = _task_manager_cache[cache_key] task_manager.db = db return task_manager @@ -171,29 +153,18 @@ async def process_a2a_request( JSON-RPC response or streaming (SSE) depending on the method """ try: - # Detailed request log - logger.info(f"Request received for A2A agent {agent_id}") - logger.info(f"Headers: {dict(request.headers)}") - try: body = await request.json() method = body.get("method", "unknown") - logger.info(f"[IMPORTANTE] Método solicitado: {method}") - logger.info(f"Request body: {body}") - # Determinar se é uma solicitação de consulta (get_task) ou execução (send_task) is_query_request = method in [ "tasks/get", "tasks/cancel", "tasks/pushNotification/get", "tasks/resubscribe", ] - # Para consultas, reutilizamos os componentes; para execuções, - # criamos novos para garantir estado limpo + reuse_components = is_query_request - logger.info( - f"[IMPORTANTE] Is query request: {is_query_request}, Reuse components: {reuse_components}" - ) except Exception as e: logger.error(f"Error reading request body: {e}") @@ -225,8 +196,6 @@ async def process_a2a_request( # Verify API key agent_config = agent.config - logger.info(f"Received API Key: {x_api_key}") - logger.info(f"Expected API Key: {agent_config.get('api_key')}") if x_api_key and agent_config.get("api_key") != x_api_key: logger.warning(f"Invalid API Key for agent {agent_id}") @@ -239,7 +208,6 @@ async def process_a2a_request( }, ) - # Obter o task manager para este agente (reutilizando se possível) a2a_task_manager = get_task_manager( agent_id, db=db, @@ -248,8 +216,6 @@ async def process_a2a_request( ) a2a_server = A2AServer(task_manager=a2a_task_manager) - # Configure agent_card for the A2A server - logger.info("Configuring agent_card for A2A server") agent_card = create_agent_card_from_agent(agent, db) a2a_server.agent_card = agent_card @@ -269,7 +235,6 @@ async def process_a2a_request( }, ) - # Verify the method if not body.get("method"): logger.error("Method not specified in request") return JSONResponse( @@ -285,12 +250,6 @@ async def process_a2a_request( }, ) - logger.info(f"Processing method: {body.get('method')}") - - # Process the request with the A2A server - logger.info("Sending request to A2A server") - - # Pass the agent_id and db directly to the process_request method return await a2a_server.process_request(request, agent_id=str(agent_id), db=db) except Exception as e: @@ -338,7 +297,6 @@ async def get_agent_card( agent_card = create_agent_card_from_agent(agent, db) - # Obter o task manager para este agente (reutilizando se possível) a2a_task_manager = get_task_manager(agent_id, db=db, reuse=True) a2a_server = A2AServer(task_manager=a2a_task_manager) diff --git a/src/api/auth_routes.py b/src/api/auth_routes.py index 06615f86..95856561 100644 --- a/src/api/auth_routes.py +++ b/src/api/auth_routes.py @@ -54,10 +54,10 @@ async def register_user(user_data: UserCreate, db: Session = Depends(get_db)): """ user, message = create_user(db, user_data, is_admin=False) if not user: - logger.error(f"Erro ao registrar usuário: {message}") + logger.error(f"Error registering user: {message}") raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message) - logger.info(f"Usuário registrado com sucesso: {user.email}") + logger.info(f"User registered successfully: {user.email}") return user @@ -86,7 +86,7 @@ async def register_admin( """ user, message = create_user(db, user_data, is_admin=True) if not user: - logger.error(f"Erro ao registrar administrador: {message}") + logger.error(f"Error registering admin: {message}") raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message) logger.info( diff --git a/src/api/chat_routes.py b/src/api/chat_routes.py index 5062a2e7..3a282d7e 100644 --- a/src/api/chat_routes.py +++ b/src/api/chat_routes.py @@ -50,7 +50,7 @@ async def websocket_chat( await websocket.accept() logger.info("WebSocket connection accepted, waiting for authentication") - # Aguardar mensagem de autenticação + # Wait for authentication message try: auth_data = await websocket.receive_json() logger.info(f"Received authentication data: {auth_data}") @@ -70,14 +70,14 @@ async def websocket_chat( await websocket.close(code=status.WS_1008_POLICY_VIOLATION) return - # Verificar se o agente pertence ao cliente do usuário + # Verify if the agent belongs to the user's client agent = agent_service.get_agent(db, agent_id) if not agent: logger.warning(f"Agent {agent_id} not found") await websocket.close(code=status.WS_1008_POLICY_VIOLATION) return - # Verificar se o usuário tem acesso ao agente (via client) + # Verify if the user has access to the agent (via client) await verify_user_client(payload, db, agent.client_id) logger.info( @@ -102,12 +102,12 @@ async def websocket_chat( memory_service=memory_service, db=db, ): - # Enviar cada chunk como uma mensagem JSON + # Send each chunk as a JSON message await websocket.send_json( {"message": chunk, "turn_complete": False} ) - # Enviar sinal de turno completo + # Send signal of complete turn await websocket.send_json({"message": "", "turn_complete": True}) except WebSocketDisconnect: diff --git a/src/services/a2a_task_manager_service.py b/src/services/a2a_task_manager_service.py index 94f490c5..40b029a9 100644 --- a/src/services/a2a_task_manager_service.py +++ b/src/services/a2a_task_manager_service.py @@ -602,7 +602,7 @@ class A2ATaskManager: # Processa a resposta do agente if response and isinstance(response, dict): # Extrai texto da resposta - response_text = response.get("text", "") + response_text = response.get("content", "") if not response_text and "message" in response: message = response.get("message", {}) parts = message.get("parts", [])