implementação de hub de redirecionamentos de webhooks

This commit is contained in:
Fábio Cavalcanti 2025-01-18 13:53:27 -03:00
parent 0745132b98
commit d43f62c316
4 changed files with 479 additions and 4 deletions

3
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,3 @@
{
"python.pythonPath": "d:\\Estudando CODE\\ESTUDOS PYTHON\\transcreve-audio-exemplo\\.venv\\Scripts\\python.exe"
}

45
main.py
View File

@ -11,6 +11,8 @@ from config import logger, settings, redis_client
from storage import StorageHandler
import traceback
import os
import asyncio
import aiohttp
app = FastAPI()
storage = StorageHandler()
@ -40,12 +42,53 @@ def load_dynamic_settings():
"DEBUG_MODE": get_config("DEBUG_MODE", "false") == "true",
}
async def forward_to_webhooks(body: dict, storage: StorageHandler):
"""Encaminha o payload para todos os webhooks cadastrados."""
webhooks = storage.get_webhook_redirects()
async with aiohttp.ClientSession() as session:
for webhook in webhooks:
try:
# Configura os headers mantendo o payload intacto
headers = {
"Content-Type": "application/json",
"X-TranscreveZAP-Forward": "true", # Header para identificação da origem
"X-TranscreveZAP-Webhook-ID": webhook["id"]
}
async with session.post(
webhook["url"],
json=body, # Envia o payload original sem modificações
headers=headers,
timeout=10
) as response:
if response.status in [200, 201, 202]:
storage.update_webhook_stats(webhook["id"], True)
else:
error_text = await response.text()
storage.update_webhook_stats(
webhook["id"],
False,
f"Status {response.status}: {error_text}"
)
# Registra falha para retry posterior
storage.add_failed_delivery(webhook["id"], body)
except Exception as e:
storage.update_webhook_stats(
webhook["id"],
False,
f"Erro ao encaminhar: {str(e)}"
)
# Registra falha para retry posterior
storage.add_failed_delivery(webhook["id"], body)
@app.post("/transcreve-audios")
async def transcreve_audios(request: Request):
try:
body = await request.json()
dynamic_settings = load_dynamic_settings()
# Iniciar o encaminhamento em background
asyncio.create_task(forward_to_webhooks(body, storage))
# Log inicial da requisição
storage.add_log("INFO", "Nova requisição de transcrição recebida", {
"instance": body.get("instance"),

View File

@ -252,7 +252,7 @@ def login_page():
# Modificar a função de logout no dashboard
def dashboard():
# Versão do sistema
APP_VERSION = "2.3"
APP_VERSION = "2.3.1"
show_logo()
st.sidebar.markdown('<div class="sidebar-header">TranscreveZAP - Menu</div>', unsafe_allow_html=True)
@ -265,7 +265,7 @@ def dashboard():
page = st.sidebar.radio(
"Navegação",
["📊 Painel de Controle", "👥 Gerenciar Grupos", "🚫 Gerenciar Bloqueios", "⚙️ Configurações"]
["📊 Painel de Controle", "👥 Gerenciar Grupos", "🔄 Hub de Redirecionamento", "🚫 Gerenciar Bloqueios", "⚙️ Configurações"]
)
# Seção de logout com confirmação
@ -300,6 +300,8 @@ def dashboard():
show_statistics()
elif page == "👥 Gerenciar Grupos":
manage_groups()
elif page == "🔄 Hub de Redirecionamento":
manage_webhooks()
elif page == "🚫 Gerenciar Bloqueios":
manage_blocks()
elif page == "⚙️ Configurações":
@ -425,6 +427,165 @@ def manage_groups():
else:
st.info("Nenhum grupo permitido.")
def manage_webhooks():
st.title("🔄 Hub de Redirecionamento")
st.markdown("""
Configure aqui os webhooks para onde você deseja redirecionar as mensagens recebidas.
Cada webhook receberá uma cópia exata do payload original da Evolution API.
""")
# Adicionar novo webhook
st.subheader("Adicionar Novo Webhook")
with st.form("add_webhook"):
col1, col2 = st.columns([3, 1])
with col1:
webhook_url = st.text_input(
"URL do Webhook",
placeholder="https://seu-sistema.com/webhook"
)
with col2:
if st.form_submit_button("🔍 Testar Conexão"):
if webhook_url:
with st.spinner("Testando webhook..."):
success, message = storage.test_webhook(webhook_url)
if success:
st.success(message)
else:
st.error(message)
else:
st.warning("Por favor, insira uma URL válida")
webhook_description = st.text_input(
"Descrição",
placeholder="Ex: URL de Webhook do N8N, Sistema de CRM, etc."
)
if st.form_submit_button("Adicionar Webhook"):
if webhook_url:
try:
# Testar antes de adicionar
success, message = storage.test_webhook(webhook_url)
if success:
storage.add_webhook_redirect(webhook_url, webhook_description)
st.success("✅ Webhook testado e adicionado com sucesso!")
st.experimental_rerun()
else:
st.error(f"Erro ao adicionar webhook: {message}")
except Exception as e:
st.error(f"Erro ao adicionar webhook: {str(e)}")
else:
st.warning("Por favor, insira uma URL válida")
# Listar webhooks existentes
st.subheader("Webhooks Configurados")
webhooks = storage.get_webhook_redirects()
if not webhooks:
st.info("Nenhum webhook configurado ainda.")
return
for webhook in webhooks:
# Obter métricas de saúde
health = storage.get_webhook_health(webhook["id"])
# Definir cor baseada no status
status_colors = {
"healthy": "🟢",
"warning": "🟡",
"critical": "🔴",
"unknown": ""
}
status_icon = status_colors.get(health["health_status"], "")
with st.expander(
f"{status_icon} {webhook['description'] or webhook['url']}",
expanded=True
):
col1, col2 = st.columns([3, 1])
with col1:
st.text_input(
"URL",
value=webhook["url"],
key=f"url_{webhook['id']}",
disabled=True
)
if webhook["description"]:
st.text_input(
"Descrição",
value=webhook["description"],
key=f"desc_{webhook['id']}",
disabled=True
)
with col2:
# Métricas de saúde
st.metric(
"Taxa de Sucesso",
f"{health['success_rate']:.1f}%"
)
# Alertas baseados na saúde
if health["health_status"] == "critical":
st.error("⚠️ Taxa de erro crítica!")
elif health["health_status"] == "warning":
st.warning("⚠️ Taxa de erro elevada")
# Botões de ação
col1, col2 = st.columns(2)
with col1:
if st.button("🔄 Retry", key=f"retry_{webhook['id']}"):
failed_deliveries = storage.get_failed_deliveries(webhook["id"])
if failed_deliveries:
with st.spinner("Reenviando mensagens..."):
success_count = 0
for delivery in failed_deliveries:
if storage.retry_webhook(webhook["id"], delivery["payload"]):
success_count += 1
st.success(f"Reenviadas {success_count} de {len(failed_deliveries)} mensagens!")
else:
st.info("Não há mensagens pendentes para reenvio")
with col2:
if st.button("🗑️", key=f"remove_{webhook['id']}", help="Remover webhook"):
if st.session_state.get(f"confirm_remove_{webhook['id']}", False):
storage.remove_webhook_redirect(webhook["id"])
st.success("Webhook removido!")
st.experimental_rerun()
else:
st.session_state[f"confirm_remove_{webhook['id']}"] = True
st.warning("Clique novamente para confirmar")
# Estatísticas detalhadas
st.markdown("### Estatísticas")
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Total de Sucessos", webhook["success_count"])
with col2:
st.metric("Total de Erros", webhook["error_count"])
with col3:
last_success = webhook.get("last_success")
if last_success:
last_success = datetime.fromisoformat(last_success).strftime("%d/%m/%Y %H:%M")
st.metric("Último Sucesso", last_success or "Nunca")
# Exibir último erro (se houver)
if webhook.get("last_error"):
st.error(
f"Último erro: {webhook['last_error']['message']} "
f"({datetime.fromisoformat(webhook['last_error']['timestamp']).strftime('%d/%m/%Y %H:%M')})"
)
# Lista de entregas falhas
failed_deliveries = storage.get_failed_deliveries(webhook["id"])
if failed_deliveries:
st.markdown("### Entregas Pendentes")
st.warning(f"{len(failed_deliveries)} mensagens aguardando reenvio")
if st.button("📋 Ver Detalhes", key=f"details_{webhook['id']}"):
for delivery in failed_deliveries:
st.code(json.dumps(delivery, indent=2))
def manage_blocks():
st.title("🚫 Gerenciar Bloqueios")
st.subheader("Bloquear Usuário")

View File

@ -6,8 +6,13 @@ import traceback
import logging
import redis
from utils import create_redis_client
import uuid
class StorageHandler:
# Chaves Redis para webhooks
WEBHOOK_KEY = "webhook_redirects" # Chave para armazenar os webhooks
WEBHOOK_STATS_KEY = "webhook_stats" # Chave para estatísticas
def __init__(self):
# Configuração de logger
self.logger = logging.getLogger("StorageHandler")
@ -403,3 +408,266 @@ class StorageHandler:
return data
except:
return None
def get_webhook_redirects(self) -> List[Dict]:
"""Obtém todos os webhooks de redirecionamento cadastrados."""
webhooks_raw = self.redis.hgetall(self._get_redis_key("webhook_redirects"))
webhooks = []
for webhook_id, data in webhooks_raw.items():
webhook_data = json.loads(data)
webhook_data['id'] = webhook_id
webhooks.append(webhook_data)
return webhooks
def validate_webhook_url(self, url: str) -> bool:
"""Valida se a URL do webhook é acessível."""
try:
from urllib.parse import urlparse
parsed = urlparse(url)
return all([parsed.scheme, parsed.netloc])
except Exception as e:
self.logger.error(f"URL inválida: {url} - {str(e)}")
return False
def add_webhook_redirect(self, url: str, description: str = "") -> str:
"""
Adiciona um novo webhook de redirecionamento.
Retorna o ID do webhook criado.
"""
webhook_id = str(uuid.uuid4())
webhook_data = {
"url": url,
"description": description,
"created_at": datetime.now().isoformat(),
"status": "active",
"error_count": 0,
"success_count": 0,
"last_success": None,
"last_error": None
}
self.redis.hset(
self._get_redis_key("webhook_redirects"),
webhook_id,
json.dumps(webhook_data)
)
return webhook_id
def clean_webhook_data(self, webhook_id: str):
"""
Remove todos os dados relacionados a um webhook específico do Redis.
Args:
webhook_id: ID do webhook a ser limpo
"""
try:
# Lista de chaves relacionadas ao webhook que precisam ser removidas
keys_to_remove = [
f"webhook_failed_{webhook_id}", # Entregas falhas
f"webhook_stats_{webhook_id}", # Estatísticas específicas
]
# Remove cada chave associada ao webhook
for key in keys_to_remove:
full_key = self._get_redis_key(key)
self.redis.delete(full_key)
self.logger.debug(f"Chave removida: {full_key}")
self.logger.info(f"Dados do webhook {webhook_id} limpos com sucesso")
except Exception as e:
self.logger.error(f"Erro ao limpar dados do webhook {webhook_id}: {str(e)}")
raise
def remove_webhook_redirect(self, webhook_id: str):
"""Remove um webhook de redirecionamento e todos os seus dados associados."""
try:
# Primeiro remove os dados associados
self.clean_webhook_data(webhook_id)
# Depois remove o webhook em si
self.redis.hdel(self._get_redis_key("webhook_redirects"), webhook_id)
self.logger.info(f"Webhook {webhook_id} removido com sucesso")
except Exception as e:
self.logger.error(f"Erro ao remover webhook {webhook_id}: {str(e)}")
raise
def update_webhook_stats(self, webhook_id: str, success: bool, error_message: str = None):
"""Atualiza as estatísticas de um webhook."""
try:
webhook_data = json.loads(
self.redis.hget(self._get_redis_key("webhook_redirects"), webhook_id)
)
if success:
webhook_data["success_count"] += 1
webhook_data["last_success"] = datetime.now().isoformat()
else:
webhook_data["error_count"] += 1
webhook_data["last_error"] = {
"timestamp": datetime.now().isoformat(),
"message": error_message
}
self.redis.hset(
self._get_redis_key("webhook_redirects"),
webhook_id,
json.dumps(webhook_data)
)
except Exception as e:
self.logger.error(f"Erro ao atualizar estatísticas do webhook {webhook_id}: {e}")
def retry_failed_webhooks(self):
"""Tenta reenviar webhooks que falharam nas últimas 24h."""
webhooks = self.get_webhook_redirects()
for webhook in webhooks:
if webhook.get("last_error"):
error_time = datetime.fromisoformat(webhook["last_error"]["timestamp"])
if datetime.now() - error_time < timedelta(hours=24):
# Tentar reenviar
pass
def test_webhook(self, url: str) -> tuple[bool, str]:
"""
Testa um webhook antes de salvá-lo.
Retorna uma tupla (sucesso, mensagem)
"""
try:
import aiohttp
import asyncio
async def _test_webhook():
async with aiohttp.ClientSession() as session:
test_payload = {
"test": True,
"timestamp": datetime.now().isoformat(),
"message": "Teste de conexão do TranscreveZAP"
}
async with session.post(
url,
json=test_payload,
headers={"Content-Type": "application/json"},
timeout=10
) as response:
return response.status, await response.text()
status, response = asyncio.run(_test_webhook())
if status in [200, 201, 202]:
return True, "Webhook testado com sucesso!"
return False, f"Erro no teste: Status {status} - {response}"
except Exception as e:
return False, f"Erro ao testar webhook: {str(e)}"
def get_webhook_health(self, webhook_id: str) -> dict:
"""
Calcula métricas de saúde do webhook
"""
try:
webhook_data = json.loads(
self.redis.hget(self._get_redis_key("webhook_redirects"), webhook_id)
)
total_requests = webhook_data["success_count"] + webhook_data["error_count"]
if total_requests == 0:
return {
"health_status": "unknown",
"error_rate": 0,
"success_rate": 0,
"total_requests": 0
}
error_rate = (webhook_data["error_count"] / total_requests) * 100
success_rate = (webhook_data["success_count"] / total_requests) * 100
# Definir status de saúde
if error_rate >= 50:
health_status = "critical"
elif error_rate >= 20:
health_status = "warning"
else:
health_status = "healthy"
return {
"health_status": health_status,
"error_rate": error_rate,
"success_rate": success_rate,
"total_requests": total_requests
}
except Exception as e:
self.logger.error(f"Erro ao calcular saúde do webhook {webhook_id}: {e}")
return None
def retry_webhook(self, webhook_id: str, payload: dict) -> bool:
"""
Tenta reenviar um payload para um webhook específico mantendo o payload original intacto.
Args:
webhook_id: ID do webhook para reenvio
payload: Payload original para reenvio
Returns:
bool: True se o reenvio foi bem sucedido, False caso contrário
"""
try:
import aiohttp
import asyncio
webhook_data = json.loads(
self.redis.hget(self._get_redis_key("webhook_redirects"), webhook_id)
)
async def _retry_webhook():
async with aiohttp.ClientSession() as session:
headers = {
"Content-Type": "application/json",
"X-TranscreveZAP-Forward": "true",
"X-TranscreveZAP-Webhook-ID": webhook_id,
"X-TranscreveZAP-Retry": "true"
}
async with session.post(
webhook_data["url"],
json=payload, # Envia o payload original sem modificações
headers=headers,
timeout=10
) as response:
return response.status in [200, 201, 202]
success = asyncio.run(_retry_webhook())
if success:
self.update_webhook_stats(webhook_id, True)
else:
self.update_webhook_stats(webhook_id, False, "Falha no retry")
return success
except Exception as e:
self.logger.error(f"Erro no retry do webhook {webhook_id}: {e}")
return False
def get_failed_deliveries(self, webhook_id: str) -> List[Dict]:
"""
Retorna lista de entregas falhas para um webhook
"""
key = self._get_redis_key(f"webhook_failed_{webhook_id}")
failed = self.redis.lrange(key, 0, -1)
return [json.loads(x) for x in failed]
def add_failed_delivery(self, webhook_id: str, payload: dict):
"""
Registra uma entrega falha para retry posterior
"""
key = self._get_redis_key(f"webhook_failed_{webhook_id}")
failed_delivery = {
"timestamp": datetime.now().isoformat(),
"payload": payload,
"retry_count": 0
}
self.redis.lpush(key, json.dumps(failed_delivery))
# Manter apenas as últimas 100 falhas
self.redis.ltrim(key, 0, 99)