diff --git a/planejamento_atualizado.md b/planejamento_atualizado.md deleted file mode 100644 index 00a5a78d..00000000 --- a/planejamento_atualizado.md +++ /dev/null @@ -1,191 +0,0 @@ -# Planejamento de Implementação - A2A Streaming (Atualizado) - -## 1. Visão Geral - -Implementar suporte a Server-Sent Events (SSE) para streaming de atualizações de tarefas em tempo real, seguindo a especificação oficial do A2A. - -## 2. Componentes Necessários - -### 2.2 Estrutura de Arquivos - -``` -src/ -├── api/ -│ └── agent_routes.py (modificação) -├── schemas/ -│ └── streaming.py (novo) -├── services/ -│ └── streaming_service.py (novo) -└── utils/ - └── streaming.py (novo) -``` - -## 3. Implementação - -### 3.1 Schemas (Pydantic) - -```python -# schemas/streaming.py -- TaskStatusUpdateEvent - - state: str (working, completed, failed) - - timestamp: datetime - - message: Optional[Message] - - error: Optional[Error] - -- TaskArtifactUpdateEvent - - type: str - - content: str - - metadata: Dict[str, Any] - -- JSONRPCRequest - - jsonrpc: str = "2.0" - - id: str - - method: str = "tasks/sendSubscribe" - - params: Dict[str, Any] - -- Message - - role: str - - parts: List[MessagePart] - -- MessagePart - - type: str - - text: str -``` - -### 3.2 Serviço de Streaming - -````python -# services/streaming_service.py -- send_task_streaming() - - Monta payload JSON-RPC conforme especificação: - ```json - { - "jsonrpc": "2.0", - "id": "", - "method": "tasks/sendSubscribe", - "params": { - "id": "", - "sessionId": "", - "message": { - "role": "user", - "parts": [{"type": "text", "text": ""}] - } - } - } - ``` - - Configura headers: - - Accept: text/event-stream - - Authorization: x-api-key - - Gerencia conexão SSE - - Processa eventos em tempo real -```` - -### 3.3 Rota de Streaming - -```python -# api/agent_routes.py -- Nova rota POST /{agent_id}/tasks/sendSubscribe - - Validação de API key - - Gerenciamento de sessão - - Streaming de eventos SSE - - Tratamento de erros JSON-RPC -``` - -### 3.4 Utilitários - -```python -# utils/streaming.py -- Helpers para SSE - - Formatação de eventos - - Tratamento de reconexão - - Timeout e retry -- Processamento de eventos - - Parsing de eventos SSE - - Validação de payloads -- Formatação de respostas - - Conformidade com JSON-RPC 2.0 -``` - -## 4. Fluxo de Dados - -1. Cliente envia requisição JSON-RPC para `/tasks/sendSubscribe` -2. Servidor valida API key e configura sessão -3. Inicia streaming de eventos SSE -4. Envia atualizações em tempo real: - - TaskStatusUpdateEvent (estado da tarefa) - - TaskArtifactUpdateEvent (artefatos gerados) - - Mensagens do histórico - -## 5. Exemplo de Uso - -```python -async def exemplo_uso(): - agent_id = "uuid-do-agente" - api_key = "sua-api-key" - mensagem = "Olá, como posso ajudar?" - - async with httpx.AsyncClient() as client: - # Configura headers - headers = { - "Accept": "text/event-stream", - "Authorization": f"x-api-key {api_key}" - } - - # Monta payload JSON-RPC - payload = { - "jsonrpc": "2.0", - "id": str(uuid.uuid4()), - "method": "tasks/sendSubscribe", - "params": { - "id": str(uuid.uuid4()), - "message": { - "role": "user", - "parts": [{"type": "text", "text": mensagem}] - } - } - } - - # Inicia streaming - async with connect_sse(client, "POST", f"/agents/{agent_id}/tasks/sendSubscribe", - json=payload, headers=headers) as event_source: - async for event in event_source.aiter_sse(): - if event.event == "message": - data = json.loads(event.data) - print(f"Evento recebido: {data}") -``` - -## 6. Considerações de Segurança - -- Validação rigorosa de API keys -- Timeout de conexão SSE (30 segundos) -- Tratamento de erros e reconexão automática -- Limites de taxa (rate limiting) -- Validação de payloads JSON-RPC -- Sanitização de inputs - -## 7. Testes - -- Testes unitários para schemas -- Testes de integração para streaming -- Testes de carga e performance -- Testes de reconexão e resiliência -- Testes de conformidade JSON-RPC - -## 8. Documentação - -- Atualizar documentação da API -- Adicionar exemplos de uso -- Documentar formatos de eventos -- Guia de troubleshooting -- Referência à especificação A2A - -## 9. Próximos Passos - -1. Implementar schemas Pydantic conforme especificação -2. Desenvolver serviço de streaming com suporte a JSON-RPC -3. Adicionar rota SSE com validação de payloads -4. Implementar utilitários de streaming -5. Escrever testes de conformidade -6. Atualizar documentação -7. Revisão de código -8. Deploy em ambiente de teste diff --git a/src/api/agent_routes.py b/src/api/agent_routes.py index 36835cde..8b86b95d 100644 --- a/src/api/agent_routes.py +++ b/src/api/agent_routes.py @@ -24,6 +24,9 @@ from src.services.service_providers import ( memory_service, ) import logging +from fastapi.responses import StreamingResponse +from ..services.streaming_service import StreamingService +from ..schemas.streaming import JSONRPCRequest from src.services.session_service import get_session_events @@ -74,6 +77,8 @@ router = APIRouter( responses={404: {"description": "Not found"}}, ) +streaming_service = StreamingService() + @router.post("/", response_model=Agent, status_code=status.HTTP_201_CREATED) async def create_agent( @@ -192,8 +197,8 @@ async def get_agent_json( }, "version": os.getenv("API_VERSION", ""), "capabilities": { - "streaming": False, - "pushNotifications": False, + "streaming": True, + "pushNotifications": True, "stateTransitionHistory": True, }, "authentication": { @@ -355,6 +360,11 @@ async def handle_task( except Exception as e: logger.error(f"Error processing history: {str(e)}") + + # pushNotification = task_request.get("pushNotification", False) + # if pushNotification: + # await send_push_notification(task_id, final_response_text) + return response_task except HTTPException: @@ -365,3 +375,51 @@ async def handle_task( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error", ) + + +@router.post("/{agent_id}/tasks/sendSubscribe") +async def subscribe_task_streaming( + agent_id: str, + request: JSONRPCRequest, + x_api_key: str = Header(None), + db: Session = Depends(get_db), +): + """ + Endpoint para streaming de eventos SSE de uma tarefa. + + Args: + agent_id: ID do agente + request: Requisição JSON-RPC + x_api_key: Chave de API no header + db: Sessão do banco de dados + + Returns: + StreamingResponse com eventos SSE + """ + if not x_api_key: + raise HTTPException(status_code=401, detail="API key é obrigatória") + + # Extrai mensagem do payload + message = request.params.get("message", {}).get("parts", [{}])[0].get("text", "") + session_id = request.params.get("sessionId") + + # Configura streaming + async def event_generator(): + async for event in streaming_service.send_task_streaming( + agent_id=agent_id, + api_key=x_api_key, + message=message, + session_id=session_id, + db=db, + ): + yield event + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) diff --git a/src/schemas/streaming.py b/src/schemas/streaming.py new file mode 100644 index 00000000..94c45af1 --- /dev/null +++ b/src/schemas/streaming.py @@ -0,0 +1,40 @@ +from datetime import datetime +from typing import Dict, List, Optional, Any, Literal +from pydantic import BaseModel, Field + + +class MessagePart(BaseModel): + type: str + text: str + + +class Message(BaseModel): + role: str + parts: List[MessagePart] + + +class TaskStatusUpdateEvent(BaseModel): + state: str = Field(..., description="Estado da tarefa (working, completed, failed)") + timestamp: datetime = Field(default_factory=datetime.utcnow) + message: Optional[Message] = None + error: Optional[Dict[str, Any]] = None + + +class TaskArtifactUpdateEvent(BaseModel): + type: str + content: str + metadata: Dict[str, Any] = Field(default_factory=dict) + + +class JSONRPCRequest(BaseModel): + jsonrpc: Literal["2.0"] = "2.0" + id: str + method: Literal["tasks/sendSubscribe"] = "tasks/sendSubscribe" + params: Dict[str, Any] + + +class JSONRPCResponse(BaseModel): + jsonrpc: Literal["2.0"] = "2.0" + id: str + result: Optional[Dict[str, Any]] = None + error: Optional[Dict[str, Any]] = None diff --git a/src/services/streaming_service.py b/src/services/streaming_service.py new file mode 100644 index 00000000..8ea3f67d --- /dev/null +++ b/src/services/streaming_service.py @@ -0,0 +1,140 @@ +import uuid +import json +from typing import AsyncGenerator, Dict, Any +from fastapi import HTTPException +from datetime import datetime +from ..schemas.streaming import ( + JSONRPCRequest, + TaskStatusUpdateEvent, + TaskArtifactUpdateEvent, +) +from ..services.agent_runner import run_agent +from ..services.service_providers import ( + session_service, + artifacts_service, + memory_service, +) +from sqlalchemy.orm import Session + + +class StreamingService: + def __init__(self): + self.active_connections: Dict[str, Any] = {} + + async def send_task_streaming( + self, + agent_id: str, + api_key: str, + message: str, + session_id: str = None, + db: Session = None, + ) -> AsyncGenerator[str, None]: + """ + Inicia o streaming de eventos SSE para uma tarefa. + + Args: + agent_id: ID do agente + api_key: Chave de API para autenticação + message: Mensagem inicial + session_id: ID da sessão (opcional) + db: Sessão do banco de dados + + Yields: + Eventos SSE formatados + """ + # Validação básica da API key + if not api_key: + raise HTTPException(status_code=401, detail="API key é obrigatória") + + # Gera IDs únicos + task_id = str(uuid.uuid4()) + request_id = str(uuid.uuid4()) + + # Monta payload JSON-RPC + payload = JSONRPCRequest( + id=request_id, + params={ + "id": task_id, + "sessionId": session_id, + "message": { + "role": "user", + "parts": [{"type": "text", "text": message}], + }, + }, + ) + + # Registra conexão + self.active_connections[task_id] = { + "agent_id": agent_id, + "api_key": api_key, + "session_id": session_id, + } + + try: + # Envia evento de início + yield self._format_sse_event( + "status", + TaskStatusUpdateEvent( + state="working", + timestamp=datetime.now().isoformat(), + message=payload.params["message"], + ).model_dump_json(), + ) + + # Executa o agente + result = await run_agent( + str(agent_id), + task_id, + message, + session_service, + artifacts_service, + memory_service, + db, + session_id, + ) + + # Envia a resposta do agente como um evento separado + yield self._format_sse_event( + "message", + json.dumps( + { + "role": "agent", + "content": result, + "timestamp": datetime.now().isoformat(), + } + ), + ) + + # Evento de conclusão + yield self._format_sse_event( + "status", + TaskStatusUpdateEvent( + state="completed", + timestamp=datetime.now().isoformat(), + ).model_dump_json(), + ) + + except Exception as e: + # Evento de erro + yield self._format_sse_event( + "status", + TaskStatusUpdateEvent( + state="failed", + timestamp=datetime.now().isoformat(), + error={"message": str(e)}, + ).model_dump_json(), + ) + raise + + finally: + # Limpa conexão + self.active_connections.pop(task_id, None) + + def _format_sse_event(self, event_type: str, data: str) -> str: + """Formata um evento SSE.""" + return f"event: {event_type}\ndata: {data}\n\n" + + async def close_connection(self, task_id: str): + """Fecha uma conexão de streaming.""" + if task_id in self.active_connections: + self.active_connections.pop(task_id) diff --git a/src/utils/streaming.py b/src/utils/streaming.py new file mode 100644 index 00000000..46167178 --- /dev/null +++ b/src/utils/streaming.py @@ -0,0 +1,70 @@ +import asyncio +from typing import AsyncGenerator, Optional +from fastapi import HTTPException + + +class SSEUtils: + @staticmethod + async def with_timeout( + generator: AsyncGenerator, timeout: int = 30, retry_attempts: int = 3 + ) -> AsyncGenerator: + """ + Adiciona timeout e retry a um gerador de eventos SSE. + + Args: + generator: Gerador de eventos + timeout: Tempo máximo de espera em segundos + retry_attempts: Número de tentativas de reconexão + + Yields: + Eventos do gerador + """ + attempts = 0 + while attempts < retry_attempts: + try: + async for event in asyncio.wait_for(generator, timeout): + yield event + break + except asyncio.TimeoutError: + attempts += 1 + if attempts >= retry_attempts: + raise HTTPException( + status_code=408, detail="Timeout após múltiplas tentativas" + ) + await asyncio.sleep(1) # Espera antes de tentar novamente + + @staticmethod + def format_error_event(error: Exception) -> str: + """ + Formata um evento de erro SSE. + + Args: + error: Exceção ocorrida + + Returns: + String formatada do evento SSE + """ + return f"event: error\ndata: {str(error)}\n\n" + + @staticmethod + def validate_sse_headers(headers: dict) -> None: + """ + Valida headers necessários para SSE. + + Args: + headers: Dicionário de headers + + Raises: + HTTPException se headers inválidos + """ + required_headers = { + "Accept": "text/event-stream", + "Cache-Control": "no-cache", + "Connection": "keep-alive", + } + + for header, value in required_headers.items(): + if headers.get(header) != value: + raise HTTPException( + status_code=400, detail=f"Header {header} inválido ou ausente" + ) diff --git a/static/test_a2a_stream.html b/static/test_a2a_stream.html new file mode 100644 index 00000000..257b198b --- /dev/null +++ b/static/test_a2a_stream.html @@ -0,0 +1,295 @@ + + + + + + + Teste de Streaming A2A + + + + + +
+

