Compare commits

...

46 Commits
2.0 ... main

Author SHA1 Message Date
Fábio Cavalcanti
c42476549e ajuste audio source download 2025-02-22 08:13:38 -03:00
Fábio Cavalcanti
e89787e715 ajuste readme 2025-01-24 22:48:06 -03:00
Fábio Cavalcanti
f63dcc40d1 adicionado sistema de seleção de provedor de llm para GROQ ou Open ai 2025-01-24 22:27:02 -03:00
Fábio Cavalcanti
af20510b2b corrigido sistem de validação de chaves groq 2025-01-24 21:06:13 -03:00
Fábio Cavalcanti
a25dc9c4e7 ajuste para verificar formdata e json na chamada groq 2025-01-23 15:54:21 -03:00
Fábio Cavalcanti
a4ba9d02bc ajuste identacao 2025-01-23 15:38:49 -03:00
Fábio Cavalcanti
4c7d346a3c ajuste identacao 2025-01-23 15:27:35 -03:00
Fábio Cavalcanti
be82707ccc melhoria no sistema de chaves groq 2025-01-23 15:19:39 -03:00
Fábio Cavalcanti
3cd75903fc hub do readme 2025-01-18 13:58:46 -03:00
Fábio Cavalcanti
d43f62c316 implementação de hub de redirecionamentos de webhooks 2025-01-18 13:53:27 -03:00
Fábio Cavalcanti
0745132b98 ajuste da conexão com redis para contemplar usuarios que usam senha no redis ou redis cloud 2025-01-17 19:06:53 -03:00
Fábio Cavalcanti
0b5ad96508 ajuste no storage para o db redis 2025-01-17 17:57:35 -03:00
Fábio Cavalcanti
22217802a2 ajuste de stack docker 2025-01-16 17:51:13 -03:00
Fábio Cavalcanti
facfcb4559 ajuste de docker compose referencia 2025-01-16 17:08:26 -03:00
Fábio Cavalcanti
7d8c91fbc9 definição do banco do redis opcional na variavel de ambiente do docker compose 2025-01-16 17:00:09 -03:00
Fábio Cavalcanti
6fd1ec33e9 ajuste readme 2025-01-10 10:59:37 -03:00
Fábio Cavalcanti
bac9469b05 ajuste docker standalone no readme 2025-01-10 10:27:10 -03:00
Fábio Cavalcanti
943d5be2c8 orientações do readme 2025-01-08 20:52:40 -03:00
Fábio Cavalcanti
ae74686c9b ajuste do logoff 2025-01-08 20:25:04 -03:00
Fábio Cavalcanti
2fadd723dc adicionado sessão de login persistente 2025-01-08 19:06:55 -03:00
Fábio Cavalcanti
2a296d759f correção de detecção automatica 2025-01-08 17:16:39 -03:00
Fábio Cavalcanti
f558542359 ajuste de função de tradução simultanea 2025-01-08 13:26:08 -03:00
Fábio Cavalcanti
6a9ba1f087 ajustes de tradução e fuso horario 2025-01-07 18:10:27 -03:00
Fábio Cavalcanti
b86c7ac764 adicionado funções de detalhamento de idiomas e comportamento de tradutor automatico 2025-01-07 15:44:59 -03:00
Fábio Cavalcanti
9a072aee22 corrigido modo de processamento 2025-01-07 13:50:24 -03:00
Fábio Cavalcanti
eeffecb091 adicionado configuração de modo de processamento de mensagens apenas em grupos ou grupos e privado 2025-01-07 13:35:44 -03:00
Fábio Cavalcanti
69cb3b1965 personalização de comportamento e mensagens de transcrição 2025-01-07 11:36:39 -03:00
Fábio Cavalcanti
ec65839beb correção na lógica de resumos 2025-01-07 10:15:56 -03:00
Fábio Cavalcanti
abc4c4298a romeno no readme 2024-12-19 18:08:40 -03:00
Fábio Cavalcanti
161251a403 adicionado Romeno as linguagens novas 2024-12-19 18:05:59 -03:00
Fábio Cavalcanti
97cc842eb8 Adicionado rotação de chaves groq e seleção de idioma de tradução 2024-12-19 17:34:26 -03:00
Fábio Cavalcanti
ffd916c855 Merge branch 'dev' 2024-12-18 16:14:58 -03:00
Fábio Cavalcanti
fede0057e5 ajuste prompt 2024-12-18 16:11:04 -03:00
Fábio Cavalcanti
345cc26186 ajuste readme 2024-12-12 23:27:53 -03:00
Fábio Cavalcanti
a646e724f6 ajuste readme 2024-12-12 23:26:32 -03:00
Fábio Cavalcanti
c88c014e86 ajuste readme 2024-12-12 23:25:27 -03:00
Fábio Cavalcanti
c72c143609 Ajuste de orientação de uso 2024-12-12 21:04:44 -03:00
Fábio Cavalcanti
64f7f64b17 ajuste de orientação de uso 2024-12-12 21:04:03 -03:00
Fábio Cavalcanti
8e775b7379 Corrigido chamada para buscar grupos 2024-12-12 19:57:04 -03:00
Fábio Cavalcanti
374169e56f ajuste na busca de grupos para permitir transcrição 2024-12-12 19:54:13 -03:00
Fábio Cavalcanti
d6bbb5bc6e correções de variaveis de sistema e redis 2024-12-12 19:28:30 -03:00
Fábio Cavalcanti
d0c3ffca09 readme igual ao do main 2024-12-12 19:01:38 -03:00
Fábio Cavalcanti
1f40b128fa Melhorias de Inicialização do Serviço Docker 2024-12-12 18:58:48 -03:00
Fábio Cavalcanti
b3753e768c ajustes de inicialização, adicionado orientações de uso da API no Manager 2024-12-12 18:52:35 -03:00
Fábio Cavalcanti
b6e3ea8ec3 adicionado orientação de uso da api na interface 2024-12-12 18:30:26 -03:00
Fábio Cavalcanti
bb63c590e4 branch 2.0 no readme 2024-12-12 17:36:20 -03:00
16 changed files with 2845 additions and 275 deletions

View File

@ -12,3 +12,4 @@ __pycache__
*.postman_collection.json *.postman_collection.json
deploy_*.sh deploy_*.sh
manager_atualizar.py manager_atualizar.py
roadmap.md

View File

@ -1,12 +1,54 @@
#-----------------------------------------------
# Configurações do Servidor
#-----------------------------------------------
# Configurações do UVICORN
UVICORN_PORT=8005
UVICORN_HOST=0.0.0.0
UVICORN_RELOAD=true
UVICORN_WORKERS=1
# Domínios da Aplicação
API_DOMAIN=seu.dominio.com # Subdomínio para a API (ex: api.seudominio.com)
MANAGER_DOMAIN=manager.seu.dominio.com # Subdomínio para o Manager (ex: manager.seudominio.com)
# Debug e Logs # Debug e Logs
DEBUG_MODE=false DEBUG_MODE=false
LOG_LEVEL=INFO LOG_LEVEL=INFO
# Credenciais do Gerenciador #-----------------------------------------------
MANAGER_USER=admin # Credenciais de Acesso
MANAGER_PASSWORD=impacteai2024 #-----------------------------------------------
# Credenciais do Painel Administrativo
MANAGER_USER=seu_usuario_admin # Username para acessar o painel admin
MANAGER_PASSWORD=sua_senha_segura # Senha para acessar o painel admin
# Configurações do Servidor #-----------------------------------------------
FASTAPI_PORT=8005 # Configurações do Redis
STREAMLIT_PORT=8501 #-----------------------------------------------
HOST=0.0.0.0 # Configurações Básicas
REDIS_HOST=redis-transcrevezap # Host do Redis (use redis-transcrevezap para docker-compose)
REDIS_PORT=6380 # Porta do Redis
REDIS_DB=0 # Número do banco de dados Redis
# Autenticação Redis (opcional)
REDIS_USERNAME= # Deixe em branco se não usar autenticação
REDIS_PASSWORD= # Deixe em branco se não usar autenticação
#-----------------------------------------------
# Configurações de Rede
#-----------------------------------------------
# Nome da Rede Docker Externa
NETWORK_NAME=sua_rede_externa # Nome da sua rede Docker externa
#-----------------------------------------------
# Configurações do Traefik (se estiver usando)
#-----------------------------------------------
# Certificados SSL
SSL_RESOLVER=letsencryptresolver # Resolvedor SSL do Traefik
SSL_ENTRYPOINT=websecure # Entrypoint SSL do Traefik
#-----------------------------------------------
# Portas da Aplicação
#-----------------------------------------------
API_PORT=8005 # Porta para a API FastAPI
MANAGER_PORT=8501 # Porta para o Streamlit Manager

1
.gitignore vendored
View File

@ -7,3 +7,4 @@ GPT.postman_collection.json
deploy_producao.sh deploy_producao.sh
Dockerfile Dockerfile
manager_atualizar.py manager_atualizar.py
roadmap.md

3
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,3 @@
{
"python.pythonPath": "d:\\Estudando CODE\\ESTUDOS PYTHON\\transcreve-audio-exemplo\\.venv\\Scripts\\python.exe"
}

View File

