Compare commits

...

30 Commits
2.1 ... main

Author SHA1 Message Date
Fábio Cavalcanti
c42476549e ajuste audio source download 2025-02-22 08:13:38 -03:00
Fábio Cavalcanti
e89787e715 ajuste readme 2025-01-24 22:48:06 -03:00
Fábio Cavalcanti
f63dcc40d1 adicionado sistema de seleção de provedor de llm para GROQ ou Open ai 2025-01-24 22:27:02 -03:00
Fábio Cavalcanti
af20510b2b corrigido sistem de validação de chaves groq 2025-01-24 21:06:13 -03:00
Fábio Cavalcanti
a25dc9c4e7 ajuste para verificar formdata e json na chamada groq 2025-01-23 15:54:21 -03:00
Fábio Cavalcanti
a4ba9d02bc ajuste identacao 2025-01-23 15:38:49 -03:00
Fábio Cavalcanti
4c7d346a3c ajuste identacao 2025-01-23 15:27:35 -03:00
Fábio Cavalcanti
be82707ccc melhoria no sistema de chaves groq 2025-01-23 15:19:39 -03:00
Fábio Cavalcanti
3cd75903fc hub do readme 2025-01-18 13:58:46 -03:00
Fábio Cavalcanti
d43f62c316 implementação de hub de redirecionamentos de webhooks 2025-01-18 13:53:27 -03:00
Fábio Cavalcanti
0745132b98 ajuste da conexão com redis para contemplar usuarios que usam senha no redis ou redis cloud 2025-01-17 19:06:53 -03:00
Fábio Cavalcanti
0b5ad96508 ajuste no storage para o db redis 2025-01-17 17:57:35 -03:00
Fábio Cavalcanti
22217802a2 ajuste de stack docker 2025-01-16 17:51:13 -03:00
Fábio Cavalcanti
facfcb4559 ajuste de docker compose referencia 2025-01-16 17:08:26 -03:00
Fábio Cavalcanti
7d8c91fbc9 definição do banco do redis opcional na variavel de ambiente do docker compose 2025-01-16 17:00:09 -03:00
Fábio Cavalcanti
6fd1ec33e9 ajuste readme 2025-01-10 10:59:37 -03:00
Fábio Cavalcanti
bac9469b05 ajuste docker standalone no readme 2025-01-10 10:27:10 -03:00
Fábio Cavalcanti
943d5be2c8 orientações do readme 2025-01-08 20:52:40 -03:00
Fábio Cavalcanti
ae74686c9b ajuste do logoff 2025-01-08 20:25:04 -03:00
Fábio Cavalcanti
2fadd723dc adicionado sessão de login persistente 2025-01-08 19:06:55 -03:00
Fábio Cavalcanti
2a296d759f correção de detecção automatica 2025-01-08 17:16:39 -03:00
Fábio Cavalcanti
f558542359 ajuste de função de tradução simultanea 2025-01-08 13:26:08 -03:00
Fábio Cavalcanti
6a9ba1f087 ajustes de tradução e fuso horario 2025-01-07 18:10:27 -03:00
Fábio Cavalcanti
b86c7ac764 adicionado funções de detalhamento de idiomas e comportamento de tradutor automatico 2025-01-07 15:44:59 -03:00
Fábio Cavalcanti
9a072aee22 corrigido modo de processamento 2025-01-07 13:50:24 -03:00
Fábio Cavalcanti
eeffecb091 adicionado configuração de modo de processamento de mensagens apenas em grupos ou grupos e privado 2025-01-07 13:35:44 -03:00
Fábio Cavalcanti
69cb3b1965 personalização de comportamento e mensagens de transcrição 2025-01-07 11:36:39 -03:00
Fábio Cavalcanti
ec65839beb correção na lógica de resumos 2025-01-07 10:15:56 -03:00
Fábio Cavalcanti
abc4c4298a romeno no readme 2024-12-19 18:08:40 -03:00
Fábio Cavalcanti
161251a403 adicionado Romeno as linguagens novas 2024-12-19 18:05:59 -03:00
14 changed files with 2576 additions and 369 deletions

View File

@ -1,12 +1,54 @@
#-----------------------------------------------
# Configurações do Servidor
#-----------------------------------------------
# Configurações do UVICORN
UVICORN_PORT=8005
UVICORN_HOST=0.0.0.0
UVICORN_RELOAD=true
UVICORN_WORKERS=1
# Domínios da Aplicação
API_DOMAIN=seu.dominio.com # Subdomínio para a API (ex: api.seudominio.com)
MANAGER_DOMAIN=manager.seu.dominio.com # Subdomínio para o Manager (ex: manager.seudominio.com)
# Debug e Logs
DEBUG_MODE=false
LOG_LEVEL=INFO
# Credenciais do Gerenciador
MANAGER_USER=admin
MANAGER_PASSWORD=impacteai2024
#-----------------------------------------------
# Credenciais de Acesso
#-----------------------------------------------
# Credenciais do Painel Administrativo
MANAGER_USER=seu_usuario_admin # Username para acessar o painel admin
MANAGER_PASSWORD=sua_senha_segura # Senha para acessar o painel admin
# Configurações do Servidor
FASTAPI_PORT=8005
STREAMLIT_PORT=8501
HOST=0.0.0.0
#-----------------------------------------------
# Configurações do Redis
#-----------------------------------------------
# Configurações Básicas
REDIS_HOST=redis-transcrevezap # Host do Redis (use redis-transcrevezap para docker-compose)
REDIS_PORT=6380 # Porta do Redis
REDIS_DB=0 # Número do banco de dados Redis
# Autenticação Redis (opcional)
REDIS_USERNAME= # Deixe em branco se não usar autenticação
REDIS_PASSWORD= # Deixe em branco se não usar autenticação
#-----------------------------------------------
# Configurações de Rede
#-----------------------------------------------
# Nome da Rede Docker Externa
NETWORK_NAME=sua_rede_externa # Nome da sua rede Docker externa
#-----------------------------------------------
# Configurações do Traefik (se estiver usando)
#-----------------------------------------------
# Certificados SSL
SSL_RESOLVER=letsencryptresolver # Resolvedor SSL do Traefik
SSL_ENTRYPOINT=websecure # Entrypoint SSL do Traefik
#-----------------------------------------------
# Portas da Aplicação
#-----------------------------------------------
API_PORT=8005 # Porta para a API FastAPI
MANAGER_PORT=8501 # Porta para o Streamlit Manager

3
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,3 @@
{
"python.pythonPath": "d:\\Estudando CODE\\ESTUDOS PYTHON\\transcreve-audio-exemplo\\.venv\\Scripts\\python.exe"
}

View File

