From d43f62c316710b1be222b4caa71dd03db57b7ee8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?F=C3=A1bio=20Cavalcanti?= Date: Sat, 18 Jan 2025 13:53:27 -0300 Subject: [PATCH] =?UTF-8?q?implementa=C3=A7=C3=A3o=20de=20hub=20de=20redir?= =?UTF-8?q?ecionamentos=20de=20webhooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .vscode/settings.json | 3 + main.py | 45 ++++++- manager.py | 165 +++++++++++++++++++++++++- storage.py | 270 +++++++++++++++++++++++++++++++++++++++++- 4 files changed, 479 insertions(+), 4 deletions(-) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..7c743f0 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "python.pythonPath": "d:\\Estudando CODE\\ESTUDOS PYTHON\\transcreve-audio-exemplo\\.venv\\Scripts\\python.exe" +} \ No newline at end of file diff --git a/main.py b/main.py index 919cc06..7fa930d 100644 --- a/main.py +++ b/main.py @@ -11,6 +11,8 @@ from config import logger, settings, redis_client from storage import StorageHandler import traceback import os +import asyncio +import aiohttp app = FastAPI() storage = StorageHandler() @@ -40,12 +42,53 @@ def load_dynamic_settings(): "DEBUG_MODE": get_config("DEBUG_MODE", "false") == "true", } +async def forward_to_webhooks(body: dict, storage: StorageHandler): + """Encaminha o payload para todos os webhooks cadastrados.""" + webhooks = storage.get_webhook_redirects() + + async with aiohttp.ClientSession() as session: + for webhook in webhooks: + try: + # Configura os headers mantendo o payload intacto + headers = { + "Content-Type": "application/json", + "X-TranscreveZAP-Forward": "true", # Header para identificação da origem + "X-TranscreveZAP-Webhook-ID": webhook["id"] + } + + async with session.post( + webhook["url"], + json=body, # Envia o payload original sem modificações + headers=headers, + timeout=10 + ) as response: + if response.status in [200, 201, 202]: + storage.update_webhook_stats(webhook["id"], True) + else: + error_text = await response.text() + storage.update_webhook_stats( + webhook["id"], + False, + f"Status {response.status}: {error_text}" + ) + # Registra falha para retry posterior + storage.add_failed_delivery(webhook["id"], body) + except Exception as e: + storage.update_webhook_stats( + webhook["id"], + False, + f"Erro ao encaminhar: {str(e)}" + ) + # Registra falha para retry posterior + storage.add_failed_delivery(webhook["id"], body) + @app.post("/transcreve-audios") async def transcreve_audios(request: Request): try: body = await request.json() dynamic_settings = load_dynamic_settings() - + # Iniciar o encaminhamento em background + asyncio.create_task(forward_to_webhooks(body, storage)) # Log inicial da requisição storage.add_log("INFO", "Nova requisição de transcrição recebida", { "instance": body.get("instance"), diff --git a/manager.py b/manager.py index 6054224..0ea62a9 100644 --- a/manager.py +++ b/manager.py @@ -252,7 +252,7 @@ def login_page(): # Modificar a função de logout no dashboard def dashboard(): # Versão do sistema - APP_VERSION = "2.3" + APP_VERSION = "2.3.1" show_logo() st.sidebar.markdown('', unsafe_allow_html=True) @@ -265,7 +265,7 @@ def dashboard(): page = st.sidebar.radio( "Navegação", - ["📊 Painel de Controle", "👥 Gerenciar Grupos", "🚫 Gerenciar Bloqueios", "⚙️ Configurações"] + ["📊 Painel de Controle", "👥 Gerenciar Grupos", "🔄 Hub de Redirecionamento", "🚫 Gerenciar Bloqueios", "⚙️ Configurações"] ) # Seção de logout com confirmação @@ -300,6 +300,8 @@ def dashboard(): show_statistics() elif page == "👥 Gerenciar Grupos": manage_groups() + elif page == "🔄 Hub de Redirecionamento": + manage_webhooks() elif page == "🚫 Gerenciar Bloqueios": manage_blocks() elif page == "⚙️ Configurações": @@ -425,6 +427,165 @@ def manage_groups(): else: st.info("Nenhum grupo permitido.") +def manage_webhooks(): + st.title("🔄 Hub de Redirecionamento") + st.markdown(""" + Configure aqui os webhooks para onde você deseja redirecionar as mensagens recebidas. + Cada webhook receberá uma cópia exata do payload original da Evolution API. + """) + + # Adicionar novo webhook + st.subheader("Adicionar Novo Webhook") + with st.form("add_webhook"): + col1, col2 = st.columns([3, 1]) + with col1: + webhook_url = st.text_input( + "URL do Webhook", + placeholder="https://seu-sistema.com/webhook" + ) + with col2: + if st.form_submit_button("🔍 Testar Conexão"): + if webhook_url: + with st.spinner("Testando webhook..."): + success, message = storage.test_webhook(webhook_url) + if success: + st.success(message) + else: + st.error(message) + else: + st.warning("Por favor, insira uma URL válida") + + webhook_description = st.text_input( + "Descrição", + placeholder="Ex: URL de Webhook do N8N, Sistema de CRM, etc." + ) + + if st.form_submit_button("Adicionar Webhook"): + if webhook_url: + try: + # Testar antes de adicionar + success, message = storage.test_webhook(webhook_url) + if success: + storage.add_webhook_redirect(webhook_url, webhook_description) + st.success("✅ Webhook testado e adicionado com sucesso!") + st.experimental_rerun() + else: + st.error(f"Erro ao adicionar webhook: {message}") + except Exception as e: + st.error(f"Erro ao adicionar webhook: {str(e)}") + else: + st.warning("Por favor, insira uma URL válida") + + # Listar webhooks existentes + st.subheader("Webhooks Configurados") + webhooks = storage.get_webhook_redirects() + + if not webhooks: + st.info("Nenhum webhook configurado ainda.") + return + + for webhook in webhooks: + # Obter métricas de saúde + health = storage.get_webhook_health(webhook["id"]) + + # Definir cor baseada no status + status_colors = { + "healthy": "🟢", + "warning": "🟡", + "critical": "🔴", + "unknown": "⚪" + } + + status_icon = status_colors.get(health["health_status"], "⚪") + + with st.expander( + f"{status_icon} {webhook['description'] or webhook['url']}", + expanded=True + ): + col1, col2 = st.columns([3, 1]) + + with col1: + st.text_input( + "URL", + value=webhook["url"], + key=f"url_{webhook['id']}", + disabled=True + ) + if webhook["description"]: + st.text_input( + "Descrição", + value=webhook["description"], + key=f"desc_{webhook['id']}", + disabled=True + ) + + with col2: + # Métricas de saúde + st.metric( + "Taxa de Sucesso", + f"{health['success_rate']:.1f}%" + ) + + # Alertas baseados na saúde + if health["health_status"] == "critical": + st.error("⚠️ Taxa de erro crítica!") + elif health["health_status"] == "warning": + st.warning("⚠️ Taxa de erro elevada") + + # Botões de ação + col1, col2 = st.columns(2) + with col1: + if st.button("🔄 Retry", key=f"retry_{webhook['id']}"): + failed_deliveries = storage.get_failed_deliveries(webhook["id"]) + if failed_deliveries: + with st.spinner("Reenviando mensagens..."): + success_count = 0 + for delivery in failed_deliveries: + if storage.retry_webhook(webhook["id"], delivery["payload"]): + success_count += 1 + st.success(f"Reenviadas {success_count} de {len(failed_deliveries)} mensagens!") + else: + st.info("Não há mensagens pendentes para reenvio") + + with col2: + if st.button("🗑️", key=f"remove_{webhook['id']}", help="Remover webhook"): + if st.session_state.get(f"confirm_remove_{webhook['id']}", False): + storage.remove_webhook_redirect(webhook["id"]) + st.success("Webhook removido!") + st.experimental_rerun() + else: + st.session_state[f"confirm_remove_{webhook['id']}"] = True + st.warning("Clique novamente para confirmar") + + # Estatísticas detalhadas + st.markdown("### Estatísticas") + col1, col2, col3 = st.columns(3) + with col1: + st.metric("Total de Sucessos", webhook["success_count"]) + with col2: + st.metric("Total de Erros", webhook["error_count"]) + with col3: + last_success = webhook.get("last_success") + if last_success: + last_success = datetime.fromisoformat(last_success).strftime("%d/%m/%Y %H:%M") + st.metric("Último Sucesso", last_success or "Nunca") + + # Exibir último erro (se houver) + if webhook.get("last_error"): + st.error( + f"Último erro: {webhook['last_error']['message']} " + f"({datetime.fromisoformat(webhook['last_error']['timestamp']).strftime('%d/%m/%Y %H:%M')})" + ) + + # Lista de entregas falhas + failed_deliveries = storage.get_failed_deliveries(webhook["id"]) + if failed_deliveries: + st.markdown("### Entregas Pendentes") + st.warning(f"{len(failed_deliveries)} mensagens aguardando reenvio") + if st.button("📋 Ver Detalhes", key=f"details_{webhook['id']}"): + for delivery in failed_deliveries: + st.code(json.dumps(delivery, indent=2)) + def manage_blocks(): st.title("🚫 Gerenciar Bloqueios") st.subheader("Bloquear Usuário") diff --git a/storage.py b/storage.py index 9486686..b91f48c 100644 --- a/storage.py +++ b/storage.py @@ -6,8 +6,13 @@ import traceback import logging import redis from utils import create_redis_client +import uuid class StorageHandler: + # Chaves Redis para webhooks + WEBHOOK_KEY = "webhook_redirects" # Chave para armazenar os webhooks + WEBHOOK_STATS_KEY = "webhook_stats" # Chave para estatísticas + def __init__(self): # Configuração de logger self.logger = logging.getLogger("StorageHandler") @@ -402,4 +407,267 @@ class StorageHandler: return None return data except: - return None \ No newline at end of file + return None + + def get_webhook_redirects(self) -> List[Dict]: + """Obtém todos os webhooks de redirecionamento cadastrados.""" + webhooks_raw = self.redis.hgetall(self._get_redis_key("webhook_redirects")) + webhooks = [] + + for webhook_id, data in webhooks_raw.items(): + webhook_data = json.loads(data) + webhook_data['id'] = webhook_id + webhooks.append(webhook_data) + + return webhooks + + def validate_webhook_url(self, url: str) -> bool: + """Valida se a URL do webhook é acessível.""" + try: + from urllib.parse import urlparse + parsed = urlparse(url) + return all([parsed.scheme, parsed.netloc]) + except Exception as e: + self.logger.error(f"URL inválida: {url} - {str(e)}") + return False + + def add_webhook_redirect(self, url: str, description: str = "") -> str: + """ + Adiciona um novo webhook de redirecionamento. + Retorna o ID do webhook criado. + """ + webhook_id = str(uuid.uuid4()) + webhook_data = { + "url": url, + "description": description, + "created_at": datetime.now().isoformat(), + "status": "active", + "error_count": 0, + "success_count": 0, + "last_success": None, + "last_error": None + } + + self.redis.hset( + self._get_redis_key("webhook_redirects"), + webhook_id, + json.dumps(webhook_data) + ) + return webhook_id + + def clean_webhook_data(self, webhook_id: str): + """ + Remove todos os dados relacionados a um webhook específico do Redis. + + Args: + webhook_id: ID do webhook a ser limpo + """ + try: + # Lista de chaves relacionadas ao webhook que precisam ser removidas + keys_to_remove = [ + f"webhook_failed_{webhook_id}", # Entregas falhas + f"webhook_stats_{webhook_id}", # Estatísticas específicas + ] + + # Remove cada chave associada ao webhook + for key in keys_to_remove: + full_key = self._get_redis_key(key) + self.redis.delete(full_key) + self.logger.debug(f"Chave removida: {full_key}") + + self.logger.info(f"Dados do webhook {webhook_id} limpos com sucesso") + + except Exception as e: + self.logger.error(f"Erro ao limpar dados do webhook {webhook_id}: {str(e)}") + raise + + def remove_webhook_redirect(self, webhook_id: str): + """Remove um webhook de redirecionamento e todos os seus dados associados.""" + try: + # Primeiro remove os dados associados + self.clean_webhook_data(webhook_id) + + # Depois remove o webhook em si + self.redis.hdel(self._get_redis_key("webhook_redirects"), webhook_id) + self.logger.info(f"Webhook {webhook_id} removido com sucesso") + + except Exception as e: + self.logger.error(f"Erro ao remover webhook {webhook_id}: {str(e)}") + raise + + def update_webhook_stats(self, webhook_id: str, success: bool, error_message: str = None): + """Atualiza as estatísticas de um webhook.""" + try: + webhook_data = json.loads( + self.redis.hget(self._get_redis_key("webhook_redirects"), webhook_id) + ) + + if success: + webhook_data["success_count"] += 1 + webhook_data["last_success"] = datetime.now().isoformat() + else: + webhook_data["error_count"] += 1 + webhook_data["last_error"] = { + "timestamp": datetime.now().isoformat(), + "message": error_message + } + + self.redis.hset( + self._get_redis_key("webhook_redirects"), + webhook_id, + json.dumps(webhook_data) + ) + except Exception as e: + self.logger.error(f"Erro ao atualizar estatísticas do webhook {webhook_id}: {e}") + + def retry_failed_webhooks(self): + """Tenta reenviar webhooks que falharam nas últimas 24h.""" + webhooks = self.get_webhook_redirects() + for webhook in webhooks: + if webhook.get("last_error"): + error_time = datetime.fromisoformat(webhook["last_error"]["timestamp"]) + if datetime.now() - error_time < timedelta(hours=24): + # Tentar reenviar + pass + + def test_webhook(self, url: str) -> tuple[bool, str]: + """ + Testa um webhook antes de salvá-lo. + Retorna uma tupla (sucesso, mensagem) + """ + try: + import aiohttp + import asyncio + + async def _test_webhook(): + async with aiohttp.ClientSession() as session: + test_payload = { + "test": True, + "timestamp": datetime.now().isoformat(), + "message": "Teste de conexão do TranscreveZAP" + } + async with session.post( + url, + json=test_payload, + headers={"Content-Type": "application/json"}, + timeout=10 + ) as response: + return response.status, await response.text() + + status, response = asyncio.run(_test_webhook()) + if status in [200, 201, 202]: + return True, "Webhook testado com sucesso!" + return False, f"Erro no teste: Status {status} - {response}" + + except Exception as e: + return False, f"Erro ao testar webhook: {str(e)}" + + def get_webhook_health(self, webhook_id: str) -> dict: + """ + Calcula métricas de saúde do webhook + """ + try: + webhook_data = json.loads( + self.redis.hget(self._get_redis_key("webhook_redirects"), webhook_id) + ) + + total_requests = webhook_data["success_count"] + webhook_data["error_count"] + if total_requests == 0: + return { + "health_status": "unknown", + "error_rate": 0, + "success_rate": 0, + "total_requests": 0 + } + + error_rate = (webhook_data["error_count"] / total_requests) * 100 + success_rate = (webhook_data["success_count"] / total_requests) * 100 + + # Definir status de saúde + if error_rate >= 50: + health_status = "critical" + elif error_rate >= 20: + health_status = "warning" + else: + health_status = "healthy" + + return { + "health_status": health_status, + "error_rate": error_rate, + "success_rate": success_rate, + "total_requests": total_requests + } + + except Exception as e: + self.logger.error(f"Erro ao calcular saúde do webhook {webhook_id}: {e}") + return None + + def retry_webhook(self, webhook_id: str, payload: dict) -> bool: + """ + Tenta reenviar um payload para um webhook específico mantendo o payload original intacto. + + Args: + webhook_id: ID do webhook para reenvio + payload: Payload original para reenvio + + Returns: + bool: True se o reenvio foi bem sucedido, False caso contrário + """ + try: + import aiohttp + import asyncio + + webhook_data = json.loads( + self.redis.hget(self._get_redis_key("webhook_redirects"), webhook_id) + ) + + async def _retry_webhook(): + async with aiohttp.ClientSession() as session: + headers = { + "Content-Type": "application/json", + "X-TranscreveZAP-Forward": "true", + "X-TranscreveZAP-Webhook-ID": webhook_id, + "X-TranscreveZAP-Retry": "true" + } + + async with session.post( + webhook_data["url"], + json=payload, # Envia o payload original sem modificações + headers=headers, + timeout=10 + ) as response: + return response.status in [200, 201, 202] + + success = asyncio.run(_retry_webhook()) + if success: + self.update_webhook_stats(webhook_id, True) + else: + self.update_webhook_stats(webhook_id, False, "Falha no retry") + + return success + + except Exception as e: + self.logger.error(f"Erro no retry do webhook {webhook_id}: {e}") + return False + + def get_failed_deliveries(self, webhook_id: str) -> List[Dict]: + """ + Retorna lista de entregas falhas para um webhook + """ + key = self._get_redis_key(f"webhook_failed_{webhook_id}") + failed = self.redis.lrange(key, 0, -1) + return [json.loads(x) for x in failed] + + def add_failed_delivery(self, webhook_id: str, payload: dict): + """ + Registra uma entrega falha para retry posterior + """ + key = self._get_redis_key(f"webhook_failed_{webhook_id}") + failed_delivery = { + "timestamp": datetime.now().isoformat(), + "payload": payload, + "retry_count": 0 + } + self.redis.lpush(key, json.dumps(failed_delivery)) + # Manter apenas as últimas 100 falhas + self.redis.ltrim(key, 0, 99) \ No newline at end of file