Teste de Streaming A2A

+ +
+ + +
+ +
+ + +
+ +
+ + +
+ +
+ + +
+ +
+ Status: Não conectado +
+ +
+ +
+
+ + + + + \ No newline at end of file diff --git a/static/test.html b/static/test_chat_stream.html similarity index 50% rename from static/test.html rename to static/test_chat_stream.html index b7a44d90..bf8c9c6e 100644 --- a/static/test.html +++ b/static/test_chat_stream.html @@ -3,40 +3,199 @@ ADK Streaming Test + @@ -44,14 +203,16 @@

ADK Streaming Test

-
+
Desconectado
-
-
-
-

- - + + + + +
+ + +
@@ -86,12 +247,10 @@ return; } - // Fechar conexão existente se houver if (ws) { ws.close(); } - // Criar nova conexão WebSocket const ws_url = `ws://${window.location.host}/api/v1/chat/ws/${agentId}/${contactId}`; log('Connecting to WebSocket', { url: ws_url @@ -101,7 +260,6 @@ ws.onopen = function () { log('WebSocket connected, sending authentication'); - // Adicionar token no header const authMessage = { type: 'authorization', token: token @@ -110,7 +268,7 @@ log('Authentication sent', authMessage); statusDiv.textContent = 'Conectado'; - statusDiv.style.color = 'green'; + statusDiv.className = 'connected'; sendButton.disabled = false; connectButton.disabled = true; }; @@ -144,7 +302,7 @@ wasClean: event.wasClean }); statusDiv.textContent = 'Desconectado'; - statusDiv.style.color = 'red'; + statusDiv.className = 'disconnected'; sendButton.disabled = true; connectButton.disabled = false; ws = null; @@ -153,7 +311,7 @@ ws.onerror = function (error) { log('WebSocket error', error); statusDiv.textContent = 'Erro na conexão'; - statusDiv.style.color = 'red'; + statusDiv.className = 'disconnected'; }; }; @@ -163,13 +321,11 @@ const message = messageInput.value; if (message && ws) { - // Criar div para mensagem do usuário const userMessageDiv = document.createElement('div'); userMessageDiv.className = 'message user-message'; userMessageDiv.textContent = message; messagesDiv.appendChild(userMessageDiv); - // Enviar mensagem via WebSocket const messagePacket = { message: message };