feat(api): implement WebSocket support for real-time task updates and add streaming service

This commit is contained in:
Davidson Gomes 2025-04-29 20:15:02 -03:00
parent d97ddc06c9
commit 34734b6da7
8 changed files with 613 additions and 16 deletions

19
.env
View File

@ -6,28 +6,23 @@ API_URL="http://localhost:8000"
ORGANIZATION_NAME="Evo AI"
ORGANIZATION_URL="https://evoai.evoapicloud.com"
# Configurações do banco de dados
# Database settings
POSTGRES_CONNECTION_STRING="postgresql://postgres:root@localhost:5432/evo_ai"
# Configurações de logging
# Logging settings
LOG_LEVEL="INFO"
LOG_DIR="logs"
# Configurações da API de Conhecimento
KNOWLEDGE_API_URL="http://localhost:5540"
KNOWLEDGE_API_KEY="sua-chave-api-conhecimento"
TENANT_ID="seu-tenant-id"
# Configurações do Redis
# Redis settings
REDIS_HOST="localhost"
REDIS_PORT=6379
REDIS_DB=8
REDIS_PASSWORD=""
# TTL do cache de ferramentas em segundos (1 hora)
# Tools cache TTL in seconds (1 hour)
TOOLS_CACHE_TTL=3600
# Configurações JWT
# JWT settings
JWT_SECRET_KEY="f6884ef5be4c279686ff90f0ed9d4656685eef9807245019ac94a3fbe32b0938"
JWT_ALGORITHM="HS256"
JWT_EXPIRATION_TIME=3600
@ -37,12 +32,12 @@ SENDGRID_API_KEY="SG.lfmOfb13QseRA0AHTLlKlw.H9RX5wKx37URMPohaAU1D4tJimG4g0FPR2iU
EMAIL_FROM="noreply@evolution-api.com"
APP_URL="https://evoai.evoapicloud.com"
# Configurações do Servidor
# Server settings
HOST="0.0.0.0"
PORT=8000
DEBUG=false
# Configurações de Seeders
# Seeders settings
ADMIN_EMAIL="admin@evoai.com"
ADMIN_INITIAL_PASSWORD="senhaforte123"
DEMO_EMAIL="demo@exemplo.com"

191
planejamento_atualizado.md Normal file
View File

