Compare commits

...

16 Commits
2.3 ... main

Author SHA1 Message Date
Fábio Cavalcanti
c42476549e ajuste audio source download 2025-02-22 08:13:38 -03:00
Fábio Cavalcanti
e89787e715 ajuste readme 2025-01-24 22:48:06 -03:00
Fábio Cavalcanti
f63dcc40d1 adicionado sistema de seleção de provedor de llm para GROQ ou Open ai 2025-01-24 22:27:02 -03:00
Fábio Cavalcanti
af20510b2b corrigido sistem de validação de chaves groq 2025-01-24 21:06:13 -03:00
Fábio Cavalcanti
a25dc9c4e7 ajuste para verificar formdata e json na chamada groq 2025-01-23 15:54:21 -03:00
Fábio Cavalcanti
a4ba9d02bc ajuste identacao 2025-01-23 15:38:49 -03:00
Fábio Cavalcanti
4c7d346a3c ajuste identacao 2025-01-23 15:27:35 -03:00
Fábio Cavalcanti
be82707ccc melhoria no sistema de chaves groq 2025-01-23 15:19:39 -03:00
Fábio Cavalcanti
3cd75903fc hub do readme 2025-01-18 13:58:46 -03:00
Fábio Cavalcanti
d43f62c316 implementação de hub de redirecionamentos de webhooks 2025-01-18 13:53:27 -03:00
Fábio Cavalcanti
0745132b98 ajuste da conexão com redis para contemplar usuarios que usam senha no redis ou redis cloud 2025-01-17 19:06:53 -03:00
Fábio Cavalcanti
0b5ad96508 ajuste no storage para o db redis 2025-01-17 17:57:35 -03:00
Fábio Cavalcanti
22217802a2 ajuste de stack docker 2025-01-16 17:51:13 -03:00
Fábio Cavalcanti
facfcb4559 ajuste de docker compose referencia 2025-01-16 17:08:26 -03:00
Fábio Cavalcanti
7d8c91fbc9 definição do banco do redis opcional na variavel de ambiente do docker compose 2025-01-16 17:00:09 -03:00
Fábio Cavalcanti
6fd1ec33e9 ajuste readme 2025-01-10 10:59:37 -03:00
14 changed files with 1287 additions and 275 deletions

View File

@ -1,12 +1,54 @@
#-----------------------------------------------
# Configurações do Servidor
#-----------------------------------------------
# Configurações do UVICORN
UVICORN_PORT=8005
UVICORN_HOST=0.0.0.0
UVICORN_RELOAD=true
UVICORN_WORKERS=1
# Domínios da Aplicação
API_DOMAIN=seu.dominio.com # Subdomínio para a API (ex: api.seudominio.com)
MANAGER_DOMAIN=manager.seu.dominio.com # Subdomínio para o Manager (ex: manager.seudominio.com)
# Debug e Logs # Debug e Logs
DEBUG_MODE=false DEBUG_MODE=false
LOG_LEVEL=INFO LOG_LEVEL=INFO
# Credenciais do Gerenciador #-----------------------------------------------
MANAGER_USER=admin # Credenciais de Acesso
MANAGER_PASSWORD=impacteai2024 #-----------------------------------------------
# Credenciais do Painel Administrativo
MANAGER_USER=seu_usuario_admin # Username para acessar o painel admin
MANAGER_PASSWORD=sua_senha_segura # Senha para acessar o painel admin
# Configurações do Servidor #-----------------------------------------------
FASTAPI_PORT=8005 # Configurações do Redis
STREAMLIT_PORT=8501 #-----------------------------------------------
HOST=0.0.0.0 # Configurações Básicas
REDIS_HOST=redis-transcrevezap # Host do Redis (use redis-transcrevezap para docker-compose)
REDIS_PORT=6380 # Porta do Redis
REDIS_DB=0 # Número do banco de dados Redis
# Autenticação Redis (opcional)
REDIS_USERNAME= # Deixe em branco se não usar autenticação
REDIS_PASSWORD= # Deixe em branco se não usar autenticação
#-----------------------------------------------
# Configurações de Rede
#-----------------------------------------------
# Nome da Rede Docker Externa
NETWORK_NAME=sua_rede_externa # Nome da sua rede Docker externa
#-----------------------------------------------
# Configurações do Traefik (se estiver usando)
#-----------------------------------------------
# Certificados SSL
SSL_RESOLVER=letsencryptresolver # Resolvedor SSL do Traefik
SSL_ENTRYPOINT=websecure # Entrypoint SSL do Traefik
#-----------------------------------------------
# Portas da Aplicação
#-----------------------------------------------
API_PORT=8005 # Porta para a API FastAPI
MANAGER_PORT=8501 # Porta para o Streamlit Manager

3
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,3 @@
{
"python.pythonPath": "d:\\Estudando CODE\\ESTUDOS PYTHON\\transcreve-audio-exemplo\\.venv\\Scripts\\python.exe"
}

View File