@ -1,36 +1,46 @@
# Usar uma imagem oficial do Python como base
# Imagem base do Python 3.10-slim
FROM python:3.10-slim
# Instalar dependências do sistema, incluindo redis-tools
# Configuração básica de timezone
ENV TZ=America/Sao_Paulo
# Instalação de dependências mínimas necessárias
RUN apt-get update && apt-get install -y --no-install-recommends \
redis-tools \
&& apt-get clean && rm -rf /var/lib/apt/lists/*
tzdata \
dos2unix \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/* \
&& ln -snf /usr/share/zoneinfo/$TZ /etc/localtime \
&& echo $TZ > /etc/timezone
# Definir o diretório de trabalho
# Configuração do ambiente de trabalho
WORKDIR /app
# Copiar o arquivo requirements.txt e instalar dependências
# Instalação das dependências Python
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copiar todo o código da aplicação
# Copia dos arquivos da aplicação
COPY . .
# Garantir que o diretório static existe
RUN mkdir -p /app/static
# Preparação do diretório de estáticos
RUN mkdir -p /app/static && \
if [ -d "static" ]; then cp -r static/* /app/static/ 2>/dev/null || true; fi
# Copiar arquivos estáticos para o diretório apropriado
COPY static/ /app/static/
# Configuração do script de inicialização
RUN chmod +x start.sh && \
dos2unix start.sh && \
apt-get purge -y dos2unix && \
apt-get autoremove -y
# Garantir permissões de execução ao script inicial
COPY start.sh .
RUN chmod +x start.sh
# Converter possíveis caracteres de retorno de carro do Windows
RUN apt-get update && apt-get install -y dos2unix && dos2unix start.sh && apt-get remove -y dos2unix && apt-get autoremove -y && apt-get clean
# Expor as portas usadas pela aplicação
# Portas da aplicação
EXPOSE 8005 8501
# Definir o comando inicial
# Valores padrão para Redis
ENV REDIS_HOST=redis-transcrevezap \
REDIS_PORT=6380 \
REDIS_DB=0
# Comando de inicialização
CMD ["/bin/bash", "/app/start.sh"]

View File

@ -1,6 +1,7 @@
import logging
import redis
import os
from utils import create_redis_client
# Configuração de logging com cores e formatação melhorada
class ColoredFormatter(logging.Formatter):
@ -30,12 +31,7 @@ logger.addHandler(handler)
logger.setLevel(logging.INFO)
# Conexão com o Redis
redis_client = redis.Redis(
host=os.getenv('REDIS_HOST', 'localhost'),
port=int(os.getenv('REDIS_PORT', 6380)),
db=0,
decode_responses=True
)
redis_client = create_redis_client()
class Settings:
"""Classe para gerenciar configurações do sistema."""
@ -43,6 +39,8 @@ class Settings:
"""Inicializa as configurações."""
logger.debug("Carregando configurações do Redis...")
self.ACTIVE_LLM_PROVIDER = self.get_redis_value("ACTIVE_LLM_PROVIDER", "groq")
self.OPENAI_API_KEY = self.get_redis_value("OPENAI_API_KEY", "")
self.GROQ_API_KEY = self.get_redis_value("GROQ_API_KEY", "gsk_default_key")
self.BUSINESS_MESSAGE = self.get_redis_value("BUSINESS_MESSAGE", "*Impacte AI* Premium Services")
self.PROCESS_GROUP_MESSAGES = self.get_redis_value("PROCESS_GROUP_MESSAGES", "false").lower() == "true"

View File

@ -2,7 +2,7 @@ version: "3.7"
services:
tcaudio:
image: impacteai/transcrevezap:latest
image: impacteai/transcrevezap:dev
networks:
- sua_rede_externa # Substitua pelo nome da sua rede externa
ports:
@ -20,6 +20,10 @@ services:
- MANAGER_PASSWORD=sua_senha_segura # Defina Senha do Manager
- REDIS_HOST=redis-transcrevezap
- REDIS_PORT=6380 # Porta personalizada para o Redis do TranscreveZAP
- REDIS_DB=0 # Opcional: pode ser removida para usar o valor padrão
# Autenticação Redis (opcional - descomente se necessário, se estiver usando autenticação)
# - REDIS_USERNAME=${REDIS_USERNAME:-} # Nome do usuário definido no comando do Redis
# - REDIS_PASSWORD=${REDIS_PASSWORD:-} # Senha definida no comando do Redis (sem o '>')
depends_on:
- redis-transcrevezap
deploy:
@ -48,11 +52,30 @@ services:
redis-transcrevezap:
image: redis:6
# 1. Configuração SEM autenticação (padrão):
command: redis-server --port 6380 --appendonly yes
# 2. Configuração COM autenticação (descomente e ajuste se necessário):
# command: >
# redis-server
# --port 6380
# --appendonly yes
# --user seuusuario on '>minhasenha' '~*' '+@all'
# # Explicação dos parâmetros:
# # --user seuusuario: nome do usuário
# # on: indica início da configuração do usuário
# # '>minhasenha': senha do usuário (mantenha o '>')
# # '~*': permite acesso a todas as chaves
# # '+@all': concede todas as permissões
volumes:
- redis_transcrevezap_data:/data
networks:
- sua_rede_externa # Substitua pelo nome da sua rede externa
deploy:
mode: replicated
replicas: 1
placement:
constraints:
- node.role == manager
networks:
sua_rede_externa: # Substitua pelo nome da sua rede externa

111
groq_handler.py Normal file
View File

@ -0,0 +1,111 @@
import aiohttp
import json
from typing import Optional, Tuple, Any
from datetime import datetime
import logging
from storage import StorageHandler
import asyncio
logger = logging.getLogger("GROQHandler")
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
async def test_groq_key(key: str) -> bool:
"""Teste se uma chave GROQ é válida e está funcionando."""
url = "https://api.groq.com/openai/v1/models"
headers = {"Authorization": f"Bearer {key}"}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
if response.status == 200:
data = await response.json()
return bool(data.get("data"))
return False
except Exception as e:
logger.error(f"Erro ao testar chave GROQ: {e}")
return False
async def validate_transcription_response(response_text: str) -> bool:
"""Valide se a resposta da transcrição é significativa."""
try:
cleaned_text = response_text.strip()
return len(cleaned_text) >= 10
except Exception as e:
logger.error(f"Erro ao validar resposta da transcrição: {e}")
return False
async def get_working_groq_key(storage: StorageHandler) -> Optional[str]:
"""Obtenha uma chave GROQ funcional do pool disponível."""
keys = storage.get_groq_keys()
for _ in range(len(keys)):
key = storage.get_next_groq_key()
if not key:
continue
penalized_until = storage.get_penalized_until(key)
if penalized_until and penalized_until > datetime.utcnow():
continue
if await test_groq_key(key):
return key
else:
storage.penalize_key(key, penalty_duration=300)
storage.add_log("ERROR", "Nenhuma chave GROQ funcional disponível.")
return None
async def handle_groq_request(
url: str,
headers: dict,
data: Any,
storage: StorageHandler,
is_form_data: bool = False
) -> Tuple[bool, dict, str]:
"""Lida com requisições para a API GROQ com suporte a retries e rotação de chaves."""
max_retries = len(storage.get_groq_keys())
for attempt in range(max_retries):
try:
storage.add_log("DEBUG", "Iniciando tentativa de requisição para GROQ", {
"url": url,
"is_form_data": is_form_data,
"attempt": attempt + 1
})
async with aiohttp.ClientSession() as session:
if is_form_data:
async with session.post(url, headers=headers, data=data) as response:
response_data = await response.json()
if response.status == 200 and response_data.get("text"):
return True, response_data, ""
else:
async with session.post(url, headers=headers, json=data) as response:
response_data = await response.json()
if response.status == 200 and response_data.get("choices"):
return True, response_data, ""
error_msg = response_data.get("error", {}).get("message", "")
if "organization_restricted" in error_msg or "invalid_api_key" in error_msg:
new_key = await get_working_groq_key(storage)
if new_key:
headers["Authorization"] = f"Bearer {new_key}"
await asyncio.sleep(1)
continue
return False, response_data, error_msg
except Exception as e:
storage.add_log("ERROR", "Erro na requisição", {"error": str(e)})
if attempt < max_retries - 1:
await asyncio.sleep(1)
continue
return False, {}, f"Request failed: {str(e)}"
storage.add_log("ERROR", "Todas as chaves GROQ falharam.")
return False, {}, "All GROQ keys exhausted."

138
main.py
View File

@ -5,12 +5,15 @@ from services import (
send_message_to_whatsapp,
get_audio_base64,
summarize_text_if_needed,
download_remote_audio,
)
from models import WebhookRequest
from config import logger, settings, redis_client
from storage import StorageHandler
import traceback
import os
import asyncio
import aiohttp
app = FastAPI()
storage = StorageHandler()
@ -40,12 +43,53 @@ def load_dynamic_settings():
"DEBUG_MODE": get_config("DEBUG_MODE", "false") == "true",
}
async def forward_to_webhooks(body: dict, storage: StorageHandler):
"""Encaminha o payload para todos os webhooks cadastrados."""
webhooks = storage.get_webhook_redirects()
async with aiohttp.ClientSession() as session:
for webhook in webhooks:
try:
# Configura os headers mantendo o payload intacto
headers = {
"Content-Type": "application/json",
"X-TranscreveZAP-Forward": "true", # Header para identificação da origem
"X-TranscreveZAP-Webhook-ID": webhook["id"]
}
async with session.post(
webhook["url"],
json=body, # Envia o payload original sem modificações
headers=headers,
timeout=10
) as response:
if response.status in [200, 201, 202]:
storage.update_webhook_stats(webhook["id"], True)
else:
error_text = await response.text()
storage.update_webhook_stats(
webhook["id"],
False,
f"Status {response.status}: {error_text}"
)
# Registra falha para retry posterior
storage.add_failed_delivery(webhook["id"], body)
except Exception as e:
storage.update_webhook_stats(
webhook["id"],
False,
f"Erro ao encaminhar: {str(e)}"
)
# Registra falha para retry posterior
storage.add_failed_delivery(webhook["id"], body)
@app.post("/transcreve-audios")
async def transcreve_audios(request: Request):
try:
body = await request.json()
dynamic_settings = load_dynamic_settings()
# Iniciar o encaminhamento em background
asyncio.create_task(forward_to_webhooks(body, storage))
# Log inicial da requisição
storage.add_log("INFO", "Nova requisição de transcrição recebida", {
"instance": body.get("instance"),
@ -87,6 +131,18 @@ async def transcreve_audios(request: Request):
)
return {"message": "Mensagem não autorizada para processamento"}
# Verificação do modo de processamento (grupos/todos)
process_mode = storage.get_process_mode()
is_group = "@g.us" in remote_jid
if process_mode == "groups_only" and not is_group:
storage.add_log("INFO", "Mensagem ignorada - modo apenas grupos ativo", {
"remote_jid": remote_jid,
"process_mode": process_mode,
"is_group": is_group
})
return {"message": "Modo apenas grupos ativo - mensagens privadas ignoradas"}
if from_me and not dynamic_settings["PROCESS_SELF_MESSAGES"]:
storage.add_log("INFO", "Mensagem própria ignorada", {
"remote_jid": remote_jid
@ -96,33 +152,71 @@ async def transcreve_audios(request: Request):
# Obter áudio
try:
if "mediaUrl" in body["data"]["message"]:
audio_source = body["data"]["message"]["mediaUrl"]
storage.add_log("DEBUG", "Usando mediaUrl para áudio", {
"mediaUrl": audio_source
})
media_url = body["data"]["message"]["mediaUrl"]
storage.add_log("DEBUG", "Baixando áudio via URL", {"mediaUrl": media_url})
audio_source = await download_remote_audio(media_url) # Baixa o arquivo remoto e retorna o caminho local
else:
storage.add_log("DEBUG", "Obtendo áudio via base64")
base64_audio = await get_audio_base64(server_url, instance, apikey, audio_key)
audio_source = await convert_base64_to_file(base64_audio)
storage.add_log("DEBUG", "Áudio convertido", {
"source": audio_source
})
storage.add_log("DEBUG", "Áudio convertido", {"source": audio_source})
# Carregar configurações de formatação
output_mode = get_config("output_mode", "both")
summary_header = get_config("summary_header", "🤖 *Resumo do áudio:*")
transcription_header = get_config("transcription_header", "🔊 *Transcrição do áudio:*")
character_limit = int(get_config("character_limit", "500"))
# Verificar se timestamps estão habilitados
use_timestamps = get_config("use_timestamps", "false") == "true"
storage.add_log("DEBUG", "Informações da mensagem", {
"from_me": from_me,
"remote_jid": remote_jid,
"is_group": is_group
})
# Transcrever áudio
storage.add_log("INFO", "Iniciando transcrição")
transcription_text, _ = await transcribe_audio(audio_source)
# Resumir se necessário
summary_text = await summarize_text_if_needed(transcription_text)
# Formatar mensagem
summary_message = (
f"🤖 *Resumo do áudio:*\n\n"
f"{summary_text}\n\n"
f"🔊 *Transcrição do áudio:*\n\n"
f"{transcription_text}\n\n"
f"{dynamic_settings['BUSINESS_MESSAGE']}"
transcription_text, has_timestamps = await transcribe_audio(
audio_source,
apikey=apikey,
remote_jid=remote_jid,
from_me=from_me,
use_timestamps=use_timestamps
)
# Log do resultado
storage.add_log("INFO", "Transcrição concluída", {
"has_timestamps": has_timestamps,
"text_length": len(transcription_text),
"remote_jid": remote_jid
})
# Determinar se precisa de resumo baseado no modo de saída
summary_text = None
if output_mode in ["both", "summary_only"] or (
output_mode == "smart" and len(transcription_text) > character_limit
):
summary_text = await summarize_text_if_needed(transcription_text)
# Construir mensagem baseada no modo de saída
message_parts = []
if output_mode == "smart":
if len(transcription_text) > character_limit:
message_parts.append(f"{summary_header}\n\n{summary_text}")
else:
message_parts.append(f"{transcription_header}\n\n{transcription_text}")
else:
if output_mode in ["both", "summary_only"] and summary_text:
message_parts.append(f"{summary_header}\n\n{summary_text}")
if output_mode in ["both", "transcription_only"]:
message_parts.append(f"{transcription_header}\n\n{transcription_text}")
# Adicionar mensagem de negócio
message_parts.append(dynamic_settings['BUSINESS_MESSAGE'])
# Juntar todas as partes da mensagem
summary_message = "\n\n".join(message_parts)
# Enviar resposta
await send_message_to_whatsapp(
@ -138,8 +232,8 @@ async def transcreve_audios(request: Request):
storage.record_processing(remote_jid)
storage.add_log("INFO", "Áudio processado com sucesso", {
"remote_jid": remote_jid,
"transcription_length": len(transcription_text),
"summary_length": len(summary_text)
"transcription_length": len(transcription_text) if transcription_text else 0,
"summary_length": len(summary_text) if summary_text else 0 # Adiciona verificação
})
return {"message": "Áudio transcrito e resposta enviada com sucesso"}

File diff suppressed because it is too large Load Diff

74
openai_handler.py Normal file
View File

@ -0,0 +1,74 @@
import aiohttp
import json
from datetime import datetime
import logging
from storage import StorageHandler
logger = logging.getLogger("OpenAIHandler")
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
async def test_openai_key(key: str) -> bool:
"""Test if an OpenAI key is valid and working."""
url = "https://api.openai.com/v1/models"
headers = {"Authorization": f"Bearer {key}"}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
if response.status == 200:
data = await response.json()
return len(data.get("data", [])) > 0
return False
except Exception as e:
logger.error(f"Error testing OpenAI key: {e}")
return False
async def handle_openai_request(
url: str,
headers: dict,
data: any,
storage: StorageHandler,
is_form_data: bool = False
) -> tuple[bool, dict, str]:
"""Handle requests to OpenAI API with retries."""
max_retries = 3
for attempt in range(max_retries):
try:
async with aiohttp.ClientSession() as session:
if is_form_data:
async with session.post(url, headers=headers, data=data) as response:
response_data = await response.json()
if response.status == 200:
if is_form_data and response_data.get("text"):
return True, response_data, ""
elif not is_form_data and response_data.get("choices"):
return True, response_data, ""
else:
async with session.post(url, headers=headers, json=data) as response:
response_data = await response.json()
if response.status == 200 and response_data.get("choices"):
return True, response_data, ""
error_msg = response_data.get("error", {}).get("message", "")
if "invalid_api_key" in error_msg or "invalid authorization" in error_msg.lower():
logger.error(f"OpenAI API key invalid or expired")
return False, response_data, error_msg
if attempt < max_retries - 1:
continue
return False, response_data, error_msg
except Exception as e:
logger.error(f"Error in request: {str(e)}")
if attempt < max_retries - 1:
continue
return False, {}, f"Request failed: {str(e)}"
return False, {}, "All retries failed"

312
readme.md
View File

@ -1,9 +1,22 @@
# TranscreveZAP 2.0
## Transcrição e Resumo de Áudios no WhatsApp usando Python com interface em Streamlit
![ImpacteAI](./fluxo.png)
# TranscreveZAP 2.3- Plataforma de Gestão e Automação de Áudios do WhatsApp
Este projeto permite transcrever e resumir áudios enviados pelo WhatsApp usando inteligência artificial e integração com APIs. Ideal para automatizar o processamento de mensagens de áudio, oferecendo um resumo claro e prático.
### Sistema Inteligente de Transcrição, Resumo e Tradução Automática de Áudios para WhatsApp
*Desenvolvido com Python, FastAPI e Streamlit*
---
Uma solução completa para automatizar e gerenciar mensagens de áudio no WhatsApp, oferecendo:
- Transcrição automática multilíngue
- Resumos inteligentes de áudios
- Detecção e tradução automática entre idiomas
- Seleção de plataforma LLM (GROQ ou OpenAI)
- Interface administrativa completa
- Sistema de rodízio de chaves API
- Gestão avançada de grupos e usuários
- Personalização de formatação e saída
- Sistema de Redirecionamento de Webhooks
Contato de email: contato@impacte.ai
([ACESSE NOSSO SITE](https://impacte.ai/))
@ -16,50 +29,97 @@ Antes de começar, certifique-se de ter os seguintes requisitos:
- Python 3.10+ instalado ([Download](https://www.python.org/downloads/))
- Docker e Docker Compose instalados ([Instruções](https://docs.docker.com/get-docker/))
- Uma conta Evolution API com chave válida
- Uma conta GROQ API com chave válida (começa com 'gsk_') ([Crie sua CONTA](https://console.groq.com/login))
- Chaves GROQ (começa com `gsk_`) e/ou chaves OpenAI (começa com `sk-`) configuradas ([Crie sua conta GROQ](https://console.groq.com/login))
* Em caso de uso com Proxy Reverso Aponte um Subdomínio para a API e outro para o MANAGER da aplicação
---
## 🚀 **Novidade: Escolha do Provedor LLM**
Agora você pode escolher entre dois provedores para transcrições e resumos:
1. **GROQ** (open-source): Configuração padrão.
2. **OpenAI** (API paga): Integração com modelos GPT.
### Configuração:
- Acesse: **Configurações > Provedor LLM** na interface administrativa.
- Escolha entre `groq` e `openai`.
- Adicione as chaves correspondentes para cada provedor.
---
## 🚀 **Instalação e Configuração**
### 🐳 Docker Compose
1. Clone o repositório:
```bash
git clone https://github.com/seu-usuario/transcrevezap.git
cd transcrevezap
```
2. Configure o arquivo docker-compose.yaml:
1. Configure o arquivo docker-compose.yaml:
```yaml
version: "3.7"
services:
tcaudio:
image: impacteai/transcrevezap:latest
ports:
- 8005:8005 # Porta para FastAPI
- 8501:8501 # Porta para Streamlit
environment:
- REDIS_HOST=redis
- REDIS_PORT=6380
- API_DOMAIN=seu-ip
- DEBUG_MODE=false
- LOG_LEVEL=INFO
- MANAGER_USER=admin
- MANAGER_PASSWORD=sua_senha_aqui
depends_on:
- redis
version: "3.7"
redis:
image: redis:6
command: redis-server --port 6380 --appendonly yes
volumes:
- redis_data:/data
services:
# Serviço principal do TranscreveZAP
tcaudio:
image: impacteai/transcrevezap:latest
build:
context: .
ports:
- "8005:8005" # API FastAPI - Use esta porta para configurar o webhook
- "8501:8501" # Interface Web Streamlit - Acesse o painel por esta porta
environment:
# Configurações do Servidor
- UVICORN_PORT=8005
- UVICORN_HOST=0.0.0.0
- UVICORN_RELOAD=true
- UVICORN_WORKERS=1
- API_DOMAIN=localhost # Para uso local mantenha localhost
# Modo Debug e Logs
- DEBUG_MODE=false
- LOG_LEVEL=INFO
# Credenciais do Painel Admin (ALTERE ESTAS CREDENCIAIS!)
- MANAGER_USER=admin
- MANAGER_PASSWORD=sua_senha_aqui
# Configurações do Redis
- REDIS_HOST=redis-transcrevezap # Nome do serviço Redis
- REDIS_PORT=6380 # Porta do Redis
- REDIS_DB=0 # Banco de dados Redis
# Autenticação Redis (opcional - descomente se necessário)
# - REDIS_USERNAME=seu_usuario # Nome do usuário Redis
# - REDIS_PASSWORD=sua_senha # Senha do Redis
depends_on:
- redis-transcrevezap
command: ./start.sh
# Serviço Redis para armazenamento de dados
redis-transcrevezap:
image: redis:6
# Escolha UMA das configurações do Redis abaixo:
# 1. Configuração simples SEM autenticação:
command: redis-server --port 6380 --appendonly yes
# 2. Configuração COM autenticação (descomente e ajuste se necessário):
# command: >
# redis-server
# --port 6380
# --appendonly yes
# --user admin on '>sua_senha' '~*' '+@all'
volumes:
redis_data:
- redis_transcrevezap_data:/data # Persistência dos dados
# Volumes para persistência
volumes:
redis_transcrevezap_data:
driver: local
# Instruções de Uso:
# 1. Salve este arquivo como docker-compose.yml
# 2. Execute com: docker compose up -d
# 3. Acesse o painel em: http://localhost:8501
# 4. Configure o webhook da Evolution API para: http://localhost:8005/transcreve-audios
```
3. Inicie os serviços:
2. Inicie os serviços:
```bash
docker-compose up -d
```
@ -136,7 +196,7 @@ version: "3.7"
services:
tcaudio:
image: impacteai/transcrevezap:latest
image: impacteai/transcrevezap:dev
networks:
- sua_rede_externa # Substitua pelo nome da sua rede externa
ports:
@ -154,6 +214,10 @@ services:
- MANAGER_PASSWORD=sua_senha_segura # Defina Senha do Manager
- REDIS_HOST=redis-transcrevezap
- REDIS_PORT=6380 # Porta personalizada para o Redis do TranscreveZAP
- REDIS_DB=0 # Opcional: pode ser removida para usar o valor padrão
# Autenticação Redis (opcional - descomente se necessário, se estiver usando autenticação)
# - REDIS_USERNAME=${REDIS_USERNAME:-} # Nome do usuário definido no comando do Redis
# - REDIS_PASSWORD=${REDIS_PASSWORD:-} # Senha definida no comando do Redis (sem o '>')
depends_on:
- redis-transcrevezap
deploy:
@ -182,11 +246,30 @@ services:
redis-transcrevezap:
image: redis:6
# 1. Configuração SEM autenticação (padrão):
command: redis-server --port 6380 --appendonly yes
# 2. Configuração COM autenticação (descomente e ajuste se necessário):
# command: >
# redis-server
# --port 6380
# --appendonly yes
# --user seuusuario on '>minhasenha' '~*' '+@all'
# # Explicação dos parâmetros:
# # --user seuusuario: nome do usuário
# # on: indica início da configuração do usuário
# # '>minhasenha': senha do usuário (mantenha o '>')
# # '~*': permite acesso a todas as chaves
# # '+@all': concede todas as permissões
volumes:
- redis_transcrevezap_data:/data
networks:
- sua_rede_externa # Substitua pelo nome da sua rede externa
deploy:
mode: replicated
replicas: 1
placement:
constraints:
- node.role == manager
networks:
sua_rede_externa: # Substitua pelo nome da sua rede externa
@ -219,13 +302,38 @@ Para usar com Traefik, certifique-se de:
- Em produção, recomenda-se DEBUG_MODE=false
- Configure LOG_LEVEL=DEBUG apenas para troubleshooting
## ✨ Novos Recursos na v2.1
## 🚀 Novo Recurso v2.3.1: Hub de Redirecionamento
O TranscreveZAP agora oferece um sistema robusto para redirecionamento de mensagens, permitindo que você encaminhe os webhooks da Evolution API para múltiplos destinos simultaneamente.
### Principais Recursos
- Interface dedicada para gerenciamento de webhooks
- Redirecionamento sem alteração do payload original
- Monitoramento de saúde dos webhooks em tempo real
- Sistema de retry automático para reenvio de mensagens falhas
- Headers de rastreamento para identificação de origem (`X-TranscreveZAP-Forward`)
- Suporte a descrições personalizadas para cada webhook
- Limpeza automática de dados ao remover webhooks
### Compatibilidade
- Mantém o payload da Evolution API intacto
- Suporta múltiplos endpoints simultaneamente
- Compatível com qualquer sistema que aceite webhooks via POST
- Preserva todos os dados originais da mensagem
## ✨ Novos Recursos na v2.3
### 🌍 Suporte Multilíngue
- Transcrição e resumo em 10+ idiomas
- Transcrição e resumo com suporte para 16 idiomas principais
- Mudança instantânea de idioma
- Interface intuitiva para seleção de idioma
- Mantém consistência entre transcrição e resumo
- Configuração manual de idioma por contato
- Detecção automática de idioma
- Tradução automática integrada
### 🔄 Sistema de Cache para Idiomas
Implementação de cache inteligente para otimizar a detecção e processamento de idiomas.
### 🔄 Sistema Inteligente de Rodízio de Chaves
- Suporte a múltiplas chaves GROQ
@ -233,27 +341,116 @@ Para usar com Traefik, certifique-se de:
- Maior redundância e disponibilidade
- Gestão simplificada de chaves via interface
## 🌍 Sistema de Idiomas
O TranscreveZAP agora suporta transcrição e resumo em múltiplos idiomas. Na seção "Configurações", você pode:
### ⏱️ Timestamps em Transcrições
Nova funcionalidade de timestamps que adiciona marcadores de tempo precisos em cada trecho da transcrição.
## 📋 Detalhamento das Funcionalidades
### 🌍 Sistema de Idiomas
O TranscreveZAP suporta transcrição e resumo em múltiplos idiomas. Na seção "Configurações", você pode:
1. Selecionar o idioma principal para transcrição e resumo
2. O sistema manterá Português como padrão se nenhum outro for selecionado
3. A mudança de idioma é aplicada instantaneamente após salvar
Idiomas suportados:
- 🇧🇷 Português (padrão)
- 🇺🇸 Inglês
- 🇩🇪 Alemão
- 🇸🇦 Árabe
- 🇨🇳 Chinês
- 🇰🇷 Coreano
- 🇪🇸 Espanhol
- 🇫🇷 Francês
- 🇩🇪 Alemão
- 🇮🇳 Hindi
- 🇳🇱 Holandês
- 🇬🇧 Inglês
- 🇮🇹 Italiano
- 🇯🇵 Japonês
- 🇰🇷 Coreano
- 🇨🇳 Chinês
- 🇵🇱 Polonês
- 🇧🇷 Português (padrão)
- 🇷🇴 Romeno
- 🇷🇺 Russo
- 🇹🇷 Turco
### 🌐 Gestão de Idiomas por Contato
#### Configuração Manual
```markdown
1. Acesse o Manager > Configurações > Idiomas e Transcrição
2. Expanda "Adicionar Novo Contato"
3. Digite o número do contato (formato: 5521999999999)
4. Selecione o idioma desejado
5. Clique em "Adicionar Contato"
```
### 🔄 Detecção Automática de Idioma
Nova funcionalidade que detecta automaticamente o idioma do contato:
- Ativação via Manager > Configurações > Idiomas e Transcrição
- Analisa o primeiro áudio de cada contato
- Cache inteligente de 24 horas
- Funciona apenas em conversas privadas
- Mantém configuração global para grupos
### ⚡ Tradução Automática
Sistema inteligente de tradução que:
- Traduz automaticamente áudios recebidos para seu idioma principal
- Mantém o contexto e estilo original da mensagem
- Preserva formatações especiais (emojis, negrito, itálico)
- Otimizado para comunicação natural
### ⏱️ Sistema de Timestamps
Nova funcionalidade que adiciona marcadores de tempo:
- Formato [MM:SS] no início de cada trecho
- Ativação via Manager > Configurações > Idiomas e Transcrição
- Precisão de segundos
- Ideal para referência e navegação em áudios longos
#### Exemplo de Saída com Timestamps:
```
[00:00] Bom dia pessoal
[00:02] Hoje vamos falar sobre
[00:05] O novo sistema de timestamps
```
## 🔧 Configuração e Uso
### Configuração de Idiomas
1. **Configuração Global**
- Defina o idioma padrão do sistema
- Acesse: Manager > Configurações > Configurações Gerais
- Selecione o idioma principal em "Idioma para Transcrição e Resumo"
2. **Configuração por Contato**
- Acesse: Manager > Configurações > Idiomas e Transcrição
- Use "Adicionar Novo Contato" ou gerencie contatos existentes
- Cada contato pode ter seu próprio idioma configurado
3. **Detecção Automática**
- Ative/Desative a detecção automática
- Configure o tempo de cache
- Gerencie exceções e configurações manuais
### Configuração de Timestamps
1. Acesse: Manager > Configurações > Idiomas e Transcrição
2. Localize a seção "Timestamps na Transcrição"
3. Use o toggle para ativar/desativar
4. As mudanças são aplicadas imediatamente
## 📊 Monitoramento e Estatísticas
### Estatísticas de Idiomas
O sistema agora oferece estatísticas detalhadas:
- Total de transcrições por idioma
- Número de detecções automáticas
- Divisão entre mensagens enviadas/recebidas
- Histórico de uso por idioma
### Visualização de Dados
- Gráficos de uso por idioma
- Distribuição de idiomas
- Estatísticas de tradução
- Performance do sistema
## 🔄 Sistema de Rodízio de Chaves GROQ
O TranscreveZAP agora suporta múltiplas chaves GROQ com sistema de rodízio automático para melhor distribuição de carga e redundância.
O TranscreveZAP suporta múltiplas chaves GROQ com sistema de rodízio automático para melhor distribuição de carga e redundância.
### Funcionalidades:
1. Adicione múltiplas chaves GROQ para distribuição de carga
@ -294,6 +491,29 @@ Se encontrar problemas:
3. Reinicie o serviço se as alterações não forem aplicadas
4. Verifique os logs para confirmar o idioma em uso
## 📝 Notas Adicionais
### Recomendações de Uso
- Configure idiomas manualmente para contatos frequentes
- Use detecção automática como fallback
- Monitore estatísticas de uso
- Faça backups regulares das configurações
### Limitações Conhecidas
- Detecção automática requer primeiro áudio
- Cache limitado a 24 horas
- Timestamps podem variar em áudios muito longos
## 🤝 Contribuição
Agradecemos feedback e contribuições! Reporte issues e sugira melhorias em nosso GitHub.
---
### 📞 Suporte
Para suporte adicional ou dúvidas:
- WhatsApp: [Entre no GRUPO](https://chat.whatsapp.com/L9jB1SlcmQFIVxzN71Y6KG)
- Email: contato@impacte.ai
- Site: [impacte.ai](https://impacte.ai)
## 📄 **Licença**
Este projeto está licenciado sob a Licença MIT - veja o arquivo [LICENSE](LICENSE) para detalhes.

View File

@ -7,7 +7,8 @@ from storage import StorageHandler
import os
import json
import tempfile
import traceback
from groq_handler import get_working_groq_key, validate_transcription_response, handle_groq_request
# Inicializa o storage handler
storage = StorageHandler()
@ -46,6 +47,7 @@ async def summarize_text_if_needed(text):
storage.add_log("DEBUG", "Iniciando processo de resumo", {
"text_length": len(text)
})
provider = storage.get_llm_provider()
# Obter idioma configurado
language = redis_client.get("TRANSCRIPTION_LANGUAGE") or "pt"
@ -53,10 +55,20 @@ async def summarize_text_if_needed(text):
"language": language,
"redis_value": redis_client.get("TRANSCRIPTION_LANGUAGE")
})
url_completions = "https://api.groq.com/openai/v1/chat/completions"
groq_key = await get_groq_key()
if provider == "openai":
api_key = storage.get_openai_keys()[0]
url = "https://api.openai.com/v1/chat/completions"
model = "gpt-4o-mini"
else: # groq
url = "https://api.groq.com/openai/v1/chat/completions"
api_key = await get_working_groq_key(storage)
if not api_key:
raise Exception("Nenhuma chave GROQ disponível")
model = "llama-3.3-70b-versatile"
headers = {
"Authorization": f"Bearer {groq_key}",
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
@ -118,6 +130,13 @@ async def summarize_text_if_needed(text):
请仅以摘要的形式回答就好像是你在发送这条消息
不要问候也不要在摘要前后写任何内容只需用一句简短的话总结音频中所说的内容
""",
"ro": """
Înțelege contextul acestui audio și creează un rezumat foarte concis despre ce este vorba.
Acest audio a fost trimis prin WhatsApp, de cineva, către Fabio.
Scrie DOAR rezumatul audio-ului ca și cum tu ai trimite acest mesaj.
Nu saluta, nu scrie nimic înainte sau după rezumat, răspunde doar cu un rezumat concis despre ce s-a spus în audio.
""",
"ru": """
Поймите контекст этого аудио и сделайте очень краткое резюме, о чем идет речь.
Это аудио было отправлено через WhatsApp кем-то Фабио.
@ -133,29 +152,33 @@ async def summarize_text_if_needed(text):
"role": "user",
"content": f"{base_prompt}\n\nTexto para resumir: {text}",
}],
"model": "llama-3.3-70b-versatile",
"model": model,
}
try:
async with aiohttp.ClientSession() as session:
storage.add_log("DEBUG", "Enviando requisição para API GROQ")
async with session.post(url_completions, headers=headers, json=json_data) as summary_response:
if summary_response.status == 200:
summary_result = await summary_response.json()
summary_text = summary_result["choices"][0]["message"]["content"]
storage.add_log("INFO", "Resumo gerado com sucesso", {
"original_length": len(text),
"summary_length": len(summary_text),
"language": language
})
return summary_text
else:
error_text = await summary_response.text()
storage.add_log("ERROR", "Erro na API GROQ", {
"error": error_text,
"status": summary_response.status
})
raise Exception(f"Erro ao resumir o texto: {error_text}")
success, response_data, error = await handle_groq_request(url, headers, json_data, storage, is_form_data=False)
if not success:
raise Exception(error)
summary_text = response_data["choices"][0]["message"]["content"]
# Validar se o resumo não está vazio
if not await validate_transcription_response(summary_text):
storage.add_log("ERROR", "Resumo vazio ou inválido recebido")
raise Exception("Resumo vazio ou inválido recebido")
# Validar se o resumo é menor que o texto original
if len(summary_text) >= len(text):
storage.add_log("WARNING", "Resumo maior que texto original", {
"original_length": len(text),
"summary_length": len(summary_text)
})
storage.add_log("INFO", "Resumo gerado com sucesso", {
"original_length": len(text),
"summary_length": len(summary_text),
"language": language
})
return summary_text
except Exception as e:
storage.add_log("ERROR", "Erro no processo de resumo", {
"error": str(e),
@ -163,76 +186,212 @@ async def summarize_text_if_needed(text):
})
raise
async def transcribe_audio(audio_source, apikey=None):
"""Transcreve áudio usando a API GROQ com sistema de rodízio de chaves"""
storage.add_log("INFO", "Iniciando processo de transcrição")
url = "https://api.groq.com/openai/v1/audio/transcriptions"
groq_key = await get_groq_key()
groq_headers = {"Authorization": f"Bearer {groq_key}"}
# Obter idioma configurado
language = redis_client.get("TRANSCRIPTION_LANGUAGE") or "pt"
storage.add_log("DEBUG", "Idioma configurado para transcrição", {
"language": language,
"redis_value": redis_client.get("TRANSCRIPTION_LANGUAGE")
async def transcribe_audio(audio_source, apikey=None, remote_jid=None, from_me=False, use_timestamps=False):
"""
Transcreve áudio com suporte a detecção de idioma e tradução automática.
Args:
audio_source: Caminho do arquivo de áudio ou URL
apikey: Chave da API opcional para download de áudio
remote_jid: ID do remetente/destinatário
from_me: Se o áudio foi enviado pelo próprio usuário
use_timestamps: Se True, usa verbose_json para incluir timestamps
Returns:
tuple: (texto_transcrito, has_timestamps)
"""
storage.add_log("INFO", "Iniciando processo de transcrição", {
"from_me": from_me,
"remote_jid": remote_jid
})
provider = storage.get_llm_provider()
if provider == "openai":
api_key = storage.get_openai_keys()[0] # Get first OpenAI key
url = "https://api.openai.com/v1/audio/transcriptions"
model = "whisper-1"
else: # groq
api_key = await get_working_groq_key(storage)
if not api_key:
raise Exception("Nenhuma chave GROQ disponível")
url = "https://api.groq.com/openai/v1/audio/transcriptions"
model = "whisper-large-v3"
headers = {"Authorization": f"Bearer {api_key}"}
# Inicializar variáveis
contact_language = None
system_language = redis_client.get("TRANSCRIPTION_LANGUAGE") or "pt"
is_private = remote_jid and "@s.whatsapp.net" in remote_jid
# Determinar idioma do contato em conversas privadas
if is_private:
# Remover @s.whatsapp.net do ID para buscar no cache
contact_id = remote_jid.split('@')[0]
# 1. Primeiro tentar obter idioma configurado manualmente
contact_language = storage.get_contact_language(contact_id)
if contact_language:
storage.add_log("DEBUG", "Usando idioma configurado manualmente", {
"contact_language": contact_language,
"from_me": from_me,
"remote_jid": remote_jid,
"is_private": is_private
})
# 2. Se não houver configuração manual e detecção automática estiver ativa
elif storage.get_auto_language_detection():
# Verificar cache primeiro
cached_lang = storage.get_cached_language(contact_id)
if cached_lang:
contact_language = cached_lang.get('language')
storage.add_log("DEBUG", "Usando idioma do cache", {
"contact_language": contact_language,
"auto_detected": True
})
# Se não há cache ou está expirado, fazer detecção
elif not from_me: # Só detecta em mensagens recebidas
try:
# Realizar transcrição inicial sem idioma específico
with open(audio_source, 'rb') as audio_file:
data = aiohttp.FormData()
data.add_field('file', audio_file, filename='audio.mp3')
data.add_field('model', model)
success, response_data, error = await handle_groq_request(url, headers, data, storage, is_form_data=True)
if success:
initial_text = response_data.get("text", "")
# Detectar idioma do texto transcrito
detected_lang = await detect_language(initial_text)
# Salvar no cache E na configuração do contato
storage.cache_language_detection(contact_id, detected_lang)
storage.set_contact_language(contact_id, detected_lang)
contact_language = detected_lang
storage.add_log("INFO", "Idioma detectado e configurado", {
"language": detected_lang,
"remote_jid": remote_jid,
"auto_detected": True
})
except Exception as e:
storage.add_log("WARNING", "Erro na detecção automática de idioma", {
"error": str(e),
"remote_jid": remote_jid
})
if not contact_language:
storage.add_log("DEBUG", "Usando idioma padrão do sistema", {
"from_me": from_me,
"remote_jid": remote_jid,
"is_private": is_private,
"system_language": system_language
})
# Definir idioma de transcrição e tradução baseado no contexto
if is_private and contact_language:
if from_me:
# Se estou enviando para um contato com idioma configurado
transcription_language = contact_language # Transcrever no idioma do contato
target_language = contact_language # Não precisa traduzir
storage.add_log("DEBUG", "Usando idioma do contato para áudio enviado", {
"transcription_language": transcription_language,
"target_language": target_language
})
else:
# Se estou recebendo
transcription_language = contact_language # Transcrever no idioma do contato
target_language = system_language # Traduzir para o idioma do sistema
storage.add_log("DEBUG", "Processando áudio recebido com tradução", {
"transcription_language": transcription_language,
"target_language": target_language
})
else:
# Caso padrão: usar idioma do sistema
transcription_language = system_language
target_language = system_language
storage.add_log("DEBUG", "Usando idioma do sistema", {
"transcription_language": transcription_language,
"target_language": target_language
})
storage.add_log("DEBUG", "Configuração de idiomas definida", {
"transcription_language": transcription_language,
"target_language": target_language,
"from_me": from_me,
"is_private": is_private,
"contact_language": contact_language
})
try:
async with aiohttp.ClientSession() as session:
# Se o audio_source for uma URL
if isinstance(audio_source, str) and audio_source.startswith('http'):
storage.add_log("DEBUG", "Baixando áudio da URL", {
"url": audio_source
})
download_headers = {"apikey": apikey} if apikey else {}
async with session.get(audio_source, headers=download_headers) as response:
if response.status != 200:
error_text = await response.text()
storage.add_log("ERROR", "Erro no download do áudio", {
"status": response.status,
"error": error_text
})
raise Exception(f"Erro ao baixar áudio: {error_text}")
audio_data = await response.read()
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_file:
temp_file.write(audio_data)
audio_source = temp_file.name
storage.add_log("DEBUG", "Áudio salvo temporariamente", {
"path": audio_source
})
# Preparar dados para transcrição
# Realizar transcrição
with open(audio_source, 'rb') as audio_file:
data = aiohttp.FormData()
data.add_field('file', open(audio_source, 'rb'), filename='audio.mp3')
data.add_field('model', 'whisper-large-v3')
data.add_field('language', language)
data.add_field('file', audio_file, filename='audio.mp3')
data.add_field('model', model)
data.add_field('language', transcription_language)
storage.add_log("DEBUG", "Enviando áudio para transcrição")
async with session.post(url, headers=groq_headers, data=data) as response:
if response.status == 200:
result = await response.json()
message = result.get("text", "")
storage.add_log("INFO", "Transcrição concluída com sucesso", {
"text_length": len(message)
if use_timestamps:
data.add_field('response_format', 'verbose_json')
# Usar handle_groq_request para ter retry e validação
success, response_data, error = await handle_groq_request(url, headers, data, storage, is_form_data=True)
if not success:
raise Exception(f"Erro na transcrição: {error}")
transcription = format_timestamped_result(response_data) if use_timestamps else response_data.get("text", "")
# Validar o conteúdo da transcrição
if not await validate_transcription_response(transcription):
storage.add_log("ERROR", "Transcrição vazia ou inválida recebida")
raise Exception("Transcrição vazia ou inválida recebida")
# Detecção automática para novos contatos
if (is_private and storage.get_auto_language_detection() and
not from_me and not contact_language):
try:
detected_lang = await detect_language(transcription)
storage.cache_language_detection(remote_jid, detected_lang)
contact_language = detected_lang
storage.add_log("INFO", "Idioma detectado e cacheado", {
"language": detected_lang,
"remote_jid": remote_jid
})
except Exception as e:
storage.add_log("WARNING", "Erro na detecção de idioma", {"error": str(e)})
is_summary = False
if len(message) > 1000:
storage.add_log("DEBUG", "Texto longo detectado, iniciando resumo", {
"text_length": len(message)
})
is_summary = True
message = await summarize_text_if_needed(message)
# Tradução quando necessário
need_translation = (
is_private and contact_language and
(
(from_me and transcription_language != target_language) or
(not from_me and target_language != transcription_language)
)
)
return message, is_summary
else:
error_text = await response.text()
storage.add_log("ERROR", "Erro na transcrição", {
"error": error_text,
"status": response.status
if need_translation:
try:
transcription = await translate_text(
transcription,
transcription_language,
target_language
)
storage.add_log("INFO", "Texto traduzido automaticamente", {
"from": transcription_language,
"to": target_language
})
raise Exception(f"Erro na transcrição: {error_text}")
except Exception as e:
storage.add_log("ERROR", "Erro na tradução", {"error": str(e)})
# Registrar estatísticas de uso
used_language = contact_language if contact_language else system_language
storage.record_language_usage(
used_language,
from_me,
bool(contact_language and contact_language != system_language)
)
return transcription, use_timestamps
except Exception as e:
storage.add_log("ERROR", "Erro no processo de transcrição", {
@ -243,7 +402,130 @@ async def transcribe_audio(audio_source, apikey=None):
finally:
# Limpar arquivos temporários
if isinstance(audio_source, str) and os.path.exists(audio_source):
os.unlink(audio_source)
try:
os.unlink(audio_source)
except Exception as e:
storage.add_log("WARNING", "Erro ao remover arquivo temporário", {
"error": str(e)
})
def format_timestamped_result(result):
"""
Formata o resultado da transcrição com timestamps
"""
segments = result.get("segments", [])
formatted_lines = []
for segment in segments:
start_time = format_timestamp(segment.get("start", 0))
end_time = format_timestamp(segment.get("end", 0))
text = segment.get("text", "").strip()
if text:
formatted_lines.append(f"[{start_time} -> {end_time}] {text}")
return "\n".join(formatted_lines)
def format_timestamp(seconds):
"""
Converte segundos em formato MM:SS
"""
minutes = int(seconds // 60)
remaining_seconds = int(seconds % 60)
return f"{minutes:02d}:{remaining_seconds:02d}"
# Função para detecção de idioma
async def detect_language(text: str) -> str:
"""
Detecta o idioma do texto usando a API GROQ
Args:
text: Texto para detectar idioma
Returns:
str: Código ISO 639-1 do idioma detectado
"""
provider = storage.get_llm_provider()
storage.add_log("DEBUG", "Iniciando detecção de idioma", {
"text_length": len(text)
})
# Lista de idiomas suportados
SUPPORTED_LANGUAGES = {
"pt", "en", "es", "fr", "de", "it", "ja", "ko",
"zh", "ro", "ru", "ar", "hi", "nl", "pl", "tr"
}
if provider == "openai":
api_key = storage.get_openai_keys()[0]
url = "https://api.openai.com/v1/chat/completions"
model = "gpt-4o-mini"
else: # groq
url = "https://api.groq.com/openai/v1/chat/completions"
api_key = await get_working_groq_key(storage)
if not api_key:
raise Exception("Nenhuma chave GROQ disponível")
model = "llama-3.3-70b-versatile"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
# Prompt melhorado com exemplos e restrições
prompt = """
Analise o texto e retorne APENAS o código ISO 639-1 do idioma principal.
Regras:
1. Retorne APENAS o código de 2 letras
2. Use somente códigos permitidos: pt, en, es, fr, de, it, ja, ko, zh, ro, ru, ar, hi, nl, pl, tr
3. Se não tiver certeza ou o idioma não estiver na lista, retorne "en"
4. Não inclua pontuação, espaços extras ou explicações
Exemplos corretos:
"Hello world" -> en
"Bonjour le monde" -> fr
"Olá mundo" -> pt
Texto para análise:
"""
json_data = {
"messages": [{
"role": "system",
"content": "Você é um detector de idiomas preciso que retorna apenas códigos ISO 639-1."
}, {
"role": "user",
"content": f"{prompt}\n\n{text[:500]}" # Limitando para os primeiros 500 caracteres
}],
"model": model,
"temperature": 0.1
}
try:
success, response_data, error = await handle_groq_request(url, headers, json_data, storage, is_form_data=False)
if not success:
raise Exception(f"Falha na detecção de idioma: {error}")
detected_language = response_data["choices"][0]["message"]["content"].strip().lower()
# Validar o resultado
if detected_language not in SUPPORTED_LANGUAGES:
storage.add_log("WARNING", "Idioma detectado não suportado", {
"detected": detected_language,
"fallback": "en"
})
detected_language = "en"
storage.add_log("INFO", "Idioma detectado com sucesso", {
"detected_language": detected_language
})
return detected_language
except Exception as e:
storage.add_log("ERROR", "Erro no processo de detecção de idioma", {
"error": str(e),
"type": type(e).__name__
})
raise
async def send_message_to_whatsapp(server_url, instance, apikey, message, remote_jid, message_id):
"""Envia mensagem via WhatsApp"""
@ -348,3 +630,158 @@ async def get_audio_base64(server_url, instance, apikey, message_id):
"message_id": message_id
})
raise
async def format_message(transcription_text, summary_text=None):
"""Formata a mensagem baseado nas configurações."""
settings = storage.get_message_settings()
message_parts = []
# Determinar modo de saída
output_mode = settings["output_mode"]
char_limit = int(settings["character_limit"])
if output_mode == "smart":
# Modo inteligente baseado no tamanho
if len(transcription_text) > char_limit:
if summary_text:
message_parts.append(f"{settings['summary_header']}\n\n{summary_text}")
else:
message_parts.append(f"{settings['transcription_header']}\n\n{transcription_text}")
elif output_mode == "summary_only":
if summary_text:
message_parts.append(f"{settings['summary_header']}\n\n{summary_text}")
elif output_mode == "transcription_only":
message_parts.append(f"{settings['transcription_header']}\n\n{transcription_text}")
else: # both
if summary_text:
message_parts.append(f"{settings['summary_header']}\n\n{summary_text}")
message_parts.append(f"{settings['transcription_header']}\n\n{transcription_text}")
# Adicionar mensagem de negócio
message_parts.append(dynamic_settings['BUSINESS_MESSAGE'])
return "\n\n".join(message_parts)
async def translate_text(text: str, source_language: str, target_language: str) -> str:
"""
Traduz o texto usando a API GROQ
Args:
text: Texto para traduzir
source_language: Código ISO 639-1 do idioma de origem
target_language: Código ISO 639-1 do idioma de destino
Returns:
str: Texto traduzido
"""
provider = storage.get_llm_provider()
storage.add_log("DEBUG", "Iniciando tradução", {
"source_language": source_language,
"target_language": target_language,
"text_length": len(text)
})
# Se os idiomas forem iguais, retorna o texto original
if source_language == target_language:
return text
if provider == "openai":
api_key = storage.get_openai_keys()[0]
url = "https://api.openai.com/v1/chat/completions"
model = "gpt-4o-mini"
else: # groq
url = "https://api.groq.com/openai/v1/chat/completions"
api_key = await get_working_groq_key(storage)
if not api_key:
raise Exception("Nenhuma chave GROQ disponível")
model = "llama-3.3-70b-versatile"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
prompt = f"""
Você é um tradutor profissional especializado em manter o tom e estilo do texto original.
Instruções:
1. Traduza o texto de {source_language} para {target_language}
2. Preserve todas as formatações (negrito, itálico, emojis)
3. Mantenha os mesmos parágrafos e quebras de linha
4. Preserve números, datas e nomes próprios
5. Não adicione ou remova informações
6. Não inclua notas ou explicações
7. Mantenha o mesmo nível de formalidade
Texto para tradução:
{text}
"""
json_data = {
"messages": [{
"role": "system",
"content": "Você é um tradutor profissional que mantém o estilo e formatação do texto original."
}, {
"role": "user",
"content": prompt
}],
"model": model,
"temperature": 0.3
}
try:
success, response_data, error = await handle_groq_request(url, headers, json_data, storage, is_form_data=False)
if not success:
raise Exception(f"Falha na tradução: {error}")
translated_text = response_data["choices"][0]["message"]["content"].strip()
# Verificar se a tradução manteve aproximadamente o mesmo tamanho
length_ratio = len(translated_text) / len(text)
if not (0.5 <= length_ratio <= 1.5):
storage.add_log("WARNING", "Possível erro na tradução - diferença significativa no tamanho", {
"original_length": len(text),
"translated_length": len(translated_text),
"ratio": length_ratio
})
# Validar se a tradução não está vazia
if not await validate_transcription_response(translated_text):
storage.add_log("ERROR", "Tradução vazia ou inválida recebida")
raise Exception("Tradução vazia ou inválida recebida")
storage.add_log("INFO", "Tradução concluída com sucesso", {
"original_length": len(text),
"translated_length": len(translated_text),
"ratio": length_ratio
})
return translated_text
except Exception as e:
storage.add_log("ERROR", "Erro no processo de tradução", {
"error": str(e),
"type": type(e).__name__
})
raise
# Nova função para baixar áudio remoto
async def download_remote_audio(url: str) -> str:
"""
Baixa um arquivo de áudio remoto e salva localmente como um arquivo temporário.
Retorna o caminho para o arquivo salvo.
"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
audio_data = await response.read()
# Cria um arquivo temporário para armazenar o áudio (pode ajustar o sufixo caso necessário)
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_file:
temp_file.write(audio_data)
local_path = temp_file.name
return local_path
else:
raise Exception(f"Falha no download, código de status: {response.status}")
except Exception as e:
raise Exception(f"Erro ao baixar áudio remoto: {str(e)}")

View File

@ -1,22 +1,47 @@
#!/bin/bash
# Função para inicializar configurações no Redis se não existirem
# Função para construir o comando redis-cli com autenticação condicional
build_redis_cli_cmd() {
cmd="redis-cli -h ${REDIS_HOST:-localhost} -p ${REDIS_PORT:-6380}"
if [ ! -z "$REDIS_USERNAME" ]; then
cmd="$cmd --user $REDIS_USERNAME"
fi
if [ ! -z "$REDIS_PASSWORD" ]; then
cmd="$cmd -a $REDIS_PASSWORD"
fi
if [ ! -z "$REDIS_DB" ]; then
cmd="$cmd -n $REDIS_DB"
fi
echo "$cmd"
}
# Função para inicializar configurações no Redis
initialize_redis_config() {
redis-cli -h $REDIS_HOST -p $REDIS_PORT SET GROQ_API_KEY "sua_api_key_aqui" NX
redis-cli -h $REDIS_HOST -p $REDIS_PORT SET BUSINESS_MESSAGE "*Impacte AI* Premium Services" NX
redis-cli -h $REDIS_HOST -p $REDIS_PORT SET PROCESS_GROUP_MESSAGES "false" NX
redis-cli -h $REDIS_HOST -p $REDIS_PORT SET PROCESS_SELF_MESSAGES "true" NX
redis-cli -h $REDIS_HOST -p $REDIS_PORT SET API_DOMAIN "$API_DOMAIN" NX
redis_cmd=$(build_redis_cli_cmd)
$redis_cmd SET GROQ_API_KEY "sua_api_key_aqui" NX
$redis_cmd SET BUSINESS_MESSAGE "*Impacte AI* Premium Services" NX
$redis_cmd SET PROCESS_GROUP_MESSAGES "false" NX
$redis_cmd SET PROCESS_SELF_MESSAGES "true" NX
$redis_cmd SET API_DOMAIN "$API_DOMAIN" NX
}
# Aguardar o Redis estar pronto
echo "Aguardando o Redis ficar disponível..."
until redis-cli -h $REDIS_HOST -p $REDIS_PORT PING; do
redis_cmd=$(build_redis_cli_cmd)
until $redis_cmd PING 2>/dev/null; do
echo "Redis não está pronto - aguardando..."
sleep 5
done
# Inicializar configurações no Redis (apenas se não existirem)
echo "Redis disponível!"
# Inicializar configurações
initialize_redis_config
# Iniciar o FastAPI em background

View File

@ -1,12 +1,18 @@
import json
import os
from typing import List, Dict
from typing import List, Dict, Optional
from datetime import datetime, timedelta
import traceback
import logging
import redis
from utils import create_redis_client
import uuid
class StorageHandler:
# Chaves Redis para webhooks
WEBHOOK_KEY = "webhook_redirects" # Chave para armazenar os webhooks
WEBHOOK_STATS_KEY = "webhook_stats" # Chave para estatísticas
def __init__(self):
# Configuração de logger
self.logger = logging.getLogger("StorageHandler")
@ -20,17 +26,19 @@ class StorageHandler:
self.logger.info("StorageHandler inicializado.")
# Conexão com o Redis
self.redis = redis.Redis(
host=os.getenv('REDIS_HOST', 'localhost'),
port=int(os.getenv('REDIS_PORT', 6380)),
db=0,
decode_responses=True
)
self.redis = create_redis_client()
# Retenção de logs e backups
self.log_retention_hours = int(os.getenv('LOG_RETENTION_HOURS', 48))
self.backup_retention_days = int(os.getenv('BACKUP_RETENTION_DAYS', 7))
# Garantir valores padrão para configurações de idioma
if not self.redis.exists(self._get_redis_key("auto_translation")):
self.redis.set(self._get_redis_key("auto_translation"), "false")
if not self.redis.exists(self._get_redis_key("auto_language_detection")):
self.redis.set(self._get_redis_key("auto_language_detection"), "false")
def _get_redis_key(self, key):
return f"transcrevezap:{key}"
@ -200,3 +208,510 @@ class StorageHandler:
self.redis.set(self._get_redis_key("groq_key_counter"), str(next_counter))
return keys[counter % len(keys)]
def get_penalized_until(self, key: str) -> Optional[datetime]:
"""
Retorna o timestamp até quando a chave está penalizada, ou None se não estiver penalizada.
"""
penalized_key = self._get_redis_key(f"groq_key_penalized_{key}")
penalized_until = self.redis.get(penalized_key)
if penalized_until:
return datetime.fromisoformat(penalized_until)
return None
def penalize_key(self, key: str, penalty_duration: int):
"""
Penaliza uma chave por um tempo determinado (em segundos).
"""
penalized_key = self._get_redis_key(f"groq_key_penalized_{key}")
penalized_until = datetime.utcnow() + timedelta(seconds=penalty_duration)
self.redis.set(penalized_key, penalized_until.isoformat())
self.redis.expire(penalized_key, penalty_duration) # Expira a chave após o tempo de penalidade
self.add_log("INFO", "Chave GROQ penalizada", {
"key": key,
"penalized_until": penalized_until.isoformat()
})
def get_message_settings(self):
"""Obtém as configurações de mensagens."""
return {
"summary_header": self.redis.get(self._get_redis_key("summary_header")) or "🤖 *Resumo do áudio:*",
"transcription_header": self.redis.get(self._get_redis_key("transcription_header")) or "🔊 *Transcrição do áudio:*",
"output_mode": self.redis.get(self._get_redis_key("output_mode")) or "both",
"character_limit": int(self.redis.get(self._get_redis_key("character_limit")) or "500"),
}
def save_message_settings(self, settings: dict):
"""Salva as configurações de mensagens."""
for key, value in settings.items():
self.redis.set(self._get_redis_key(key), str(value))
def get_process_mode(self):
"""Retorna o modo de processamento configurado"""
mode = self.redis.get(self._get_redis_key("process_mode")) or "all"
self.logger.debug(f"Modo de processamento atual: {mode}")
return mode
def get_contact_language(self, contact_id: str) -> str:
"""
Obtém o idioma configurado para um contato específico.
O contact_id pode vir com ou sem @s.whatsapp.net
"""
# Remover @s.whatsapp.net se presente
contact_id = contact_id.split('@')[0]
return self.redis.hget(self._get_redis_key("contact_languages"), contact_id)
def set_contact_language(self, contact_id: str, language: str):
"""
Define o idioma para um contato específico
"""
# Remover @s.whatsapp.net se presente
contact_id = contact_id.split('@')[0]
self.redis.hset(self._get_redis_key("contact_languages"), contact_id, language)
self.logger.info(f"Idioma {language} definido para o contato {contact_id}")
def get_all_contact_languages(self) -> dict:
"""
Retorna um dicionário com todos os contatos e seus idiomas configurados
"""
return self.redis.hgetall(self._get_redis_key("contact_languages"))
def remove_contact_language(self, contact_id: str):
"""
Remove a configuração de idioma de um contato
"""
contact_id = contact_id.split('@')[0]
self.redis.hdel(self._get_redis_key("contact_languages"), contact_id)
self.logger.info(f"Configuração de idioma removida para o contato {contact_id}")
def get_auto_language_detection(self) -> bool:
"""
Verifica se a detecção automática de idioma está ativada
"""
return self.redis.get(self._get_redis_key("auto_language_detection")) == "true"
def set_auto_language_detection(self, enabled: bool):
"""
Ativa ou desativa a detecção automática de idioma
"""
self.redis.set(self._get_redis_key("auto_language_detection"), str(enabled).lower())
self.logger.info(f"Detecção automática de idioma {'ativada' if enabled else 'desativada'}")
def get_auto_translation(self) -> bool:
"""
Verifica se a tradução automática está ativada
"""
return self.redis.get(self._get_redis_key("auto_translation")) == "true"
def set_auto_translation(self, enabled: bool):
"""
Ativa ou desativa a tradução automática
"""
self.redis.set(self._get_redis_key("auto_translation"), str(enabled).lower())
self.logger.info(f"Tradução automática {'ativada' if enabled else 'desativada'}")
def record_language_usage(self, language: str, from_me: bool, auto_detected: bool = False):
"""
Registra estatísticas de uso de idiomas
Args:
language: Código do idioma (ex: 'pt', 'en')
from_me: Se o áudio foi enviado por nós
auto_detected: Se o idioma foi detectado automaticamente
"""
try:
# Validar idioma
if not language:
self.add_log("WARNING", "Tentativa de registrar uso sem idioma definido")
return
# Incrementar contagem total do idioma
self.redis.hincrby(
self._get_redis_key("language_stats"),
f"{language}_total",
1
)
# Incrementar contagem por direção (enviado/recebido)
direction = 'sent' if from_me else 'received'
self.redis.hincrby(
self._get_redis_key("language_stats"),
f"{language}_{direction}",
1
)
# Se foi detecção automática, registrar
if auto_detected:
self.redis.hincrby(
self._get_redis_key("language_stats"),
f"{language}_auto_detected",
1
)
# Registrar última utilização
self.redis.hset(
self._get_redis_key("language_stats"),
f"{language}_last_used",
datetime.now().isoformat()
)
# Log detalhado
self.add_log("DEBUG", "Uso de idioma registrado", {
"language": language,
"direction": direction,
"auto_detected": auto_detected
})
except Exception as e:
self.add_log("ERROR", "Erro ao registrar uso de idioma", {
"error": str(e),
"type": type(e).__name__
})
def get_language_statistics(self) -> Dict:
"""
Obtém estatísticas de uso de idiomas
"""
try:
stats_raw = self.redis.hgetall(self._get_redis_key("language_stats"))
# Organizar estatísticas por idioma
stats = {}
for key, value in stats_raw.items():
lang, metric = key.split('_', 1)
if lang not in stats:
stats[lang] = {}
if metric == 'last_used':
stats[lang][metric] = value
else:
stats[lang][metric] = int(value)
return stats
except Exception as e:
self.logger.error(f"Erro ao obter estatísticas de idioma: {e}")
return {}
def cache_language_detection(self, contact_id: str, language: str, confidence: float = 1.0):
"""
Armazena em cache o idioma detectado para um contato
"""
contact_id = contact_id.split('@')[0]
cache_data = {
'language': language,
'confidence': confidence,
'timestamp': datetime.now().isoformat(),
'auto_detected': True
}
self.redis.hset(
self._get_redis_key("language_detection_cache"),
contact_id,
json.dumps(cache_data)
)
def get_cached_language(self, contact_id: str) -> Dict:
"""
Obtém o idioma em cache para um contato
Retorna None se não houver cache ou se estiver expirado
"""
contact_id = contact_id.split('@')[0]
cached = self.redis.hget(
self._get_redis_key("language_detection_cache"),
contact_id
)
if not cached:
return None
try:
data = json.loads(cached)
# Verificar se o cache expirou (24 horas)
cache_time = datetime.fromisoformat(data['timestamp'])
if datetime.now() - cache_time > timedelta(hours=24):
return None
return data
except:
return None
def get_webhook_redirects(self) -> List[Dict]:
"""Obtém todos os webhooks de redirecionamento cadastrados."""
webhooks_raw = self.redis.hgetall(self._get_redis_key("webhook_redirects"))
webhooks = []
for webhook_id, data in webhooks_raw.items():
webhook_data = json.loads(data)
webhook_data['id'] = webhook_id
webhooks.append(webhook_data)
return webhooks
def validate_webhook_url(self, url: str) -> bool:
"""Valida se a URL do webhook é acessível."""
try:
from urllib.parse import urlparse
parsed = urlparse(url)
return all([parsed.scheme, parsed.netloc])
except Exception as e:
self.logger.error(f"URL inválida: {url} - {str(e)}")
return False
def add_webhook_redirect(self, url: str, description: str = "") -> str:
"""
Adiciona um novo webhook de redirecionamento.
Retorna o ID do webhook criado.
"""
webhook_id = str(uuid.uuid4())
webhook_data = {
"url": url,
"description": description,
"created_at": datetime.now().isoformat(),
"status": "active",
"error_count": 0,
"success_count": 0,
"last_success": None,
"last_error": None
}
self.redis.hset(
self._get_redis_key("webhook_redirects"),
webhook_id,
json.dumps(webhook_data)
)
return webhook_id
def clean_webhook_data(self, webhook_id: str):
"""
Remove todos os dados relacionados a um webhook específico do Redis.
Args:
webhook_id: ID do webhook a ser limpo
"""
try:
# Lista de chaves relacionadas ao webhook que precisam ser removidas
keys_to_remove = [
f"webhook_failed_{webhook_id}", # Entregas falhas
f"webhook_stats_{webhook_id}", # Estatísticas específicas
]
# Remove cada chave associada ao webhook
for key in keys_to_remove:
full_key = self._get_redis_key(key)
self.redis.delete(full_key)
self.logger.debug(f"Chave removida: {full_key}")
self.logger.info(f"Dados do webhook {webhook_id} limpos com sucesso")
except Exception as e:
self.logger.error(f"Erro ao limpar dados do webhook {webhook_id}: {str(e)}")
raise
def remove_webhook_redirect(self, webhook_id: str):
"""Remove um webhook de redirecionamento e todos os seus dados associados."""
try:
# Primeiro remove os dados associados
self.clean_webhook_data(webhook_id)
# Depois remove o webhook em si
self.redis.hdel(self._get_redis_key("webhook_redirects"), webhook_id)
self.logger.info(f"Webhook {webhook_id} removido com sucesso")
except Exception as e:
self.logger.error(f"Erro ao remover webhook {webhook_id}: {str(e)}")
raise
def update_webhook_stats(self, webhook_id: str, success: bool, error_message: str = None):
"""Atualiza as estatísticas de um webhook."""
try:
webhook_data = json.loads(
self.redis.hget(self._get_redis_key("webhook_redirects"), webhook_id)
)
if success:
webhook_data["success_count"] += 1
webhook_data["last_success"] = datetime.now().isoformat()
else:
webhook_data["error_count"] += 1
webhook_data["last_error"] = {
"timestamp": datetime.now().isoformat(),
"message": error_message
}
self.redis.hset(
self._get_redis_key("webhook_redirects"),
webhook_id,
json.dumps(webhook_data)
)
except Exception as e:
self.logger.error(f"Erro ao atualizar estatísticas do webhook {webhook_id}: {e}")
def retry_failed_webhooks(self):
"""Tenta reenviar webhooks que falharam nas últimas 24h."""
webhooks = self.get_webhook_redirects()
for webhook in webhooks:
if webhook.get("last_error"):
error_time = datetime.fromisoformat(webhook["last_error"]["timestamp"])
if datetime.now() - error_time < timedelta(hours=24):
# Tentar reenviar
pass
def test_webhook(self, url: str) -> tuple[bool, str]:
"""
Testa um webhook antes de salvá-lo.
Retorna uma tupla (sucesso, mensagem)
"""
try:
import aiohttp
import asyncio
async def _test_webhook():
async with aiohttp.ClientSession() as session:
test_payload = {
"test": True,
"timestamp": datetime.now().isoformat(),
"message": "Teste de conexão do TranscreveZAP"
}
async with session.post(
url,
json=test_payload,
headers={"Content-Type": "application/json"},
timeout=10
) as response:
return response.status, await response.text()
status, response = asyncio.run(_test_webhook())
if status in [200, 201, 202]:
return True, "Webhook testado com sucesso!"
return False, f"Erro no teste: Status {status} - {response}"
except Exception as e:
return False, f"Erro ao testar webhook: {str(e)}"
def get_webhook_health(self, webhook_id: str) -> dict:
"""
Calcula métricas de saúde do webhook
"""
try:
webhook_data = json.loads(
self.redis.hget(self._get_redis_key("webhook_redirects"), webhook_id)
)
total_requests = webhook_data["success_count"] + webhook_data["error_count"]
if total_requests == 0:
return {
"health_status": "unknown",
"error_rate": 0,
"success_rate": 0,
"total_requests": 0
}
error_rate = (webhook_data["error_count"] / total_requests) * 100
success_rate = (webhook_data["success_count"] / total_requests) * 100
# Definir status de saúde
if error_rate >= 50:
health_status = "critical"
elif error_rate >= 20:
health_status = "warning"
else:
health_status = "healthy"
return {
"health_status": health_status,
"error_rate": error_rate,
"success_rate": success_rate,
"total_requests": total_requests
}
except Exception as e:
self.logger.error(f"Erro ao calcular saúde do webhook {webhook_id}: {e}")
return None
def retry_webhook(self, webhook_id: str, payload: dict) -> bool:
"""
Tenta reenviar um payload para um webhook específico mantendo o payload original intacto.
Args:
webhook_id: ID do webhook para reenvio
payload: Payload original para reenvio
Returns:
bool: True se o reenvio foi bem sucedido, False caso contrário
"""
try:
import aiohttp
import asyncio
webhook_data = json.loads(
self.redis.hget(self._get_redis_key("webhook_redirects"), webhook_id)
)
async def _retry_webhook():
async with aiohttp.ClientSession() as session:
headers = {
"Content-Type": "application/json",
"X-TranscreveZAP-Forward": "true",
"X-TranscreveZAP-Webhook-ID": webhook_id,
"X-TranscreveZAP-Retry": "true"
}
async with session.post(
webhook_data["url"],
json=payload, # Envia o payload original sem modificações
headers=headers,
timeout=10
) as response:
return response.status in [200, 201, 202]
success = asyncio.run(_retry_webhook())
if success:
self.update_webhook_stats(webhook_id, True)
else:
self.update_webhook_stats(webhook_id, False, "Falha no retry")
return success
except Exception as e:
self.logger.error(f"Erro no retry do webhook {webhook_id}: {e}")
return False
def get_failed_deliveries(self, webhook_id: str) -> List[Dict]:
"""
Retorna lista de entregas falhas para um webhook
"""
key = self._get_redis_key(f"webhook_failed_{webhook_id}")
failed = self.redis.lrange(key, 0, -1)
return [json.loads(x) for x in failed]
def add_failed_delivery(self, webhook_id: str, payload: dict):
"""
Registra uma entrega falha para retry posterior
"""
key = self._get_redis_key(f"webhook_failed_{webhook_id}")
failed_delivery = {
"timestamp": datetime.now().isoformat(),
"payload": payload,
"retry_count": 0
}
self.redis.lpush(key, json.dumps(failed_delivery))
# Manter apenas as últimas 100 falhas
self.redis.ltrim(key, 0, 99)
def get_llm_provider(self) -> str:
"""Returns active LLM provider (groq or openai)"""
return self.redis.get(self._get_redis_key("active_llm_provider")) or "groq"
def set_llm_provider(self, provider: str):
"""Sets active LLM provider"""
if provider not in ["groq", "openai"]:
raise ValueError("Provider must be 'groq' or 'openai'")
self.redis.set(self._get_redis_key("active_llm_provider"), provider)
def get_openai_keys(self) -> List[str]:
"""Get stored OpenAI API keys"""
return list(self.redis.smembers(self._get_redis_key("openai_keys")))
def add_openai_key(self, key: str):
"""Add OpenAI API key"""
if key and key.startswith("sk-"):
self.redis.sadd(self._get_redis_key("openai_keys"), key)
return True
return False

49
utils.py Normal file
View File

@ -0,0 +1,49 @@
import os
import redis
import logging
logger = logging.getLogger("TranscreveZAP")
def get_redis_connection_params():
"""
Retorna os parâmetros de conexão do Redis baseado nas variáveis de ambiente.
Retira parâmetros de autenticação se não estiverem configurados.
"""
params = {
'host': os.getenv('REDIS_HOST', 'localhost'),
'port': int(os.getenv('REDIS_PORT', 6380)),
'db': int(os.getenv('REDIS_DB', '0')),
'decode_responses': True
}
# Adiciona credenciais apenas se estiverem configuradas
username = os.getenv('REDIS_USERNAME')
password = os.getenv('REDIS_PASSWORD')
if username and username.strip():
params['username'] = username
if password and password.strip():
params['password'] = password
return params
def create_redis_client():
"""
Cria e testa a conexão com o Redis.
Retorna o cliente Redis se bem sucedido.
"""
try:
params = get_redis_connection_params()
client = redis.Redis(**params)
client.ping() # Testa a conexão
logger.info("Conexão com Redis estabelecida com sucesso!")
return client
except redis.exceptions.AuthenticationError:
logger.error("Falha de autenticação no Redis. Verifique as credenciais.")
raise
except redis.exceptions.ConnectionError as e:
logger.error(f"Erro de conexão com Redis: {e}")
raise
except Exception as e:
logger.error(f"Erro ao configurar Redis: {e}")
raise