@ -0,0 +1,191 @@
# Planejamento de Implementação - A2A Streaming (Atualizado)
## 1. Visão Geral
Implementar suporte a Server-Sent Events (SSE) para streaming de atualizações de tarefas em tempo real, seguindo a especificação oficial do A2A.
## 2. Componentes Necessários
### 2.2 Estrutura de Arquivos
```
src/
├── api/
│ └── agent_routes.py (modificação)
├── schemas/
│ └── streaming.py (novo)
├── services/
│ └── streaming_service.py (novo)
└── utils/
└── streaming.py (novo)
```
## 3. Implementação
### 3.1 Schemas (Pydantic)
```python
# schemas/streaming.py
- TaskStatusUpdateEvent
- state: str (working, completed, failed)
- timestamp: datetime
- message: Optional[Message]
- error: Optional[Error]
- TaskArtifactUpdateEvent
- type: str
- content: str
- metadata: Dict[str, Any]
- JSONRPCRequest
- jsonrpc: str = "2.0"
- id: str
- method: str = "tasks/sendSubscribe"
- params: Dict[str, Any]
- Message
- role: str
- parts: List[MessagePart]
- MessagePart
- type: str
- text: str
```
### 3.2 Serviço de Streaming
````python
# services/streaming_service.py
- send_task_streaming()
- Monta payload JSON-RPC conforme especificação:
```json
{
"jsonrpc": "2.0",
"id": "<uuid>",
"method": "tasks/sendSubscribe",
"params": {
"id": "<uuid>",
"sessionId": "<opcional>",
"message": {
"role": "user",
"parts": [{"type": "text", "text": "<mensagem>"}]
}
}
}
```
- Configura headers:
- Accept: text/event-stream
- Authorization: x-api-key <SUA_API_KEY>
- Gerencia conexão SSE
- Processa eventos em tempo real
````
### 3.3 Rota de Streaming
```python
# api/agent_routes.py
- Nova rota POST /{agent_id}/tasks/sendSubscribe
- Validação de API key
- Gerenciamento de sessão
- Streaming de eventos SSE
- Tratamento de erros JSON-RPC
```
### 3.4 Utilitários
```python
# utils/streaming.py
- Helpers para SSE
- Formatação de eventos
- Tratamento de reconexão
- Timeout e retry
- Processamento de eventos
- Parsing de eventos SSE
- Validação de payloads
- Formatação de respostas
- Conformidade com JSON-RPC 2.0
```
## 4. Fluxo de Dados
1. Cliente envia requisição JSON-RPC para `/tasks/sendSubscribe`
2. Servidor valida API key e configura sessão
3. Inicia streaming de eventos SSE
4. Envia atualizações em tempo real:
- TaskStatusUpdateEvent (estado da tarefa)
- TaskArtifactUpdateEvent (artefatos gerados)
- Mensagens do histórico
## 5. Exemplo de Uso
```python
async def exemplo_uso():
agent_id = "uuid-do-agente"
api_key = "sua-api-key"
mensagem = "Olá, como posso ajudar?"
async with httpx.AsyncClient() as client:
# Configura headers
headers = {
"Accept": "text/event-stream",
"Authorization": f"x-api-key {api_key}"
}
# Monta payload JSON-RPC
payload = {
"jsonrpc": "2.0",
"id": str(uuid.uuid4()),
"method": "tasks/sendSubscribe",
"params": {
"id": str(uuid.uuid4()),
"message": {
"role": "user",
"parts": [{"type": "text", "text": mensagem}]
}
}
}
# Inicia streaming
async with connect_sse(client, "POST", f"/agents/{agent_id}/tasks/sendSubscribe",
json=payload, headers=headers) as event_source:
async for event in event_source.aiter_sse():
if event.event == "message":
data = json.loads(event.data)
print(f"Evento recebido: {data}")
```
## 6. Considerações de Segurança
- Validação rigorosa de API keys
- Timeout de conexão SSE (30 segundos)
- Tratamento de erros e reconexão automática
- Limites de taxa (rate limiting)
- Validação de payloads JSON-RPC
- Sanitização de inputs
## 7. Testes
- Testes unitários para schemas
- Testes de integração para streaming
- Testes de carga e performance
- Testes de reconexão e resiliência
- Testes de conformidade JSON-RPC
## 8. Documentação
- Atualizar documentação da API
- Adicionar exemplos de uso
- Documentar formatos de eventos
- Guia de troubleshooting
- Referência à especificação A2A
## 9. Próximos Passos
1. Implementar schemas Pydantic conforme especificação
2. Desenvolver serviço de streaming com suporte a JSON-RPC
3. Adicionar rota SSE com validação de payloads
4. Implementar utilitários de streaming
5. Escrever testes de conformidade
6. Atualizar documentação
7. Revisão de código
8. Deploy em ambiente de teste

View File

@ -41,6 +41,8 @@ dependencies = [
"bcrypt==4.3.0",
"jinja2==3.1.6",
"pydantic[email]==2.11.3",
"httpx==0.28.1",
"httpx-sse==0.4.0",
]
[project.optional-dependencies]

View File