@ -1,41 +1,46 @@
# Usar uma imagem oficial do Python como base # Imagem base do Python 3.10-slim
FROM python:3.10-slim FROM python:3.10-slim
# Instalar dependências do sistema, incluindo redis-tools e tzdata para fuso horário # Configuração básica de timezone
ENV TZ=America/Sao_Paulo
# Instalação de dependências mínimas necessárias
RUN apt-get update && apt-get install -y --no-install-recommends \ RUN apt-get update && apt-get install -y --no-install-recommends \
redis-tools \ redis-tools \
tzdata \ tzdata \
&& apt-get clean && rm -rf /var/lib/apt/lists/* dos2unix \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/* \
&& ln -snf /usr/share/zoneinfo/$TZ /etc/localtime \
&& echo $TZ > /etc/timezone
# Configurar o fuso horário # Configuração do ambiente de trabalho
ENV TZ=America/Sao_Paulo
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
# Definir o diretório de trabalho
WORKDIR /app WORKDIR /app
# Copiar o arquivo requirements.txt e instalar dependências # Instalação das dependências Python
COPY requirements.txt . COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt RUN pip install --no-cache-dir -r requirements.txt
# Copiar todo o código da aplicação # Copia dos arquivos da aplicação
COPY . . COPY . .
# Garantir que o diretório static existe # Preparação do diretório de estáticos
RUN mkdir -p /app/static RUN mkdir -p /app/static && \
if [ -d "static" ]; then cp -r static/* /app/static/ 2>/dev/null || true; fi
# Copiar arquivos estáticos para o diretório apropriado # Configuração do script de inicialização
COPY static/ /app/static/ RUN chmod +x start.sh && \
dos2unix start.sh && \
apt-get purge -y dos2unix && \
apt-get autoremove -y
# Garantir permissões de execução ao script inicial # Portas da aplicação
COPY start.sh .
RUN chmod +x start.sh
# Converter possíveis caracteres de retorno de carro do Windows
RUN apt-get update && apt-get install -y dos2unix && dos2unix start.sh && apt-get remove -y dos2unix && apt-get autoremove -y && apt-get clean
# Expor as portas usadas pela aplicação
EXPOSE 8005 8501 EXPOSE 8005 8501
# Definir o comando inicial # Valores padrão para Redis
ENV REDIS_HOST=redis-transcrevezap \
REDIS_PORT=6380 \
REDIS_DB=0
# Comando de inicialização
CMD ["/bin/bash", "/app/start.sh"] CMD ["/bin/bash", "/app/start.sh"]

View File

@ -1,6 +1,7 @@
import logging import logging
import redis import redis
import os import os
from utils import create_redis_client
# Configuração de logging com cores e formatação melhorada # Configuração de logging com cores e formatação melhorada
class ColoredFormatter(logging.Formatter): class ColoredFormatter(logging.Formatter):
@ -30,12 +31,7 @@ logger.addHandler(handler)
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
# Conexão com o Redis # Conexão com o Redis
redis_client = redis.Redis( redis_client = create_redis_client()
host=os.getenv('REDIS_HOST', 'localhost'),
port=int(os.getenv('REDIS_PORT', 6380)),
db=0,
decode_responses=True
)
class Settings: class Settings:
"""Classe para gerenciar configurações do sistema.""" """Classe para gerenciar configurações do sistema."""
@ -43,6 +39,8 @@ class Settings:
"""Inicializa as configurações.""" """Inicializa as configurações."""
logger.debug("Carregando configurações do Redis...") logger.debug("Carregando configurações do Redis...")
self.ACTIVE_LLM_PROVIDER = self.get_redis_value("ACTIVE_LLM_PROVIDER", "groq")
self.OPENAI_API_KEY = self.get_redis_value("OPENAI_API_KEY", "")
self.GROQ_API_KEY = self.get_redis_value("GROQ_API_KEY", "gsk_default_key") self.GROQ_API_KEY = self.get_redis_value("GROQ_API_KEY", "gsk_default_key")
self.BUSINESS_MESSAGE = self.get_redis_value("BUSINESS_MESSAGE", "*Impacte AI* Premium Services") self.BUSINESS_MESSAGE = self.get_redis_value("BUSINESS_MESSAGE", "*Impacte AI* Premium Services")
self.PROCESS_GROUP_MESSAGES = self.get_redis_value("PROCESS_GROUP_MESSAGES", "false").lower() == "true" self.PROCESS_GROUP_MESSAGES = self.get_redis_value("PROCESS_GROUP_MESSAGES", "false").lower() == "true"

View File

@ -2,7 +2,7 @@ version: "3.7"
services: services:
tcaudio: tcaudio:
image: impacteai/transcrevezap:latest image: impacteai/transcrevezap:dev
networks: networks:
- sua_rede_externa # Substitua pelo nome da sua rede externa - sua_rede_externa # Substitua pelo nome da sua rede externa
ports: ports:
@ -20,6 +20,10 @@ services:
- MANAGER_PASSWORD=sua_senha_segura # Defina Senha do Manager - MANAGER_PASSWORD=sua_senha_segura # Defina Senha do Manager
- REDIS_HOST=redis-transcrevezap - REDIS_HOST=redis-transcrevezap
- REDIS_PORT=6380 # Porta personalizada para o Redis do TranscreveZAP - REDIS_PORT=6380 # Porta personalizada para o Redis do TranscreveZAP
- REDIS_DB=0 # Opcional: pode ser removida para usar o valor padrão
# Autenticação Redis (opcional - descomente se necessário, se estiver usando autenticação)
# - REDIS_USERNAME=${REDIS_USERNAME:-} # Nome do usuário definido no comando do Redis
# - REDIS_PASSWORD=${REDIS_PASSWORD:-} # Senha definida no comando do Redis (sem o '>')
depends_on: depends_on:
- redis-transcrevezap - redis-transcrevezap
deploy: deploy:
@ -48,11 +52,30 @@ services:
redis-transcrevezap: redis-transcrevezap:
image: redis:6 image: redis:6
# 1. Configuração SEM autenticação (padrão):
command: redis-server --port 6380 --appendonly yes command: redis-server --port 6380 --appendonly yes
# 2. Configuração COM autenticação (descomente e ajuste se necessário):
# command: >
# redis-server
# --port 6380
# --appendonly yes
# --user seuusuario on '>minhasenha' '~*' '+@all'
# # Explicação dos parâmetros:
# # --user seuusuario: nome do usuário
# # on: indica início da configuração do usuário
# # '>minhasenha': senha do usuário (mantenha o '>')
# # '~*': permite acesso a todas as chaves
# # '+@all': concede todas as permissões
volumes: volumes:
- redis_transcrevezap_data:/data - redis_transcrevezap_data:/data
networks: networks:
- sua_rede_externa # Substitua pelo nome da sua rede externa - sua_rede_externa # Substitua pelo nome da sua rede externa
deploy:
mode: replicated
replicas: 1
placement:
constraints:
- node.role == manager
networks: networks:
sua_rede_externa: # Substitua pelo nome da sua rede externa sua_rede_externa: # Substitua pelo nome da sua rede externa

111
groq_handler.py Normal file
View File

@ -0,0 +1,111 @@
import aiohttp
import json
from typing import Optional, Tuple, Any
from datetime import datetime
import logging
from storage import StorageHandler
import asyncio
logger = logging.getLogger("GROQHandler")
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
async def test_groq_key(key: str) -> bool:
"""Teste se uma chave GROQ é válida e está funcionando."""
url = "https://api.groq.com/openai/v1/models"
headers = {"Authorization": f"Bearer {key}"}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
if response.status == 200:
data = await response.json()
return bool(data.get("data"))
return False
except Exception as e:
logger.error(f"Erro ao testar chave GROQ: {e}")
return False
async def validate_transcription_response(response_text: str) -> bool:
"""Valide se a resposta da transcrição é significativa."""
try:
cleaned_text = response_text.strip()
return len(cleaned_text) >= 10
except Exception as e:
logger.error(f"Erro ao validar resposta da transcrição: {e}")
return False
async def get_working_groq_key(storage: StorageHandler) -> Optional[str]:
"""Obtenha uma chave GROQ funcional do pool disponível."""
keys = storage.get_groq_keys()
for _ in range(len(keys)):
key = storage.get_next_groq_key()
if not key:
continue
penalized_until = storage.get_penalized_until(key)
if penalized_until and penalized_until > datetime.utcnow():
continue
if await test_groq_key(key):
return key
else:
storage.penalize_key(key, penalty_duration=300)
storage.add_log("ERROR", "Nenhuma chave GROQ funcional disponível.")
return None
async def handle_groq_request(
url: str,
headers: dict,
data: Any,
storage: StorageHandler,
is_form_data: bool = False
) -> Tuple[bool, dict, str]:
"""Lida com requisições para a API GROQ com suporte a retries e rotação de chaves."""
max_retries = len(storage.get_groq_keys())
for attempt in range(max_retries):
try:
storage.add_log("DEBUG", "Iniciando tentativa de requisição para GROQ", {
"url": url,
"is_form_data": is_form_data,
"attempt": attempt + 1
})
async with aiohttp.ClientSession() as session:
if is_form_data:
async with session.post(url, headers=headers, data=data) as response:
response_data = await response.json()
if response.status == 200 and response_data.get("text"):
return True, response_data, ""
else:
async with session.post(url, headers=headers, json=data) as response:
response_data = await response.json()
if response.status == 200 and response_data.get("choices"):
return True, response_data, ""
error_msg = response_data.get("error", {}).get("message", "")
if "organization_restricted" in error_msg or "invalid_api_key" in error_msg:
new_key = await get_working_groq_key(storage)
if new_key:
headers["Authorization"] = f"Bearer {new_key}"
await asyncio.sleep(1)
continue
return False, response_data, error_msg
except Exception as e:
storage.add_log("ERROR", "Erro na requisição", {"error": str(e)})
if attempt < max_retries - 1:
await asyncio.sleep(1)
continue
return False, {}, f"Request failed: {str(e)}"
storage.add_log("ERROR", "Todas as chaves GROQ falharam.")
return False, {}, "All GROQ keys exhausted."

57
main.py
View File

@ -5,12 +5,15 @@ from services import (
send_message_to_whatsapp, send_message_to_whatsapp,
get_audio_base64, get_audio_base64,
summarize_text_if_needed, summarize_text_if_needed,
download_remote_audio,
) )
from models import WebhookRequest from models import WebhookRequest
from config import logger, settings, redis_client from config import logger, settings, redis_client
from storage import StorageHandler from storage import StorageHandler
import traceback import traceback
import os import os
import asyncio
import aiohttp
app = FastAPI() app = FastAPI()
storage = StorageHandler() storage = StorageHandler()
@ -40,12 +43,53 @@ def load_dynamic_settings():
"DEBUG_MODE": get_config("DEBUG_MODE", "false") == "true", "DEBUG_MODE": get_config("DEBUG_MODE", "false") == "true",
} }
async def forward_to_webhooks(body: dict, storage: StorageHandler):
"""Encaminha o payload para todos os webhooks cadastrados."""
webhooks = storage.get_webhook_redirects()
async with aiohttp.ClientSession() as session:
for webhook in webhooks:
try:
# Configura os headers mantendo o payload intacto
headers = {
"Content-Type": "application/json",
"X-TranscreveZAP-Forward": "true", # Header para identificação da origem
"X-TranscreveZAP-Webhook-ID": webhook["id"]
}
async with session.post(
webhook["url"],
json=body, # Envia o payload original sem modificações
headers=headers,
timeout=10
) as response:
if response.status in [200, 201, 202]:
storage.update_webhook_stats(webhook["id"], True)
else:
error_text = await response.text()
storage.update_webhook_stats(
webhook["id"],
False,
f"Status {response.status}: {error_text}"
)
# Registra falha para retry posterior
storage.add_failed_delivery(webhook["id"], body)
except Exception as e:
storage.update_webhook_stats(
webhook["id"],
False,
f"Erro ao encaminhar: {str(e)}"
)
# Registra falha para retry posterior
storage.add_failed_delivery(webhook["id"], body)
@app.post("/transcreve-audios") @app.post("/transcreve-audios")
async def transcreve_audios(request: Request): async def transcreve_audios(request: Request):
try: try:
body = await request.json() body = await request.json()
dynamic_settings = load_dynamic_settings() dynamic_settings = load_dynamic_settings()
# Iniciar o encaminhamento em background
asyncio.create_task(forward_to_webhooks(body, storage))
# Log inicial da requisição # Log inicial da requisição
storage.add_log("INFO", "Nova requisição de transcrição recebida", { storage.add_log("INFO", "Nova requisição de transcrição recebida", {
"instance": body.get("instance"), "instance": body.get("instance"),
@ -108,17 +152,14 @@ async def transcreve_audios(request: Request):
# Obter áudio # Obter áudio
try: try:
if "mediaUrl" in body["data"]["message"]: if "mediaUrl" in body["data"]["message"]:
audio_source = body["data"]["message"]["mediaUrl"] media_url = body["data"]["message"]["mediaUrl"]
storage.add_log("DEBUG", "Usando mediaUrl para áudio", { storage.add_log("DEBUG", "Baixando áudio via URL", {"mediaUrl": media_url})
"mediaUrl": audio_source audio_source = await download_remote_audio(media_url) # Baixa o arquivo remoto e retorna o caminho local
})
else: else:
storage.add_log("DEBUG", "Obtendo áudio via base64") storage.add_log("DEBUG", "Obtendo áudio via base64")
base64_audio = await get_audio_base64(server_url, instance, apikey, audio_key) base64_audio = await get_audio_base64(server_url, instance, apikey, audio_key)
audio_source = await convert_base64_to_file(base64_audio) audio_source = await convert_base64_to_file(base64_audio)
storage.add_log("DEBUG", "Áudio convertido", { storage.add_log("DEBUG", "Áudio convertido", {"source": audio_source})
"source": audio_source
})
# Carregar configurações de formatação # Carregar configurações de formatação
output_mode = get_config("output_mode", "both") output_mode = get_config("output_mode", "both")

View File

@ -6,6 +6,7 @@ from storage import StorageHandler
import plotly.express as px import plotly.express as px
import os import os
import redis import redis
from utils import create_redis_client
# 1. Primeiro: Configuração da página # 1. Primeiro: Configuração da página
st.set_page_config( st.set_page_config(
@ -16,11 +17,7 @@ st.set_page_config(
) )
# 2. Depois: Inicialização do Redis # 2. Depois: Inicialização do Redis
redis_client = redis.Redis( redis_client = create_redis_client()
host=os.getenv('REDIS_HOST', 'localhost'),
port=int(os.getenv('REDIS_PORT', 6380)),
decode_responses=True
)
# 3. Funções de sessão (atualizado para usar st.query_params) # 3. Funções de sessão (atualizado para usar st.query_params)
def init_session(): def init_session():
@ -255,7 +252,7 @@ def login_page():
# Modificar a função de logout no dashboard # Modificar a função de logout no dashboard
def dashboard(): def dashboard():
# Versão do sistema # Versão do sistema
APP_VERSION = "2.3" APP_VERSION = "2.3.3"
show_logo() show_logo()
st.sidebar.markdown('<div class="sidebar-header">TranscreveZAP - Menu</div>', unsafe_allow_html=True) st.sidebar.markdown('<div class="sidebar-header">TranscreveZAP - Menu</div>', unsafe_allow_html=True)
@ -268,7 +265,7 @@ def dashboard():
page = st.sidebar.radio( page = st.sidebar.radio(
"Navegação", "Navegação",
["📊 Painel de Controle", "👥 Gerenciar Grupos", "🚫 Gerenciar Bloqueios", "⚙️ Configurações"] ["📊 Painel de Controle", "👥 Gerenciar Grupos", "🔄 Hub de Redirecionamento", "🚫 Gerenciar Bloqueios", "⚙️ Configurações"]
) )
# Seção de logout com confirmação # Seção de logout com confirmação
@ -303,6 +300,8 @@ def dashboard():
show_statistics() show_statistics()
elif page == "👥 Gerenciar Grupos": elif page == "👥 Gerenciar Grupos":
manage_groups() manage_groups()
elif page == "🔄 Hub de Redirecionamento":
manage_webhooks()
elif page == "🚫 Gerenciar Bloqueios": elif page == "🚫 Gerenciar Bloqueios":
manage_blocks() manage_blocks()
elif page == "⚙️ Configurações": elif page == "⚙️ Configurações":
@ -428,6 +427,165 @@ def manage_groups():
else: else:
st.info("Nenhum grupo permitido.") st.info("Nenhum grupo permitido.")
def manage_webhooks():
st.title("🔄 Hub de Redirecionamento")
st.markdown("""
Configure aqui os webhooks para onde você deseja redirecionar as mensagens recebidas.
Cada webhook receberá uma cópia exata do payload original da Evolution API.
""")
# Adicionar novo webhook
st.subheader("Adicionar Novo Webhook")
with st.form("add_webhook"):
col1, col2 = st.columns([3, 1])
with col1:
webhook_url = st.text_input(
"URL do Webhook",
placeholder="https://seu-sistema.com/webhook"
)
with col2:
if st.form_submit_button("🔍 Testar Conexão"):
if webhook_url:
with st.spinner("Testando webhook..."):
success, message = storage.test_webhook(webhook_url)
if success:
st.success(message)
else:
st.error(message)
else:
st.warning("Por favor, insira uma URL válida")
webhook_description = st.text_input(
"Descrição",
placeholder="Ex: URL de Webhook do N8N, Sistema de CRM, etc."
)
if st.form_submit_button("Adicionar Webhook"):
if webhook_url:
try:
# Testar antes de adicionar
success, message = storage.test_webhook(webhook_url)
if success:
storage.add_webhook_redirect(webhook_url, webhook_description)
st.success("✅ Webhook testado e adicionado com sucesso!")
st.experimental_rerun()
else:
st.error(f"Erro ao adicionar webhook: {message}")
except Exception as e:
st.error(f"Erro ao adicionar webhook: {str(e)}")
else:
st.warning("Por favor, insira uma URL válida")
# Listar webhooks existentes
st.subheader("Webhooks Configurados")
webhooks = storage.get_webhook_redirects()
if not webhooks:
st.info("Nenhum webhook configurado ainda.")
return
for webhook in webhooks:
# Obter métricas de saúde
health = storage.get_webhook_health(webhook["id"])
# Definir cor baseada no status
status_colors = {
"healthy": "🟢",
"warning": "🟡",
"critical": "🔴",
"unknown": ""
}
status_icon = status_colors.get(health["health_status"], "")
with st.expander(
f"{status_icon} {webhook['description'] or webhook['url']}",
expanded=True
):
col1, col2 = st.columns([3, 1])
with col1:
st.text_input(
"URL",
value=webhook["url"],
key=f"url_{webhook['id']}",
disabled=True
)
if webhook["description"]:
st.text_input(
"Descrição",
value=webhook["description"],
key=f"desc_{webhook['id']}",
disabled=True
)
with col2:
# Métricas de saúde
st.metric(
"Taxa de Sucesso",
f"{health['success_rate']:.1f}%"
)
# Alertas baseados na saúde
if health["health_status"] == "critical":
st.error("⚠️ Taxa de erro crítica!")
elif health["health_status"] == "warning":
st.warning("⚠️ Taxa de erro elevada")
# Botões de ação
col1, col2 = st.columns(2)
with col1:
if st.button("🔄 Retry", key=f"retry_{webhook['id']}"):
failed_deliveries = storage.get_failed_deliveries(webhook["id"])
if failed_deliveries:
with st.spinner("Reenviando mensagens..."):
success_count = 0
for delivery in failed_deliveries:
if storage.retry_webhook(webhook["id"], delivery["payload"]):
success_count += 1
st.success(f"Reenviadas {success_count} de {len(failed_deliveries)} mensagens!")
else:
st.info("Não há mensagens pendentes para reenvio")
with col2:
if st.button("🗑️", key=f"remove_{webhook['id']}", help="Remover webhook"):
if st.session_state.get(f"confirm_remove_{webhook['id']}", False):
storage.remove_webhook_redirect(webhook["id"])
st.success("Webhook removido!")
st.experimental_rerun()
else:
st.session_state[f"confirm_remove_{webhook['id']}"] = True
st.warning("Clique novamente para confirmar")
# Estatísticas detalhadas
st.markdown("### Estatísticas")
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Total de Sucessos", webhook["success_count"])
with col2:
st.metric("Total de Erros", webhook["error_count"])
with col3:
last_success = webhook.get("last_success")
if last_success:
last_success = datetime.fromisoformat(last_success).strftime("%d/%m/%Y %H:%M")
st.metric("Último Sucesso", last_success or "Nunca")
# Exibir último erro (se houver)
if webhook.get("last_error"):
st.error(
f"Último erro: {webhook['last_error']['message']} "
f"({datetime.fromisoformat(webhook['last_error']['timestamp']).strftime('%d/%m/%Y %H:%M')})"
)
# Lista de entregas falhas
failed_deliveries = storage.get_failed_deliveries(webhook["id"])
if failed_deliveries:
st.markdown("### Entregas Pendentes")
st.warning(f"{len(failed_deliveries)} mensagens aguardando reenvio")
if st.button("📋 Ver Detalhes", key=f"details_{webhook['id']}"):
for delivery in failed_deliveries:
st.code(json.dumps(delivery, indent=2))
def manage_blocks(): def manage_blocks():
st.title("🚫 Gerenciar Bloqueios") st.title("🚫 Gerenciar Bloqueios")
st.subheader("Bloquear Usuário") st.subheader("Bloquear Usuário")
@ -570,8 +728,9 @@ def manage_settings():
st.title("⚙️ Configurações") st.title("⚙️ Configurações")
# Criar tabs para melhor organização # Criar tabs para melhor organização
tab1, tab2, tab3, tab4 = st.tabs([ tab1, tab2, tab3, tab4, tab5 = st.tabs([
"🔑 Chaves API", "🔑 Chaves API",
"🤖 Provedor LLM",
"🌐 Configurações Gerais", "🌐 Configurações Gerais",
"📝 Formatação de Mensagens", "📝 Formatação de Mensagens",
"🗣️ Idiomas e Transcrição" "🗣️ Idiomas e Transcrição"
@ -629,6 +788,46 @@ def manage_settings():
pass pass
with tab2: with tab2:
st.subheader("Configuração do Provedor LLM")
# Select provider
current_provider = storage.get_llm_provider()
provider = st.selectbox(
"Provedor de Serviço",
options=["groq", "openai"],
format_func=lambda x: "Groq (Open Source)" if x == "groq" else "OpenAI (API Paga)",
index=0 if current_provider == "groq" else 1
)
if provider == "openai":
st.info("""
A OpenAI é um serviço pago que requer uma chave API válida.
Obtenha sua chave em https://platform.openai.com
""")
# OpenAI Key Management
openai_key = st.text_input(
"OpenAI API Key",
type="password",
help="Chave que começa com 'sk-'"
)
if st.button("Adicionar Chave OpenAI"):
if openai_key and openai_key.startswith("sk-"):
storage.add_openai_key(openai_key)
st.success("✅ Chave OpenAI adicionada com sucesso!")
else:
st.error("Chave inválida! Deve começar com 'sk-'")
# Save provider selection
if st.button("💾 Salvar Configuração do Provedor"):
try:
storage.set_llm_provider(provider)
st.success(f"Provedor alterado para: {provider}")
except Exception as e:
st.error(f"Erro ao salvar provedor: {str(e)}")
with tab3:
st.subheader("Configurações do Sistema") st.subheader("Configurações do Sistema")
# Business Message # Business Message
@ -692,7 +891,7 @@ def manage_settings():
) )
pass pass
with tab3: with tab4:
st.subheader("Formatação de Mensagens") st.subheader("Formatação de Mensagens")
# Headers personalizados # Headers personalizados
@ -777,7 +976,7 @@ def manage_settings():
st.error(f"Erro ao salvar configurações: {str(e)}") st.error(f"Erro ao salvar configurações: {str(e)}")
with tab4: with tab5:
st.subheader("Idiomas e Transcrição") st.subheader("Idiomas e Transcrição")
# Adicionar estatísticas no topo # Adicionar estatísticas no topo

74
openai_handler.py Normal file
View File

@ -0,0 +1,74 @@
import aiohttp
import json
from datetime import datetime
import logging
from storage import StorageHandler
logger = logging.getLogger("OpenAIHandler")
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
async def test_openai_key(key: str) -> bool:
"""Test if an OpenAI key is valid and working."""
url = "https://api.openai.com/v1/models"
headers = {"Authorization": f"Bearer {key}"}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
if response.status == 200:
data = await response.json()
return len(data.get("data", [])) > 0
return False
except Exception as e:
logger.error(f"Error testing OpenAI key: {e}")
return False
async def handle_openai_request(
url: str,
headers: dict,
data: any,
storage: StorageHandler,
is_form_data: bool = False
) -> tuple[bool, dict, str]:
"""Handle requests to OpenAI API with retries."""
max_retries = 3
for attempt in range(max_retries):
try:
async with aiohttp.ClientSession() as session:
if is_form_data:
async with session.post(url, headers=headers, data=data) as response:
response_data = await response.json()
if response.status == 200:
if is_form_data and response_data.get("text"):
return True, response_data, ""
elif not is_form_data and response_data.get("choices"):
return True, response_data, ""
else:
async with session.post(url, headers=headers, json=data) as response:
response_data = await response.json()
if response.status == 200 and response_data.get("choices"):
return True, response_data, ""
error_msg = response_data.get("error", {}).get("message", "")
if "invalid_api_key" in error_msg or "invalid authorization" in error_msg.lower():
logger.error(f"OpenAI API key invalid or expired")
return False, response_data, error_msg
if attempt < max_retries - 1:
continue
return False, response_data, error_msg
except Exception as e:
logger.error(f"Error in request: {str(e)}")
if attempt < max_retries - 1:
continue
return False, {}, f"Request failed: {str(e)}"
return False, {}, "All retries failed"

115
readme.md
View File

@ -11,10 +11,12 @@ Uma solução completa para automatizar e gerenciar mensagens de áudio no Whats
- Transcrição automática multilíngue - Transcrição automática multilíngue
- Resumos inteligentes de áudios - Resumos inteligentes de áudios
- Detecção e tradução automática entre idiomas - Detecção e tradução automática entre idiomas
- Seleção de plataforma LLM (GROQ ou OpenAI)
- Interface administrativa completa - Interface administrativa completa
- Sistema de rodízio de chaves API - Sistema de rodízio de chaves API
- Gestão avançada de grupos e usuários - Gestão avançada de grupos e usuários
- Personalização de formatação e saída - Personalização de formatação e saída
- Sistema de Redirecionamento de Webhooks
Contato de email: contato@impacte.ai Contato de email: contato@impacte.ai
([ACESSE NOSSO SITE](https://impacte.ai/)) ([ACESSE NOSSO SITE](https://impacte.ai/))
@ -27,60 +29,97 @@ Antes de começar, certifique-se de ter os seguintes requisitos:
- Python 3.10+ instalado ([Download](https://www.python.org/downloads/)) - Python 3.10+ instalado ([Download](https://www.python.org/downloads/))
- Docker e Docker Compose instalados ([Instruções](https://docs.docker.com/get-docker/)) - Docker e Docker Compose instalados ([Instruções](https://docs.docker.com/get-docker/))
- Uma conta Evolution API com chave válida - Uma conta Evolution API com chave válida
- No mínimo uma conta GROQ API com chave válida (começa com 'gsk_') ([Crie sua CONTA](https://console.groq.com/login)) - Chaves GROQ (começa com `gsk_`) e/ou chaves OpenAI (começa com `sk-`) configuradas ([Crie sua conta GROQ](https://console.groq.com/login))
* Em caso de uso com Proxy Reverso Aponte um Subdomínio para a API e outro para o MANAGER da aplicação * Em caso de uso com Proxy Reverso Aponte um Subdomínio para a API e outro para o MANAGER da aplicação
--- ---
## 🚀 **Novidade: Escolha do Provedor LLM**
Agora você pode escolher entre dois provedores para transcrições e resumos:
1. **GROQ** (open-source): Configuração padrão.
2. **OpenAI** (API paga): Integração com modelos GPT.
### Configuração:
- Acesse: **Configurações > Provedor LLM** na interface administrativa.
- Escolha entre `groq` e `openai`.
- Adicione as chaves correspondentes para cada provedor.
---
## 🚀 **Instalação e Configuração** ## 🚀 **Instalação e Configuração**
### 🐳 Docker Compose ### 🐳 Docker Compose
1. Clone o repositório: 1. Configure o arquivo docker-compose.yaml:
```bash
git clone https://github.com/seu-usuario/transcrevezap.git
cd transcrevezap
```
2. Configure o arquivo docker-compose.yaml:
```yaml ```yaml
version: "3.7" version: "3.7"
services: services:
# Serviço principal do TranscreveZAP
tcaudio: tcaudio:
image: impacteai/transcrevezap:latest image: impacteai/transcrevezap:latest
build: build:
context: . context: .
ports: ports:
- 8005:8005 # Porta para FastAPI - "8005:8005" # API FastAPI - Use esta porta para configurar o webhook
- 8501:8501 # Porta para Streamlit - "8501:8501" # Interface Web Streamlit - Acesse o painel por esta porta
environment: environment:
# Configurações do Servidor
- UVICORN_PORT=8005 - UVICORN_PORT=8005
- UVICORN_HOST=0.0.0.0 - UVICORN_HOST=0.0.0.0
- UVICORN_RELOAD=true - UVICORN_RELOAD=true
- UVICORN_WORKERS=1 - UVICORN_WORKERS=1
- API_DOMAIN=localhost - API_DOMAIN=localhost # Para uso local mantenha localhost
# Modo Debug e Logs
- DEBUG_MODE=false - DEBUG_MODE=false
- LOG_LEVEL=INFO - LOG_LEVEL=INFO
# Credenciais do Painel Admin (ALTERE ESTAS CREDENCIAIS!)
- MANAGER_USER=admin - MANAGER_USER=admin
- MANAGER_PASSWORD=sua_senha_aqui - MANAGER_PASSWORD=sua_senha_aqui
- REDIS_HOST=redis-transcrevezap
- REDIS_PORT=6380 # Porta personalizada para o Redis do TranscreveZAP # Configurações do Redis
- REDIS_HOST=redis-transcrevezap # Nome do serviço Redis
- REDIS_PORT=6380 # Porta do Redis
- REDIS_DB=0 # Banco de dados Redis
# Autenticação Redis (opcional - descomente se necessário)
# - REDIS_USERNAME=seu_usuario # Nome do usuário Redis
# - REDIS_PASSWORD=sua_senha # Senha do Redis
depends_on: depends_on:
- redis-transcrevezap - redis-transcrevezap
command: ./start.sh command: ./start.sh
# Serviço Redis para armazenamento de dados
redis-transcrevezap: redis-transcrevezap:
image: redis:6 image: redis:6
command: redis-server --port 6380 --appendonly yes # Escolha UMA das configurações do Redis abaixo:
volumes:
- redis_transcrevezap_data:/data
# 1. Configuração simples SEM autenticação:
command: redis-server --port 6380 --appendonly yes
# 2. Configuração COM autenticação (descomente e ajuste se necessário):
# command: >
# redis-server
# --port 6380
# --appendonly yes
# --user admin on '>sua_senha' '~*' '+@all'
volumes:
- redis_transcrevezap_data:/data # Persistência dos dados
# Volumes para persistência
volumes: volumes:
redis_transcrevezap_data: redis_transcrevezap_data:
driver: local driver: local
# Instruções de Uso:
# 1. Salve este arquivo como docker-compose.yml
# 2. Execute com: docker compose up -d
# 3. Acesse o painel em: http://localhost:8501
# 4. Configure o webhook da Evolution API para: http://localhost:8005/transcreve-audios
``` ```
3. Inicie os serviços: 2. Inicie os serviços:
```bash ```bash
docker-compose up -d docker-compose up -d
``` ```
@ -157,7 +196,7 @@ version: "3.7"
services: services:
tcaudio: tcaudio:
image: impacteai/transcrevezap:latest image: impacteai/transcrevezap:dev
networks: networks:
- sua_rede_externa # Substitua pelo nome da sua rede externa - sua_rede_externa # Substitua pelo nome da sua rede externa
ports: ports:
@ -175,6 +214,10 @@ services:
- MANAGER_PASSWORD=sua_senha_segura # Defina Senha do Manager - MANAGER_PASSWORD=sua_senha_segura # Defina Senha do Manager
- REDIS_HOST=redis-transcrevezap - REDIS_HOST=redis-transcrevezap
- REDIS_PORT=6380 # Porta personalizada para o Redis do TranscreveZAP - REDIS_PORT=6380 # Porta personalizada para o Redis do TranscreveZAP
- REDIS_DB=0 # Opcional: pode ser removida para usar o valor padrão
# Autenticação Redis (opcional - descomente se necessário, se estiver usando autenticação)
# - REDIS_USERNAME=${REDIS_USERNAME:-} # Nome do usuário definido no comando do Redis
# - REDIS_PASSWORD=${REDIS_PASSWORD:-} # Senha definida no comando do Redis (sem o '>')
depends_on: depends_on:
- redis-transcrevezap - redis-transcrevezap
deploy: deploy:
@ -203,11 +246,30 @@ services:
redis-transcrevezap: redis-transcrevezap:
image: redis:6 image: redis:6
# 1. Configuração SEM autenticação (padrão):
command: redis-server --port 6380 --appendonly yes command: redis-server --port 6380 --appendonly yes
# 2. Configuração COM autenticação (descomente e ajuste se necessário):
# command: >
# redis-server
# --port 6380
# --appendonly yes
# --user seuusuario on '>minhasenha' '~*' '+@all'
# # Explicação dos parâmetros:
# # --user seuusuario: nome do usuário
# # on: indica início da configuração do usuário
# # '>minhasenha': senha do usuário (mantenha o '>')
# # '~*': permite acesso a todas as chaves
# # '+@all': concede todas as permissões
volumes: volumes:
- redis_transcrevezap_data:/data - redis_transcrevezap_data:/data
networks: networks:
- sua_rede_externa # Substitua pelo nome da sua rede externa - sua_rede_externa # Substitua pelo nome da sua rede externa
deploy:
mode: replicated
replicas: 1
placement:
constraints:
- node.role == manager
networks: networks:
sua_rede_externa: # Substitua pelo nome da sua rede externa sua_rede_externa: # Substitua pelo nome da sua rede externa
@ -240,6 +302,25 @@ Para usar com Traefik, certifique-se de:
- Em produção, recomenda-se DEBUG_MODE=false - Em produção, recomenda-se DEBUG_MODE=false
- Configure LOG_LEVEL=DEBUG apenas para troubleshooting - Configure LOG_LEVEL=DEBUG apenas para troubleshooting
## 🚀 Novo Recurso v2.3.1: Hub de Redirecionamento
O TranscreveZAP agora oferece um sistema robusto para redirecionamento de mensagens, permitindo que você encaminhe os webhooks da Evolution API para múltiplos destinos simultaneamente.
### Principais Recursos
- Interface dedicada para gerenciamento de webhooks
- Redirecionamento sem alteração do payload original
- Monitoramento de saúde dos webhooks em tempo real
- Sistema de retry automático para reenvio de mensagens falhas
- Headers de rastreamento para identificação de origem (`X-TranscreveZAP-Forward`)
- Suporte a descrições personalizadas para cada webhook
- Limpeza automática de dados ao remover webhooks
### Compatibilidade
- Mantém o payload da Evolution API intacto
- Suporta múltiplos endpoints simultaneamente
- Compatível com qualquer sistema que aceite webhooks via POST
- Preserva todos os dados originais da mensagem
## ✨ Novos Recursos na v2.3 ## ✨ Novos Recursos na v2.3
### 🌍 Suporte Multilíngue ### 🌍 Suporte Multilíngue

View File

@ -7,7 +7,8 @@ from storage import StorageHandler
import os import os
import json import json
import tempfile import tempfile
import traceback
from groq_handler import get_working_groq_key, validate_transcription_response, handle_groq_request
# Inicializa o storage handler # Inicializa o storage handler
storage = StorageHandler() storage = StorageHandler()
@ -46,6 +47,7 @@ async def summarize_text_if_needed(text):
storage.add_log("DEBUG", "Iniciando processo de resumo", { storage.add_log("DEBUG", "Iniciando processo de resumo", {
"text_length": len(text) "text_length": len(text)
}) })
provider = storage.get_llm_provider()
# Obter idioma configurado # Obter idioma configurado
language = redis_client.get("TRANSCRIPTION_LANGUAGE") or "pt" language = redis_client.get("TRANSCRIPTION_LANGUAGE") or "pt"
@ -53,10 +55,20 @@ async def summarize_text_if_needed(text):
"language": language, "language": language,
"redis_value": redis_client.get("TRANSCRIPTION_LANGUAGE") "redis_value": redis_client.get("TRANSCRIPTION_LANGUAGE")
}) })
url_completions = "https://api.groq.com/openai/v1/chat/completions"
groq_key = await get_groq_key() if provider == "openai":
api_key = storage.get_openai_keys()[0]
url = "https://api.openai.com/v1/chat/completions"
model = "gpt-4o-mini"
else: # groq
url = "https://api.groq.com/openai/v1/chat/completions"
api_key = await get_working_groq_key(storage)
if not api_key:
raise Exception("Nenhuma chave GROQ disponível")
model = "llama-3.3-70b-versatile"
headers = { headers = {
"Authorization": f"Bearer {groq_key}", "Authorization": f"Bearer {api_key}",
"Content-Type": "application/json", "Content-Type": "application/json",
} }
@ -140,29 +152,33 @@ async def summarize_text_if_needed(text):
"role": "user", "role": "user",
"content": f"{base_prompt}\n\nTexto para resumir: {text}", "content": f"{base_prompt}\n\nTexto para resumir: {text}",
}], }],
"model": "llama-3.3-70b-versatile", "model": model,
} }
try: try:
async with aiohttp.ClientSession() as session: success, response_data, error = await handle_groq_request(url, headers, json_data, storage, is_form_data=False)
storage.add_log("DEBUG", "Enviando requisição para API GROQ") if not success:
async with session.post(url_completions, headers=headers, json=json_data) as summary_response: raise Exception(error)
if summary_response.status == 200:
summary_result = await summary_response.json() summary_text = response_data["choices"][0]["message"]["content"]
summary_text = summary_result["choices"][0]["message"]["content"] # Validar se o resumo não está vazio
storage.add_log("INFO", "Resumo gerado com sucesso", { if not await validate_transcription_response(summary_text):
"original_length": len(text), storage.add_log("ERROR", "Resumo vazio ou inválido recebido")
"summary_length": len(summary_text), raise Exception("Resumo vazio ou inválido recebido")
"language": language # Validar se o resumo é menor que o texto original
}) if len(summary_text) >= len(text):
return summary_text storage.add_log("WARNING", "Resumo maior que texto original", {
else: "original_length": len(text),
error_text = await summary_response.text() "summary_length": len(summary_text)
storage.add_log("ERROR", "Erro na API GROQ", { })
"error": error_text, storage.add_log("INFO", "Resumo gerado com sucesso", {
"status": summary_response.status "original_length": len(text),
}) "summary_length": len(summary_text),
raise Exception(f"Erro ao resumir o texto: {error_text}") "language": language
})
return summary_text
except Exception as e: except Exception as e:
storage.add_log("ERROR", "Erro no processo de resumo", { storage.add_log("ERROR", "Erro no processo de resumo", {
"error": str(e), "error": str(e),
@ -188,10 +204,20 @@ async def transcribe_audio(audio_source, apikey=None, remote_jid=None, from_me=F
"from_me": from_me, "from_me": from_me,
"remote_jid": remote_jid "remote_jid": remote_jid
}) })
provider = storage.get_llm_provider()
url = "https://api.groq.com/openai/v1/audio/transcriptions" if provider == "openai":
groq_key = await get_groq_key() api_key = storage.get_openai_keys()[0] # Get first OpenAI key
groq_headers = {"Authorization": f"Bearer {groq_key}"} url = "https://api.openai.com/v1/audio/transcriptions"
model = "whisper-1"
else: # groq
api_key = await get_working_groq_key(storage)
if not api_key:
raise Exception("Nenhuma chave GROQ disponível")
url = "https://api.groq.com/openai/v1/audio/transcriptions"
model = "whisper-large-v3"
headers = {"Authorization": f"Bearer {api_key}"}
# Inicializar variáveis # Inicializar variáveis
contact_language = None contact_language = None
@ -226,29 +252,28 @@ async def transcribe_audio(audio_source, apikey=None, remote_jid=None, from_me=F
elif not from_me: # Só detecta em mensagens recebidas elif not from_me: # Só detecta em mensagens recebidas
try: try:
# Realizar transcrição inicial sem idioma específico # Realizar transcrição inicial sem idioma específico
data = aiohttp.FormData() with open(audio_source, 'rb') as audio_file:
data.add_field('file', open(audio_source, 'rb'), filename='audio.mp3') data = aiohttp.FormData()
data.add_field('model', 'whisper-large-v3') data.add_field('file', audio_file, filename='audio.mp3')
data.add_field('model', model)
async with aiohttp.ClientSession() as session: success, response_data, error = await handle_groq_request(url, headers, data, storage, is_form_data=True)
async with session.post(url, headers=groq_headers, data=data) as response: if success:
if response.status == 200: initial_text = response_data.get("text", "")
initial_result = await response.json()
initial_text = initial_result.get("text", "")
# Detectar idioma do texto transcrito # Detectar idioma do texto transcrito
detected_lang = await detect_language(initial_text) detected_lang = await detect_language(initial_text)
# Salvar no cache E na configuração do contato # Salvar no cache E na configuração do contato
storage.cache_language_detection(contact_id, detected_lang) storage.cache_language_detection(contact_id, detected_lang)
storage.set_contact_language(contact_id, detected_lang) storage.set_contact_language(contact_id, detected_lang)
contact_language = detected_lang contact_language = detected_lang
storage.add_log("INFO", "Idioma detectado e configurado", { storage.add_log("INFO", "Idioma detectado e configurado", {
"language": detected_lang, "language": detected_lang,
"remote_jid": remote_jid, "remote_jid": remote_jid,
"auto_detected": True "auto_detected": True
}) })
except Exception as e: except Exception as e:
storage.add_log("WARNING", "Erro na detecção automática de idioma", { storage.add_log("WARNING", "Erro na detecção automática de idioma", {
"error": str(e), "error": str(e),
@ -300,76 +325,73 @@ async def transcribe_audio(audio_source, apikey=None, remote_jid=None, from_me=F
try: try:
# Realizar transcrição # Realizar transcrição
data = aiohttp.FormData() with open(audio_source, 'rb') as audio_file:
data.add_field('file', open(audio_source, 'rb'), filename='audio.mp3') data = aiohttp.FormData()
data.add_field('model', 'whisper-large-v3') data.add_field('file', audio_file, filename='audio.mp3')
data.add_field('language', transcription_language) data.add_field('model', model)
data.add_field('language', transcription_language)
if use_timestamps: if use_timestamps:
data.add_field('response_format', 'verbose_json') data.add_field('response_format', 'verbose_json')
# Realizar transcrição # Usar handle_groq_request para ter retry e validação
async with aiohttp.ClientSession() as session: success, response_data, error = await handle_groq_request(url, headers, data, storage, is_form_data=True)
async with session.post(url, headers=groq_headers, data=data) as response: if not success:
if response.status != 200: raise Exception(f"Erro na transcrição: {error}")
error_text = await response.text()
storage.add_log("ERROR", "Erro na transcrição", { transcription = format_timestamped_result(response_data) if use_timestamps else response_data.get("text", "")
"error": error_text,
"status": response.status # Validar o conteúdo da transcrição
if not await validate_transcription_response(transcription):
storage.add_log("ERROR", "Transcrição vazia ou inválida recebida")
raise Exception("Transcrição vazia ou inválida recebida")
# Detecção automática para novos contatos
if (is_private and storage.get_auto_language_detection() and
not from_me and not contact_language):
try:
detected_lang = await detect_language(transcription)
storage.cache_language_detection(remote_jid, detected_lang)
contact_language = detected_lang
storage.add_log("INFO", "Idioma detectado e cacheado", {
"language": detected_lang,
"remote_jid": remote_jid
}) })
raise Exception(f"Erro na transcrição: {error_text}") except Exception as e:
storage.add_log("WARNING", "Erro na detecção de idioma", {"error": str(e)})
result = await response.json() # Tradução quando necessário
need_translation = (
is_private and contact_language and
(
(from_me and transcription_language != target_language) or
(not from_me and target_language != transcription_language)
)
)
# Processar resposta baseado no formato if need_translation:
transcription = format_timestamped_result(result) if use_timestamps else result.get("text", "") try:
transcription = await translate_text(
# Detecção automática para novos contatos transcription,
if (is_private and storage.get_auto_language_detection() and transcription_language,
not from_me and not contact_language): target_language
try:
detected_lang = await detect_language(transcription)
storage.cache_language_detection(remote_jid, detected_lang)
contact_language = detected_lang
storage.add_log("INFO", "Idioma detectado e cacheado", {
"language": detected_lang,
"remote_jid": remote_jid
})
except Exception as e:
storage.add_log("WARNING", "Erro na detecção de idioma", {"error": str(e)})
# Tradução quando necessário
need_translation = (
is_private and contact_language and
(
(from_me and transcription_language != target_language) or
(not from_me and target_language != transcription_language)
) )
) storage.add_log("INFO", "Texto traduzido automaticamente", {
"from": transcription_language,
"to": target_language
})
except Exception as e:
storage.add_log("ERROR", "Erro na tradução", {"error": str(e)})
if need_translation: # Registrar estatísticas de uso
try: used_language = contact_language if contact_language else system_language
transcription = await translate_text( storage.record_language_usage(
transcription, used_language,
transcription_language, from_me,
target_language bool(contact_language and contact_language != system_language)
) )
storage.add_log("INFO", "Texto traduzido automaticamente", {
"from": transcription_language,
"to": target_language
})
except Exception as e:
storage.add_log("ERROR", "Erro na tradução", {"error": str(e)})
# Registrar estatísticas de uso return transcription, use_timestamps
used_language = contact_language if contact_language else system_language
storage.record_language_usage(
used_language,
from_me,
bool(contact_language and contact_language != system_language)
)
return transcription, use_timestamps
except Exception as e: except Exception as e:
storage.add_log("ERROR", "Erro no processo de transcrição", { storage.add_log("ERROR", "Erro no processo de transcrição", {
@ -423,6 +445,7 @@ async def detect_language(text: str) -> str:
Returns: Returns:
str: Código ISO 639-1 do idioma detectado str: Código ISO 639-1 do idioma detectado
""" """
provider = storage.get_llm_provider()
storage.add_log("DEBUG", "Iniciando detecção de idioma", { storage.add_log("DEBUG", "Iniciando detecção de idioma", {
"text_length": len(text) "text_length": len(text)
}) })
@ -432,11 +455,19 @@ async def detect_language(text: str) -> str:
"pt", "en", "es", "fr", "de", "it", "ja", "ko", "pt", "en", "es", "fr", "de", "it", "ja", "ko",
"zh", "ro", "ru", "ar", "hi", "nl", "pl", "tr" "zh", "ro", "ru", "ar", "hi", "nl", "pl", "tr"
} }
if provider == "openai":
api_key = storage.get_openai_keys()[0]
url = "https://api.openai.com/v1/chat/completions"
model = "gpt-4o-mini"
else: # groq
url = "https://api.groq.com/openai/v1/chat/completions"
api_key = await get_working_groq_key(storage)
if not api_key:
raise Exception("Nenhuma chave GROQ disponível")
model = "llama-3.3-70b-versatile"
url_completions = "https://api.groq.com/openai/v1/chat/completions"
groq_key = await get_groq_key()
headers = { headers = {
"Authorization": f"Bearer {groq_key}", "Authorization": f"Bearer {api_key}",
"Content-Type": "application/json", "Content-Type": "application/json",
} }
@ -465,37 +496,30 @@ async def detect_language(text: str) -> str:
"role": "user", "role": "user",
"content": f"{prompt}\n\n{text[:500]}" # Limitando para os primeiros 500 caracteres "content": f"{prompt}\n\n{text[:500]}" # Limitando para os primeiros 500 caracteres
}], }],
"model": "llama-3.3-70b-versatile", "model": model,
"temperature": 0.1 "temperature": 0.1
} }
try: try:
async with aiohttp.ClientSession() as session: success, response_data, error = await handle_groq_request(url, headers, json_data, storage, is_form_data=False)
storage.add_log("DEBUG", "Enviando requisição para API GROQ - Detecção de idioma") if not success:
async with session.post(url_completions, headers=headers, json=json_data) as response: raise Exception(f"Falha na detecção de idioma: {error}")
if response.status == 200:
result = await response.json()
detected_language = result["choices"][0]["message"]["content"].strip().lower()
# Validar o resultado detected_language = response_data["choices"][0]["message"]["content"].strip().lower()
if detected_language not in SUPPORTED_LANGUAGES:
storage.add_log("WARNING", "Idioma detectado não suportado", { # Validar o resultado
"detected": detected_language, if detected_language not in SUPPORTED_LANGUAGES:
"fallback": "en" storage.add_log("WARNING", "Idioma detectado não suportado", {
}) "detected": detected_language,
detected_language = "en" "fallback": "en"
})
detected_language = "en"
storage.add_log("INFO", "Idioma detectado com sucesso", {
"detected_language": detected_language
})
return detected_language
storage.add_log("INFO", "Idioma detectado com sucesso", {
"detected_language": detected_language
})
return detected_language
else:
error_text = await response.text()
storage.add_log("ERROR", "Erro na detecção de idioma", {
"error": error_text,
"status": response.status
})
raise Exception(f"Erro na detecção de idioma: {error_text}")
except Exception as e: except Exception as e:
storage.add_log("ERROR", "Erro no processo de detecção de idioma", { storage.add_log("ERROR", "Erro no processo de detecção de idioma", {
"error": str(e), "error": str(e),
@ -650,24 +674,33 @@ async def translate_text(text: str, source_language: str, target_language: str)
Returns: Returns:
str: Texto traduzido str: Texto traduzido
""" """
provider = storage.get_llm_provider()
storage.add_log("DEBUG", "Iniciando tradução", { storage.add_log("DEBUG", "Iniciando tradução", {
"source_language": source_language, "source_language": source_language,
"target_language": target_language, "target_language": target_language,
"text_length": len(text) "text_length": len(text)
}) })
# Se os idiomas forem iguais, retorna o texto original # Se os idiomas forem iguais, retorna o texto original
if source_language == target_language: if source_language == target_language:
return text return text
url_completions = "https://api.groq.com/openai/v1/chat/completions" if provider == "openai":
groq_key = await get_groq_key() api_key = storage.get_openai_keys()[0]
url = "https://api.openai.com/v1/chat/completions"
model = "gpt-4o-mini"
else: # groq
url = "https://api.groq.com/openai/v1/chat/completions"
api_key = await get_working_groq_key(storage)
if not api_key:
raise Exception("Nenhuma chave GROQ disponível")
model = "llama-3.3-70b-versatile"
headers = { headers = {
"Authorization": f"Bearer {groq_key}", "Authorization": f"Bearer {api_key}",
"Content-Type": "application/json", "Content-Type": "application/json",
} }
# Prompt melhorado com contexto e instruções específicas
prompt = f""" prompt = f"""
Você é um tradutor profissional especializado em manter o tom e estilo do texto original. Você é um tradutor profissional especializado em manter o tom e estilo do texto original.
@ -692,43 +725,63 @@ async def translate_text(text: str, source_language: str, target_language: str)
"role": "user", "role": "user",
"content": prompt "content": prompt
}], }],
"model": "llama-3.3-70b-versatile", "model": model,
"temperature": 0.3 "temperature": 0.3
} }
try: try:
async with aiohttp.ClientSession() as session: success, response_data, error = await handle_groq_request(url, headers, json_data, storage, is_form_data=False)
storage.add_log("DEBUG", "Enviando requisição de tradução") if not success:
async with session.post(url_completions, headers=headers, json=json_data) as response: raise Exception(f"Falha na tradução: {error}")
if response.status == 200:
result = await response.json()
translated_text = result["choices"][0]["message"]["content"].strip()
# Verificar se a tradução manteve aproximadamente o mesmo tamanho translated_text = response_data["choices"][0]["message"]["content"].strip()
length_ratio = len(translated_text) / len(text)
if not (0.5 <= length_ratio <= 1.5): # Verificar se a tradução manteve aproximadamente o mesmo tamanho
storage.add_log("WARNING", "Possível erro na tradução - diferença significativa no tamanho", { length_ratio = len(translated_text) / len(text)
"original_length": len(text), if not (0.5 <= length_ratio <= 1.5):
"translated_length": len(translated_text), storage.add_log("WARNING", "Possível erro na tradução - diferença significativa no tamanho", {
"ratio": length_ratio "original_length": len(text),
}) "translated_length": len(translated_text),
"ratio": length_ratio
})
# Validar se a tradução não está vazia
if not await validate_transcription_response(translated_text):
storage.add_log("ERROR", "Tradução vazia ou inválida recebida")
raise Exception("Tradução vazia ou inválida recebida")
storage.add_log("INFO", "Tradução concluída com sucesso", {
"original_length": len(text),
"translated_length": len(translated_text),
"ratio": length_ratio
})
return translated_text
storage.add_log("INFO", "Tradução concluída com sucesso", {
"original_length": len(text),
"translated_length": len(translated_text),
"ratio": length_ratio
})
return translated_text
else:
error_text = await response.text()
storage.add_log("ERROR", "Erro na tradução", {
"status": response.status,
"error": error_text
})
raise Exception(f"Erro na tradução: {error_text}")
except Exception as e: except Exception as e:
storage.add_log("ERROR", "Erro no processo de tradução", { storage.add_log("ERROR", "Erro no processo de tradução", {
"error": str(e), "error": str(e),
"type": type(e).__name__ "type": type(e).__name__
}) })
raise raise
# Nova função para baixar áudio remoto
async def download_remote_audio(url: str) -> str:
"""
Baixa um arquivo de áudio remoto e salva localmente como um arquivo temporário.
Retorna o caminho para o arquivo salvo.
"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
audio_data = await response.read()
# Cria um arquivo temporário para armazenar o áudio (pode ajustar o sufixo caso necessário)
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_file:
temp_file.write(audio_data)
local_path = temp_file.name
return local_path
else:
raise Exception(f"Falha no download, código de status: {response.status}")
except Exception as e:
raise Exception(f"Erro ao baixar áudio remoto: {str(e)}")

View File

@ -1,22 +1,47 @@
#!/bin/bash #!/bin/bash
# Função para inicializar configurações no Redis se não existirem # Função para construir o comando redis-cli com autenticação condicional
build_redis_cli_cmd() {
cmd="redis-cli -h ${REDIS_HOST:-localhost} -p ${REDIS_PORT:-6380}"
if [ ! -z "$REDIS_USERNAME" ]; then
cmd="$cmd --user $REDIS_USERNAME"
fi
if [ ! -z "$REDIS_PASSWORD" ]; then
cmd="$cmd -a $REDIS_PASSWORD"
fi
if [ ! -z "$REDIS_DB" ]; then
cmd="$cmd -n $REDIS_DB"
fi
echo "$cmd"
}
# Função para inicializar configurações no Redis
initialize_redis_config() { initialize_redis_config() {
redis-cli -h $REDIS_HOST -p $REDIS_PORT SET GROQ_API_KEY "sua_api_key_aqui" NX redis_cmd=$(build_redis_cli_cmd)
redis-cli -h $REDIS_HOST -p $REDIS_PORT SET BUSINESS_MESSAGE "*Impacte AI* Premium Services" NX
redis-cli -h $REDIS_HOST -p $REDIS_PORT SET PROCESS_GROUP_MESSAGES "false" NX $redis_cmd SET GROQ_API_KEY "sua_api_key_aqui" NX
redis-cli -h $REDIS_HOST -p $REDIS_PORT SET PROCESS_SELF_MESSAGES "true" NX $redis_cmd SET BUSINESS_MESSAGE "*Impacte AI* Premium Services" NX
redis-cli -h $REDIS_HOST -p $REDIS_PORT SET API_DOMAIN "$API_DOMAIN" NX $redis_cmd SET PROCESS_GROUP_MESSAGES "false" NX
$redis_cmd SET PROCESS_SELF_MESSAGES "true" NX
$redis_cmd SET API_DOMAIN "$API_DOMAIN" NX
} }
# Aguardar o Redis estar pronto # Aguardar o Redis estar pronto
echo "Aguardando o Redis ficar disponível..." echo "Aguardando o Redis ficar disponível..."
until redis-cli -h $REDIS_HOST -p $REDIS_PORT PING; do redis_cmd=$(build_redis_cli_cmd)
until $redis_cmd PING 2>/dev/null; do
echo "Redis não está pronto - aguardando..." echo "Redis não está pronto - aguardando..."
sleep 5 sleep 5
done done
# Inicializar configurações no Redis (apenas se não existirem) echo "Redis disponível!"
# Inicializar configurações
initialize_redis_config initialize_redis_config
# Iniciar o FastAPI em background # Iniciar o FastAPI em background

View File

@ -1,12 +1,18 @@
import json import json
import os import os
from typing import List, Dict from typing import List, Dict, Optional
from datetime import datetime, timedelta from datetime import datetime, timedelta
import traceback import traceback
import logging import logging
import redis import redis
from utils import create_redis_client
import uuid
class StorageHandler: class StorageHandler:
# Chaves Redis para webhooks
WEBHOOK_KEY = "webhook_redirects" # Chave para armazenar os webhooks
WEBHOOK_STATS_KEY = "webhook_stats" # Chave para estatísticas
def __init__(self): def __init__(self):
# Configuração de logger # Configuração de logger
self.logger = logging.getLogger("StorageHandler") self.logger = logging.getLogger("StorageHandler")
@ -20,12 +26,7 @@ class StorageHandler:
self.logger.info("StorageHandler inicializado.") self.logger.info("StorageHandler inicializado.")
# Conexão com o Redis # Conexão com o Redis
self.redis = redis.Redis( self.redis = create_redis_client()
host=os.getenv('REDIS_HOST', 'localhost'),
port=int(os.getenv('REDIS_PORT', 6380)),
db=0,
decode_responses=True
)
# Retenção de logs e backups # Retenção de logs e backups
self.log_retention_hours = int(os.getenv('LOG_RETENTION_HOURS', 48)) self.log_retention_hours = int(os.getenv('LOG_RETENTION_HOURS', 48))
@ -208,6 +209,29 @@ class StorageHandler:
return keys[counter % len(keys)] return keys[counter % len(keys)]
def get_penalized_until(self, key: str) -> Optional[datetime]:
"""
Retorna o timestamp até quando a chave está penalizada, ou None se não estiver penalizada.
"""
penalized_key = self._get_redis_key(f"groq_key_penalized_{key}")
penalized_until = self.redis.get(penalized_key)
if penalized_until:
return datetime.fromisoformat(penalized_until)
return None
def penalize_key(self, key: str, penalty_duration: int):
"""
Penaliza uma chave por um tempo determinado (em segundos).
"""
penalized_key = self._get_redis_key(f"groq_key_penalized_{key}")
penalized_until = datetime.utcnow() + timedelta(seconds=penalty_duration)
self.redis.set(penalized_key, penalized_until.isoformat())
self.redis.expire(penalized_key, penalty_duration) # Expira a chave após o tempo de penalidade
self.add_log("INFO", "Chave GROQ penalizada", {
"key": key,
"penalized_until": penalized_until.isoformat()
})
def get_message_settings(self): def get_message_settings(self):
"""Obtém as configurações de mensagens.""" """Obtém as configurações de mensagens."""
return { return {
@ -407,3 +431,287 @@ class StorageHandler:
return data return data
except: except:
return None return None
def get_webhook_redirects(self) -> List[Dict]:
"""Obtém todos os webhooks de redirecionamento cadastrados."""
webhooks_raw = self.redis.hgetall(self._get_redis_key("webhook_redirects"))
webhooks = []
for webhook_id, data in webhooks_raw.items():
webhook_data = json.loads(data)
webhook_data['id'] = webhook_id
webhooks.append(webhook_data)
return webhooks
def validate_webhook_url(self, url: str) -> bool:
"""Valida se a URL do webhook é acessível."""
try:
from urllib.parse import urlparse
parsed = urlparse(url)
return all([parsed.scheme, parsed.netloc])
except Exception as e:
self.logger.error(f"URL inválida: {url} - {str(e)}")
return False
def add_webhook_redirect(self, url: str, description: str = "") -> str:
"""
Adiciona um novo webhook de redirecionamento.
Retorna o ID do webhook criado.
"""
webhook_id = str(uuid.uuid4())
webhook_data = {
"url": url,
"description": description,
"created_at": datetime.now().isoformat(),
"status": "active",
"error_count": 0,
"success_count": 0,
"last_success": None,
"last_error": None
}
self.redis.hset(
self._get_redis_key("webhook_redirects"),
webhook_id,
json.dumps(webhook_data)
)
return webhook_id
def clean_webhook_data(self, webhook_id: str):
"""
Remove todos os dados relacionados a um webhook específico do Redis.
Args:
webhook_id: ID do webhook a ser limpo
"""
try:
# Lista de chaves relacionadas ao webhook que precisam ser removidas
keys_to_remove = [
f"webhook_failed_{webhook_id}", # Entregas falhas
f"webhook_stats_{webhook_id}", # Estatísticas específicas
]
# Remove cada chave associada ao webhook
for key in keys_to_remove:
full_key = self._get_redis_key(key)
self.redis.delete(full_key)
self.logger.debug(f"Chave removida: {full_key}")
self.logger.info(f"Dados do webhook {webhook_id} limpos com sucesso")
except Exception as e:
self.logger.error(f"Erro ao limpar dados do webhook {webhook_id}: {str(e)}")
raise
def remove_webhook_redirect(self, webhook_id: str):
"""Remove um webhook de redirecionamento e todos os seus dados associados."""
try:
# Primeiro remove os dados associados
self.clean_webhook_data(webhook_id)
# Depois remove o webhook em si
self.redis.hdel(self._get_redis_key("webhook_redirects"), webhook_id)
self.logger.info(f"Webhook {webhook_id} removido com sucesso")
except Exception as e:
self.logger.error(f"Erro ao remover webhook {webhook_id}: {str(e)}")
raise
def update_webhook_stats(self, webhook_id: str, success: bool, error_message: str = None):
"""Atualiza as estatísticas de um webhook."""
try:
webhook_data = json.loads(
self.redis.hget(self._get_redis_key("webhook_redirects"), webhook_id)
)
if success:
webhook_data["success_count"] += 1
webhook_data["last_success"] = datetime.now().isoformat()
else:
webhook_data["error_count"] += 1
webhook_data["last_error"] = {
"timestamp": datetime.now().isoformat(),
"message": error_message
}
self.redis.hset(
self._get_redis_key("webhook_redirects"),
webhook_id,
json.dumps(webhook_data)
)
except Exception as e:
self.logger.error(f"Erro ao atualizar estatísticas do webhook {webhook_id}: {e}")
def retry_failed_webhooks(self):
"""Tenta reenviar webhooks que falharam nas últimas 24h."""
webhooks = self.get_webhook_redirects()
for webhook in webhooks:
if webhook.get("last_error"):
error_time = datetime.fromisoformat(webhook["last_error"]["timestamp"])
if datetime.now() - error_time < timedelta(hours=24):
# Tentar reenviar
pass
def test_webhook(self, url: str) -> tuple[bool, str]:
"""
Testa um webhook antes de salvá-lo.
Retorna uma tupla (sucesso, mensagem)
"""
try:
import aiohttp
import asyncio
async def _test_webhook():
async with aiohttp.ClientSession() as session:
test_payload = {
"test": True,
"timestamp": datetime.now().isoformat(),
"message": "Teste de conexão do TranscreveZAP"
}
async with session.post(
url,
json=test_payload,
headers={"Content-Type": "application/json"},
timeout=10
) as response:
return response.status, await response.text()
status, response = asyncio.run(_test_webhook())
if status in [200, 201, 202]:
return True, "Webhook testado com sucesso!"
return False, f"Erro no teste: Status {status} - {response}"
except Exception as e:
return False, f"Erro ao testar webhook: {str(e)}"
def get_webhook_health(self, webhook_id: str) -> dict:
"""
Calcula métricas de saúde do webhook
"""
try:
webhook_data = json.loads(
self.redis.hget(self._get_redis_key("webhook_redirects"), webhook_id)
)
total_requests = webhook_data["success_count"] + webhook_data["error_count"]
if total_requests == 0:
return {
"health_status": "unknown",
"error_rate": 0,
"success_rate": 0,
"total_requests": 0
}
error_rate = (webhook_data["error_count"] / total_requests) * 100
success_rate = (webhook_data["success_count"] / total_requests) * 100
# Definir status de saúde
if error_rate >= 50:
health_status = "critical"
elif error_rate >= 20:
health_status = "warning"
else:
health_status = "healthy"
return {
"health_status": health_status,
"error_rate": error_rate,
"success_rate": success_rate,
"total_requests": total_requests
}
except Exception as e:
self.logger.error(f"Erro ao calcular saúde do webhook {webhook_id}: {e}")
return None
def retry_webhook(self, webhook_id: str, payload: dict) -> bool:
"""
Tenta reenviar um payload para um webhook específico mantendo o payload original intacto.
Args:
webhook_id: ID do webhook para reenvio
payload: Payload original para reenvio
Returns:
bool: True se o reenvio foi bem sucedido, False caso contrário
"""
try:
import aiohttp
import asyncio
webhook_data = json.loads(
self.redis.hget(self._get_redis_key("webhook_redirects"), webhook_id)
)
async def _retry_webhook():
async with aiohttp.ClientSession() as session:
headers = {
"Content-Type": "application/json",
"X-TranscreveZAP-Forward": "true",
"X-TranscreveZAP-Webhook-ID": webhook_id,
"X-TranscreveZAP-Retry": "true"
}
async with session.post(
webhook_data["url"],
json=payload, # Envia o payload original sem modificações
headers=headers,
timeout=10
) as response:
return response.status in [200, 201, 202]
success = asyncio.run(_retry_webhook())
if success:
self.update_webhook_stats(webhook_id, True)
else:
self.update_webhook_stats(webhook_id, False, "Falha no retry")
return success
except Exception as e:
self.logger.error(f"Erro no retry do webhook {webhook_id}: {e}")
return False
def get_failed_deliveries(self, webhook_id: str) -> List[Dict]:
"""
Retorna lista de entregas falhas para um webhook
"""
key = self._get_redis_key(f"webhook_failed_{webhook_id}")
failed = self.redis.lrange(key, 0, -1)
return [json.loads(x) for x in failed]
def add_failed_delivery(self, webhook_id: str, payload: dict):
"""
Registra uma entrega falha para retry posterior
"""
key = self._get_redis_key(f"webhook_failed_{webhook_id}")
failed_delivery = {
"timestamp": datetime.now().isoformat(),
"payload": payload,
"retry_count": 0
}
self.redis.lpush(key, json.dumps(failed_delivery))
# Manter apenas as últimas 100 falhas
self.redis.ltrim(key, 0, 99)
def get_llm_provider(self) -> str:
"""Returns active LLM provider (groq or openai)"""
return self.redis.get(self._get_redis_key("active_llm_provider")) or "groq"
def set_llm_provider(self, provider: str):
"""Sets active LLM provider"""
if provider not in ["groq", "openai"]:
raise ValueError("Provider must be 'groq' or 'openai'")
self.redis.set(self._get_redis_key("active_llm_provider"), provider)
def get_openai_keys(self) -> List[str]:
"""Get stored OpenAI API keys"""
return list(self.redis.smembers(self._get_redis_key("openai_keys")))
def add_openai_key(self, key: str):
"""Add OpenAI API key"""
if key and key.startswith("sk-"):
self.redis.sadd(self._get_redis_key("openai_keys"), key)
return True
return False

49
utils.py Normal file
View File

@ -0,0 +1,49 @@
import os
import redis
import logging
logger = logging.getLogger("TranscreveZAP")
def get_redis_connection_params():
"""
Retorna os parâmetros de conexão do Redis baseado nas variáveis de ambiente.
Retira parâmetros de autenticação se não estiverem configurados.
"""
params = {
'host': os.getenv('REDIS_HOST', 'localhost'),
'port': int(os.getenv('REDIS_PORT', 6380)),
'db': int(os.getenv('REDIS_DB', '0')),
'decode_responses': True
}
# Adiciona credenciais apenas se estiverem configuradas
username = os.getenv('REDIS_USERNAME')
password = os.getenv('REDIS_PASSWORD')
if username and username.strip():
params['username'] = username
if password and password.strip():
params['password'] = password
return params
def create_redis_client():
"""
Cria e testa a conexão com o Redis.
Retorna o cliente Redis se bem sucedido.
"""
try:
params = get_redis_connection_params()
client = redis.Redis(**params)
client.ping() # Testa a conexão
logger.info("Conexão com Redis estabelecida com sucesso!")
return client
except redis.exceptions.AuthenticationError:
logger.error("Falha de autenticação no Redis. Verifique as credenciais.")
raise
except redis.exceptions.ConnectionError as e:
logger.error(f"Erro de conexão com Redis: {e}")
raise
except Exception as e:
logger.error(f"Erro ao configurar Redis: {e}")
raise