@ -1,31 +1,46 @@
# Usar uma imagem oficial do Python como base # Imagem base do Python 3.10-slim
FROM python:3.10-slim FROM python:3.10-slim
# Instalar dependências do sistema # Configuração básica de timezone
RUN apt-get update && apt-get install -y --no-install-recommends \ ENV TZ=America/Sao_Paulo
&& apt-get clean && rm -rf /var/lib/apt/lists/*
# Definir o diretório de trabalho # Instalação de dependências mínimas necessárias
RUN apt-get update && apt-get install -y --no-install-recommends \
redis-tools \
tzdata \
dos2unix \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/* \
&& ln -snf /usr/share/zoneinfo/$TZ /etc/localtime \
&& echo $TZ > /etc/timezone
# Configuração do ambiente de trabalho
WORKDIR /app WORKDIR /app
# Copiar o arquivo requirements.txt e instalar dependências # Instalação das dependências Python
COPY requirements.txt . COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt RUN pip install --no-cache-dir -r requirements.txt
# Copiar todo o código da aplicação # Copia dos arquivos da aplicação
COPY . . COPY . .
# Garantir que o diretório static existe # Preparação do diretório de estáticos
RUN mkdir -p /app/static RUN mkdir -p /app/static && \
if [ -d "static" ]; then cp -r static/* /app/static/ 2>/dev/null || true; fi
# Copiar arquivos estáticos para o diretório apropriado # Configuração do script de inicialização
COPY static/ /app/static/ RUN chmod +x start.sh && \
dos2unix start.sh && \
apt-get purge -y dos2unix && \
apt-get autoremove -y
# Garantir permissões de execução ao script inicial # Portas da aplicação
RUN chmod +x start.sh
# Expor as portas usadas pela aplicação
EXPOSE 8005 8501 EXPOSE 8005 8501
# Definir o comando inicial # Valores padrão para Redis
CMD ["./start.sh"] ENV REDIS_HOST=redis-transcrevezap \
REDIS_PORT=6380 \
REDIS_DB=0
# Comando de inicialização
CMD ["/bin/bash", "/app/start.sh"]

View File

@ -1,6 +1,7 @@
import logging import logging
import redis import redis
import os import os
from utils import create_redis_client
# Configuração de logging com cores e formatação melhorada # Configuração de logging com cores e formatação melhorada
class ColoredFormatter(logging.Formatter): class ColoredFormatter(logging.Formatter):
@ -30,12 +31,7 @@ logger.addHandler(handler)
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
# Conexão com o Redis # Conexão com o Redis
redis_client = redis.Redis( redis_client = create_redis_client()
host=os.getenv('REDIS_HOST', 'localhost'),
port=int(os.getenv('REDIS_PORT', 6380)),
db=0,
decode_responses=True
)
class Settings: class Settings:
"""Classe para gerenciar configurações do sistema.""" """Classe para gerenciar configurações do sistema."""
@ -43,11 +39,15 @@ class Settings:
"""Inicializa as configurações.""" """Inicializa as configurações."""
logger.debug("Carregando configurações do Redis...") logger.debug("Carregando configurações do Redis...")
self.ACTIVE_LLM_PROVIDER = self.get_redis_value("ACTIVE_LLM_PROVIDER", "groq")
self.OPENAI_API_KEY = self.get_redis_value("OPENAI_API_KEY", "")
self.GROQ_API_KEY = self.get_redis_value("GROQ_API_KEY", "gsk_default_key") self.GROQ_API_KEY = self.get_redis_value("GROQ_API_KEY", "gsk_default_key")
self.BUSINESS_MESSAGE = self.get_redis_value("BUSINESS_MESSAGE", "*Impacte AI* Premium Services") self.BUSINESS_MESSAGE = self.get_redis_value("BUSINESS_MESSAGE", "*Impacte AI* Premium Services")
self.PROCESS_GROUP_MESSAGES = self.get_redis_value("PROCESS_GROUP_MESSAGES", "false").lower() == "true" self.PROCESS_GROUP_MESSAGES = self.get_redis_value("PROCESS_GROUP_MESSAGES", "false").lower() == "true"
self.PROCESS_SELF_MESSAGES = self.get_redis_value("PROCESS_SELF_MESSAGES", "true").lower() == "true" self.PROCESS_SELF_MESSAGES = self.get_redis_value("PROCESS_SELF_MESSAGES", "true").lower() == "true"
self.LOG_LEVEL = self.get_redis_value("LOG_LEVEL", "INFO").upper() self.DEBUG_MODE = os.getenv("DEBUG_MODE", "false").lower() == "true"
self.LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
self.TRANSCRIPTION_LANGUAGE = self.get_redis_value("TRANSCRIPTION_LANGUAGE", "pt")
# Mascarar chave ao logar # Mascarar chave ao logar
if self.GROQ_API_KEY: if self.GROQ_API_KEY:

View File

@ -2,9 +2,9 @@ version: "3.7"
services: services:
tcaudio: tcaudio:
image: impacteai/transcrevezap:latest image: impacteai/transcrevezap:dev
networks: networks:
- transcrevezap_network - sua_rede_externa # Substitua pelo nome da sua rede externa
ports: ports:
- 8005:8005 # Porta para FastAPI - 8005:8005 # Porta para FastAPI
- 8501:8501 # Porta para Streamlit - 8501:8501 # Porta para Streamlit
@ -13,12 +13,17 @@ services:
- UVICORN_HOST=0.0.0.0 - UVICORN_HOST=0.0.0.0
- UVICORN_RELOAD=true - UVICORN_RELOAD=true
- UVICORN_WORKERS=1 - UVICORN_WORKERS=1
- API_DOMAIN=seu.dominio.com #coloque seu subdominio da API apontado aqui
- DEBUG_MODE=false - DEBUG_MODE=false
- LOG_LEVEL=INFO - LOG_LEVEL=INFO
- MANAGER_USER=seu_usuario_admin - MANAGER_USER=seu_usuario_admin # Defina Usuário do Manager
- MANAGER_PASSWORD=sua_senha_segura - MANAGER_PASSWORD=sua_senha_segura # Defina Senha do Manager
- REDIS_HOST=redis-transcrevezap - REDIS_HOST=redis-transcrevezap
- REDIS_PORT=6380 - REDIS_PORT=6380 # Porta personalizada para o Redis do TranscreveZAP
- REDIS_DB=0 # Opcional: pode ser removida para usar o valor padrão
# Autenticação Redis (opcional - descomente se necessário, se estiver usando autenticação)
# - REDIS_USERNAME=${REDIS_USERNAME:-} # Nome do usuário definido no comando do Redis
# - REDIS_PASSWORD=${REDIS_PASSWORD:-} # Senha definida no comando do Redis (sem o '>')
depends_on: depends_on:
- redis-transcrevezap - redis-transcrevezap
deploy: deploy:
@ -29,7 +34,7 @@ services:
- node.role == manager - node.role == manager
labels: labels:
- traefik.enable=true - traefik.enable=true
- traefik.http.routers.tcaudio.rule=Host(`seu.dominio.com`) - traefik.http.routers.tcaudio.rule=Host(`seu.dominio.com`) #coloque seu subdominio da API apontado aqui
- traefik.http.routers.tcaudio.entrypoints=websecure - traefik.http.routers.tcaudio.entrypoints=websecure
- traefik.http.routers.tcaudio.tls.certresolver=letsencryptresolver - traefik.http.routers.tcaudio.tls.certresolver=letsencryptresolver
- traefik.http.services.tcaudio.loadbalancer.server.port=8005 - traefik.http.services.tcaudio.loadbalancer.server.port=8005
@ -38,7 +43,7 @@ services:
- traefik.http.middlewares.traefik-compress.compress=true - traefik.http.middlewares.traefik-compress.compress=true
- traefik.http.routers.tcaudio.middlewares=traefik-compress - traefik.http.routers.tcaudio.middlewares=traefik-compress
# Configuração do Streamlit # Configuração do Streamlit
- traefik.http.routers.tcaudio-manager.rule=Host(`manager.seu.dominio.com`) - traefik.http.routers.tcaudio-manager.rule=Host(`manager.seu.dominio.com`) #coloque seu subdominio do Manager apontado aqui
- traefik.http.routers.tcaudio-manager.entrypoints=websecure - traefik.http.routers.tcaudio-manager.entrypoints=websecure
- traefik.http.routers.tcaudio-manager.tls.certresolver=letsencryptresolver - traefik.http.routers.tcaudio-manager.tls.certresolver=letsencryptresolver
- traefik.http.services.tcaudio-manager.loadbalancer.server.port=8501 - traefik.http.services.tcaudio-manager.loadbalancer.server.port=8501
@ -47,14 +52,33 @@ services:
redis-transcrevezap: redis-transcrevezap:
image: redis:6 image: redis:6
# 1. Configuração SEM autenticação (padrão):
command: redis-server --port 6380 --appendonly yes command: redis-server --port 6380 --appendonly yes
# 2. Configuração COM autenticação (descomente e ajuste se necessário):
# command: >
# redis-server
# --port 6380
# --appendonly yes
# --user seuusuario on '>minhasenha' '~*' '+@all'
# # Explicação dos parâmetros:
# # --user seuusuario: nome do usuário
# # on: indica início da configuração do usuário
# # '>minhasenha': senha do usuário (mantenha o '>')
# # '~*': permite acesso a todas as chaves
# # '+@all': concede todas as permissões
volumes: volumes:
- redis_transcrevezap_data:/data - redis_transcrevezap_data:/data
networks: networks:
- transcrevezap_network - sua_rede_externa # Substitua pelo nome da sua rede externa
deploy:
mode: replicated
replicas: 1
placement:
constraints:
- node.role == manager
networks: networks:
transcrevezap_network: sua_rede_externa: # Substitua pelo nome da sua rede externa
external: true external: true
name: sua_rede_externa # Substitua pelo nome da sua rede externa name: sua_rede_externa # Substitua pelo nome da sua rede externa

111
groq_handler.py Normal file
View File

@ -0,0 +1,111 @@
import aiohttp
import json
from typing import Optional, Tuple, Any
from datetime import datetime
import logging
from storage import StorageHandler
import asyncio
logger = logging.getLogger("GROQHandler")
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
async def test_groq_key(key: str) -> bool:
"""Teste se uma chave GROQ é válida e está funcionando."""
url = "https://api.groq.com/openai/v1/models"
headers = {"Authorization": f"Bearer {key}"}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
if response.status == 200:
data = await response.json()
return bool(data.get("data"))
return False
except Exception as e:
logger.error(f"Erro ao testar chave GROQ: {e}")
return False
async def validate_transcription_response(response_text: str) -> bool:
"""Valide se a resposta da transcrição é significativa."""
try:
cleaned_text = response_text.strip()
return len(cleaned_text) >= 10
except Exception as e:
logger.error(f"Erro ao validar resposta da transcrição: {e}")
return False
async def get_working_groq_key(storage: StorageHandler) -> Optional[str]:
"""Obtenha uma chave GROQ funcional do pool disponível."""
keys = storage.get_groq_keys()
for _ in range(len(keys)):
key = storage.get_next_groq_key()
if not key:
continue
penalized_until = storage.get_penalized_until(key)
if penalized_until and penalized_until > datetime.utcnow():
continue
if await test_groq_key(key):
return key
else:
storage.penalize_key(key, penalty_duration=300)
storage.add_log("ERROR", "Nenhuma chave GROQ funcional disponível.")
return None
async def handle_groq_request(
url: str,
headers: dict,
data: Any,
storage: StorageHandler,
is_form_data: bool = False
) -> Tuple[bool, dict, str]:
"""Lida com requisições para a API GROQ com suporte a retries e rotação de chaves."""
max_retries = len(storage.get_groq_keys())
for attempt in range(max_retries):
try:
storage.add_log("DEBUG", "Iniciando tentativa de requisição para GROQ", {
"url": url,
"is_form_data": is_form_data,
"attempt": attempt + 1
})
async with aiohttp.ClientSession() as session:
if is_form_data:
async with session.post(url, headers=headers, data=data) as response:
response_data = await response.json()
if response.status == 200 and response_data.get("text"):
return True, response_data, ""
else:
async with session.post(url, headers=headers, json=data) as response:
response_data = await response.json()
if response.status == 200 and response_data.get("choices"):
return True, response_data, ""
error_msg = response_data.get("error", {}).get("message", "")
if "organization_restricted" in error_msg or "invalid_api_key" in error_msg:
new_key = await get_working_groq_key(storage)
if new_key:
headers["Authorization"] = f"Bearer {new_key}"
await asyncio.sleep(1)
continue
return False, response_data, error_msg
except Exception as e:
storage.add_log("ERROR", "Erro na requisição", {"error": str(e)})
if attempt < max_retries - 1:
await asyncio.sleep(1)
continue
return False, {}, f"Request failed: {str(e)}"
storage.add_log("ERROR", "Todas as chaves GROQ falharam.")
return False, {}, "All GROQ keys exhausted."

139
main.py
View File

@ -5,16 +5,22 @@ from services import (
send_message_to_whatsapp, send_message_to_whatsapp,
get_audio_base64, get_audio_base64,
summarize_text_if_needed, summarize_text_if_needed,
download_remote_audio,
) )
from models import WebhookRequest from models import WebhookRequest
from config import logger, settings, redis_client from config import logger, settings, redis_client
from storage import StorageHandler from storage import StorageHandler
import traceback import traceback
import os import os
import asyncio
import aiohttp
app = FastAPI() app = FastAPI()
storage = StorageHandler() storage = StorageHandler()
@app.on_event("startup")
async def startup_event():
api_domain = os.getenv("API_DOMAIN", "seu.dominio.com")
redis_client.set("API_DOMAIN", api_domain)
# Função para buscar configurações do Redis com fallback para valores padrão # Função para buscar configurações do Redis com fallback para valores padrão
def get_config(key, default=None): def get_config(key, default=None):
try: try:
@ -37,12 +43,53 @@ def load_dynamic_settings():
"DEBUG_MODE": get_config("DEBUG_MODE", "false") == "true", "DEBUG_MODE": get_config("DEBUG_MODE", "false") == "true",
} }
async def forward_to_webhooks(body: dict, storage: StorageHandler):
"""Encaminha o payload para todos os webhooks cadastrados."""
webhooks = storage.get_webhook_redirects()
async with aiohttp.ClientSession() as session:
for webhook in webhooks:
try:
# Configura os headers mantendo o payload intacto
headers = {
"Content-Type": "application/json",
"X-TranscreveZAP-Forward": "true", # Header para identificação da origem
"X-TranscreveZAP-Webhook-ID": webhook["id"]
}
async with session.post(
webhook["url"],
json=body, # Envia o payload original sem modificações
headers=headers,
timeout=10
) as response:
if response.status in [200, 201, 202]:
storage.update_webhook_stats(webhook["id"], True)
else:
error_text = await response.text()
storage.update_webhook_stats(
webhook["id"],
False,
f"Status {response.status}: {error_text}"
)
# Registra falha para retry posterior
storage.add_failed_delivery(webhook["id"], body)
except Exception as e:
storage.update_webhook_stats(
webhook["id"],
False,
f"Erro ao encaminhar: {str(e)}"
)
# Registra falha para retry posterior
storage.add_failed_delivery(webhook["id"], body)
@app.post("/transcreve-audios") @app.post("/transcreve-audios")
async def transcreve_audios(request: Request): async def transcreve_audios(request: Request):
try: try:
body = await request.json() body = await request.json()
dynamic_settings = load_dynamic_settings() dynamic_settings = load_dynamic_settings()
# Iniciar o encaminhamento em background
asyncio.create_task(forward_to_webhooks(body, storage))
# Log inicial da requisição # Log inicial da requisição
storage.add_log("INFO", "Nova requisição de transcrição recebida", { storage.add_log("INFO", "Nova requisição de transcrição recebida", {
"instance": body.get("instance"), "instance": body.get("instance"),
@ -84,6 +131,18 @@ async def transcreve_audios(request: Request):
) )
return {"message": "Mensagem não autorizada para processamento"} return {"message": "Mensagem não autorizada para processamento"}
# Verificação do modo de processamento (grupos/todos)
process_mode = storage.get_process_mode()
is_group = "@g.us" in remote_jid
if process_mode == "groups_only" and not is_group:
storage.add_log("INFO", "Mensagem ignorada - modo apenas grupos ativo", {
"remote_jid": remote_jid,
"process_mode": process_mode,
"is_group": is_group
})
return {"message": "Modo apenas grupos ativo - mensagens privadas ignoradas"}
if from_me and not dynamic_settings["PROCESS_SELF_MESSAGES"]: if from_me and not dynamic_settings["PROCESS_SELF_MESSAGES"]:
storage.add_log("INFO", "Mensagem própria ignorada", { storage.add_log("INFO", "Mensagem própria ignorada", {
"remote_jid": remote_jid "remote_jid": remote_jid
@ -93,33 +152,71 @@ async def transcreve_audios(request: Request):
# Obter áudio # Obter áudio
try: try:
if "mediaUrl" in body["data"]["message"]: if "mediaUrl" in body["data"]["message"]:
audio_source = body["data"]["message"]["mediaUrl"] media_url = body["data"]["message"]["mediaUrl"]
storage.add_log("DEBUG", "Usando mediaUrl para áudio", { storage.add_log("DEBUG", "Baixando áudio via URL", {"mediaUrl": media_url})
"mediaUrl": audio_source audio_source = await download_remote_audio(media_url) # Baixa o arquivo remoto e retorna o caminho local
})
else: else:
storage.add_log("DEBUG", "Obtendo áudio via base64") storage.add_log("DEBUG", "Obtendo áudio via base64")
base64_audio = await get_audio_base64(server_url, instance, apikey, audio_key) base64_audio = await get_audio_base64(server_url, instance, apikey, audio_key)
audio_source = await convert_base64_to_file(base64_audio) audio_source = await convert_base64_to_file(base64_audio)
storage.add_log("DEBUG", "Áudio convertido", { storage.add_log("DEBUG", "Áudio convertido", {"source": audio_source})
"source": audio_source
# Carregar configurações de formatação
output_mode = get_config("output_mode", "both")
summary_header = get_config("summary_header", "🤖 *Resumo do áudio:*")
transcription_header = get_config("transcription_header", "🔊 *Transcrição do áudio:*")
character_limit = int(get_config("character_limit", "500"))
# Verificar se timestamps estão habilitados
use_timestamps = get_config("use_timestamps", "false") == "true"
storage.add_log("DEBUG", "Informações da mensagem", {
"from_me": from_me,
"remote_jid": remote_jid,
"is_group": is_group
}) })
# Transcrever áudio # Transcrever áudio
storage.add_log("INFO", "Iniciando transcrição") storage.add_log("INFO", "Iniciando transcrição")
transcription_text, _ = await transcribe_audio(audio_source) transcription_text, has_timestamps = await transcribe_audio(
audio_source,
# Resumir se necessário apikey=apikey,
remote_jid=remote_jid,
from_me=from_me,
use_timestamps=use_timestamps
)
# Log do resultado
storage.add_log("INFO", "Transcrição concluída", {
"has_timestamps": has_timestamps,
"text_length": len(transcription_text),
"remote_jid": remote_jid
})
# Determinar se precisa de resumo baseado no modo de saída
summary_text = None
if output_mode in ["both", "summary_only"] or (
output_mode == "smart" and len(transcription_text) > character_limit
):
summary_text = await summarize_text_if_needed(transcription_text) summary_text = await summarize_text_if_needed(transcription_text)
# Formatar mensagem # Construir mensagem baseada no modo de saída
summary_message = ( message_parts = []
f"🤖 *Resumo do áudio:*\n\n"
f"{summary_text}\n\n" if output_mode == "smart":
f"🔊 *Transcrição do áudio:*\n\n" if len(transcription_text) > character_limit:
f"{transcription_text}\n\n" message_parts.append(f"{summary_header}\n\n{summary_text}")
f"{dynamic_settings['BUSINESS_MESSAGE']}" else:
) message_parts.append(f"{transcription_header}\n\n{transcription_text}")
else:
if output_mode in ["both", "summary_only"] and summary_text:
message_parts.append(f"{summary_header}\n\n{summary_text}")
if output_mode in ["both", "transcription_only"]:
message_parts.append(f"{transcription_header}\n\n{transcription_text}")
# Adicionar mensagem de negócio
message_parts.append(dynamic_settings['BUSINESS_MESSAGE'])
# Juntar todas as partes da mensagem
summary_message = "\n\n".join(message_parts)
# Enviar resposta # Enviar resposta
await send_message_to_whatsapp( await send_message_to_whatsapp(
@ -135,8 +232,8 @@ async def transcreve_audios(request: Request):
storage.record_processing(remote_jid) storage.record_processing(remote_jid)
storage.add_log("INFO", "Áudio processado com sucesso", { storage.add_log("INFO", "Áudio processado com sucesso", {
"remote_jid": remote_jid, "remote_jid": remote_jid,
"transcription_length": len(transcription_text), "transcription_length": len(transcription_text) if transcription_text else 0,
"summary_length": len(summary_text) "summary_length": len(summary_text) if summary_text else 0 # Adiciona verificação
}) })
return {"message": "Áudio transcrito e resposta enviada com sucesso"} return {"message": "Áudio transcrito e resposta enviada com sucesso"}

View File

@ -6,44 +6,9 @@ from storage import StorageHandler
import plotly.express as px import plotly.express as px
import os import os
import redis import redis
from utils import create_redis_client
# 1. Primeiro: Configuração da página
# Conectar ao Redis
redis_client = redis.Redis(host=os.getenv('REDIS_HOST', 'localhost'), port=int(os.getenv('REDIS_PORT', 6380)), decode_responses=True)
# Função para salvar configurações no Redis
def save_to_redis(key, value):
try:
redis_client.set(key, value)
st.success(f"Configuração {key} salva com sucesso!")
except Exception as e:
st.error(f"Erro ao salvar no Redis: {key} -> {e}")
# Função para buscar configurações no Redis
def get_from_redis(key, default=None):
try:
value = redis_client.get(key)
return value if value is not None else default
except Exception as e:
st.error(f"Erro ao buscar no Redis: {key} -> {e}")
return default
# Função para buscar grupos do Whatsapp
def fetch_whatsapp_groups(server_url, instance, api_key):
url = f"{server_url}/group/fetchAllGroups/{instance}"
headers = {"apikey": api_key}
params = {"getParticipants": "false"} # Adicionando o parâmetro de query
try:
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
st.error(f"Erro ao buscar grupos: {str(e)}")
if response.text:
st.error(f"Resposta do servidor: {response.text}")
return []
# Configuração da página
st.set_page_config( st.set_page_config(
page_title="TranscreveZAP by Impacte AI", page_title="TranscreveZAP by Impacte AI",
page_icon="🎙️", page_icon="🎙️",
@ -51,6 +16,63 @@ st.set_page_config(
initial_sidebar_state="expanded", initial_sidebar_state="expanded",
) )
# 2. Depois: Inicialização do Redis
redis_client = create_redis_client()
# 3. Funções de sessão (atualizado para usar st.query_params)
def init_session():
"""Inicializa o sistema de sessão"""
if 'session_id' not in st.session_state:
# Verificar se existe uma sessão válida no Redis
session_token = st.query_params.get('session', None)
if session_token:
session_data = redis_client.get(f"session:{session_token}")
if session_data:
st.session_state.session_id = session_token
st.session_state.authenticated = True
return
# Se não houver sessão válida, gerar um novo ID
st.session_state.session_id = None
st.session_state.authenticated = False
# Garantir que init_session seja chamado antes de qualquer coisa
init_session()
def create_session():
"""Cria uma nova sessão no Redis"""
import uuid
session_id = str(uuid.uuid4())
expiry = 7 * 24 * 60 * 60 # 7 dias em segundos
# Salvar sessão no Redis
redis_client.setex(f"session:{session_id}", expiry, "active")
# Atualizar estado da sessão
st.session_state.session_id = session_id
st.session_state.authenticated = True
# Adicionar session_id como parâmetro de URL
st.query_params['session'] = session_id
def end_session():
"""Encerra a sessão atual"""
if 'session_id' in st.session_state and st.session_state.session_id:
# Remover sessão do Redis
redis_client.delete(f"session:{st.session_state.session_id}")
# Limpar todos os estados relevantes
for key in ['session_id', 'authenticated', 'username']:
if key in st.session_state:
del st.session_state[key]
# Remover parâmetro de sessão da URL
if 'session' in st.query_params:
del st.query_params['session']
# 4. Inicializar a sessão
init_session()
# Estilos CSS personalizados # Estilos CSS personalizados
st.markdown(""" st.markdown("""
<style> <style>
@ -100,6 +122,64 @@ st.markdown("""
# Configuração do storage # Configuração do storage
storage = StorageHandler() storage = StorageHandler()
# Dicionário de idiomas em português
IDIOMAS = {
"pt": "Português",
"en": "Inglês",
"es": "Espanhol",
"fr": "Francês",
"de": "Alemão",
"it": "Italiano",
"ja": "Japonês",
"ko": "Coreano",
"zh": "Chinês",
"ro": "Romeno",
"ru": "Russo",
"ar": "Árabe",
"hi": "Hindi",
"nl": "Holandês",
"pl": "Polonês",
"tr": "Turco"
}
# Função para salvar configurações no Redis
def save_to_redis(key, value):
try:
redis_client.set(key, value)
st.success(f"Configuração {key} salva com sucesso!")
except Exception as e:
st.error(f"Erro ao salvar no Redis: {key} -> {e}")
# Função para buscar configurações no Redis
def get_from_redis(key, default=None):
try:
value = redis_client.get(key)
return value if value is not None else default
except Exception as e:
st.error(f"Erro ao buscar no Redis: {key} -> {e}")
return default
# Função para buscar grupos do Whatsapp
def fetch_whatsapp_groups(server_url, instance, api_key):
url = f"{server_url}/group/fetchAllGroups/{instance}"
headers = {"apikey": api_key}
params = {"getParticipants": "false"} # Adicionando o parâmetro de query
try:
st.write(f"Requisição para URL: {url}") # Debug para URL
st.write(f"Cabeçalhos: {headers}") # Debug para headers
st.write(f"Parâmetros: {params}") # Debug para parâmetros
response = requests.get(url, headers=headers, params=params)
st.write(f"Status Code: {response.status_code}") # Debug para status HTTP
response.raise_for_status() # Levanta exceções HTTP
return response.json() # Retorna o JSON da resposta
except requests.RequestException as e:
st.error(f"Erro ao buscar grupos: {str(e)}")
if response.text:
st.error(f"Resposta do servidor: {response.text}")
return []
# Função para carregar configurações do Redis para o Streamlit # Função para carregar configurações do Redis para o Streamlit
def load_settings(): def load_settings():
try: try:
@ -108,6 +188,7 @@ def load_settings():
"BUSINESS_MESSAGE": get_from_redis("BUSINESS_MESSAGE", "*Impacte AI* Premium Services"), "BUSINESS_MESSAGE": get_from_redis("BUSINESS_MESSAGE", "*Impacte AI* Premium Services"),
"PROCESS_GROUP_MESSAGES": get_from_redis("PROCESS_GROUP_MESSAGES", "false"), "PROCESS_GROUP_MESSAGES": get_from_redis("PROCESS_GROUP_MESSAGES", "false"),
"PROCESS_SELF_MESSAGES": get_from_redis("PROCESS_SELF_MESSAGES", "true"), "PROCESS_SELF_MESSAGES": get_from_redis("PROCESS_SELF_MESSAGES", "true"),
"TRANSCRIPTION_LANGUAGE": get_from_redis("TRANSCRIPTION_LANGUAGE", "pt"),
} }
except Exception as e: except Exception as e:
st.error(f"Erro ao carregar configurações do Redis: {e}") st.error(f"Erro ao carregar configurações do Redis: {e}")
@ -162,26 +243,65 @@ def login_page():
submit_button = st.form_submit_button('Entrar') submit_button = st.form_submit_button('Entrar')
if submit_button: if submit_button:
if username == os.getenv('MANAGER_USER') and password == os.getenv('MANAGER_PASSWORD'): if username == os.getenv('MANAGER_USER') and password == os.getenv('MANAGER_PASSWORD'):
st.session_state.authenticated = True create_session()
st.success("Login realizado com sucesso!")
st.experimental_rerun() st.experimental_rerun()
else: else:
st.error('Credenciais inválidas') st.error('Credenciais inválidas')
# Modificar a função de logout no dashboard
def dashboard(): def dashboard():
# Versão do sistema
APP_VERSION = "2.3.3"
show_logo() show_logo()
st.sidebar.markdown('<div class="sidebar-header">TranscreveZAP - Menu</div>', unsafe_allow_html=True) st.sidebar.markdown('<div class="sidebar-header">TranscreveZAP - Menu</div>', unsafe_allow_html=True)
st.sidebar.markdown(f'<div style="text-align: center; color: gray; font-size: 0.8em;">versão {APP_VERSION}</div>', unsafe_allow_html=True)
# Mostrar nome do usuário logado (se disponível)
if hasattr(st.session_state, 'session_id'):
st.sidebar.markdown("---")
st.sidebar.markdown("👤 **Usuário Conectado**")
page = st.sidebar.radio( page = st.sidebar.radio(
"Navegação", "Navegação",
["📊 Painel de Controle", "👥 Gerenciar Grupos", "🚫 Gerenciar Bloqueios", "⚙️ Configurações"] ["📊 Painel de Controle", "👥 Gerenciar Grupos", "🔄 Hub de Redirecionamento", "🚫 Gerenciar Bloqueios", "⚙️ Configurações"]
) )
if st.sidebar.button("Sair"):
st.session_state.authenticated = False # Seção de logout com confirmação
st.sidebar.markdown("---")
logout_container = st.sidebar.container()
# Verifica se já existe um estado para confirmação de logout
if 'logout_confirmation' not in st.session_state:
st.session_state.logout_confirmation = False
# Botão principal de logout
if not st.session_state.logout_confirmation:
if logout_container.button("🚪 Sair da Conta"):
st.session_state.logout_confirmation = True
st.experimental_rerun() st.experimental_rerun()
# Botões de confirmação
if st.session_state.logout_confirmation:
col1, col2 = st.sidebar.columns(2)
if col1.button("✅ Confirmar"):
st.session_state.logout_confirmation = False
end_session()
st.experimental_rerun()
if col2.button("❌ Cancelar"):
st.session_state.logout_confirmation = False
st.experimental_rerun()
# Renderiza a página selecionada
if page == "📊 Painel de Controle": if page == "📊 Painel de Controle":
show_statistics() show_statistics()
elif page == "👥 Gerenciar Grupos": elif page == "👥 Gerenciar Grupos":
manage_groups() manage_groups()
elif page == "🔄 Hub de Redirecionamento":
manage_webhooks()
elif page == "🚫 Gerenciar Bloqueios": elif page == "🚫 Gerenciar Bloqueios":
manage_blocks() manage_blocks()
elif page == "⚙️ Configurações": elif page == "⚙️ Configurações":
@ -209,6 +329,24 @@ def show_statistics():
st.plotly_chart(fig, use_container_width=True) st.plotly_chart(fig, use_container_width=True)
else: else:
st.info("Ainda não há dados de processamento disponíveis.") st.info("Ainda não há dados de processamento disponíveis.")
# Adicionar informações sobre o endpoint da API
st.subheader("Endpoint da API")
api_domain = get_from_redis("API_DOMAIN", "seu.dominio.com")
api_endpoint = f"https://{api_domain}/transcreve-audios"
st.code(api_endpoint, language="text")
if st.button(" Instruções de Uso"):
st.info(
"Para utilizar o serviço de transcrição, siga estas etapas:\n\n"
"1. Copie a URL completa acima.\n"
"2. Na configuração de webhook da Evolution API:\n"
" - Cole a URL no campo apropriado.\n"
" - Ative o webhook.\n"
" - Marque as opções 'Webhook Base64' e o Evento 'MESSAGES_UPSERT'.\n\n"
"Isso permitirá que a Evolution API envie as mensagens de áudio para o nosso serviço de transcrição."
)
except Exception as e: except Exception as e:
st.error(f"Erro ao carregar estatísticas: {e}") st.error(f"Erro ao carregar estatísticas: {e}")
@ -289,6 +427,165 @@ def manage_groups():
else: else:
st.info("Nenhum grupo permitido.") st.info("Nenhum grupo permitido.")
def manage_webhooks():
st.title("🔄 Hub de Redirecionamento")
st.markdown("""
Configure aqui os webhooks para onde você deseja redirecionar as mensagens recebidas.
Cada webhook receberá uma cópia exata do payload original da Evolution API.
""")
# Adicionar novo webhook
st.subheader("Adicionar Novo Webhook")
with st.form("add_webhook"):
col1, col2 = st.columns([3, 1])
with col1:
webhook_url = st.text_input(
"URL do Webhook",
placeholder="https://seu-sistema.com/webhook"
)
with col2:
if st.form_submit_button("🔍 Testar Conexão"):
if webhook_url:
with st.spinner("Testando webhook..."):
success, message = storage.test_webhook(webhook_url)
if success:
st.success(message)
else:
st.error(message)
else:
st.warning("Por favor, insira uma URL válida")
webhook_description = st.text_input(
"Descrição",
placeholder="Ex: URL de Webhook do N8N, Sistema de CRM, etc."
)
if st.form_submit_button("Adicionar Webhook"):
if webhook_url:
try:
# Testar antes de adicionar
success, message = storage.test_webhook(webhook_url)
if success:
storage.add_webhook_redirect(webhook_url, webhook_description)
st.success("✅ Webhook testado e adicionado com sucesso!")
st.experimental_rerun()
else:
st.error(f"Erro ao adicionar webhook: {message}")
except Exception as e:
st.error(f"Erro ao adicionar webhook: {str(e)}")
else:
st.warning("Por favor, insira uma URL válida")
# Listar webhooks existentes
st.subheader("Webhooks Configurados")
webhooks = storage.get_webhook_redirects()
if not webhooks:
st.info("Nenhum webhook configurado ainda.")
return
for webhook in webhooks:
# Obter métricas de saúde
health = storage.get_webhook_health(webhook["id"])
# Definir cor baseada no status
status_colors = {
"healthy": "🟢",
"warning": "🟡",
"critical": "🔴",
"unknown": ""
}
status_icon = status_colors.get(health["health_status"], "")
with st.expander(
f"{status_icon} {webhook['description'] or webhook['url']}",
expanded=True
):
col1, col2 = st.columns([3, 1])
with col1:
st.text_input(
"URL",
value=webhook["url"],
key=f"url_{webhook['id']}",
disabled=True
)
if webhook["description"]:
st.text_input(
"Descrição",
value=webhook["description"],
key=f"desc_{webhook['id']}",
disabled=True
)
with col2:
# Métricas de saúde
st.metric(
"Taxa de Sucesso",
f"{health['success_rate']:.1f}%"
)
# Alertas baseados na saúde
if health["health_status"] == "critical":
st.error("⚠️ Taxa de erro crítica!")
elif health["health_status"] == "warning":
st.warning("⚠️ Taxa de erro elevada")
# Botões de ação
col1, col2 = st.columns(2)
with col1:
if st.button("🔄 Retry", key=f"retry_{webhook['id']}"):
failed_deliveries = storage.get_failed_deliveries(webhook["id"])
if failed_deliveries:
with st.spinner("Reenviando mensagens..."):
success_count = 0
for delivery in failed_deliveries:
if storage.retry_webhook(webhook["id"], delivery["payload"]):
success_count += 1
st.success(f"Reenviadas {success_count} de {len(failed_deliveries)} mensagens!")
else:
st.info("Não há mensagens pendentes para reenvio")
with col2:
if st.button("🗑️", key=f"remove_{webhook['id']}", help="Remover webhook"):
if st.session_state.get(f"confirm_remove_{webhook['id']}", False):
storage.remove_webhook_redirect(webhook["id"])
st.success("Webhook removido!")
st.experimental_rerun()
else:
st.session_state[f"confirm_remove_{webhook['id']}"] = True
st.warning("Clique novamente para confirmar")
# Estatísticas detalhadas
st.markdown("### Estatísticas")
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Total de Sucessos", webhook["success_count"])
with col2:
st.metric("Total de Erros", webhook["error_count"])
with col3:
last_success = webhook.get("last_success")
if last_success:
last_success = datetime.fromisoformat(last_success).strftime("%d/%m/%Y %H:%M")
st.metric("Último Sucesso", last_success or "Nunca")
# Exibir último erro (se houver)
if webhook.get("last_error"):
st.error(
f"Último erro: {webhook['last_error']['message']} "
f"({datetime.fromisoformat(webhook['last_error']['timestamp']).strftime('%d/%m/%Y %H:%M')})"
)
# Lista de entregas falhas
failed_deliveries = storage.get_failed_deliveries(webhook["id"])
if failed_deliveries:
st.markdown("### Entregas Pendentes")
st.warning(f"{len(failed_deliveries)} mensagens aguardando reenvio")
if st.button("📋 Ver Detalhes", key=f"details_{webhook['id']}"):
for delivery in failed_deliveries:
st.code(json.dumps(delivery, indent=2))
def manage_blocks(): def manage_blocks():
st.title("🚫 Gerenciar Bloqueios") st.title("🚫 Gerenciar Bloqueios")
st.subheader("Bloquear Usuário") st.subheader("Bloquear Usuário")
@ -317,19 +614,484 @@ def manage_blocks():
else: else:
st.info("Nenhum usuário bloqueado.") st.info("Nenhum usuário bloqueado.")
# manager.py - Adicionar na seção de configurações
def message_settings_section():
st.subheader("📝 Configurações de Mensagem")
# Carregar configurações atuais
message_settings = storage.get_message_settings()
# Headers personalizados
col1, col2 = st.columns(2)
with col1:
summary_header = st.text_input(
"Cabeçalho do Resumo",
value=message_settings["summary_header"],
help="Formato do cabeçalho para o resumo do áudio"
)
with col2:
transcription_header = st.text_input(
"Cabeçalho da Transcrição",
value=message_settings["transcription_header"],
help="Formato do cabeçalho para a transcrição do áudio"
)
# Modo de saída
output_mode = st.selectbox(
"Modo de Saída",
options=["both", "summary_only", "transcription_only", "smart"],
format_func=lambda x: {
"both": "Resumo e Transcrição",
"summary_only": "Apenas Resumo",
"transcription_only": "Apenas Transcrição",
"smart": "Modo Inteligente (baseado no tamanho)"
}[x],
value=message_settings["output_mode"]
)
# Configuração do limite de caracteres (visível apenas no modo inteligente)
if output_mode == "smart":
character_limit = st.number_input(
"Limite de Caracteres para Modo Inteligente",
min_value=100,
max_value=5000,
value=int(message_settings["character_limit"]),
help="Se a transcrição exceder este limite, será enviado apenas o resumo"
)
else:
character_limit = message_settings["character_limit"]
# Botão de salvar
if st.button("💾 Salvar Configurações de Mensagem"):
try:
new_settings = {
"summary_header": summary_header,
"transcription_header": transcription_header,
"output_mode": output_mode,
"character_limit": character_limit
}
storage.save_message_settings(new_settings)
st.success("Configurações de mensagem salvas com sucesso!")
except Exception as e:
st.error(f"Erro ao salvar configurações: {str(e)}")
def show_language_statistics():
"""Exibe estatísticas de uso de idiomas"""
stats = storage.get_language_statistics()
if not stats:
st.info("Ainda não há estatísticas de uso de idiomas.")
return
# Resumo geral
st.subheader("📊 Estatísticas de Idiomas")
# Criar métricas resumidas
total_usage = sum(s.get('total', 0) for s in stats.values())
auto_detected = sum(s.get('auto_detected', 0) for s in stats.values())
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Total de Transcrições", total_usage)
with col2:
st.metric("Detecções Automáticas", auto_detected)
with col3:
st.metric("Idiomas Diferentes", len(stats))
# Gráfico de uso por idioma
usage_data = []
for lang, data in stats.items():
usage_data.append({
'Idioma': IDIOMAS.get(lang, lang),
'Total': data.get('total', 0),
'Enviados': data.get('sent', 0),
'Recebidos': data.get('received', 0),
'Auto-detectados': data.get('auto_detected', 0)
})
if usage_data:
df = pd.DataFrame(usage_data)
# Gráfico de barras empilhadas
fig = px.bar(df,
x='Idioma',
y=['Enviados', 'Recebidos'],
title='Uso por Idioma',
barmode='stack')
st.plotly_chart(fig, use_container_width=True)
# Tabela detalhada
st.subheader("📋 Detalhamento por Idioma")
st.dataframe(df.sort_values('Total', ascending=False))
def manage_settings(): def manage_settings():
st.title("⚙️ Configurações") st.title("⚙️ Configurações")
# Criar tabs para melhor organização
tab1, tab2, tab3, tab4, tab5 = st.tabs([
"🔑 Chaves API",
"🤖 Provedor LLM",
"🌐 Configurações Gerais",
"📝 Formatação de Mensagens",
"🗣️ Idiomas e Transcrição"
])
with tab1:
st.subheader("Gerenciamento de Chaves GROQ")
# Campo para gerenciamento de chaves GROQ
main_key = st.text_input(
"GROQ API Key Principal",
value=st.session_state.settings["GROQ_API_KEY"],
key="groq_api_key",
type="password",
help="Chave GROQ principal do sistema"
)
# Seção de chaves adicionais
st.markdown("---")
st.subheader("Chaves GROQ Adicionais (Sistema de Rodízio)")
# Exibir chaves existentes
groq_keys = storage.get_groq_keys()
if groq_keys:
st.write("Chaves configuradas para rodízio:")
for key in groq_keys:
col1, col2 = st.columns([4, 1])
with col1:
masked_key = f"{key[:10]}...{key[-4:]}"
st.code(masked_key, language=None)
with col2:
if st.button("🗑️", key=f"remove_{key}", help="Remover esta chave"):
storage.remove_groq_key(key)
st.success(f"Chave removida do rodízio!")
st.experimental_rerun()
# Adicionar nova chave
new_key = st.text_input(
"Adicionar Nova Chave GROQ",
key="new_groq_key",
type="password",
help="Insira uma nova chave GROQ para adicionar ao sistema de rodízio"
)
col1, col2 = st.columns([4, 1])
with col1:
if st.button(" Adicionar ao Rodízio", help="Adicionar esta chave ao sistema de rodízio"):
if new_key:
if new_key.startswith("gsk_"):
storage.add_groq_key(new_key)
st.success("Nova chave adicionada ao sistema de rodízio!")
st.experimental_rerun()
else:
st.error("Chave inválida! A chave deve começar com 'gsk_'")
else:
st.warning("Por favor, insira uma chave válida")
pass
with tab2:
st.subheader("Configuração do Provedor LLM")
# Select provider
current_provider = storage.get_llm_provider()
provider = st.selectbox(
"Provedor de Serviço",
options=["groq", "openai"],
format_func=lambda x: "Groq (Open Source)" if x == "groq" else "OpenAI (API Paga)",
index=0 if current_provider == "groq" else 1
)
if provider == "openai":
st.info("""
A OpenAI é um serviço pago que requer uma chave API válida.
Obtenha sua chave em https://platform.openai.com
""")
# OpenAI Key Management
openai_key = st.text_input(
"OpenAI API Key",
type="password",
help="Chave que começa com 'sk-'"
)
if st.button("Adicionar Chave OpenAI"):
if openai_key and openai_key.startswith("sk-"):
storage.add_openai_key(openai_key)
st.success("✅ Chave OpenAI adicionada com sucesso!")
else:
st.error("Chave inválida! Deve começar com 'sk-'")
# Save provider selection
if st.button("💾 Salvar Configuração do Provedor"):
try:
storage.set_llm_provider(provider)
st.success(f"Provedor alterado para: {provider}")
except Exception as e:
st.error(f"Erro ao salvar provedor: {str(e)}")
with tab3:
st.subheader("Configurações do Sistema") st.subheader("Configurações do Sistema")
st.text_input("GROQ_API_KEY", value=st.session_state.settings["GROQ_API_KEY"], key="groq_api_key")
st.text_input("Mensagem de Serviço no Rodapé", value=st.session_state.settings["BUSINESS_MESSAGE"], key="business_message") # Business Message
st.selectbox("Processar Mensagens em Grupos", options=["true", "false"], index=["true", "false"].index(st.session_state.settings["PROCESS_GROUP_MESSAGES"]), key="process_group_messages") st.text_input(
st.selectbox("Processar Mensagens Próprias", options=["true", "false"], index=["true", "false"].index(st.session_state.settings["PROCESS_SELF_MESSAGES"]), key="process_self_messages") "Mensagem de Serviço no Rodapé",
if st.button("Salvar Configurações"): value=st.session_state.settings["BUSINESS_MESSAGE"],
key="business_message"
)
# Process Group Messages
st.selectbox(
"Processar Mensagens em Grupos",
options=["true", "false"],
index=["true", "false"].index(st.session_state.settings["PROCESS_GROUP_MESSAGES"]),
key="process_group_messages"
)
# Process Self Messages
st.selectbox(
"Processar Mensagens Próprias",
options=["true", "false"],
index=["true", "false"].index(st.session_state.settings["PROCESS_SELF_MESSAGES"]),
key="process_self_messages"
)
st.subheader("🔄 Modo de Processamento")
# Obter o modo atual do Redis
current_mode = storage.get_process_mode()
# Definir as opções e seus rótulos
mode_options = ["all", "groups_only"]
mode_labels = {
"all": "Todos (Grupos e Privado)",
"groups_only": "Apenas Grupos"
}
# Calcular o índice atual baseado no valor do Redis
current_index = mode_options.index(current_mode) if current_mode in mode_options else 0
process_mode = st.selectbox(
"Processar mensagens de:",
options=mode_options,
format_func=lambda x: mode_labels[x],
index=current_index,
key="process_mode",
help="Escolha se deseja processar mensagens de todos os contatos ou apenas de grupos"
)
# Configuração de idioma
st.markdown("---")
st.subheader("🌐 Idioma")
# Carregar configuração atual de idioma
current_language = get_from_redis("TRANSCRIPTION_LANGUAGE", "pt")
# Seleção de idioma
selected_language = st.selectbox(
"Idioma para Transcrição e Resumo",
options=list(IDIOMAS.keys()),
format_func=lambda x: IDIOMAS[x],
index=list(IDIOMAS.keys()).index(current_language) if current_language in IDIOMAS else 0,
help="Selecione o idioma para transcrição dos áudios e geração dos resumos",
key="transcription_language"
)
pass
with tab4:
st.subheader("Formatação de Mensagens")
# Headers personalizados
col1, col2 = st.columns(2)
with col1:
summary_header = st.text_input(
"Cabeçalho do Resumo",
value=get_from_redis("summary_header", "🤖 *Resumo do áudio:*"),
key="summary_header",
help="Formato do cabeçalho para o resumo do áudio"
)
with col2:
transcription_header = st.text_input(
"Cabeçalho da Transcrição",
value=get_from_redis("transcription_header", "🔊 *Transcrição do áudio:*"),
key="transcription_header",
help="Formato do cabeçalho para a transcrição do áudio"
)
# Modo de saída - Corrigido para usar index
output_modes = ["both", "summary_only", "transcription_only", "smart"]
output_mode_labels = {
"both": "Resumo e Transcrição",
"summary_only": "Apenas Resumo",
"transcription_only": "Apenas Transcrição",
"smart": "Modo Inteligente (baseado no tamanho)"
}
current_mode = get_from_redis("output_mode", "both")
mode_index = output_modes.index(current_mode) if current_mode in output_modes else 0
output_mode = st.selectbox(
"Modo de Saída",
options=output_modes,
format_func=lambda x: output_mode_labels[x],
index=mode_index,
key="output_mode",
help="Selecione como deseja que as mensagens sejam enviadas"
)
if output_mode == "smart":
character_limit = st.number_input(
"Limite de Caracteres para Modo Inteligente",
min_value=100,
max_value=5000,
value=int(get_from_redis("character_limit", "500")),
help="Se a transcrição exceder este limite, será enviado apenas o resumo"
)
# Botão de salvar unificado
if st.button("💾 Salvar Todas as Configurações"):
try:
# Salvar configurações existentes
save_settings() save_settings()
if "authenticated" not in st.session_state: # Salvar novas configurações de mensagem
st.session_state.authenticated = False save_to_redis("summary_header", summary_header)
save_to_redis("transcription_header", transcription_header)
save_to_redis("output_mode", output_mode)
if output_mode == "smart":
save_to_redis("character_limit", str(character_limit))
# Se há uma chave principal, adicionar ao sistema de rodízio
if main_key and main_key.startswith("gsk_"):
storage.add_groq_key(main_key)
# Salvar configuração de idioma
save_to_redis("TRANSCRIPTION_LANGUAGE", selected_language)
# Salvamento do modo de processamento
storage.redis.set(storage._get_redis_key("process_mode"), process_mode)
st.success("✅ Todas as configurações foram salvas com sucesso!")
# Mostrar resumo
total_keys = len(storage.get_groq_keys())
st.info(f"""Sistema configurado com {total_keys} chave(s) GROQ no rodízio
Idioma definido: {IDIOMAS[selected_language]}
Modo de saída: {output_mode_labels[output_mode]}""")
except Exception as e:
st.error(f"Erro ao salvar configurações: {str(e)}")
with tab5:
st.subheader("Idiomas e Transcrição")
# Adicionar estatísticas no topo
show_language_statistics()
# Seção de Detecção Automática
st.markdown("---")
st.markdown("### 🔄 Detecção Automática de Idioma")
col1, col2 = st.columns(2)
with col1:
auto_detect = st.toggle(
"Ativar detecção automática",
value=storage.get_auto_language_detection(),
help="Detecta e configura automaticamente o idioma dos contatos"
)
if auto_detect:
st.info("""
A detecção automática de idioma:
1. Analisa o primeiro áudio de cada contato
2. Configura o idioma automaticamente
3. Usa cache de 24 horas para otimização
4. Funciona apenas em conversas privadas
5. Mantém o idioma global para grupos
6. Permite tradução automática entre idiomas
""")
# Seção de Timestamps
st.markdown("---")
st.markdown("### ⏱️ Timestamps na Transcrição")
use_timestamps = st.toggle(
"Incluir timestamps",
value=get_from_redis("use_timestamps", "false") == "true",
help="Adiciona marcadores de tempo em cada trecho"
)
if use_timestamps:
st.info("Os timestamps serão mostrados no formato [MM:SS] para cada trecho da transcrição")
# Seção de Configuração Manual de Idiomas por Contato
st.markdown("---")
st.markdown("### 👥 Idiomas por Contato")
# Obter contatos configurados
contact_languages = storage.get_all_contact_languages()
# Adicionar novo contato
with st.expander(" Adicionar Novo Contato", expanded=not bool(contact_languages)):
new_contact = st.text_input(
"Número do Contato",
placeholder="Ex: 5521999999999",
help="Digite apenas números, sem símbolos ou @s.whatsapp.net"
)
new_language = st.selectbox(
"Idioma do Contato",
options=list(IDIOMAS.keys()),
format_func=lambda x: IDIOMAS[x],
help="Idioma para transcrição dos áudios deste contato"
)
if st.button("Adicionar Contato"):
if new_contact and new_contact.isdigit():
storage.set_contact_language(new_contact, new_language)
st.success(f"✅ Contato configurado com idioma {IDIOMAS[new_language]}")
st.experimental_rerun()
else:
st.error("Por favor, insira um número válido")
# Listar contatos configurados
if contact_languages:
st.markdown("### Contatos Configurados")
for contact, language in contact_languages.items():
col1, col2, col3 = st.columns([2, 2, 1])
with col1:
st.text(f"+{contact}")
with col2:
current_language = st.selectbox(
"Idioma",
options=list(IDIOMAS.keys()),
format_func=lambda x: IDIOMAS[x],
key=f"lang_{contact}",
index=list(IDIOMAS.keys()).index(language) if language in IDIOMAS else 0
)
if current_language != language:
storage.set_contact_language(contact, current_language)
with col3:
if st.button("🗑️", key=f"remove_{contact}"):
storage.remove_contact_language(contact)
st.success("Contato removido")
st.experimental_rerun()
# Botão de Salvar
if st.button("💾 Salvar Configurações de Idioma e Transcrição"):
try:
storage.set_auto_language_detection(auto_detect)
save_to_redis("use_timestamps", str(use_timestamps).lower())
st.success("✅ Configurações salvas com sucesso!")
# Mostrar resumo das configurações
st.info(f"""
Configurações atuais:
- Detecção automática: {'Ativada' if auto_detect else 'Desativada'}
- Timestamps: {'Ativados' if use_timestamps else 'Desativados'}
- Contatos configurados: {len(contact_languages)}
""")
except Exception as e:
st.error(f"Erro ao salvar configurações: {str(e)}")
# Adicionar no início da execução principal
if __name__ == "__main__":
init_session()
# Modificar a parte final do código
if st.session_state.authenticated: if st.session_state.authenticated:
dashboard() dashboard()
else: else:

74
openai_handler.py Normal file
View File

@ -0,0 +1,74 @@
import aiohttp
import json
from datetime import datetime
import logging
from storage import StorageHandler
logger = logging.getLogger("OpenAIHandler")
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
async def test_openai_key(key: str) -> bool:
"""Test if an OpenAI key is valid and working."""
url = "https://api.openai.com/v1/models"
headers = {"Authorization": f"Bearer {key}"}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
if response.status == 200:
data = await response.json()
return len(data.get("data", [])) > 0
return False
except Exception as e:
logger.error(f"Error testing OpenAI key: {e}")
return False
async def handle_openai_request(
url: str,
headers: dict,
data: any,
storage: StorageHandler,
is_form_data: bool = False
) -> tuple[bool, dict, str]:
"""Handle requests to OpenAI API with retries."""
max_retries = 3
for attempt in range(max_retries):
try:
async with aiohttp.ClientSession() as session:
if is_form_data:
async with session.post(url, headers=headers, data=data) as response:
response_data = await response.json()
if response.status == 200:
if is_form_data and response_data.get("text"):
return True, response_data, ""
elif not is_form_data and response_data.get("choices"):
return True, response_data, ""
else:
async with session.post(url, headers=headers, json=data) as response:
response_data = await response.json()
if response.status == 200 and response_data.get("choices"):
return True, response_data, ""
error_msg = response_data.get("error", {}).get("message", "")
if "invalid_api_key" in error_msg or "invalid authorization" in error_msg.lower():
logger.error(f"OpenAI API key invalid or expired")
return False, response_data, error_msg
if attempt < max_retries - 1:
continue
return False, response_data, error_msg
except Exception as e:
logger.error(f"Error in request: {str(e)}")
if attempt < max_retries - 1:
continue
return False, {}, f"Request failed: {str(e)}"
return False, {}, "All retries failed"

371
readme.md
View File

@ -1,9 +1,22 @@
# TranscreveZAP 2.0
## Transcrição e Resumo de Áudios no WhatsApp usando Python com interface em Streamlit
![ImpacteAI](./fluxo.png) ![ImpacteAI](./fluxo.png)
# TranscreveZAP 2.3- Plataforma de Gestão e Automação de Áudios do WhatsApp
Este projeto permite transcrever e resumir áudios enviados pelo WhatsApp usando inteligência artificial e integração com APIs. Ideal para automatizar o processamento de mensagens de áudio, oferecendo um resumo claro e prático. ### Sistema Inteligente de Transcrição, Resumo e Tradução Automática de Áudios para WhatsApp
*Desenvolvido com Python, FastAPI e Streamlit*
---
Uma solução completa para automatizar e gerenciar mensagens de áudio no WhatsApp, oferecendo:
- Transcrição automática multilíngue
- Resumos inteligentes de áudios
- Detecção e tradução automática entre idiomas
- Seleção de plataforma LLM (GROQ ou OpenAI)
- Interface administrativa completa
- Sistema de rodízio de chaves API
- Gestão avançada de grupos e usuários
- Personalização de formatação e saída
- Sistema de Redirecionamento de Webhooks
Contato de email: contato@impacte.ai Contato de email: contato@impacte.ai
([ACESSE NOSSO SITE](https://impacte.ai/)) ([ACESSE NOSSO SITE](https://impacte.ai/))
@ -16,47 +29,97 @@ Antes de começar, certifique-se de ter os seguintes requisitos:
- Python 3.10+ instalado ([Download](https://www.python.org/downloads/)) - Python 3.10+ instalado ([Download](https://www.python.org/downloads/))
- Docker e Docker Compose instalados ([Instruções](https://docs.docker.com/get-docker/)) - Docker e Docker Compose instalados ([Instruções](https://docs.docker.com/get-docker/))
- Uma conta Evolution API com chave válida - Uma conta Evolution API com chave válida
- Uma conta GROQ API com chave válida (começa com 'gsk_') ([Crie sua CONTA](https://console.groq.com/login)) - Chaves GROQ (começa com `gsk_`) e/ou chaves OpenAI (começa com `sk-`) configuradas ([Crie sua conta GROQ](https://console.groq.com/login))
* Em caso de uso com Proxy Reverso Aponte um Subdomínio para a API e outro para o MANAGER da aplicação
--- ---
## 🚀 **Novidade: Escolha do Provedor LLM**
Agora você pode escolher entre dois provedores para transcrições e resumos:
1. **GROQ** (open-source): Configuração padrão.
2. **OpenAI** (API paga): Integração com modelos GPT.
### Configuração:
- Acesse: **Configurações > Provedor LLM** na interface administrativa.
- Escolha entre `groq` e `openai`.
- Adicione as chaves correspondentes para cada provedor.
---
## 🚀 **Instalação e Configuração** ## 🚀 **Instalação e Configuração**
### 🐳 Docker Compose ### 🐳 Docker Compose
1. Clone o repositório: 1. Configure o arquivo docker-compose.yaml:
```bash
git clone https://github.com/seu-usuario/transcrevezap.git
cd transcrevezap
```
2. Configure o arquivo docker-compose.yaml:
```yaml ```yaml
version: "3.7" version: "3.7"
services: services:
# Serviço principal do TranscreveZAP
tcaudio: tcaudio:
image: impacteai/transcrevezap:latest image: impacteai/transcrevezap:latest
build:
context: .
ports: ports:
- 8005:8005 # Porta para FastAPI - "8005:8005" # API FastAPI - Use esta porta para configurar o webhook
- 8501:8501 # Porta para Streamlit - "8501:8501" # Interface Web Streamlit - Acesse o painel por esta porta
environment: environment:
- REDIS_HOST=redis # Configurações do Servidor
- REDIS_PORT=6380 - UVICORN_PORT=8005
- UVICORN_HOST=0.0.0.0
- UVICORN_RELOAD=true
- UVICORN_WORKERS=1
- API_DOMAIN=localhost # Para uso local mantenha localhost
# Modo Debug e Logs
- DEBUG_MODE=false
- LOG_LEVEL=INFO
# Credenciais do Painel Admin (ALTERE ESTAS CREDENCIAIS!)
- MANAGER_USER=admin - MANAGER_USER=admin
- MANAGER_PASSWORD=sua_senha_aqui - MANAGER_PASSWORD=sua_senha_aqui
# Configurações do Redis
- REDIS_HOST=redis-transcrevezap # Nome do serviço Redis
- REDIS_PORT=6380 # Porta do Redis
- REDIS_DB=0 # Banco de dados Redis
# Autenticação Redis (opcional - descomente se necessário)
# - REDIS_USERNAME=seu_usuario # Nome do usuário Redis
# - REDIS_PASSWORD=sua_senha # Senha do Redis
depends_on: depends_on:
- redis - redis-transcrevezap
command: ./start.sh
redis: # Serviço Redis para armazenamento de dados
redis-transcrevezap:
image: redis:6 image: redis:6
command: redis-server --port 6380 --appendonly yes # Escolha UMA das configurações do Redis abaixo:
volumes:
- redis_data:/data
# 1. Configuração simples SEM autenticação:
command: redis-server --port 6380 --appendonly yes
# 2. Configuração COM autenticação (descomente e ajuste se necessário):
# command: >
# redis-server
# --port 6380
# --appendonly yes
# --user admin on '>sua_senha' '~*' '+@all'
volumes: volumes:
redis_data: - redis_transcrevezap_data:/data # Persistência dos dados
# Volumes para persistência
volumes:
redis_transcrevezap_data:
driver: local
# Instruções de Uso:
# 1. Salve este arquivo como docker-compose.yml
# 2. Execute com: docker compose up -d
# 3. Acesse o painel em: http://localhost:8501
# 4. Configure o webhook da Evolution API para: http://localhost:8005/transcreve-audios
``` ```
3. Inicie os serviços: 2. Inicie os serviços:
```bash ```bash
docker-compose up -d docker-compose up -d
``` ```
@ -67,11 +130,10 @@ Acesse a interface de gerenciamento em http://seu-ip:8501.
Faça login com as credenciais definidas em MANAGER_USER e MANAGER_PASSWORD. Faça login com as credenciais definidas em MANAGER_USER e MANAGER_PASSWORD.
Na seção "Configurações", defina: Na seção "Configurações", defina:
GROQ_API_KEY: Sua chave da API GROQ 1. GROQ_API_KEY: Sua chave da API GROQ
BUSINESS_MESSAGE: Mensagem de rodapé após transcrição 2. BUSINESS_MESSAGE: Mensagem de rodapé após transcrição
PROCESS_GROUP_MESSAGES: Habilitar processamento de mensagens em grupos 3. PROCESS_GROUP_MESSAGES: Habilitar processamento de mensagens em grupos
PROCESS_SELF_MESSAGES: Habilitar processamento de mensagens próprias 4. PROCESS_SELF_MESSAGES: Habilitar processamento de mensagens próprias
## 🔧 Uso ## 🔧 Uso
Endpoint para Webhook da Evolution API Endpoint para Webhook da Evolution API
@ -125,7 +187,8 @@ uvicorn main:app --host 0.0.0.0 --port 8005
```bash ```bash
http://127.0.0.1:8005/transcreve-audios http://127.0.0.1:8005/transcreve-audios
``` ```
1. Aponte um subomínio com o IP do seu servidor para a API da TranscreveZAP
2. Aponte um subomínio com o IP do seu servidor para o MANAGER da TranscreveZAP
### 🌟 Docker Swarm com Traefik ### 🌟 Docker Swarm com Traefik
```yaml ```yaml
@ -133,9 +196,9 @@ version: "3.7"
services: services:
tcaudio: tcaudio:
image: impacteai/transcrevezap:latest image: impacteai/transcrevezap:dev
networks: networks:
- transcrevezap_network - sua_rede_externa # Substitua pelo nome da sua rede externa
ports: ports:
- 8005:8005 # Porta para FastAPI - 8005:8005 # Porta para FastAPI
- 8501:8501 # Porta para Streamlit - 8501:8501 # Porta para Streamlit
@ -144,12 +207,17 @@ services:
- UVICORN_HOST=0.0.0.0 - UVICORN_HOST=0.0.0.0
- UVICORN_RELOAD=true - UVICORN_RELOAD=true
- UVICORN_WORKERS=1 - UVICORN_WORKERS=1
- API_DOMAIN=seu.dominio.com #coloque seu subdominio da API apontado aqui
- DEBUG_MODE=false - DEBUG_MODE=false
- LOG_LEVEL=INFO - LOG_LEVEL=INFO
- MANAGER_USER=seu_usuario_admin - MANAGER_USER=seu_usuario_admin # Defina Usuário do Manager
- MANAGER_PASSWORD=sua_senha_segura - MANAGER_PASSWORD=sua_senha_segura # Defina Senha do Manager
- REDIS_HOST=redis-transcrevezap - REDIS_HOST=redis-transcrevezap
- REDIS_PORT=6380 - REDIS_PORT=6380 # Porta personalizada para o Redis do TranscreveZAP
- REDIS_DB=0 # Opcional: pode ser removida para usar o valor padrão
# Autenticação Redis (opcional - descomente se necessário, se estiver usando autenticação)
# - REDIS_USERNAME=${REDIS_USERNAME:-} # Nome do usuário definido no comando do Redis
# - REDIS_PASSWORD=${REDIS_PASSWORD:-} # Senha definida no comando do Redis (sem o '>')
depends_on: depends_on:
- redis-transcrevezap - redis-transcrevezap
deploy: deploy:
@ -160,7 +228,7 @@ services:
- node.role == manager - node.role == manager
labels: labels:
- traefik.enable=true - traefik.enable=true
- traefik.http.routers.tcaudio.rule=Host(`seu.dominio.com`) - traefik.http.routers.tcaudio.rule=Host(`seu.dominio.com`) #coloque seu subdominio da API apontado aqui
- traefik.http.routers.tcaudio.entrypoints=websecure - traefik.http.routers.tcaudio.entrypoints=websecure
- traefik.http.routers.tcaudio.tls.certresolver=letsencryptresolver - traefik.http.routers.tcaudio.tls.certresolver=letsencryptresolver
- traefik.http.services.tcaudio.loadbalancer.server.port=8005 - traefik.http.services.tcaudio.loadbalancer.server.port=8005
@ -169,7 +237,7 @@ services:
- traefik.http.middlewares.traefik-compress.compress=true - traefik.http.middlewares.traefik-compress.compress=true
- traefik.http.routers.tcaudio.middlewares=traefik-compress - traefik.http.routers.tcaudio.middlewares=traefik-compress
# Configuração do Streamlit # Configuração do Streamlit
- traefik.http.routers.tcaudio-manager.rule=Host(`manager.seu.dominio.com`) - traefik.http.routers.tcaudio-manager.rule=Host(`manager.seu.dominio.com`) #coloque seu subdominio do Manager apontado aqui
- traefik.http.routers.tcaudio-manager.entrypoints=websecure - traefik.http.routers.tcaudio-manager.entrypoints=websecure
- traefik.http.routers.tcaudio-manager.tls.certresolver=letsencryptresolver - traefik.http.routers.tcaudio-manager.tls.certresolver=letsencryptresolver
- traefik.http.services.tcaudio-manager.loadbalancer.server.port=8501 - traefik.http.services.tcaudio-manager.loadbalancer.server.port=8501
@ -178,14 +246,33 @@ services:
redis-transcrevezap: redis-transcrevezap:
image: redis:6 image: redis:6
# 1. Configuração SEM autenticação (padrão):
command: redis-server --port 6380 --appendonly yes command: redis-server --port 6380 --appendonly yes
# 2. Configuração COM autenticação (descomente e ajuste se necessário):
# command: >
# redis-server
# --port 6380
# --appendonly yes
# --user seuusuario on '>minhasenha' '~*' '+@all'
# # Explicação dos parâmetros:
# # --user seuusuario: nome do usuário
# # on: indica início da configuração do usuário
# # '>minhasenha': senha do usuário (mantenha o '>')
# # '~*': permite acesso a todas as chaves
# # '+@all': concede todas as permissões
volumes: volumes:
- redis_transcrevezap_data:/data - redis_transcrevezap_data:/data
networks: networks:
- transcrevezap_network - sua_rede_externa # Substitua pelo nome da sua rede externa
deploy:
mode: replicated
replicas: 1
placement:
constraints:
- node.role == manager
networks: networks:
transcrevezap_network: sua_rede_externa: # Substitua pelo nome da sua rede externa
external: true external: true
name: sua_rede_externa # Substitua pelo nome da sua rede externa name: sua_rede_externa # Substitua pelo nome da sua rede externa
@ -203,9 +290,10 @@ https://transcricaoaudio.seudominio.com.br/transcreve-audios
Para usar com Traefik, certifique-se de: Para usar com Traefik, certifique-se de:
1. Ter o Traefik configurado em seu ambiente Docker Swarm 1. Ter o Traefik configurado em seu ambiente Docker Swarm
2. Configurar o DNS do seu domínio para apontar para o servidor 2. Configurar 2 DNS do seu domínio para apontar para a API e para o MANAGER
3. Ajustar as labels do Traefik conforme seu ambiente 3. Ajustar as labels do Traefik conforme seu ambiente
4. Verificar se a rede externa existe no Docker Swarm 4. Verificar se a rede externa existe no Docker Swarm
5. Utilize a stack de exemplo contida no projeto para guiar a instalação
## 📝 **Notas Importantes** ## 📝 **Notas Importantes**
- A GROQ_API_KEY deve começar com 'gsk_' - A GROQ_API_KEY deve começar com 'gsk_'
@ -214,6 +302,176 @@ Para usar com Traefik, certifique-se de:
- Em produção, recomenda-se DEBUG_MODE=false - Em produção, recomenda-se DEBUG_MODE=false
- Configure LOG_LEVEL=DEBUG apenas para troubleshooting - Configure LOG_LEVEL=DEBUG apenas para troubleshooting
## 🚀 Novo Recurso v2.3.1: Hub de Redirecionamento
O TranscreveZAP agora oferece um sistema robusto para redirecionamento de mensagens, permitindo que você encaminhe os webhooks da Evolution API para múltiplos destinos simultaneamente.
### Principais Recursos
- Interface dedicada para gerenciamento de webhooks
- Redirecionamento sem alteração do payload original
- Monitoramento de saúde dos webhooks em tempo real
- Sistema de retry automático para reenvio de mensagens falhas
- Headers de rastreamento para identificação de origem (`X-TranscreveZAP-Forward`)
- Suporte a descrições personalizadas para cada webhook
- Limpeza automática de dados ao remover webhooks
### Compatibilidade
- Mantém o payload da Evolution API intacto
- Suporta múltiplos endpoints simultaneamente
- Compatível com qualquer sistema que aceite webhooks via POST
- Preserva todos os dados originais da mensagem
## ✨ Novos Recursos na v2.3
### 🌍 Suporte Multilíngue
- Transcrição e resumo com suporte para 16 idiomas principais
- Mudança instantânea de idioma
- Interface intuitiva para seleção de idioma
- Mantém consistência entre transcrição e resumo
- Configuração manual de idioma por contato
- Detecção automática de idioma
- Tradução automática integrada
### 🔄 Sistema de Cache para Idiomas
Implementação de cache inteligente para otimizar a detecção e processamento de idiomas.
### 🔄 Sistema Inteligente de Rodízio de Chaves
- Suporte a múltiplas chaves GROQ
- Balanceamento automático de carga
- Maior redundância e disponibilidade
- Gestão simplificada de chaves via interface
### ⏱️ Timestamps em Transcrições
Nova funcionalidade de timestamps que adiciona marcadores de tempo precisos em cada trecho da transcrição.
## 📋 Detalhamento das Funcionalidades
### 🌍 Sistema de Idiomas
O TranscreveZAP suporta transcrição e resumo em múltiplos idiomas. Na seção "Configurações", você pode:
1. Selecionar o idioma principal para transcrição e resumo
2. O sistema manterá Português como padrão se nenhum outro for selecionado
3. A mudança de idioma é aplicada instantaneamente após salvar
Idiomas suportados:
- 🇩🇪 Alemão
- 🇸🇦 Árabe
- 🇨🇳 Chinês
- 🇰🇷 Coreano
- 🇪🇸 Espanhol
- 🇫🇷 Francês
- 🇮🇳 Hindi
- 🇳🇱 Holandês
- 🇬🇧 Inglês
- 🇮🇹 Italiano
- 🇯🇵 Japonês
- 🇵🇱 Polonês
- 🇧🇷 Português (padrão)
- 🇷🇴 Romeno
- 🇷🇺 Russo
- 🇹🇷 Turco
### 🌐 Gestão de Idiomas por Contato
#### Configuração Manual
```markdown
1. Acesse o Manager > Configurações > Idiomas e Transcrição
2. Expanda "Adicionar Novo Contato"
3. Digite o número do contato (formato: 5521999999999)
4. Selecione o idioma desejado
5. Clique em "Adicionar Contato"
```
### 🔄 Detecção Automática de Idioma
Nova funcionalidade que detecta automaticamente o idioma do contato:
- Ativação via Manager > Configurações > Idiomas e Transcrição
- Analisa o primeiro áudio de cada contato
- Cache inteligente de 24 horas
- Funciona apenas em conversas privadas
- Mantém configuração global para grupos
### ⚡ Tradução Automática
Sistema inteligente de tradução que:
- Traduz automaticamente áudios recebidos para seu idioma principal
- Mantém o contexto e estilo original da mensagem
- Preserva formatações especiais (emojis, negrito, itálico)
- Otimizado para comunicação natural
### ⏱️ Sistema de Timestamps
Nova funcionalidade que adiciona marcadores de tempo:
- Formato [MM:SS] no início de cada trecho
- Ativação via Manager > Configurações > Idiomas e Transcrição
- Precisão de segundos
- Ideal para referência e navegação em áudios longos
#### Exemplo de Saída com Timestamps:
```
[00:00] Bom dia pessoal
[00:02] Hoje vamos falar sobre
[00:05] O novo sistema de timestamps
```
## 🔧 Configuração e Uso
### Configuração de Idiomas
1. **Configuração Global**
- Defina o idioma padrão do sistema
- Acesse: Manager > Configurações > Configurações Gerais
- Selecione o idioma principal em "Idioma para Transcrição e Resumo"
2. **Configuração por Contato**
- Acesse: Manager > Configurações > Idiomas e Transcrição
- Use "Adicionar Novo Contato" ou gerencie contatos existentes
- Cada contato pode ter seu próprio idioma configurado
3. **Detecção Automática**
- Ative/Desative a detecção automática
- Configure o tempo de cache
- Gerencie exceções e configurações manuais
### Configuração de Timestamps
1. Acesse: Manager > Configurações > Idiomas e Transcrição
2. Localize a seção "Timestamps na Transcrição"
3. Use o toggle para ativar/desativar
4. As mudanças são aplicadas imediatamente
## 📊 Monitoramento e Estatísticas
### Estatísticas de Idiomas
O sistema agora oferece estatísticas detalhadas:
- Total de transcrições por idioma
- Número de detecções automáticas
- Divisão entre mensagens enviadas/recebidas
- Histórico de uso por idioma
### Visualização de Dados
- Gráficos de uso por idioma
- Distribuição de idiomas
- Estatísticas de tradução
- Performance do sistema
## 🔄 Sistema de Rodízio de Chaves GROQ
O TranscreveZAP suporta múltiplas chaves GROQ com sistema de rodízio automático para melhor distribuição de carga e redundância.
### Funcionalidades:
1. Adicione múltiplas chaves GROQ para distribuição de carga
2. O sistema alterna automaticamente entre as chaves disponíveis
3. Se uma chave falhar, o sistema usa a próxima disponível
4. Visualize todas as chaves configuradas no painel
5. Adicione ou remova chaves sem interromper o serviço
### Como Configurar:
1. Acesse a seção "Configurações"
2. Na área "🔑 Gerenciamento de Chaves GROQ":
- Adicione a chave principal
- Use "Adicionar Nova Chave GROQ" para incluir chaves adicionais
- O sistema começará a usar todas as chaves em rodízio automaticamente
### Boas Práticas:
- Mantenha pelo menos duas chaves ativas para redundância
- Monitore o uso das chaves pelo painel administrativo
- Remova chaves expiradas ou inválidas
- Todas as chaves devem começar com 'gsk_'
## 🔍 **Troubleshooting** ## 🔍 **Troubleshooting**
Se encontrar problemas: Se encontrar problemas:
1. Verifique se todas as variáveis obrigatórias estão configuradas 1. Verifique se todas as variáveis obrigatórias estão configuradas
@ -221,6 +479,41 @@ Se encontrar problemas:
3. Verifique os logs do container 3. Verifique os logs do container
4. Certifique-se que as APIs estão acessíveis 4. Certifique-se que as APIs estão acessíveis
### Problemas com Múltiplas Chaves GROQ:
1. Verifique se todas as chaves começam com 'gsk_'
2. Confirme se as chaves estão ativas na console GROQ
3. Monitore os logs para identificar falhas específicas de chaves
4. Mantenha pelo menos uma chave válida no sistema
### Problemas com Idiomas:
1. Verifique se o idioma está corretamente selecionado nas configurações
2. Confirme se a configuração foi salva com sucesso
3. Reinicie o serviço se as alterações não forem aplicadas
4. Verifique os logs para confirmar o idioma em uso
## 📝 Notas Adicionais
### Recomendações de Uso
- Configure idiomas manualmente para contatos frequentes
- Use detecção automática como fallback
- Monitore estatísticas de uso
- Faça backups regulares das configurações
### Limitações Conhecidas
- Detecção automática requer primeiro áudio
- Cache limitado a 24 horas
- Timestamps podem variar em áudios muito longos
## 🤝 Contribuição
Agradecemos feedback e contribuições! Reporte issues e sugira melhorias em nosso GitHub.
---
### 📞 Suporte
Para suporte adicional ou dúvidas:
- WhatsApp: [Entre no GRUPO](https://chat.whatsapp.com/L9jB1SlcmQFIVxzN71Y6KG)
- Email: contato@impacte.ai
- Site: [impacte.ai](https://impacte.ai)
## 📄 **Licença** ## 📄 **Licença**
Este projeto está licenciado sob a Licença MIT - veja o arquivo [LICENSE](LICENSE) para detalhes. Este projeto está licenciado sob a Licença MIT - veja o arquivo [LICENSE](LICENSE) para detalhes.

View File

@ -7,7 +7,8 @@ from storage import StorageHandler
import os import os
import json import json
import tempfile import tempfile
import traceback
from groq_handler import get_working_groq_key, validate_transcription_response, handle_groq_request
# Inicializa o storage handler # Inicializa o storage handler
storage = StorageHandler() storage = StorageHandler()
@ -31,55 +32,153 @@ async def convert_base64_to_file(base64_data):
}) })
raise raise
async def get_groq_key():
"""Obtém a próxima chave GROQ do sistema de rodízio."""
key = storage.get_next_groq_key()
if not key:
raise HTTPException(
status_code=500,
detail="Nenhuma chave GROQ configurada. Configure pelo menos uma chave no painel administrativo."
)
return key
async def summarize_text_if_needed(text): async def summarize_text_if_needed(text):
"""Resumir texto usando a API GROQ""" """Resumir texto usando a API GROQ com sistema de rodízio de chaves"""
storage.add_log("DEBUG", "Iniciando processo de resumo", { storage.add_log("DEBUG", "Iniciando processo de resumo", {
"text_length": len(text) "text_length": len(text)
}) })
provider = storage.get_llm_provider()
# Obter idioma configurado
language = redis_client.get("TRANSCRIPTION_LANGUAGE") or "pt"
storage.add_log("DEBUG", "Idioma configurado para resumo", {
"language": language,
"redis_value": redis_client.get("TRANSCRIPTION_LANGUAGE")
})
if provider == "openai":
api_key = storage.get_openai_keys()[0]
url = "https://api.openai.com/v1/chat/completions"
model = "gpt-4o-mini"
else: # groq
url = "https://api.groq.com/openai/v1/chat/completions"
api_key = await get_working_groq_key(storage)
if not api_key:
raise Exception("Nenhuma chave GROQ disponível")
model = "llama-3.3-70b-versatile"
url_completions = "https://api.groq.com/openai/v1/chat/completions"
headers = { headers = {
"Authorization": f"Bearer {settings.GROQ_API_KEY}", "Authorization": f"Bearer {api_key}",
"Content-Type": "application/json", "Content-Type": "application/json",
} }
# Adaptar o prompt para considerar o idioma
prompt_by_language = {
"pt": """
Entenda o contexto desse áudio e faça um resumo super enxuto sobre o que se trata.
Esse áudio foi enviado pelo whatsapp, de alguém, para Fabio.
Escreva APENAS o resumo do áudio como se fosse você que estivesse enviando
essa mensagem! Não cumprimente, não de oi, não escreva nada antes nem depois
do resumo, responda apenas um resumo enxuto do que foi falado no áudio.
""",
"en": """
Understand the context of this audio and make a very concise summary of what it's about.
This audio was sent via WhatsApp, from someone, to Fabio.
Write ONLY the summary of the audio as if you were sending this message yourself!
Don't greet, don't say hi, don't write anything before or after the summary,
respond with just a concise summary of what was said in the audio.
""",
"es": """
Entiende el contexto de este audio y haz un resumen muy conciso sobre de qué se trata.
Este audio fue enviado por WhatsApp, de alguien, para Fabio.
Escribe SOLO el resumen del audio como si estuvieras enviando este mensaje.
No saludes, no escribas nada antes ni después del resumen, responde únicamente un resumen conciso de lo dicho en el audio.
""",
"fr": """
Comprenez le contexte de cet audio et faites un résumé très concis de ce dont il s'agit.
Cet audio a été envoyé via WhatsApp, par quelqu'un, à Fabio.
Écrivez UNIQUEMENT le résumé de l'audio comme si c'était vous qui envoyiez ce message.
Ne saluez pas, n'écrivez rien avant ou après le résumé, répondez seulement par un résumé concis de ce qui a été dit dans l'audio.
""",
"de": """
Verstehen Sie den Kontext dieses Audios und erstellen Sie eine sehr kurze Zusammenfassung, worum es geht.
Dieses Audio wurde über WhatsApp von jemandem an Fabio gesendet.
Schreiben Sie NUR die Zusammenfassung des Audios, als ob Sie diese Nachricht senden würden.
Grüßen Sie nicht, schreiben Sie nichts vor oder nach der Zusammenfassung, antworten Sie nur mit einer kurzen Zusammenfassung dessen, was im Audio gesagt wurde.
""",
"it": """
Comprendi il contesto di questo audio e fai un riassunto molto conciso di cosa si tratta.
Questo audio è stato inviato tramite WhatsApp, da qualcuno, a Fabio.
Scrivi SOLO il riassunto dell'audio come se fossi tu a inviare questo messaggio.
Non salutare, non scrivere nulla prima o dopo il riassunto, rispondi solo con un riassunto conciso di ciò che è stato detto nell'audio.
""",
"ja": """
この音声の内容を理解しそれが何について話されているのかを非常に簡潔に要約してください
この音声は誰かがWhatsAppでファビオに送ったものです
あなたがそのメッセージを送っているように音声の要約だけを記述してください
挨拶や前置き後書きは書かず音声で話された内容の簡潔な要約のみを返信してください
""",
"ko": """
오디오의 맥락을 이해하고, 무엇에 관한 것인지 매우 간략하게 요약하세요.
오디오는 누군가가 WhatsApp을 통해 Fabio에게 보낸 것입니다.
마치 당신이 메시지를 보내는 것처럼 오디오의 요약만 작성하세요.
인사하거나, 요약 전후로 아무것도 쓰지 말고, 오디오에서 말한 내용을 간략하게 요약한 답변만 하세요.
""",
"zh": """
理解这个音频的上下文并简洁地总结它的内容
这个音频是某人通过WhatsApp发送给Fabio的
请仅以摘要的形式回答就好像是你在发送这条消息
不要问候也不要在摘要前后写任何内容只需用一句简短的话总结音频中所说的内容
""",
"ro": """
Înțelege contextul acestui audio și creează un rezumat foarte concis despre ce este vorba.
Acest audio a fost trimis prin WhatsApp, de cineva, către Fabio.
Scrie DOAR rezumatul audio-ului ca și cum tu ai trimite acest mesaj.
Nu saluta, nu scrie nimic înainte sau după rezumat, răspunde doar cu un rezumat concis despre ce s-a spus în audio.
""",
"ru": """
Поймите контекст этого аудио и сделайте очень краткое резюме, о чем идет речь.
Это аудио было отправлено через WhatsApp кем-то Фабио.
Напишите ТОЛЬКО резюме аудио, как будто вы отправляете это сообщение.
Не приветствуйте, не пишите ничего до или после резюме, ответьте только кратким резюме того, что говорилось в аудио.
"""
}
# Usar o prompt do idioma configurado ou fallback para português
base_prompt = prompt_by_language.get(language, prompt_by_language["pt"])
json_data = { json_data = {
"messages": [{ "messages": [{
"role": "user", "role": "user",
"content": f""" "content": f"{base_prompt}\n\nTexto para resumir: {text}",
Entenda o contexto desse áudio e faça um resumo super enxuto sobre o que se trata, coloque os pontos relevantes e mais importantes no resumo de forma muito curta.
Esse áudio foi enviado pelo whatsapp, de alguém, para Gabriel.
Escreva APENAS o resumo do áudio como se fosse você que estivesse enviando
essa mensagem! Não comprimente, não de oi, não escreva nada antes nem depois
do resumo, responda apenas um resumo enxuto do que foi falado no áudio.
IMPORTANTE: Não faça esse resumo como se fosse um áudio que uma terceira
pessoa enviou, não diga coisas como 'a pessoa está falando...' etc.
Escreva o resumo com base nessa mensagem do áudio,
como se você estivesse escrevendo esse resumo e enviando em
texto pelo whatsapp: {text}""",
}], }],
"model": "llama-3.3-70b-versatile", "model": model,
} }
try: try:
async with aiohttp.ClientSession() as session: success, response_data, error = await handle_groq_request(url, headers, json_data, storage, is_form_data=False)
storage.add_log("DEBUG", "Enviando requisição para API GROQ") if not success:
async with session.post(url_completions, headers=headers, json=json_data) as summary_response: raise Exception(error)
if summary_response.status == 200:
summary_result = await summary_response.json() summary_text = response_data["choices"][0]["message"]["content"]
summary_text = summary_result["choices"][0]["message"]["content"] # Validar se o resumo não está vazio
storage.add_log("INFO", "Resumo gerado com sucesso", { if not await validate_transcription_response(summary_text):
storage.add_log("ERROR", "Resumo vazio ou inválido recebido")
raise Exception("Resumo vazio ou inválido recebido")
# Validar se o resumo é menor que o texto original
if len(summary_text) >= len(text):
storage.add_log("WARNING", "Resumo maior que texto original", {
"original_length": len(text), "original_length": len(text),
"summary_length": len(summary_text) "summary_length": len(summary_text)
}) })
return summary_text storage.add_log("INFO", "Resumo gerado com sucesso", {
else: "original_length": len(text),
error_text = await summary_response.text() "summary_length": len(summary_text),
storage.add_log("ERROR", "Erro na API GROQ", { "language": language
"error": error_text,
"status": summary_response.status
}) })
raise Exception(f"Erro ao resumir o texto: {error_text}")
return summary_text
except Exception as e: except Exception as e:
storage.add_log("ERROR", "Erro no processo de resumo", { storage.add_log("ERROR", "Erro no processo de resumo", {
"error": str(e), "error": str(e),
@ -87,69 +186,212 @@ async def summarize_text_if_needed(text):
}) })
raise raise
async def transcribe_audio(audio_source, apikey=None): async def transcribe_audio(audio_source, apikey=None, remote_jid=None, from_me=False, use_timestamps=False):
"""Transcreve áudio usando a API GROQ""" """
storage.add_log("INFO", "Iniciando processo de transcrição") Transcreve áudio com suporte a detecção de idioma e tradução automática.
Args:
audio_source: Caminho do arquivo de áudio ou URL
apikey: Chave da API opcional para download de áudio
remote_jid: ID do remetente/destinatário
from_me: Se o áudio foi enviado pelo próprio usuário
use_timestamps: Se True, usa verbose_json para incluir timestamps
Returns:
tuple: (texto_transcrito, has_timestamps)
"""
storage.add_log("INFO", "Iniciando processo de transcrição", {
"from_me": from_me,
"remote_jid": remote_jid
})
provider = storage.get_llm_provider()
if provider == "openai":
api_key = storage.get_openai_keys()[0] # Get first OpenAI key
url = "https://api.openai.com/v1/audio/transcriptions"
model = "whisper-1"
else: # groq
api_key = await get_working_groq_key(storage)
if not api_key:
raise Exception("Nenhuma chave GROQ disponível")
url = "https://api.groq.com/openai/v1/audio/transcriptions" url = "https://api.groq.com/openai/v1/audio/transcriptions"
groq_headers = {"Authorization": f"Bearer {settings.GROQ_API_KEY}"} model = "whisper-large-v3"
headers = {"Authorization": f"Bearer {api_key}"}
# Inicializar variáveis
contact_language = None
system_language = redis_client.get("TRANSCRIPTION_LANGUAGE") or "pt"
is_private = remote_jid and "@s.whatsapp.net" in remote_jid
# Determinar idioma do contato em conversas privadas
if is_private:
# Remover @s.whatsapp.net do ID para buscar no cache
contact_id = remote_jid.split('@')[0]
# 1. Primeiro tentar obter idioma configurado manualmente
contact_language = storage.get_contact_language(contact_id)
if contact_language:
storage.add_log("DEBUG", "Usando idioma configurado manualmente", {
"contact_language": contact_language,
"from_me": from_me,
"remote_jid": remote_jid,
"is_private": is_private
})
# 2. Se não houver configuração manual e detecção automática estiver ativa
elif storage.get_auto_language_detection():
# Verificar cache primeiro
cached_lang = storage.get_cached_language(contact_id)
if cached_lang:
contact_language = cached_lang.get('language')
storage.add_log("DEBUG", "Usando idioma do cache", {
"contact_language": contact_language,
"auto_detected": True
})
# Se não há cache ou está expirado, fazer detecção
elif not from_me: # Só detecta em mensagens recebidas
try:
# Realizar transcrição inicial sem idioma específico
with open(audio_source, 'rb') as audio_file:
data = aiohttp.FormData()
data.add_field('file', audio_file, filename='audio.mp3')
data.add_field('model', model)
success, response_data, error = await handle_groq_request(url, headers, data, storage, is_form_data=True)
if success:
initial_text = response_data.get("text", "")
# Detectar idioma do texto transcrito
detected_lang = await detect_language(initial_text)
# Salvar no cache E na configuração do contato
storage.cache_language_detection(contact_id, detected_lang)
storage.set_contact_language(contact_id, detected_lang)
contact_language = detected_lang
storage.add_log("INFO", "Idioma detectado e configurado", {
"language": detected_lang,
"remote_jid": remote_jid,
"auto_detected": True
})
except Exception as e:
storage.add_log("WARNING", "Erro na detecção automática de idioma", {
"error": str(e),
"remote_jid": remote_jid
})
if not contact_language:
storage.add_log("DEBUG", "Usando idioma padrão do sistema", {
"from_me": from_me,
"remote_jid": remote_jid,
"is_private": is_private,
"system_language": system_language
})
# Definir idioma de transcrição e tradução baseado no contexto
if is_private and contact_language:
if from_me:
# Se estou enviando para um contato com idioma configurado
transcription_language = contact_language # Transcrever no idioma do contato
target_language = contact_language # Não precisa traduzir
storage.add_log("DEBUG", "Usando idioma do contato para áudio enviado", {
"transcription_language": transcription_language,
"target_language": target_language
})
else:
# Se estou recebendo
transcription_language = contact_language # Transcrever no idioma do contato
target_language = system_language # Traduzir para o idioma do sistema
storage.add_log("DEBUG", "Processando áudio recebido com tradução", {
"transcription_language": transcription_language,
"target_language": target_language
})
else:
# Caso padrão: usar idioma do sistema
transcription_language = system_language
target_language = system_language
storage.add_log("DEBUG", "Usando idioma do sistema", {
"transcription_language": transcription_language,
"target_language": target_language
})
storage.add_log("DEBUG", "Configuração de idiomas definida", {
"transcription_language": transcription_language,
"target_language": target_language,
"from_me": from_me,
"is_private": is_private,
"contact_language": contact_language
})
try: try:
async with aiohttp.ClientSession() as session: # Realizar transcrição
# Se o audio_source for uma URL with open(audio_source, 'rb') as audio_file:
if isinstance(audio_source, str) and audio_source.startswith('http'):
storage.add_log("DEBUG", "Baixando áudio da URL", {
"url": audio_source
})
download_headers = {"apikey": apikey} if apikey else {}
async with session.get(audio_source, headers=download_headers) as response:
if response.status != 200:
error_text = await response.text()
storage.add_log("ERROR", "Erro no download do áudio", {
"status": response.status,
"error": error_text
})
raise Exception(f"Erro ao baixar áudio: {error_text}")
audio_data = await response.read()
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_file:
temp_file.write(audio_data)
audio_source = temp_file.name
storage.add_log("DEBUG", "Áudio salvo temporariamente", {
"path": audio_source
})
# Preparar dados para transcrição
data = aiohttp.FormData() data = aiohttp.FormData()
data.add_field('file', open(audio_source, 'rb'), filename='audio.mp3') data.add_field('file', audio_file, filename='audio.mp3')
data.add_field('model', 'whisper-large-v3') data.add_field('model', model)
data.add_field('language', 'pt') data.add_field('language', transcription_language)
storage.add_log("DEBUG", "Enviando áudio para transcrição") if use_timestamps:
async with session.post(url, headers=groq_headers, data=data) as response: data.add_field('response_format', 'verbose_json')
if response.status == 200:
result = await response.json()
message = result.get("text", "")
storage.add_log("INFO", "Transcrição concluída com sucesso", {
"text_length": len(message)
})
is_summary = False # Usar handle_groq_request para ter retry e validação
if len(message) > 1000: success, response_data, error = await handle_groq_request(url, headers, data, storage, is_form_data=True)
storage.add_log("DEBUG", "Texto longo detectado, iniciando resumo", { if not success:
"text_length": len(message) raise Exception(f"Erro na transcrição: {error}")
})
is_summary = True
message = await summarize_text_if_needed(message)
return message, is_summary transcription = format_timestamped_result(response_data) if use_timestamps else response_data.get("text", "")
else:
error_text = await response.text() # Validar o conteúdo da transcrição
storage.add_log("ERROR", "Erro na transcrição", { if not await validate_transcription_response(transcription):
"error": error_text, storage.add_log("ERROR", "Transcrição vazia ou inválida recebida")
"status": response.status raise Exception("Transcrição vazia ou inválida recebida")
# Detecção automática para novos contatos
if (is_private and storage.get_auto_language_detection() and
not from_me and not contact_language):
try:
detected_lang = await detect_language(transcription)
storage.cache_language_detection(remote_jid, detected_lang)
contact_language = detected_lang
storage.add_log("INFO", "Idioma detectado e cacheado", {
"language": detected_lang,
"remote_jid": remote_jid
}) })
raise Exception(f"Erro na transcrição: {error_text}") except Exception as e:
storage.add_log("WARNING", "Erro na detecção de idioma", {"error": str(e)})
# Tradução quando necessário
need_translation = (
is_private and contact_language and
(
(from_me and transcription_language != target_language) or
(not from_me and target_language != transcription_language)
)
)
if need_translation:
try:
transcription = await translate_text(
transcription,
transcription_language,
target_language
)
storage.add_log("INFO", "Texto traduzido automaticamente", {
"from": transcription_language,
"to": target_language
})
except Exception as e:
storage.add_log("ERROR", "Erro na tradução", {"error": str(e)})
# Registrar estatísticas de uso
used_language = contact_language if contact_language else system_language
storage.record_language_usage(
used_language,
from_me,
bool(contact_language and contact_language != system_language)
)
return transcription, use_timestamps
except Exception as e: except Exception as e:
storage.add_log("ERROR", "Erro no processo de transcrição", { storage.add_log("ERROR", "Erro no processo de transcrição", {
@ -160,7 +402,130 @@ async def transcribe_audio(audio_source, apikey=None):
finally: finally:
# Limpar arquivos temporários # Limpar arquivos temporários
if isinstance(audio_source, str) and os.path.exists(audio_source): if isinstance(audio_source, str) and os.path.exists(audio_source):
try:
os.unlink(audio_source) os.unlink(audio_source)
except Exception as e:
storage.add_log("WARNING", "Erro ao remover arquivo temporário", {
"error": str(e)
})
def format_timestamped_result(result):
"""
Formata o resultado da transcrição com timestamps
"""
segments = result.get("segments", [])
formatted_lines = []
for segment in segments:
start_time = format_timestamp(segment.get("start", 0))
end_time = format_timestamp(segment.get("end", 0))
text = segment.get("text", "").strip()
if text:
formatted_lines.append(f"[{start_time} -> {end_time}] {text}")
return "\n".join(formatted_lines)
def format_timestamp(seconds):
"""
Converte segundos em formato MM:SS
"""
minutes = int(seconds // 60)
remaining_seconds = int(seconds % 60)
return f"{minutes:02d}:{remaining_seconds:02d}"
# Função para detecção de idioma
async def detect_language(text: str) -> str:
"""
Detecta o idioma do texto usando a API GROQ
Args:
text: Texto para detectar idioma
Returns:
str: Código ISO 639-1 do idioma detectado
"""
provider = storage.get_llm_provider()
storage.add_log("DEBUG", "Iniciando detecção de idioma", {
"text_length": len(text)
})
# Lista de idiomas suportados
SUPPORTED_LANGUAGES = {
"pt", "en", "es", "fr", "de", "it", "ja", "ko",
"zh", "ro", "ru", "ar", "hi", "nl", "pl", "tr"
}
if provider == "openai":
api_key = storage.get_openai_keys()[0]
url = "https://api.openai.com/v1/chat/completions"
model = "gpt-4o-mini"
else: # groq
url = "https://api.groq.com/openai/v1/chat/completions"
api_key = await get_working_groq_key(storage)
if not api_key:
raise Exception("Nenhuma chave GROQ disponível")
model = "llama-3.3-70b-versatile"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
# Prompt melhorado com exemplos e restrições
prompt = """
Analise o texto e retorne APENAS o código ISO 639-1 do idioma principal.
Regras:
1. Retorne APENAS o código de 2 letras
2. Use somente códigos permitidos: pt, en, es, fr, de, it, ja, ko, zh, ro, ru, ar, hi, nl, pl, tr
3. Se não tiver certeza ou o idioma não estiver na lista, retorne "en"
4. Não inclua pontuação, espaços extras ou explicações
Exemplos corretos:
"Hello world" -> en
"Bonjour le monde" -> fr
"Olá mundo" -> pt
Texto para análise:
"""
json_data = {
"messages": [{
"role": "system",
"content": "Você é um detector de idiomas preciso que retorna apenas códigos ISO 639-1."
}, {
"role": "user",
"content": f"{prompt}\n\n{text[:500]}" # Limitando para os primeiros 500 caracteres
}],
"model": model,
"temperature": 0.1
}
try:
success, response_data, error = await handle_groq_request(url, headers, json_data, storage, is_form_data=False)
if not success:
raise Exception(f"Falha na detecção de idioma: {error}")
detected_language = response_data["choices"][0]["message"]["content"].strip().lower()
# Validar o resultado
if detected_language not in SUPPORTED_LANGUAGES:
storage.add_log("WARNING", "Idioma detectado não suportado", {
"detected": detected_language,
"fallback": "en"
})
detected_language = "en"
storage.add_log("INFO", "Idioma detectado com sucesso", {
"detected_language": detected_language
})
return detected_language
except Exception as e:
storage.add_log("ERROR", "Erro no processo de detecção de idioma", {
"error": str(e),
"type": type(e).__name__
})
raise
async def send_message_to_whatsapp(server_url, instance, apikey, message, remote_jid, message_id): async def send_message_to_whatsapp(server_url, instance, apikey, message, remote_jid, message_id):
"""Envia mensagem via WhatsApp""" """Envia mensagem via WhatsApp"""
@ -265,3 +630,158 @@ async def get_audio_base64(server_url, instance, apikey, message_id):
"message_id": message_id "message_id": message_id
}) })
raise raise
async def format_message(transcription_text, summary_text=None):
"""Formata a mensagem baseado nas configurações."""
settings = storage.get_message_settings()
message_parts = []
# Determinar modo de saída
output_mode = settings["output_mode"]
char_limit = int(settings["character_limit"])
if output_mode == "smart":
# Modo inteligente baseado no tamanho
if len(transcription_text) > char_limit:
if summary_text:
message_parts.append(f"{settings['summary_header']}\n\n{summary_text}")
else:
message_parts.append(f"{settings['transcription_header']}\n\n{transcription_text}")
elif output_mode == "summary_only":
if summary_text:
message_parts.append(f"{settings['summary_header']}\n\n{summary_text}")
elif output_mode == "transcription_only":
message_parts.append(f"{settings['transcription_header']}\n\n{transcription_text}")
else: # both
if summary_text:
message_parts.append(f"{settings['summary_header']}\n\n{summary_text}")
message_parts.append(f"{settings['transcription_header']}\n\n{transcription_text}")
# Adicionar mensagem de negócio
message_parts.append(dynamic_settings['BUSINESS_MESSAGE'])
return "\n\n".join(message_parts)
async def translate_text(text: str, source_language: str, target_language: str) -> str:
"""
Traduz o texto usando a API GROQ
Args:
text: Texto para traduzir
source_language: Código ISO 639-1 do idioma de origem
target_language: Código ISO 639-1 do idioma de destino
Returns:
str: Texto traduzido
"""
provider = storage.get_llm_provider()
storage.add_log("DEBUG", "Iniciando tradução", {
"source_language": source_language,
"target_language": target_language,
"text_length": len(text)
})
# Se os idiomas forem iguais, retorna o texto original
if source_language == target_language:
return text
if provider == "openai":
api_key = storage.get_openai_keys()[0]
url = "https://api.openai.com/v1/chat/completions"
model = "gpt-4o-mini"
else: # groq
url = "https://api.groq.com/openai/v1/chat/completions"
api_key = await get_working_groq_key(storage)
if not api_key:
raise Exception("Nenhuma chave GROQ disponível")
model = "llama-3.3-70b-versatile"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
prompt = f"""
Você é um tradutor profissional especializado em manter o tom e estilo do texto original.
Instruções:
1. Traduza o texto de {source_language} para {target_language}
2. Preserve todas as formatações (negrito, itálico, emojis)
3. Mantenha os mesmos parágrafos e quebras de linha
4. Preserve números, datas e nomes próprios
5. Não adicione ou remova informações
6. Não inclua notas ou explicações
7. Mantenha o mesmo nível de formalidade
Texto para tradução:
{text}
"""
json_data = {
"messages": [{
"role": "system",
"content": "Você é um tradutor profissional que mantém o estilo e formatação do texto original."
}, {
"role": "user",
"content": prompt
}],
"model": model,
"temperature": 0.3
}
try:
success, response_data, error = await handle_groq_request(url, headers, json_data, storage, is_form_data=False)
if not success:
raise Exception(f"Falha na tradução: {error}")
translated_text = response_data["choices"][0]["message"]["content"].strip()
# Verificar se a tradução manteve aproximadamente o mesmo tamanho
length_ratio = len(translated_text) / len(text)
if not (0.5 <= length_ratio <= 1.5):
storage.add_log("WARNING", "Possível erro na tradução - diferença significativa no tamanho", {
"original_length": len(text),
"translated_length": len(translated_text),
"ratio": length_ratio
})
# Validar se a tradução não está vazia
if not await validate_transcription_response(translated_text):
storage.add_log("ERROR", "Tradução vazia ou inválida recebida")
raise Exception("Tradução vazia ou inválida recebida")
storage.add_log("INFO", "Tradução concluída com sucesso", {
"original_length": len(text),
"translated_length": len(translated_text),
"ratio": length_ratio
})
return translated_text
except Exception as e:
storage.add_log("ERROR", "Erro no processo de tradução", {
"error": str(e),
"type": type(e).__name__
})
raise
# Nova função para baixar áudio remoto
async def download_remote_audio(url: str) -> str:
"""
Baixa um arquivo de áudio remoto e salva localmente como um arquivo temporário.
Retorna o caminho para o arquivo salvo.
"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
audio_data = await response.read()
# Cria um arquivo temporário para armazenar o áudio (pode ajustar o sufixo caso necessário)
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_file:
temp_file.write(audio_data)
local_path = temp_file.name
return local_path
else:
raise Exception(f"Falha no download, código de status: {response.status}")
except Exception as e:
raise Exception(f"Erro ao baixar áudio remoto: {str(e)}")

View File

@ -1,15 +1,47 @@
#!/bin/bash #!/bin/bash
# Função para construir o comando redis-cli com autenticação condicional
build_redis_cli_cmd() {
cmd="redis-cli -h ${REDIS_HOST:-localhost} -p ${REDIS_PORT:-6380}"
if [ ! -z "$REDIS_USERNAME" ]; then
cmd="$cmd --user $REDIS_USERNAME"
fi
if [ ! -z "$REDIS_PASSWORD" ]; then
cmd="$cmd -a $REDIS_PASSWORD"
fi
if [ ! -z "$REDIS_DB" ]; then
cmd="$cmd -n $REDIS_DB"
fi
echo "$cmd"
}
# Função para inicializar configurações no Redis # Função para inicializar configurações no Redis
initialize_redis_config() { initialize_redis_config() {
redis-cli -h $REDIS_HOST -p $REDIS_PORT SET GROQ_API_KEY "sua_api_key_aqui" redis_cmd=$(build_redis_cli_cmd)
redis-cli -h $REDIS_HOST -p $REDIS_PORT SET BUSINESS_MESSAGE "*Impacte AI* Premium Services"
redis-cli -h $REDIS_HOST -p $REDIS_PORT SET PROCESS_GROUP_MESSAGES "false" $redis_cmd SET GROQ_API_KEY "sua_api_key_aqui" NX
redis-cli -h $REDIS_HOST -p $REDIS_PORT SET PROCESS_SELF_MESSAGES "true" $redis_cmd SET BUSINESS_MESSAGE "*Impacte AI* Premium Services" NX
redis-cli -h $REDIS_HOST -p $REDIS_PORT SET DEBUG_MODE "false" $redis_cmd SET PROCESS_GROUP_MESSAGES "false" NX
$redis_cmd SET PROCESS_SELF_MESSAGES "true" NX
$redis_cmd SET API_DOMAIN "$API_DOMAIN" NX
} }
# Inicializar configurações no Redis # Aguardar o Redis estar pronto
echo "Aguardando o Redis ficar disponível..."
redis_cmd=$(build_redis_cli_cmd)
until $redis_cmd PING 2>/dev/null; do
echo "Redis não está pronto - aguardando..."
sleep 5
done
echo "Redis disponível!"
# Inicializar configurações
initialize_redis_config initialize_redis_config
# Iniciar o FastAPI em background # Iniciar o FastAPI em background

View File

@ -1,12 +1,18 @@
import json import json
import os import os
from typing import List, Dict from typing import List, Dict, Optional
from datetime import datetime, timedelta from datetime import datetime, timedelta
import traceback import traceback
import logging import logging
import redis import redis
from utils import create_redis_client
import uuid
class StorageHandler: class StorageHandler:
# Chaves Redis para webhooks
WEBHOOK_KEY = "webhook_redirects" # Chave para armazenar os webhooks
WEBHOOK_STATS_KEY = "webhook_stats" # Chave para estatísticas
def __init__(self): def __init__(self):
# Configuração de logger # Configuração de logger
self.logger = logging.getLogger("StorageHandler") self.logger = logging.getLogger("StorageHandler")
@ -20,17 +26,19 @@ class StorageHandler:
self.logger.info("StorageHandler inicializado.") self.logger.info("StorageHandler inicializado.")
# Conexão com o Redis # Conexão com o Redis
self.redis = redis.Redis( self.redis = create_redis_client()
host=os.getenv('REDIS_HOST', 'localhost'),
port=int(os.getenv('REDIS_PORT', 6380)),
db=0,
decode_responses=True
)
# Retenção de logs e backups # Retenção de logs e backups
self.log_retention_hours = int(os.getenv('LOG_RETENTION_HOURS', 48)) self.log_retention_hours = int(os.getenv('LOG_RETENTION_HOURS', 48))
self.backup_retention_days = int(os.getenv('BACKUP_RETENTION_DAYS', 7)) self.backup_retention_days = int(os.getenv('BACKUP_RETENTION_DAYS', 7))
# Garantir valores padrão para configurações de idioma
if not self.redis.exists(self._get_redis_key("auto_translation")):
self.redis.set(self._get_redis_key("auto_translation"), "false")
if not self.redis.exists(self._get_redis_key("auto_language_detection")):
self.redis.set(self._get_redis_key("auto_language_detection"), "false")
def _get_redis_key(self, key): def _get_redis_key(self, key):
return f"transcrevezap:{key}" return f"transcrevezap:{key}"
@ -169,3 +177,541 @@ class StorageHandler:
self.redis.delete(key) self.redis.delete(key)
except Exception as e: except Exception as e:
self.logger.error(f"Erro ao limpar backups antigos: {e}") self.logger.error(f"Erro ao limpar backups antigos: {e}")
# Método de rotação de chaves groq
def get_groq_keys(self) -> List[str]:
"""Obtém todas as chaves GROQ armazenadas."""
return list(self.redis.smembers(self._get_redis_key("groq_keys")))
def add_groq_key(self, key: str):
"""Adiciona uma nova chave GROQ ao conjunto."""
if key and key.startswith("gsk_"):
self.redis.sadd(self._get_redis_key("groq_keys"), key)
return True
return False
def remove_groq_key(self, key: str):
"""Remove uma chave GROQ do conjunto."""
self.redis.srem(self._get_redis_key("groq_keys"), key)
def get_next_groq_key(self) -> str:
"""
Obtém a próxima chave GROQ no sistema de rodízio.
Utiliza um contador no Redis para controlar a rotação.
"""
keys = self.get_groq_keys()
if not keys:
return None
# Obtém e incrementa o contador de rodízio
counter = int(self.redis.get(self._get_redis_key("groq_key_counter")) or "0")
next_counter = (counter + 1) % len(keys)
self.redis.set(self._get_redis_key("groq_key_counter"), str(next_counter))
return keys[counter % len(keys)]
def get_penalized_until(self, key: str) -> Optional[datetime]:
"""
Retorna o timestamp até quando a chave está penalizada, ou None se não estiver penalizada.
"""
penalized_key = self._get_redis_key(f"groq_key_penalized_{key}")
penalized_until = self.redis.get(penalized_key)
if penalized_until:
return datetime.fromisoformat(penalized_until)
return None
def penalize_key(self, key: str, penalty_duration: int):
"""
Penaliza uma chave por um tempo determinado (em segundos).
"""
penalized_key = self._get_redis_key(f"groq_key_penalized_{key}")
penalized_until = datetime.utcnow() + timedelta(seconds=penalty_duration)
self.redis.set(penalized_key, penalized_until.isoformat())
self.redis.expire(penalized_key, penalty_duration) # Expira a chave após o tempo de penalidade
self.add_log("INFO", "Chave GROQ penalizada", {
"key": key,
"penalized_until": penalized_until.isoformat()
})
def get_message_settings(self):
"""Obtém as configurações de mensagens."""
return {
"summary_header": self.redis.get(self._get_redis_key("summary_header")) or "🤖 *Resumo do áudio:*",
"transcription_header": self.redis.get(self._get_redis_key("transcription_header")) or "🔊 *Transcrição do áudio:*",
"output_mode": self.redis.get(self._get_redis_key("output_mode")) or "both",
"character_limit": int(self.redis.get(self._get_redis_key("character_limit")) or "500"),
}
def save_message_settings(self, settings: dict):
"""Salva as configurações de mensagens."""
for key, value in settings.items():
self.redis.set(self._get_redis_key(key), str(value))
def get_process_mode(self):
"""Retorna o modo de processamento configurado"""
mode = self.redis.get(self._get_redis_key("process_mode")) or "all"
self.logger.debug(f"Modo de processamento atual: {mode}")
return mode
def get_contact_language(self, contact_id: str) -> str:
"""
Obtém o idioma configurado para um contato específico.
O contact_id pode vir com ou sem @s.whatsapp.net
"""
# Remover @s.whatsapp.net se presente
contact_id = contact_id.split('@')[0]
return self.redis.hget(self._get_redis_key("contact_languages"), contact_id)
def set_contact_language(self, contact_id: str, language: str):
"""
Define o idioma para um contato específico
"""
# Remover @s.whatsapp.net se presente
contact_id = contact_id.split('@')[0]
self.redis.hset(self._get_redis_key("contact_languages"), contact_id, language)
self.logger.info(f"Idioma {language} definido para o contato {contact_id}")
def get_all_contact_languages(self) -> dict:
"""
Retorna um dicionário com todos os contatos e seus idiomas configurados
"""
return self.redis.hgetall(self._get_redis_key("contact_languages"))
def remove_contact_language(self, contact_id: str):
"""
Remove a configuração de idioma de um contato
"""
contact_id = contact_id.split('@')[0]
self.redis.hdel(self._get_redis_key("contact_languages"), contact_id)
self.logger.info(f"Configuração de idioma removida para o contato {contact_id}")
def get_auto_language_detection(self) -> bool:
"""
Verifica se a detecção automática de idioma está ativada
"""
return self.redis.get(self._get_redis_key("auto_language_detection")) == "true"
def set_auto_language_detection(self, enabled: bool):
"""
Ativa ou desativa a detecção automática de idioma
"""
self.redis.set(self._get_redis_key("auto_language_detection"), str(enabled).lower())
self.logger.info(f"Detecção automática de idioma {'ativada' if enabled else 'desativada'}")
def get_auto_translation(self) -> bool:
"""
Verifica se a tradução automática está ativada
"""
return self.redis.get(self._get_redis_key("auto_translation")) == "true"
def set_auto_translation(self, enabled: bool):
"""
Ativa ou desativa a tradução automática
"""
self.redis.set(self._get_redis_key("auto_translation"), str(enabled).lower())
self.logger.info(f"Tradução automática {'ativada' if enabled else 'desativada'}")
def record_language_usage(self, language: str, from_me: bool, auto_detected: bool = False):
"""
Registra estatísticas de uso de idiomas
Args:
language: Código do idioma (ex: 'pt', 'en')
from_me: Se o áudio foi enviado por nós
auto_detected: Se o idioma foi detectado automaticamente
"""
try:
# Validar idioma
if not language:
self.add_log("WARNING", "Tentativa de registrar uso sem idioma definido")
return
# Incrementar contagem total do idioma
self.redis.hincrby(
self._get_redis_key("language_stats"),
f"{language}_total",
1
)
# Incrementar contagem por direção (enviado/recebido)
direction = 'sent' if from_me else 'received'
self.redis.hincrby(
self._get_redis_key("language_stats"),
f"{language}_{direction}",
1
)
# Se foi detecção automática, registrar
if auto_detected:
self.redis.hincrby(
self._get_redis_key("language_stats"),
f"{language}_auto_detected",
1
)
# Registrar última utilização
self.redis.hset(
self._get_redis_key("language_stats"),
f"{language}_last_used",
datetime.now().isoformat()
)
# Log detalhado
self.add_log("DEBUG", "Uso de idioma registrado", {
"language": language,
"direction": direction,
"auto_detected": auto_detected
})
except Exception as e:
self.add_log("ERROR", "Erro ao registrar uso de idioma", {
"error": str(e),
"type": type(e).__name__
})
def get_language_statistics(self) -> Dict:
"""
Obtém estatísticas de uso de idiomas
"""
try:
stats_raw = self.redis.hgetall(self._get_redis_key("language_stats"))
# Organizar estatísticas por idioma
stats = {}
for key, value in stats_raw.items():
lang, metric = key.split('_', 1)
if lang not in stats:
stats[lang] = {}
if metric == 'last_used':
stats[lang][metric] = value
else:
stats[lang][metric] = int(value)
return stats
except Exception as e:
self.logger.error(f"Erro ao obter estatísticas de idioma: {e}")
return {}
def cache_language_detection(self, contact_id: str, language: str, confidence: float = 1.0):
"""
Armazena em cache o idioma detectado para um contato
"""
contact_id = contact_id.split('@')[0]
cache_data = {
'language': language,
'confidence': confidence,
'timestamp': datetime.now().isoformat(),
'auto_detected': True
}
self.redis.hset(
self._get_redis_key("language_detection_cache"),
contact_id,
json.dumps(cache_data)
)
def get_cached_language(self, contact_id: str) -> Dict:
"""
Obtém o idioma em cache para um contato
Retorna None se não houver cache ou se estiver expirado
"""
contact_id = contact_id.split('@')[0]
cached = self.redis.hget(
self._get_redis_key("language_detection_cache"),
contact_id
)
if not cached:
return None
try:
data = json.loads(cached)
# Verificar se o cache expirou (24 horas)
cache_time = datetime.fromisoformat(data['timestamp'])
if datetime.now() - cache_time > timedelta(hours=24):
return None
return data
except:
return None
def get_webhook_redirects(self) -> List[Dict]:
"""Obtém todos os webhooks de redirecionamento cadastrados."""
webhooks_raw = self.redis.hgetall(self._get_redis_key("webhook_redirects"))
webhooks = []
for webhook_id, data in webhooks_raw.items():
webhook_data = json.loads(data)
webhook_data['id'] = webhook_id
webhooks.append(webhook_data)
return webhooks
def validate_webhook_url(self, url: str) -> bool:
"""Valida se a URL do webhook é acessível."""
try:
from urllib.parse import urlparse
parsed = urlparse(url)
return all([parsed.scheme, parsed.netloc])
except Exception as e:
self.logger.error(f"URL inválida: {url} - {str(e)}")
return False
def add_webhook_redirect(self, url: str, description: str = "") -> str:
"""
Adiciona um novo webhook de redirecionamento.
Retorna o ID do webhook criado.
"""
webhook_id = str(uuid.uuid4())
webhook_data = {
"url": url,
"description": description,
"created_at": datetime.now().isoformat(),
"status": "active",
"error_count": 0,
"success_count": 0,
"last_success": None,
"last_error": None
}
self.redis.hset(
self._get_redis_key("webhook_redirects"),
webhook_id,
json.dumps(webhook_data)
)
return webhook_id
def clean_webhook_data(self, webhook_id: str):
"""
Remove todos os dados relacionados a um webhook específico do Redis.
Args:
webhook_id: ID do webhook a ser limpo
"""
try:
# Lista de chaves relacionadas ao webhook que precisam ser removidas
keys_to_remove = [
f"webhook_failed_{webhook_id}", # Entregas falhas
f"webhook_stats_{webhook_id}", # Estatísticas específicas
]
# Remove cada chave associada ao webhook
for key in keys_to_remove:
full_key = self._get_redis_key(key)
self.redis.delete(full_key)
self.logger.debug(f"Chave removida: {full_key}")
self.logger.info(f"Dados do webhook {webhook_id} limpos com sucesso")
except Exception as e:
self.logger.error(f"Erro ao limpar dados do webhook {webhook_id}: {str(e)}")
raise
def remove_webhook_redirect(self, webhook_id: str):
"""Remove um webhook de redirecionamento e todos os seus dados associados."""
try:
# Primeiro remove os dados associados
self.clean_webhook_data(webhook_id)
# Depois remove o webhook em si
self.redis.hdel(self._get_redis_key("webhook_redirects"), webhook_id)
self.logger.info(f"Webhook {webhook_id} removido com sucesso")
except Exception as e:
self.logger.error(f"Erro ao remover webhook {webhook_id}: {str(e)}")
raise
def update_webhook_stats(self, webhook_id: str, success: bool, error_message: str = None):
"""Atualiza as estatísticas de um webhook."""
try:
webhook_data = json.loads(
self.redis.hget(self._get_redis_key("webhook_redirects"), webhook_id)
)
if success:
webhook_data["success_count"] += 1
webhook_data["last_success"] = datetime.now().isoformat()
else:
webhook_data["error_count"] += 1
webhook_data["last_error"] = {
"timestamp": datetime.now().isoformat(),
"message": error_message
}
self.redis.hset(
self._get_redis_key("webhook_redirects"),
webhook_id,
json.dumps(webhook_data)
)
except Exception as e:
self.logger.error(f"Erro ao atualizar estatísticas do webhook {webhook_id}: {e}")
def retry_failed_webhooks(self):
"""Tenta reenviar webhooks que falharam nas últimas 24h."""
webhooks = self.get_webhook_redirects()
for webhook in webhooks:
if webhook.get("last_error"):
error_time = datetime.fromisoformat(webhook["last_error"]["timestamp"])
if datetime.now() - error_time < timedelta(hours=24):
# Tentar reenviar
pass
def test_webhook(self, url: str) -> tuple[bool, str]:
"""
Testa um webhook antes de salvá-lo.
Retorna uma tupla (sucesso, mensagem)
"""
try:
import aiohttp
import asyncio
async def _test_webhook():
async with aiohttp.ClientSession() as session:
test_payload = {
"test": True,
"timestamp": datetime.now().isoformat(),
"message": "Teste de conexão do TranscreveZAP"
}
async with session.post(
url,
json=test_payload,
headers={"Content-Type": "application/json"},
timeout=10
) as response:
return response.status, await response.text()
status, response = asyncio.run(_test_webhook())
if status in [200, 201, 202]:
return True, "Webhook testado com sucesso!"
return False, f"Erro no teste: Status {status} - {response}"
except Exception as e:
return False, f"Erro ao testar webhook: {str(e)}"
def get_webhook_health(self, webhook_id: str) -> dict:
"""
Calcula métricas de saúde do webhook
"""
try:
webhook_data = json.loads(
self.redis.hget(self._get_redis_key("webhook_redirects"), webhook_id)
)
total_requests = webhook_data["success_count"] + webhook_data["error_count"]
if total_requests == 0:
return {
"health_status": "unknown",
"error_rate": 0,
"success_rate": 0,
"total_requests": 0
}
error_rate = (webhook_data["error_count"] / total_requests) * 100
success_rate = (webhook_data["success_count"] / total_requests) * 100
# Definir status de saúde
if error_rate >= 50:
health_status = "critical"
elif error_rate >= 20:
health_status = "warning"
else:
health_status = "healthy"
return {
"health_status": health_status,
"error_rate": error_rate,
"success_rate": success_rate,
"total_requests": total_requests
}
except Exception as e:
self.logger.error(f"Erro ao calcular saúde do webhook {webhook_id}: {e}")
return None
def retry_webhook(self, webhook_id: str, payload: dict) -> bool:
"""
Tenta reenviar um payload para um webhook específico mantendo o payload original intacto.
Args:
webhook_id: ID do webhook para reenvio
payload: Payload original para reenvio
Returns:
bool: True se o reenvio foi bem sucedido, False caso contrário
"""
try:
import aiohttp
import asyncio
webhook_data = json.loads(
self.redis.hget(self._get_redis_key("webhook_redirects"), webhook_id)
)
async def _retry_webhook():
async with aiohttp.ClientSession() as session:
headers = {
"Content-Type": "application/json",
"X-TranscreveZAP-Forward": "true",
"X-TranscreveZAP-Webhook-ID": webhook_id,
"X-TranscreveZAP-Retry": "true"
}
async with session.post(
webhook_data["url"],
json=payload, # Envia o payload original sem modificações
headers=headers,
timeout=10
) as response:
return response.status in [200, 201, 202]
success = asyncio.run(_retry_webhook())
if success:
self.update_webhook_stats(webhook_id, True)
else:
self.update_webhook_stats(webhook_id, False, "Falha no retry")
return success
except Exception as e:
self.logger.error(f"Erro no retry do webhook {webhook_id}: {e}")
return False
def get_failed_deliveries(self, webhook_id: str) -> List[Dict]:
"""
Retorna lista de entregas falhas para um webhook
"""
key = self._get_redis_key(f"webhook_failed_{webhook_id}")
failed = self.redis.lrange(key, 0, -1)
return [json.loads(x) for x in failed]
def add_failed_delivery(self, webhook_id: str, payload: dict):
"""
Registra uma entrega falha para retry posterior
"""
key = self._get_redis_key(f"webhook_failed_{webhook_id}")
failed_delivery = {
"timestamp": datetime.now().isoformat(),
"payload": payload,
"retry_count": 0
}
self.redis.lpush(key, json.dumps(failed_delivery))
# Manter apenas as últimas 100 falhas
self.redis.ltrim(key, 0, 99)
def get_llm_provider(self) -> str:
"""Returns active LLM provider (groq or openai)"""
return self.redis.get(self._get_redis_key("active_llm_provider")) or "groq"
def set_llm_provider(self, provider: str):
"""Sets active LLM provider"""
if provider not in ["groq", "openai"]:
raise ValueError("Provider must be 'groq' or 'openai'")
self.redis.set(self._get_redis_key("active_llm_provider"), provider)
def get_openai_keys(self) -> List[str]:
"""Get stored OpenAI API keys"""
return list(self.redis.smembers(self._get_redis_key("openai_keys")))
def add_openai_key(self, key: str):
"""Add OpenAI API key"""
if key and key.startswith("sk-"):
self.redis.sadd(self._get_redis_key("openai_keys"), key)
return True
return False

49
utils.py Normal file
View File

@ -0,0 +1,49 @@
import os
import redis
import logging
logger = logging.getLogger("TranscreveZAP")
def get_redis_connection_params():
"""
Retorna os parâmetros de conexão do Redis baseado nas variáveis de ambiente.
Retira parâmetros de autenticação se não estiverem configurados.
"""
params = {
'host': os.getenv('REDIS_HOST', 'localhost'),
'port': int(os.getenv('REDIS_PORT', 6380)),
'db': int(os.getenv('REDIS_DB', '0')),
'decode_responses': True
}
# Adiciona credenciais apenas se estiverem configuradas
username = os.getenv('REDIS_USERNAME')
password = os.getenv('REDIS_PASSWORD')
if username and username.strip():
params['username'] = username
if password and password.strip():
params['password'] = password
return params
def create_redis_client():
"""
Cria e testa a conexão com o Redis.
Retorna o cliente Redis se bem sucedido.
"""
try:
params = get_redis_connection_params()
client = redis.Redis(**params)
client.ping() # Testa a conexão
logger.info("Conexão com Redis estabelecida com sucesso!")
return client
except redis.exceptions.AuthenticationError:
logger.error("Falha de autenticação no Redis. Verifique as credenciais.")
raise
except redis.exceptions.ConnectionError as e:
logger.error(f"Erro de conexão com Redis: {e}")
raise
except Exception as e:
logger.error(f"Erro ao configurar Redis: {e}")
raise