@ -1,15 +1,23 @@
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi import (
APIRouter,
Depends,
HTTPException,
status,
WebSocket,
WebSocketDisconnect,
)
from sqlalchemy.orm import Session
from src.config.database import get_db
from src.core.jwt_middleware import (
get_jwt_token,
verify_user_client,
get_jwt_token_ws,
)
from src.services import (
agent_service,
)
from src.schemas.chat import ChatRequest, ChatResponse, ErrorResponse
from src.services.agent_runner import run_agent
from src.services.agent_runner import run_agent, run_agent_stream
from src.core.exceptions import AgentNotFoundError
from src.services.service_providers import (
session_service,
@ -19,6 +27,8 @@ from src.services.service_providers import (
from datetime import datetime
import logging
import json
from fastapi.responses import StreamingResponse
logger = logging.getLogger(__name__)
@ -29,6 +39,103 @@ router = APIRouter(
)
@router.websocket("/ws/{agent_id}/{contact_id}")
async def websocket_chat(
websocket: WebSocket,
agent_id: str,
contact_id: str,
db: Session = Depends(get_db),
):
try:
# Accept the connection
await websocket.accept()
logger.info("WebSocket connection accepted, waiting for authentication")
# Aguardar mensagem de autenticação
try:
auth_data = await websocket.receive_json()
logger.info(f"Received authentication data: {auth_data}")
if not auth_data.get("type") == "authorization" or not auth_data.get(
"token"
):
logger.warning("Invalid authentication message")
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
token = auth_data["token"]
# Verify the token
payload = await get_jwt_token_ws(token)
if not payload:
logger.warning("Invalid token")
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
# Verificar se o agente pertence ao cliente do usuário
agent = agent_service.get_agent(db, agent_id)
if not agent:
logger.warning(f"Agent {agent_id} not found")
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
# Verificar se o usuário tem acesso ao agente (via client)
await verify_user_client(payload, db, agent.client_id)
logger.info(
f"WebSocket connection established for agent {agent_id} and contact {contact_id}"
)
while True:
try:
data = await websocket.receive_json()
logger.info(f"Received message: {data}")
message = data.get("message")
if not message:
continue
async for chunk in run_agent_stream(
agent_id=agent_id,
contact_id=contact_id,
message=message,
session_service=session_service,
artifacts_service=artifacts_service,
memory_service=memory_service,
db=db,
):
# Enviar cada chunk como uma mensagem JSON
await websocket.send_json(
{"message": chunk, "turn_complete": False}
)
# Enviar sinal de turno completo
await websocket.send_json({"message": "", "turn_complete": True})
except WebSocketDisconnect:
logger.info("Client disconnected")
break
except json.JSONDecodeError:
logger.warning("Invalid JSON message received")
continue
except Exception as e:
logger.error(f"Error in WebSocket message handling: {str(e)}")
await websocket.close(code=status.WS_1011_INTERNAL_ERROR)
break
except WebSocketDisconnect:
logger.info("Client disconnected during authentication")
except json.JSONDecodeError:
logger.warning("Invalid authentication message format")
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
except Exception as e:
logger.error(f"Error during authentication: {str(e)}")
await websocket.close(code=status.WS_1011_INTERNAL_ERROR)
except Exception as e:
logger.error(f"WebSocket error: {str(e)}")
await websocket.close(code=status.WS_1011_INTERNAL_ERROR)
@router.post(
"/",
response_model=ChatResponse,

View File

@ -149,3 +149,17 @@ def get_current_user_client_id(
return UUID(client_id)
return None
async def get_jwt_token_ws(token: str) -> Optional[dict]:
"""
Verifica e decodifica o token JWT para WebSocket.
Retorna o payload se o token for válido, None caso contrário.
"""
try:
payload = jwt.decode(
token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM]
)
return payload
except JWTError:
return None

View File

@ -3,6 +3,7 @@ import sys
from pathlib import Path
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from src.config.database import engine, Base
from src.config.settings import settings
from src.utils.logger import setup_logger
@ -39,12 +40,18 @@ app = FastAPI(
# CORS configuration
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_origins=["*"], # Permite todas as origens em desenvolvimento
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Configuração de arquivos estáticos
static_dir = Path("static")
if not static_dir.exists():
static_dir.mkdir(parents=True)
app.mount("/static", StaticFiles(directory=static_dir), name="static")
# PostgreSQL configuration
POSTGRES_CONNECTION_STRING = os.getenv(
"POSTGRES_CONNECTION_STRING", "postgresql://postgres:root@localhost:5432/evo_ai"

View File

@ -8,7 +8,8 @@ from src.core.exceptions import AgentNotFoundError, InternalServerError
from src.services.agent_service import get_agent
from src.services.agent_builder import AgentBuilder
from sqlalchemy.orm import Session
from typing import Optional
from typing import Optional, AsyncGenerator
import asyncio
logger = setup_logger(__name__)
@ -101,3 +102,96 @@ async def run_agent(
except Exception as e:
logger.error(f"Internal error processing request: {str(e)}", exc_info=True)
raise InternalServerError(str(e))
async def run_agent_stream(
agent_id: str,
contact_id: str,
message: str,
session_service: DatabaseSessionService,
artifacts_service: InMemoryArtifactService,
memory_service: InMemoryMemoryService,
db: Session,
session_id: Optional[str] = None,
) -> AsyncGenerator[str, None]:
try:
logger.info(
f"Starting streaming execution of agent {agent_id} for contact {contact_id}"
)
logger.info(f"Received message: {message}")
get_root_agent = get_agent(db, agent_id)
logger.info(
f"Root agent found: {get_root_agent.name} (type: {get_root_agent.type})"
)
if get_root_agent is None:
raise AgentNotFoundError(f"Agent with ID {agent_id} not found")
# Using the AgentBuilder to create the agent
agent_builder = AgentBuilder(db)
root_agent, exit_stack = await agent_builder.build_agent(get_root_agent)
logger.info("Configuring Runner")
agent_runner = Runner(
agent=root_agent,
app_name=agent_id,
session_service=session_service,
artifact_service=artifacts_service,
memory_service=memory_service,
)
adk_session_id = contact_id + "_" + agent_id
if session_id is None:
session_id = adk_session_id
logger.info(f"Searching session for contact {contact_id}")
session = session_service.get_session(
app_name=agent_id,
user_id=contact_id,
session_id=adk_session_id,
)
if session is None:
logger.info(f"Creating new session for contact {contact_id}")
session = session_service.create_session(
app_name=agent_id,
user_id=contact_id,
session_id=adk_session_id,
)
content = Content(role="user", parts=[Part(text=message)])
logger.info("Starting agent streaming execution")
try:
for event in agent_runner.run(
user_id=contact_id,
session_id=adk_session_id,
new_message=content,
):
if event.content and event.content.parts:
text = event.content.parts[0].text
if text:
yield text
await asyncio.sleep(0) # Allow other tasks to run
completed_session = session_service.get_session(
app_name=agent_id,
user_id=contact_id,
session_id=adk_session_id,
)
memory_service.add_session_to_memory(completed_session)
finally:
# Ensure the exit_stack is closed correctly
if exit_stack:
await exit_stack.aclose()
logger.info("Agent streaming execution completed successfully")
except AgentNotFoundError as e:
logger.error(f"Error processing request: {str(e)}")
raise e
except Exception as e:
logger.error(f"Internal error processing request: {str(e)}", exc_info=True)
raise InternalServerError(str(e))

187
static/test.html Normal file
View File

@ -0,0 +1,187 @@
<!DOCTYPE html>
<html>
<head>
<title>ADK Streaming Test</title>
<style>
#messages {
height: 400px;
overflow-y: auto;
border: 1px solid #ccc;
padding: 10px;
margin-bottom: 10px;
}
.message {
margin: 5px 0;
padding: 5px;
border-radius: 5px;
}
.user-message {
background-color: #e3f2fd;
margin-left: 20%;
margin-right: 5px;
}
.agent-message {
background-color: #f5f5f5;
margin-right: 20%;
margin-left: 5px;
}
#debug {
margin-top: 20px;
padding: 10px;
background-color: #f8f9fa;
border: 1px solid #ddd;
font-family: monospace;
white-space: pre-wrap;
}
</style>
</head>
<body>
<h1>ADK Streaming Test</h1>
<div id="messages"></div>
<div id="connection-status" style="margin-bottom: 10px;"></div>
<form id="messageForm">
<input type="text" id="agentId" placeholder="Agent ID" style="margin-bottom: 10px; width: 300px;"><br>
<input type="text" id="contactId" placeholder="Contact ID" style="margin-bottom: 10px; width: 300px;"><br>
<input type="text" id="token" placeholder="JWT Token" style="margin-bottom: 10px; width: 300px;"><br>
<button type="button" id="connectButton">Conectar</button><br><br>
<input type="text" id="message" placeholder="Digite sua mensagem..." style="width: 300px;">
<button type="submit" id="sendButton" disabled>Enviar</button>
</form>
<div id="debug"></div>
<script>
let ws = null;
let currentMessageId = null;
const messagesDiv = document.getElementById('messages');
const messageForm = document.getElementById('messageForm');
const connectButton = document.getElementById('connectButton');
const sendButton = document.getElementById('sendButton');
const statusDiv = document.getElementById('connection-status');
const debugDiv = document.getElementById('debug');
function log(message, data = null) {
const timestamp = new Date().toISOString();
let logMessage = `${timestamp} - ${message}`;
if (data) {
logMessage += '\n' + JSON.stringify(data, null, 2);
}
debugDiv.textContent = logMessage + '\n\n' + debugDiv.textContent;
console.log(message, data);
}
connectButton.onclick = function () {
const agentId = document.getElementById('agentId').value;
const contactId = document.getElementById('contactId').value;
const token = document.getElementById('token').value;
if (!agentId || !contactId || !token) {
alert('Por favor, preencha todos os campos');
return;
}
// Fechar conexão existente se houver
if (ws) {
ws.close();
}
// Criar nova conexão WebSocket
const ws_url = `ws://${window.location.host}/api/v1/chat/ws/${agentId}/${contactId}`;
log('Connecting to WebSocket', {
url: ws_url
});
ws = new WebSocket(ws_url);
ws.onopen = function () {
log('WebSocket connected, sending authentication');
// Adicionar token no header
const authMessage = {
type: 'authorization',
token: token
};
ws.send(JSON.stringify(authMessage));
log('Authentication sent', authMessage);
statusDiv.textContent = 'Conectado';
statusDiv.style.color = 'green';
sendButton.disabled = false;
connectButton.disabled = true;
};
ws.onmessage = function (event) {
const packet = JSON.parse(event.data);
log('Received message', packet);
if (packet.turn_complete) {
currentMessageId = null;
return;
}
if (currentMessageId == null) {
currentMessageId = Math.random().toString(36).substring(7);
const messageDiv = document.createElement('div');
messageDiv.id = currentMessageId;
messageDiv.className = 'message agent-message';
messagesDiv.appendChild(messageDiv);
}
const messageDiv = document.getElementById(currentMessageId);
messageDiv.textContent = (messageDiv.textContent || '') + packet.message;
messagesDiv.scrollTop = messagesDiv.scrollHeight;
};
ws.onclose = function (event) {
log('WebSocket closed', {
code: event.code,
reason: event.reason,
wasClean: event.wasClean
});
statusDiv.textContent = 'Desconectado';
statusDiv.style.color = 'red';
sendButton.disabled = true;
connectButton.disabled = false;
ws = null;
};
ws.onerror = function (error) {
log('WebSocket error', error);
statusDiv.textContent = 'Erro na conexão';
statusDiv.style.color = 'red';
};
};
messageForm.onsubmit = function (e) {
e.preventDefault();
const messageInput = document.getElementById('message');
const message = messageInput.value;
if (message && ws) {
// Criar div para mensagem do usuário
const userMessageDiv = document.createElement('div');
userMessageDiv.className = 'message user-message';
userMessageDiv.textContent = message;
messagesDiv.appendChild(userMessageDiv);
// Enviar mensagem via WebSocket
const messagePacket = {
message: message
};
log('Sending message', messagePacket);
ws.send(JSON.stringify(messagePacket));
messageInput.value = '';
messagesDiv.scrollTop = messagesDiv.scrollHeight;
}
return false;
};
</script>
</body>
</html>