diff --git a/.gitignore b/.gitignore index 63ae9339..fe0630c1 100644 --- a/.gitignore +++ b/.gitignore @@ -31,6 +31,7 @@ coverage.xml *.cover .hypothesis/ .pytest_cache/ +docs/A2A # Environments .env diff --git a/a2a_checklist.md b/a2a_checklist.md new file mode 100644 index 00000000..3277bad4 --- /dev/null +++ b/a2a_checklist.md @@ -0,0 +1,252 @@ +# Checklist de Implementação do Protocolo A2A com Redis + +## 1. Configuração Inicial + +- [ ] **Configurar dependências no arquivo pyproject.toml** + + - Adicionar Redis e dependências relacionadas: + ``` + redis = "^5.3.0" + sse-starlette = "^2.3.3" + jwcrypto = "^1.5.6" + pyjwt = {extras = ["crypto"], version = "^2.10.1"} + ``` + +- [ ] **Configurar variáveis de ambiente para Redis** + + - Adicionar em `.env.example` e `.env`: + ``` + REDIS_HOST=localhost + REDIS_PORT=6379 + REDIS_PASSWORD= + REDIS_DB=0 + REDIS_SSL=false + REDIS_KEY_PREFIX=a2a: + REDIS_TTL=3600 + ``` + +- [ ] **Configurar Redis no docker-compose.yml** + - Adicionar serviço Redis com portas e volumes apropriados + - Configurar segurança básica (senha, se necessário) + +## 2. Implementação de Modelos e Schemas + +- [ ] **Criar schemas A2A em `src/schemas/a2a.py`** + + - Implementar tipos conforme `docs/A2A/samples/python/common/types.py`: + - Enums (TaskState, etc.) + - Classes de mensagens (TextPart, FilePart, etc.) + - Classes de tarefas (Task, TaskStatus, etc.) + - Estruturas JSON-RPC + - Tipos de erros + +- [ ] **Implementar validadores de modelo** + - Validadores para conteúdos de arquivo + - Validadores para formatos de mensagem + - Conversores de formato para compatibilidade com o protocolo + +## 3. Implementação do Cache Redis + +- [ ] **Criar configuração Redis em `src/config/redis.py`** + + - Implementar função de conexão com pool + - Configurar opções de segurança (SSL, autenticação) + - Configurar TTL padrão para diferentes tipos de dados + +- [ ] **Criar serviço de cache Redis em `src/services/redis_cache_service.py`** + + - Implementar métodos do exemplo com suporte a Redis: + ```python + class RedisCache: + async def get_task(self, task_id: str) -> dict + async def save_task(self, task_id: str, task_data: dict, ttl: int = 3600) -> None + async def update_task_status(self, task_id: str, status: dict) -> bool + async def append_to_history(self, task_id: str, message: dict) -> bool + async def save_push_notification_config(self, task_id: str, config: dict) -> None + async def get_push_notification_config(self, task_id: str) -> dict + async def save_sse_client(self, task_id: str, client_id: str) -> None + async def get_sse_clients(self, task_id: str) -> list + async def remove_sse_client(self, task_id: str, client_id: str) -> None + ``` + +- [ ] **Implementar funcionalidades para gerenciamento de conexões** + - Reconexão automática + - Fallback para cache em memória em caso de falha + - Métricas de desempenho + +## 4. Serviços A2A + +- [ ] **Implementar utilitários A2A em `src/utils/a2a_utils.py`** + + - Implementar funções conforme `docs/A2A/samples/python/common/server/utils.py`: + ```python + def are_modalities_compatible(server_output_modes, client_output_modes) + def new_incompatible_types_error(request_id) + def new_not_implemented_error(request_id) + ``` + +- [ ] **Implementar `A2ATaskManager` em `src/services/a2a_task_manager_service.py`** + + - Seguir a interface do `TaskManager` do exemplo + - Implementar todos os métodos abstratos: + ```python + async def on_get_task(self, request: GetTaskRequest) -> GetTaskResponse + async def on_cancel_task(self, request: CancelTaskRequest) -> CancelTaskResponse + async def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse + async def on_send_task_subscribe(self, request) -> Union[AsyncIterable, JSONRPCResponse] + async def on_set_task_push_notification(self, request) -> SetTaskPushNotificationResponse + async def on_get_task_push_notification(self, request) -> GetTaskPushNotificationResponse + async def on_resubscribe_to_task(self, request) -> Union[AsyncIterable, JSONRPCResponse] + ``` + - Utilizar Redis para persistência de dados de tarefa + +- [ ] **Implementar `A2AServer` em `src/services/a2a_server_service.py`** + + - Processar requisições JSON-RPC conforme `docs/A2A/samples/python/common/server/server.py`: + ```python + async def _process_request(self, request: Request) + def _handle_exception(self, e: Exception) -> JSONResponse + def _create_response(self, result: Any) -> Union[JSONResponse, EventSourceResponse] + ``` + +- [ ] **Integrar com agent_runner.py existente** + + - Adaptar `run_agent` para uso no contexto de tarefas A2A + - Implementar mapeamento entre formatos de mensagem + +- [ ] **Integrar com streaming_service.py existente** + - Adaptar para formato de eventos compatível com A2A + - Implementar suporte a streaming de múltiplos tipos de eventos + +## 5. Autenticação e Push Notifications + +- [ ] **Implementar `PushNotificationAuth` em `src/services/push_notification_auth_service.py`** + + - Seguir o exemplo em `docs/A2A/samples/python/common/utils/push_notification_auth.py` + - Implementar: + ```python + def generate_jwk(self) + def handle_jwks_endpoint(self, request: Request) + async def send_authenticated_push_notification(self, url: str, data: dict) + ``` + +- [ ] **Implementar verificação de URL de notificação** + + - Seguir método `verify_push_notification_url` do exemplo + - Implementar validação de token para verificação + +- [ ] **Implementar armazenamento seguro de chaves** + - Armazenar chaves privadas de forma segura + - Rotação periódica de chaves + - Gerenciamento do ciclo de vida das chaves + +## 6. Rotas A2A + +- [ ] **Implementar rotas em `src/api/a2a_routes.py`** + + - Criar endpoint principal para processamento de requisições JSON-RPC: + + ```python + @router.post("/{agent_id}") + async def process_a2a_request(agent_id: str, request: Request, x_api_key: str = Header(None)) + ``` + + - Implementar endpoint do Agent Card reutilizando lógica existente: + + ```python + @router.get("/{agent_id}/.well-known/agent.json") + async def get_agent_card(agent_id: str, db: Session = Depends(get_db)) + ``` + + - Implementar endpoint JWKS para autenticação de push notifications: + ```python + @router.get("/{agent_id}/.well-known/jwks.json") + async def get_jwks(agent_id: str, db: Session = Depends(get_db)) + ``` + +- [ ] **Registrar rotas A2A no aplicativo principal** + - Adicionar importação e inclusão em `src/main.py`: + ```python + app.include_router(a2a_routes.router, prefix="/api/v1") + ``` + +## 7. Testes + +- [ ] **Criar testes unitários para schemas A2A** + + - Testar validadores + - Testar conversões de formato + - Testar compatibilidade de modalidades + +- [ ] **Criar testes unitários para cache Redis** + + - Testar todas as operações CRUD + - Testar expiração de dados + - Testar comportamento com falhas de conexão + +- [ ] **Criar testes unitários para gerenciador de tarefas** + + - Testar ciclo de vida da tarefa + - Testar cancelamento de tarefas + - Testar notificações push + +- [ ] **Criar testes de integração para endpoints A2A** + - Testar requisições completas + - Testar streaming + - Testar cenários de erro + +## 8. Segurança + +- [ ] **Implementar validação de API key** + + - Verificar API key para todas as requisições + - Implementar rate limiting por agente/cliente + +- [ ] **Configurar segurança no Redis** + + - Ativar autenticação e SSL em produção + - Definir políticas de retenção de dados + - Implementar backup e recuperação + +- [ ] **Configurar segurança para push notifications** + - Implementar assinatura JWT + - Validar URLs de callback + - Implementar retry com backoff para falhas + +## 9. Monitoramento e Métricas + +- [ ] **Implementar métricas de Redis** + + - Taxa de acertos/erros do cache + - Tempo de resposta + - Uso de memória + +- [ ] **Implementar métricas de tarefas A2A** + + - Número de tarefas por estado + - Tempo médio de processamento + - Taxa de erros + +- [ ] **Configurar logging apropriado** + - Registrar eventos importantes + - Mascarar dados sensíveis + - Implementar níveis de log configuráveis + +## 10. Documentação + +- [ ] **Documentar API A2A** + + - Descrever endpoints e formatos + - Fornecer exemplos de uso + - Documentar erros e soluções + +- [ ] **Documentar integração com Redis** + + - Descrever configuração + - Explicar estratégia de cache + - Documentar TTLs e políticas de expiração + +- [ ] **Criar exemplos de clients** + - Implementar exemplos de uso em Python + - Documentar fluxos comuns + - Fornecer snippets para linguagens populares diff --git a/a2a_feature.md b/a2a_feature.md new file mode 100644 index 00000000..c9d1b2f0 --- /dev/null +++ b/a2a_feature.md @@ -0,0 +1,491 @@ +# Implementação do Servidor A2A (Agent-to-Agent) + +## Visão Geral + +Este documento descreve o plano de implementação para integrar o servidor A2A (Agent-to-Agent) no sistema existente. A implementação seguirá os exemplos fornecidos nos arquivos de referência, adaptando-os à estrutura atual do projeto. + +## Componentes a Serem Implementados + +### 1. Servidor A2A + +Implementação da classe `A2AServer` como serviço para gerenciar requisições JSON-RPC compatíveis com o protocolo A2A. + +### 2. Gerenciador de Tarefas + +Implementação do `TaskManager` como serviço para gerenciar o ciclo de vida das tarefas do agente. + +### 3. Adaptadores para Integração + +Criação de adaptadores para integrar o servidor A2A com os serviços existentes, como o streaming_service e push_notification_service. + +## Rotas e Endpoints A2A + +O protocolo A2A requer a implementação das seguintes rotas JSON-RPC: + +### 1. `POST /a2a/{agent_id}` + +Endpoint principal que processa todas as solicitações JSON-RPC para um agente específico. + +### 2. `GET /a2a/{agent_id}/.well-known/agent.json` + +Retorna o Agent Card contendo as informações do agente. + +### Métodos JSON-RPC implementados: + +1. **tasks/send** - Envia uma nova tarefa para o agente. + + - Parâmetros: `id`, `sessionId`, `message`, `acceptedOutputModes` (opcional), `pushNotification` (opcional) + - Retorna: Status da tarefa, artefatos gerados e histórico + +2. **tasks/sendSubscribe** - Envia uma tarefa e assina para receber atualizações via streaming (SSE). + + - Parâmetros: Mesmos do `tasks/send` + - Retorna: Stream de eventos SSE com atualizações de status e artefatos + +3. **tasks/get** - Obtém o status atual de uma tarefa. + + - Parâmetros: `id`, `historyLength` (opcional) + - Retorna: Status atual da tarefa, artefatos e histórico + +4. **tasks/cancel** - Tenta cancelar uma tarefa em execução. + + - Parâmetros: `id` + - Retorna: Status atualizado da tarefa + +5. **tasks/pushNotification/set** - Configura notificações push para uma tarefa. + + - Parâmetros: `id`, `pushNotificationConfig` (URL e autenticação) + - Retorna: Configuração de notificação atualizada + +6. **tasks/pushNotification/get** - Obtém a configuração de notificações push de uma tarefa. + + - Parâmetros: `id` + - Retorna: Configuração de notificação atual + +7. **tasks/resubscribe** - Reassina para receber eventos de uma tarefa existente. + - Parâmetros: `id` + - Retorna: Stream de eventos SSE + +## Streaming e Push Notifications + +### Streaming (SSE) + +O streaming será implementado usando Server-Sent Events (SSE) para enviar atualizações em tempo real aos clientes. + +#### Integração com streaming_service.py existente + +O serviço atual `StreamingService` já implementa funcionalidades de streaming SSE. Iremos expandir e adaptar: + +```python +# Exemplo de integração com o streaming_service.py existente +async def send_task_streaming(request: SendTaskStreamingRequest) -> AsyncIterable[SendTaskStreamingResponse]: + stream_service = StreamingService() + + async for event in stream_service.send_task_streaming( + agent_id=request.params.metadata.get("agent_id"), + api_key=api_key, + message=request.params.message.parts[0].text, + session_id=request.params.sessionId, + db=db + ): + # Converter formato de evento SSE para formato A2A + yield SendTaskStreamingResponse( + id=request.id, + result=convert_to_a2a_event_format(event) + ) +``` + +### Push Notifications + +O sistema de notificações push permitirá que o agente envie atualizações para URLs de callback configuradas pelos clientes. + +#### Integração com push_notification_service.py existente + +O serviço atual `PushNotificationService` já implementa o envio de notificações. Iremos adaptar: + +```python +# Exemplo de integração com o push_notification_service.py existente +async def send_push_notification(task_id, state, message=None): + notification_config = await get_push_notification_config(task_id) + if notification_config: + await push_notification_service.send_notification( + url=notification_config.url, + task_id=task_id, + state=state, + message=message, + headers=notification_config.headers + ) +``` + +#### Autenticação de Push Notifications + +Implementaremos autenticação segura para as notificações push baseada em JWT usando o `PushNotificationAuth`: + +```python +# Exemplo de como configurar autenticação nas notificações +push_auth = PushNotificationSenderAuth() +push_auth.generate_jwk() + +# Incluir rota para obter as chaves públicas +@router.get("/{agent_id}/.well-known/jwks.json") +async def get_jwks(agent_id: str): + return push_auth.handle_jwks_endpoint(request) + +# Integrar autenticação ao enviar notificações +async def send_authenticated_push_notification(url, data): + await push_auth.send_push_notification(url, data) +``` + +## Estratégia de Armazenamento de Dados + +### Uso de Redis para Dados Temporários + +Utilizaremos Redis para armazenamento e gerenciamento dos dados temporários das tarefas A2A, substituindo o cache em memória do exemplo original: + +```python +from src.services.redis_cache_service import RedisCacheService + +class RedisCache: + def __init__(self, redis_service: RedisCacheService): + self.redis = redis_service + + # Métodos para gerenciamento de tarefas + async def get_task(self, task_id: str) -> dict: + """Recupera uma tarefa pelo ID.""" + return self.redis.get(f"task:{task_id}") + + async def save_task(self, task_id: str, task_data: dict, ttl: int = 3600) -> None: + """Salva uma tarefa com TTL configurável.""" + self.redis.set(f"task:{task_id}", task_data, ttl=ttl) + + async def update_task_status(self, task_id: str, status: dict) -> bool: + """Atualiza o status de uma tarefa.""" + task_data = await self.get_task(task_id) + if not task_data: + return False + task_data["status"] = status + await self.save_task(task_id, task_data) + return True + + # Métodos para histórico de tarefas + async def append_to_history(self, task_id: str, message: dict) -> bool: + """Adiciona uma mensagem ao histórico da tarefa.""" + task_data = await self.get_task(task_id) + if not task_data: + return False + + if "history" not in task_data: + task_data["history"] = [] + + task_data["history"].append(message) + await self.save_task(task_id, task_data) + return True + + # Métodos para notificações push + async def save_push_notification_config(self, task_id: str, config: dict) -> None: + """Salva a configuração de notificação push para uma tarefa.""" + self.redis.set(f"push_notification:{task_id}", config, ttl=3600) + + async def get_push_notification_config(self, task_id: str) -> dict: + """Recupera a configuração de notificação push de uma tarefa.""" + return self.redis.get(f"push_notification:{task_id}") + + # Métodos para SSE (Server-Sent Events) + async def save_sse_client(self, task_id: str, client_id: str) -> None: + """Registra um cliente SSE para uma tarefa.""" + self.redis.set_hash(f"sse_clients:{task_id}", client_id, "active") + + async def get_sse_clients(self, task_id: str) -> list: + """Recupera todos os clientes SSE registrados para uma tarefa.""" + return self.redis.get_all_hash(f"sse_clients:{task_id}") + + async def remove_sse_client(self, task_id: str, client_id: str) -> None: + """Remove um cliente SSE do registro.""" + self.redis.delete_hash(f"sse_clients:{task_id}", client_id) +``` + +O serviço Redis será configurado com TTL (time-to-live) para garantir a limpeza automática de dados temporários: + +```python +# Configuração de TTL para diferentes tipos de dados +TASK_TTL = 3600 # 1 hora para tarefas +HISTORY_TTL = 86400 # 24 horas para histórico +PUSH_NOTIFICATION_TTL = 3600 # 1 hora para configurações de notificação +SSE_CLIENT_TTL = 300 # 5 minutos para clientes SSE +``` + +### Modelos Existentes e Redis + +O sistema continuará utilizando os modelos SQLAlchemy existentes para dados permanentes: + +- **Agent**: Dados do agente e configurações +- **Session**: Sessões persistentes +- **MCPServer**: Configurações de servidores de ferramentas + +Para dados temporários (tarefas A2A, histórico, streaming), utilizaremos Redis que oferece: + +1. **Performance**: Operações em memória com persistência opcional +2. **TTL**: Expiração automática de dados temporários +3. **Estruturas de dados**: Suporte a strings, hashes, listas para diferentes necessidades +4. **Pub/Sub**: Mecanismo para notificações em tempo real +5. **Escalabilidade**: Melhor suporte a múltiplas instâncias do que cache em memória + +### Implementação do TaskManager com Redis + +O `A2ATaskManager` implementará a mesma interface que o `TaskManager` do exemplo, mas utilizando Redis: + +```python +class A2ATaskManager: + """ + Gerenciador de tarefas A2A usando Redis para armazenamento. + Implementa a interface do protocolo A2A para gerenciamento do ciclo de vida das tarefas. + """ + + def __init__( + self, + redis_cache: RedisCacheService, + session_service=None, + artifacts_service=None, + memory_service=None, + push_notification_service=None + ): + self.redis_cache = redis_cache + self.session_service = session_service + self.artifacts_service = artifacts_service + self.memory_service = memory_service + self.push_notification_service = push_notification_service + self.lock = asyncio.Lock() + self.subscriber_lock = asyncio.Lock() + self.task_sse_subscribers = {} + + async def on_get_task(self, request: GetTaskRequest) -> GetTaskResponse: + """ + Obtém o status atual de uma tarefa. + + Args: + request: Requisição JSON-RPC para obter dados da tarefa + + Returns: + Resposta com dados da tarefa ou erro + """ + logger.info(f"Getting task {request.params.id}") + task_query_params = request.params + + task_data = self.redis_cache.get(f"task:{task_query_params.id}") + if not task_data: + return GetTaskResponse(id=request.id, error=TaskNotFoundError()) + + # Processar histórico conforme solicitado + if task_query_params.historyLength and task_data.get("history"): + task_data["history"] = task_data["history"][-task_query_params.historyLength:] + + return GetTaskResponse(id=request.id, result=task_data) +``` + +## Implementação do A2A Server Service + +O serviço `A2AServer` processará as requisições JSON-RPC conforme o protocolo A2A: + +```python +class A2AServer: + """ + Servidor A2A que implementa o protocolo JSON-RPC para processamento de tarefas de agentes. + """ + + def __init__( + self, + endpoint: str = "/", + agent_card = None, + task_manager = None, + streaming_service = None, + ): + self.endpoint = endpoint + self.agent_card = agent_card + self.task_manager = task_manager + self.streaming_service = streaming_service + + async def _process_request(self, request: Request): + """ + Processa uma requisição JSON-RPC do protocolo A2A. + + Args: + request: Requisição HTTP + + Returns: + Resposta JSON-RPC ou stream de eventos + """ + try: + body = await request.json() + json_rpc_request = A2ARequest.validate_python(body) + + # Delegar para o handler apropriado com base no tipo de requisição + if isinstance(json_rpc_request, GetTaskRequest): + result = await self.task_manager.on_get_task(json_rpc_request) + elif isinstance(json_rpc_request, SendTaskRequest): + result = await self.task_manager.on_send_task(json_rpc_request) + elif isinstance(json_rpc_request, SendTaskStreamingRequest): + result = await self.task_manager.on_send_task_subscribe(json_rpc_request) + elif isinstance(json_rpc_request, CancelTaskRequest): + result = await self.task_manager.on_cancel_task(json_rpc_request) + elif isinstance(json_rpc_request, SetTaskPushNotificationRequest): + result = await self.task_manager.on_set_task_push_notification(json_rpc_request) + elif isinstance(json_rpc_request, GetTaskPushNotificationRequest): + result = await self.task_manager.on_get_task_push_notification(json_rpc_request) + elif isinstance(json_rpc_request, TaskResubscriptionRequest): + result = await self.task_manager.on_resubscribe_to_task(json_rpc_request) + else: + logger.warning(f"Unexpected request type: {type(json_rpc_request)}") + raise ValueError(f"Unexpected request type: {type(json_rpc_request)}") + + return self._create_response(result) + + except Exception as e: + return self._handle_exception(e) +``` + +## Implementação da Autenticação para Push Notifications + +Implementaremos autenticação JWT para notificações push, seguindo o exemplo: + +```python +class PushNotificationAuth: + def __init__(self): + self.public_keys = [] + self.private_key_jwk = None + + def generate_jwk(self): + key = jwk.JWK.generate(kty='RSA', size=2048, kid=str(uuid.uuid4()), use="sig") + self.public_keys.append(key.export_public(as_dict=True)) + self.private_key_jwk = PyJWK.from_json(key.export_private()) + + def handle_jwks_endpoint(self, request: Request): + """Retorna as chaves públicas para clientes.""" + return JSONResponse({"keys": self.public_keys}) + + async def send_authenticated_push_notification(self, url: str, data: dict): + """Envia notificação push assinada com JWT.""" + jwt_token = self._generate_jwt(data) + headers = {'Authorization': f"Bearer {jwt_token}"} + async with httpx.AsyncClient(timeout=10) as client: + try: + response = await client.post(url, json=data, headers=headers) + response.raise_for_status() + logger.info(f"Push-notification sent to URL: {url}") + except Exception as e: + logger.warning(f"Error sending push-notification to URL {url}: {e}") +``` + +## Revisão das Rotas A2A + +As rotas A2A serão implementadas em `src/api/a2a_routes.py`, utilizando a lógica de AgentCard do código existente: + +```python +@router.post("/{agent_id}") +async def process_a2a_request( + agent_id: str, + request: Request, + x_api_key: str = Header(None, alias="x-api-key"), + db: Session = Depends(get_db), +): + """ + Endpoint que processa requisições JSON-RPC do protocolo A2A. + """ + # Validar agente e API key + agent = get_agent(db, agent_id) + if not agent: + return JSONResponse( + status_code=404, + content={"detail": "Agente não encontrado"} + ) + + if agent.config.get("api_key") != x_api_key: + return JSONResponse( + status_code=401, + content={"detail": "Chave API inválida"} + ) + + # Criar Agent Card para o agente (reutilizando lógica existente) + agent_card = create_agent_card_from_agent(agent, db) + + # Configurar o servidor A2A para este agente + a2a_server.agent_card = agent_card + + # Processar a requisição A2A + return await a2a_server._process_request(request) +``` + +## Arquivos a Serem Criados/Atualizados + +### Novos Arquivos + +1. `src/schemas/a2a.py` - Modelos Pydantic para o protocolo A2A +2. `src/services/redis_cache_service.py` - Serviço de cache Redis +3. `src/config/redis.py` - Configuração do cliente Redis +4. `src/utils/a2a_utils.py` - Utilitários para o protocolo A2A +5. `src/services/a2a_task_manager_service.py` - Gerenciador de tarefas A2A +6. `src/services/a2a_server_service.py` - Servidor A2A +7. `src/services/push_notification_auth_service.py` - Autenticação para push notifications +8. `src/api/a2a_routes.py` - Rotas para o protocolo A2A + +### Arquivos a Serem Atualizados + +1. `src/main.py` - Registrar novas rotas A2A +2. `pyproject.toml` - Adicionar dependências (Redis, jwcrypto, etc.) + +## Plano de Implementação + +### Fase 1: Criação dos Esquemas + +1. Criar arquivo `src/schemas/a2a.py` com os modelos Pydantic baseados no arquivo `common/types.py` +2. Adaptar os tipos para a estrutura do projeto e adicionar suporte para streaming e push notifications + +### Fase 2: Implementação do Serviço de Cache Redis + +1. Criar arquivo `src/config/redis.py` para configuração do cliente Redis +2. Criar arquivo `src/services/redis_cache_service.py` para gerenciamento de cache + +### Fase 3: Implementação de Utilitários + +1. Criar arquivo `src/utils/a2a_utils.py` com funções utilitárias baseadas em `common/server/utils.py` +2. Adaptar o `PushNotificationAuth` para uso no contexto A2A + +### Fase 4: Implementação do Gerenciador de Tarefas + +1. Criar arquivo `src/services/a2a_task_manager_service.py` com a implementação do `A2ATaskManager` +2. Integrar com serviços existentes: + - agent_runner.py para execução de agentes + - streaming_service.py para streaming SSE + - push_notification_service.py para push notifications + - redis_cache_service.py para cache de tarefas + +### Fase 5: Implementação do Servidor A2A + +1. Criar arquivo `src/services/a2a_server_service.py` com a implementação do `A2AServer` +2. Implementar processamento de requisições JSON-RPC para todas as operações A2A + +### Fase 6: Integração + +1. Criar arquivo `src/api/a2a_routes.py` com rotas para o protocolo A2A +2. Registrar as rotas no aplicativo FastAPI principal +3. Assegurar que todas as operações A2A funcionem corretamente, incluindo streaming e push notifications + +## Adaptações Necessárias + +1. **Esquemas**: Adaptar os modelos do protocolo A2A para usar os esquemas Pydantic existentes quando possível +2. **Autenticação**: Integrar com o sistema de autenticação existente usando API keys +3. **Streaming**: Adaptar o `StreamingService` existente para o formato de eventos A2A +4. **Push Notifications**: Integrar o `PushNotificationService` existente e adicionar suporte a autenticação JWT +5. **Cache**: Utilizar Redis para armazenamento temporário de tarefas e eventos +6. **Execução de Agentes**: Reutilizar o serviço existente `agent_runner.py` para execução + +## Próximos Passos + +1. Configurar dependências do Redis no projeto +2. Implementar os esquemas em `src/schemas/a2a.py` +3. Implementar o serviço de cache Redis +4. Implementar as funções utilitárias em `src/utils/a2a_utils.py` +5. Implementar o gerenciador de tarefas em `src/services/a2a_task_manager_service.py` +6. Implementar o servidor A2A em `src/services/a2a_server_service.py` +7. Implementar as rotas em `src/api/a2a_routes.py` +8. Registrar as rotas no aplicativo principal `src/main.py` +9. Testar a implementação com casos de uso completos, incluindo streaming e push notifications diff --git a/docs/A2A b/docs/A2A new file mode 160000 index 00000000..502a4d0f --- /dev/null +++ b/docs/A2A @@ -0,0 +1 @@ +Subproject commit 502a4d0fdd73bb0b8afb8163907c601e08f40525