feat(api): remove outdated planning document and implement streaming service for real-time task updates

This commit is contained in:
Davidson Gomes
2025-04-29 20:35:33 -03:00
parent 34734b6da7
commit 690168fa5d
7 changed files with 793 additions and 225 deletions

View File

@@ -24,6 +24,9 @@ from src.services.service_providers import (
memory_service,
)
import logging
from fastapi.responses import StreamingResponse
from ..services.streaming_service import StreamingService
from ..schemas.streaming import JSONRPCRequest
from src.services.session_service import get_session_events
@@ -74,6 +77,8 @@ router = APIRouter(
responses={404: {"description": "Not found"}},
)
streaming_service = StreamingService()
@router.post("/", response_model=Agent, status_code=status.HTTP_201_CREATED)
async def create_agent(
@@ -192,8 +197,8 @@ async def get_agent_json(
},
"version": os.getenv("API_VERSION", ""),
"capabilities": {
"streaming": False,
"pushNotifications": False,
"streaming": True,
"pushNotifications": True,
"stateTransitionHistory": True,
},
"authentication": {
@@ -355,6 +360,11 @@ async def handle_task(
except Exception as e:
logger.error(f"Error processing history: {str(e)}")
# pushNotification = task_request.get("pushNotification", False)
# if pushNotification:
# await send_push_notification(task_id, final_response_text)
return response_task
except HTTPException:
@@ -365,3 +375,51 @@ async def handle_task(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Internal server error",
)
@router.post("/{agent_id}/tasks/sendSubscribe")
async def subscribe_task_streaming(
agent_id: str,
request: JSONRPCRequest,
x_api_key: str = Header(None),
db: Session = Depends(get_db),
):
"""
Endpoint para streaming de eventos SSE de uma tarefa.
Args:
agent_id: ID do agente
request: Requisição JSON-RPC
x_api_key: Chave de API no header
db: Sessão do banco de dados
Returns:
StreamingResponse com eventos SSE
"""
if not x_api_key:
raise HTTPException(status_code=401, detail="API key é obrigatória")
# Extrai mensagem do payload
message = request.params.get("message", {}).get("parts", [{}])[0].get("text", "")
session_id = request.params.get("sessionId")
# Configura streaming
async def event_generator():
async for event in streaming_service.send_task_streaming(
agent_id=agent_id,
api_key=x_api_key,
message=message,
session_id=session_id,
db=db,
):
yield event
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)

40
src/schemas/streaming.py Normal file
View File

@@ -0,0 +1,40 @@
from datetime import datetime
from typing import Dict, List, Optional, Any, Literal
from pydantic import BaseModel, Field
class MessagePart(BaseModel):
type: str
text: str
class Message(BaseModel):
role: str
parts: List[MessagePart]
class TaskStatusUpdateEvent(BaseModel):
state: str = Field(..., description="Estado da tarefa (working, completed, failed)")
timestamp: datetime = Field(default_factory=datetime.utcnow)
message: Optional[Message] = None
error: Optional[Dict[str, Any]] = None
class TaskArtifactUpdateEvent(BaseModel):
type: str
content: str
metadata: Dict[str, Any] = Field(default_factory=dict)
class JSONRPCRequest(BaseModel):
jsonrpc: Literal["2.0"] = "2.0"
id: str
method: Literal["tasks/sendSubscribe"] = "tasks/sendSubscribe"
params: Dict[str, Any]
class JSONRPCResponse(BaseModel):
jsonrpc: Literal["2.0"] = "2.0"
id: str
result: Optional[Dict[str, Any]] = None
error: Optional[Dict[str, Any]] = None

View File

@@ -0,0 +1,140 @@
import uuid
import json
from typing import AsyncGenerator, Dict, Any
from fastapi import HTTPException
from datetime import datetime
from ..schemas.streaming import (
JSONRPCRequest,
TaskStatusUpdateEvent,
TaskArtifactUpdateEvent,
)
from ..services.agent_runner import run_agent
from ..services.service_providers import (
session_service,
artifacts_service,
memory_service,
)
from sqlalchemy.orm import Session
class StreamingService:
def __init__(self):
self.active_connections: Dict[str, Any] = {}
async def send_task_streaming(
self,
agent_id: str,
api_key: str,
message: str,
session_id: str = None,
db: Session = None,
) -> AsyncGenerator[str, None]:
"""
Inicia o streaming de eventos SSE para uma tarefa.
Args:
agent_id: ID do agente
api_key: Chave de API para autenticação
message: Mensagem inicial
session_id: ID da sessão (opcional)
db: Sessão do banco de dados
Yields:
Eventos SSE formatados
"""
# Validação básica da API key
if not api_key:
raise HTTPException(status_code=401, detail="API key é obrigatória")
# Gera IDs únicos
task_id = str(uuid.uuid4())
request_id = str(uuid.uuid4())
# Monta payload JSON-RPC
payload = JSONRPCRequest(
id=request_id,
params={
"id": task_id,
"sessionId": session_id,
"message": {
"role": "user",
"parts": [{"type": "text", "text": message}],
},
},
)
# Registra conexão
self.active_connections[task_id] = {
"agent_id": agent_id,
"api_key": api_key,
"session_id": session_id,
}
try:
# Envia evento de início
yield self._format_sse_event(
"status",
TaskStatusUpdateEvent(
state="working",
timestamp=datetime.now().isoformat(),
message=payload.params["message"],
).model_dump_json(),
)
# Executa o agente
result = await run_agent(
str(agent_id),
task_id,
message,
session_service,
artifacts_service,
memory_service,
db,
session_id,
)
# Envia a resposta do agente como um evento separado
yield self._format_sse_event(
"message",
json.dumps(
{
"role": "agent",
"content": result,
"timestamp": datetime.now().isoformat(),
}
),
)
# Evento de conclusão
yield self._format_sse_event(
"status",
TaskStatusUpdateEvent(
state="completed",
timestamp=datetime.now().isoformat(),
).model_dump_json(),
)
except Exception as e:
# Evento de erro
yield self._format_sse_event(
"status",
TaskStatusUpdateEvent(
state="failed",
timestamp=datetime.now().isoformat(),
error={"message": str(e)},
).model_dump_json(),
)
raise
finally:
# Limpa conexão
self.active_connections.pop(task_id, None)
def _format_sse_event(self, event_type: str, data: str) -> str:
"""Formata um evento SSE."""
return f"event: {event_type}\ndata: {data}\n\n"
async def close_connection(self, task_id: str):
"""Fecha uma conexão de streaming."""
if task_id in self.active_connections:
self.active_connections.pop(task_id)

70
src/utils/streaming.py Normal file
View File

@@ -0,0 +1,70 @@
import asyncio
from typing import AsyncGenerator, Optional
from fastapi import HTTPException
class SSEUtils:
@staticmethod
async def with_timeout(
generator: AsyncGenerator, timeout: int = 30, retry_attempts: int = 3
) -> AsyncGenerator:
"""
Adiciona timeout e retry a um gerador de eventos SSE.
Args:
generator: Gerador de eventos
timeout: Tempo máximo de espera em segundos
retry_attempts: Número de tentativas de reconexão
Yields:
Eventos do gerador
"""
attempts = 0
while attempts < retry_attempts:
try:
async for event in asyncio.wait_for(generator, timeout):
yield event
break
except asyncio.TimeoutError:
attempts += 1
if attempts >= retry_attempts:
raise HTTPException(
status_code=408, detail="Timeout após múltiplas tentativas"
)
await asyncio.sleep(1) # Espera antes de tentar novamente
@staticmethod
def format_error_event(error: Exception) -> str:
"""
Formata um evento de erro SSE.
Args:
error: Exceção ocorrida
Returns:
String formatada do evento SSE
"""
return f"event: error\ndata: {str(error)}\n\n"
@staticmethod
def validate_sse_headers(headers: dict) -> None:
"""
Valida headers necessários para SSE.
Args:
headers: Dicionário de headers
Raises:
HTTPException se headers inválidos
"""
required_headers = {
"Accept": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
for header, value in required_headers.items():
if headers.get(header) != value:
raise HTTPException(
status_code=400, detail=f"Header {header} inválido ou ausente"
)