10 Commits
2.2 ... 2.3

Author SHA1 Message Date
Fábio Cavalcanti
bac9469b05 ajuste docker standalone no readme 2025-01-10 10:27:10 -03:00
Fábio Cavalcanti
943d5be2c8 orientações do readme 2025-01-08 20:52:40 -03:00
Fábio Cavalcanti
ae74686c9b ajuste do logoff 2025-01-08 20:25:04 -03:00
Fábio Cavalcanti
2fadd723dc adicionado sessão de login persistente 2025-01-08 19:06:55 -03:00
Fábio Cavalcanti
2a296d759f correção de detecção automatica 2025-01-08 17:16:39 -03:00
Fábio Cavalcanti
f558542359 ajuste de função de tradução simultanea 2025-01-08 13:26:08 -03:00
Fábio Cavalcanti
6a9ba1f087 ajustes de tradução e fuso horario 2025-01-07 18:10:27 -03:00
Fábio Cavalcanti
b86c7ac764 adicionado funções de detalhamento de idiomas e comportamento de tradutor automatico 2025-01-07 15:44:59 -03:00
Fábio Cavalcanti
9a072aee22 corrigido modo de processamento 2025-01-07 13:50:24 -03:00
Fábio Cavalcanti
eeffecb091 adicionado configuração de modo de processamento de mensagens apenas em grupos ou grupos e privado 2025-01-07 13:35:44 -03:00
6 changed files with 1154 additions and 161 deletions

View File

@@ -1,11 +1,16 @@
# Usar uma imagem oficial do Python como base
FROM python:3.10-slim
# Instalar dependências do sistema, incluindo redis-tools
# Instalar dependências do sistema, incluindo redis-tools e tzdata para fuso horário
RUN apt-get update && apt-get install -y --no-install-recommends \
redis-tools \
tzdata \
&& apt-get clean && rm -rf /var/lib/apt/lists/*
# Configurar o fuso horário
ENV TZ=America/Sao_Paulo
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
# Definir o diretório de trabalho
WORKDIR /app

40
main.py
View File

@@ -87,6 +87,18 @@ async def transcreve_audios(request: Request):
)
return {"message": "Mensagem não autorizada para processamento"}
# Verificação do modo de processamento (grupos/todos)
process_mode = storage.get_process_mode()
is_group = "@g.us" in remote_jid
if process_mode == "groups_only" and not is_group:
storage.add_log("INFO", "Mensagem ignorada - modo apenas grupos ativo", {
"remote_jid": remote_jid,
"process_mode": process_mode,
"is_group": is_group
})
return {"message": "Modo apenas grupos ativo - mensagens privadas ignoradas"}
if from_me and not dynamic_settings["PROCESS_SELF_MESSAGES"]:
storage.add_log("INFO", "Mensagem própria ignorada", {
"remote_jid": remote_jid
@@ -114,10 +126,30 @@ async def transcreve_audios(request: Request):
transcription_header = get_config("transcription_header", "🔊 *Transcrição do áudio:*")
character_limit = int(get_config("character_limit", "500"))
# Verificar se timestamps estão habilitados
use_timestamps = get_config("use_timestamps", "false") == "true"
storage.add_log("DEBUG", "Informações da mensagem", {
"from_me": from_me,
"remote_jid": remote_jid,
"is_group": is_group
})
# Transcrever áudio
storage.add_log("INFO", "Iniciando transcrição")
transcription_text, _ = await transcribe_audio(audio_source)
transcription_text, has_timestamps = await transcribe_audio(
audio_source,
apikey=apikey,
remote_jid=remote_jid,
from_me=from_me,
use_timestamps=use_timestamps
)
# Log do resultado
storage.add_log("INFO", "Transcrição concluída", {
"has_timestamps": has_timestamps,
"text_length": len(transcription_text),
"remote_jid": remote_jid
})
# Determinar se precisa de resumo baseado no modo de saída
summary_text = None
if output_mode in ["both", "summary_only"] or (
@@ -159,8 +191,8 @@ async def transcreve_audios(request: Request):
storage.record_processing(remote_jid)
storage.add_log("INFO", "Áudio processado com sucesso", {
"remote_jid": remote_jid,
"transcription_length": len(transcription_text),
"summary_length": len(summary_text)
"transcription_length": len(transcription_text) if transcription_text else 0,
"summary_length": len(summary_text) if summary_text else 0 # Adiciona verificação
})
return {"message": "Áudio transcrito e resposta enviada com sucesso"}

View File

@@ -7,49 +7,7 @@ import plotly.express as px
import os
import redis
# Conectar ao Redis
redis_client = redis.Redis(host=os.getenv('REDIS_HOST', 'localhost'), port=int(os.getenv('REDIS_PORT', 6380)), decode_responses=True)
# Função para salvar configurações no Redis
def save_to_redis(key, value):
try:
redis_client.set(key, value)
st.success(f"Configuração {key} salva com sucesso!")
except Exception as e:
st.error(f"Erro ao salvar no Redis: {key} -> {e}")
# Função para buscar configurações no Redis
def get_from_redis(key, default=None):
try:
value = redis_client.get(key)
return value if value is not None else default
except Exception as e:
st.error(f"Erro ao buscar no Redis: {key} -> {e}")
return default
# Função para buscar grupos do Whatsapp
def fetch_whatsapp_groups(server_url, instance, api_key):
url = f"{server_url}/group/fetchAllGroups/{instance}"
headers = {"apikey": api_key}
params = {"getParticipants": "false"} # Adicionando o parâmetro de query
try:
st.write(f"Requisição para URL: {url}") # Debug para URL
st.write(f"Cabeçalhos: {headers}") # Debug para headers
st.write(f"Parâmetros: {params}") # Debug para parâmetros
response = requests.get(url, headers=headers, params=params)
st.write(f"Status Code: {response.status_code}") # Debug para status HTTP
response.raise_for_status() # Levanta exceções HTTP
return response.json() # Retorna o JSON da resposta
except requests.RequestException as e:
st.error(f"Erro ao buscar grupos: {str(e)}")
if response.text:
st.error(f"Resposta do servidor: {response.text}")
return []
# Configuração da página
# 1. Primeiro: Configuração da página
st.set_page_config(
page_title="TranscreveZAP by Impacte AI",
page_icon="🎙️",
@@ -57,6 +15,67 @@ st.set_page_config(
initial_sidebar_state="expanded",
)
# 2. Depois: Inicialização do Redis
redis_client = redis.Redis(
host=os.getenv('REDIS_HOST', 'localhost'),
port=int(os.getenv('REDIS_PORT', 6380)),
decode_responses=True
)
# 3. Funções de sessão (atualizado para usar st.query_params)
def init_session():
"""Inicializa o sistema de sessão"""
if 'session_id' not in st.session_state:
# Verificar se existe uma sessão válida no Redis
session_token = st.query_params.get('session', None)
if session_token:
session_data = redis_client.get(f"session:{session_token}")
if session_data:
st.session_state.session_id = session_token
st.session_state.authenticated = True
return
# Se não houver sessão válida, gerar um novo ID
st.session_state.session_id = None
st.session_state.authenticated = False
# Garantir que init_session seja chamado antes de qualquer coisa
init_session()
def create_session():
"""Cria uma nova sessão no Redis"""
import uuid
session_id = str(uuid.uuid4())
expiry = 7 * 24 * 60 * 60 # 7 dias em segundos
# Salvar sessão no Redis
redis_client.setex(f"session:{session_id}", expiry, "active")
# Atualizar estado da sessão
st.session_state.session_id = session_id
st.session_state.authenticated = True
# Adicionar session_id como parâmetro de URL
st.query_params['session'] = session_id
def end_session():
"""Encerra a sessão atual"""
if 'session_id' in st.session_state and st.session_state.session_id:
# Remover sessão do Redis
redis_client.delete(f"session:{st.session_state.session_id}")
# Limpar todos os estados relevantes
for key in ['session_id', 'authenticated', 'username']:
if key in st.session_state:
del st.session_state[key]
# Remover parâmetro de sessão da URL
if 'session' in st.query_params:
del st.query_params['session']
# 4. Inicializar a sessão
init_session()
# Estilos CSS personalizados
st.markdown("""
<style>
@@ -106,6 +125,64 @@ st.markdown("""
# Configuração do storage
storage = StorageHandler()
# Dicionário de idiomas em português
IDIOMAS = {
"pt": "Português",
"en": "Inglês",
"es": "Espanhol",
"fr": "Francês",
"de": "Alemão",
"it": "Italiano",
"ja": "Japonês",
"ko": "Coreano",
"zh": "Chinês",
"ro": "Romeno",
"ru": "Russo",
"ar": "Árabe",
"hi": "Hindi",
"nl": "Holandês",
"pl": "Polonês",
"tr": "Turco"
}
# Função para salvar configurações no Redis
def save_to_redis(key, value):
try:
redis_client.set(key, value)
st.success(f"Configuração {key} salva com sucesso!")
except Exception as e:
st.error(f"Erro ao salvar no Redis: {key} -> {e}")
# Função para buscar configurações no Redis
def get_from_redis(key, default=None):
try:
value = redis_client.get(key)
return value if value is not None else default
except Exception as e:
st.error(f"Erro ao buscar no Redis: {key} -> {e}")
return default
# Função para buscar grupos do Whatsapp
def fetch_whatsapp_groups(server_url, instance, api_key):
url = f"{server_url}/group/fetchAllGroups/{instance}"
headers = {"apikey": api_key}
params = {"getParticipants": "false"} # Adicionando o parâmetro de query
try:
st.write(f"Requisição para URL: {url}") # Debug para URL
st.write(f"Cabeçalhos: {headers}") # Debug para headers
st.write(f"Parâmetros: {params}") # Debug para parâmetros
response = requests.get(url, headers=headers, params=params)
st.write(f"Status Code: {response.status_code}") # Debug para status HTTP
response.raise_for_status() # Levanta exceções HTTP
return response.json() # Retorna o JSON da resposta
except requests.RequestException as e:
st.error(f"Erro ao buscar grupos: {str(e)}")
if response.text:
st.error(f"Resposta do servidor: {response.text}")
return []
# Função para carregar configurações do Redis para o Streamlit
def load_settings():
try:
@@ -169,22 +246,59 @@ def login_page():
submit_button = st.form_submit_button('Entrar')
if submit_button:
if username == os.getenv('MANAGER_USER') and password == os.getenv('MANAGER_PASSWORD'):
st.session_state.authenticated = True
create_session()
st.success("Login realizado com sucesso!")
st.experimental_rerun()
else:
st.error('Credenciais inválidas')
# Modificar a função de logout no dashboard
def dashboard():
# Versão do sistema
APP_VERSION = "2.3"
show_logo()
st.sidebar.markdown('<div class="sidebar-header">TranscreveZAP - Menu</div>', unsafe_allow_html=True)
st.sidebar.markdown(f'<div style="text-align: center; color: gray; font-size: 0.8em;">versão {APP_VERSION}</div>', unsafe_allow_html=True)
# Mostrar nome do usuário logado (se disponível)
if hasattr(st.session_state, 'session_id'):
st.sidebar.markdown("---")
st.sidebar.markdown("👤 **Usuário Conectado**")
page = st.sidebar.radio(
"Navegação",
["📊 Painel de Controle", "👥 Gerenciar Grupos", "🚫 Gerenciar Bloqueios", "⚙️ Configurações"]
)
if st.sidebar.button("Sair"):
st.session_state.authenticated = False
st.experimental_rerun()
# Seção de logout com confirmação
st.sidebar.markdown("---")
logout_container = st.sidebar.container()
# Verifica se já existe um estado para confirmação de logout
if 'logout_confirmation' not in st.session_state:
st.session_state.logout_confirmation = False
# Botão principal de logout
if not st.session_state.logout_confirmation:
if logout_container.button("🚪 Sair da Conta"):
st.session_state.logout_confirmation = True
st.experimental_rerun()
# Botões de confirmação
if st.session_state.logout_confirmation:
col1, col2 = st.sidebar.columns(2)
if col1.button("✅ Confirmar"):
st.session_state.logout_confirmation = False
end_session()
st.experimental_rerun()
if col2.button("❌ Cancelar"):
st.session_state.logout_confirmation = False
st.experimental_rerun()
# Renderiza a página selecionada
if page == "📊 Painel de Controle":
show_statistics()
elif page == "👥 Gerenciar Grupos":
@@ -403,11 +517,65 @@ def message_settings_section():
except Exception as e:
st.error(f"Erro ao salvar configurações: {str(e)}")
def show_language_statistics():
"""Exibe estatísticas de uso de idiomas"""
stats = storage.get_language_statistics()
if not stats:
st.info("Ainda não há estatísticas de uso de idiomas.")
return
# Resumo geral
st.subheader("📊 Estatísticas de Idiomas")
# Criar métricas resumidas
total_usage = sum(s.get('total', 0) for s in stats.values())
auto_detected = sum(s.get('auto_detected', 0) for s in stats.values())
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Total de Transcrições", total_usage)
with col2:
st.metric("Detecções Automáticas", auto_detected)
with col3:
st.metric("Idiomas Diferentes", len(stats))
# Gráfico de uso por idioma
usage_data = []
for lang, data in stats.items():
usage_data.append({
'Idioma': IDIOMAS.get(lang, lang),
'Total': data.get('total', 0),
'Enviados': data.get('sent', 0),
'Recebidos': data.get('received', 0),
'Auto-detectados': data.get('auto_detected', 0)
})
if usage_data:
df = pd.DataFrame(usage_data)
# Gráfico de barras empilhadas
fig = px.bar(df,
x='Idioma',
y=['Enviados', 'Recebidos'],
title='Uso por Idioma',
barmode='stack')
st.plotly_chart(fig, use_container_width=True)
# Tabela detalhada
st.subheader("📋 Detalhamento por Idioma")
st.dataframe(df.sort_values('Total', ascending=False))
def manage_settings():
st.title("⚙️ Configurações")
# Criar tabs para melhor organização
tab1, tab2, tab3 = st.tabs(["🔑 Chaves API", "🌐 Configurações Gerais", "📝 Formatação de Mensagens"])
tab1, tab2, tab3, tab4 = st.tabs([
"🔑 Chaves API",
"🌐 Configurações Gerais",
"📝 Formatação de Mensagens",
"🗣️ Idiomas e Transcrição"
])
with tab1:
st.subheader("Gerenciamento de Chaves GROQ")
@@ -486,30 +654,30 @@ def manage_settings():
key="process_self_messages"
)
st.subheader("🔄 Modo de Processamento")
# Obter o modo atual do Redis
current_mode = storage.get_process_mode()
# Definir as opções e seus rótulos
mode_options = ["all", "groups_only"]
mode_labels = {
"all": "Todos (Grupos e Privado)",
"groups_only": "Apenas Grupos"
}
# Calcular o índice atual baseado no valor do Redis
current_index = mode_options.index(current_mode) if current_mode in mode_options else 0
process_mode = st.selectbox(
"Processar mensagens de:",
options=mode_options,
format_func=lambda x: mode_labels[x],
index=current_index,
key="process_mode",
help="Escolha se deseja processar mensagens de todos os contatos ou apenas de grupos"
)
# Configuração de idioma
st.markdown("---")
st.subheader("🌐 Idioma")
# Dicionário de idiomas em português
IDIOMAS = {
"pt": "Português",
"en": "Inglês",
"es": "Espanhol",
"fr": "Francês",
"de": "Alemão",
"it": "Italiano",
"ja": "Japonês",
"ko": "Coreano",
"zh": "Chinês",
"ro": "Romeno",
"ru": "Russo",
"ar": "Árabe",
"hi": "Hindi",
"nl": "Holandês",
"pl": "Polonês",
"tr": "Turco"
}
# Carregar configuração atual de idioma
current_language = get_from_redis("TRANSCRIPTION_LANGUAGE", "pt")
@@ -594,6 +762,9 @@ def manage_settings():
# Salvar configuração de idioma
save_to_redis("TRANSCRIPTION_LANGUAGE", selected_language)
# Salvamento do modo de processamento
storage.redis.set(storage._get_redis_key("process_mode"), process_mode)
st.success("✅ Todas as configurações foram salvas com sucesso!")
# Mostrar resumo
@@ -605,9 +776,123 @@ def manage_settings():
except Exception as e:
st.error(f"Erro ao salvar configurações: {str(e)}")
if "authenticated" not in st.session_state:
st.session_state.authenticated = False
with tab4:
st.subheader("Idiomas e Transcrição")
# Adicionar estatísticas no topo
show_language_statistics()
# Seção de Detecção Automática
st.markdown("---")
st.markdown("### 🔄 Detecção Automática de Idioma")
col1, col2 = st.columns(2)
with col1:
auto_detect = st.toggle(
"Ativar detecção automática",
value=storage.get_auto_language_detection(),
help="Detecta e configura automaticamente o idioma dos contatos"
)
if auto_detect:
st.info("""
A detecção automática de idioma:
1. Analisa o primeiro áudio de cada contato
2. Configura o idioma automaticamente
3. Usa cache de 24 horas para otimização
4. Funciona apenas em conversas privadas
5. Mantém o idioma global para grupos
6. Permite tradução automática entre idiomas
""")
# Seção de Timestamps
st.markdown("---")
st.markdown("### ⏱️ Timestamps na Transcrição")
use_timestamps = st.toggle(
"Incluir timestamps",
value=get_from_redis("use_timestamps", "false") == "true",
help="Adiciona marcadores de tempo em cada trecho"
)
if use_timestamps:
st.info("Os timestamps serão mostrados no formato [MM:SS] para cada trecho da transcrição")
# Seção de Configuração Manual de Idiomas por Contato
st.markdown("---")
st.markdown("### 👥 Idiomas por Contato")
# Obter contatos configurados
contact_languages = storage.get_all_contact_languages()
# Adicionar novo contato
with st.expander(" Adicionar Novo Contato", expanded=not bool(contact_languages)):
new_contact = st.text_input(
"Número do Contato",
placeholder="Ex: 5521999999999",
help="Digite apenas números, sem símbolos ou @s.whatsapp.net"
)
new_language = st.selectbox(
"Idioma do Contato",
options=list(IDIOMAS.keys()),
format_func=lambda x: IDIOMAS[x],
help="Idioma para transcrição dos áudios deste contato"
)
if st.button("Adicionar Contato"):
if new_contact and new_contact.isdigit():
storage.set_contact_language(new_contact, new_language)
st.success(f"✅ Contato configurado com idioma {IDIOMAS[new_language]}")
st.experimental_rerun()
else:
st.error("Por favor, insira um número válido")
# Listar contatos configurados
if contact_languages:
st.markdown("### Contatos Configurados")
for contact, language in contact_languages.items():
col1, col2, col3 = st.columns([2, 2, 1])
with col1:
st.text(f"+{contact}")
with col2:
current_language = st.selectbox(
"Idioma",
options=list(IDIOMAS.keys()),
format_func=lambda x: IDIOMAS[x],
key=f"lang_{contact}",
index=list(IDIOMAS.keys()).index(language) if language in IDIOMAS else 0
)
if current_language != language:
storage.set_contact_language(contact, current_language)
with col3:
if st.button("🗑️", key=f"remove_{contact}"):
storage.remove_contact_language(contact)
st.success("Contato removido")
st.experimental_rerun()
# Botão de Salvar
if st.button("💾 Salvar Configurações de Idioma e Transcrição"):
try:
storage.set_auto_language_detection(auto_detect)
save_to_redis("use_timestamps", str(use_timestamps).lower())
st.success("✅ Configurações salvas com sucesso!")
# Mostrar resumo das configurações
st.info(f"""
Configurações atuais:
- Detecção automática: {'Ativada' if auto_detect else 'Desativada'}
- Timestamps: {'Ativados' if use_timestamps else 'Desativados'}
- Contatos configurados: {len(contact_languages)}
""")
except Exception as e:
st.error(f"Erro ao salvar configurações: {str(e)}")
# Adicionar no início da execução principal
if __name__ == "__main__":
init_session()
# Modificar a parte final do código
if st.session_state.authenticated:
dashboard()
else:

201
readme.md
View File

@@ -1,9 +1,20 @@
# TranscreveZAP 2.0
## Transcrição e Resumo de Áudios no WhatsApp usando Python com interface em Streamlit
![ImpacteAI](./fluxo.png)
# TranscreveZAP 2.3- Plataforma de Gestão e Automação de Áudios do WhatsApp
Este projeto permite transcrever e resumir áudios enviados pelo WhatsApp usando inteligência artificial e integração com APIs. Ideal para automatizar o processamento de mensagens de áudio, oferecendo um resumo claro e prático.
### Sistema Inteligente de Transcrição, Resumo e Tradução Automática de Áudios para WhatsApp
*Desenvolvido com Python, FastAPI e Streamlit*
---
Uma solução completa para automatizar e gerenciar mensagens de áudio no WhatsApp, oferecendo:
- Transcrição automática multilíngue
- Resumos inteligentes de áudios
- Detecção e tradução automática entre idiomas
- Interface administrativa completa
- Sistema de rodízio de chaves API
- Gestão avançada de grupos e usuários
- Personalização de formatação e saída
Contato de email: contato@impacte.ai
([ACESSE NOSSO SITE](https://impacte.ai/))
@@ -16,7 +27,7 @@ Antes de começar, certifique-se de ter os seguintes requisitos:
- Python 3.10+ instalado ([Download](https://www.python.org/downloads/))
- Docker e Docker Compose instalados ([Instruções](https://docs.docker.com/get-docker/))
- Uma conta Evolution API com chave válida
- Uma conta GROQ API com chave válida (começa com 'gsk_') ([Crie sua CONTA](https://console.groq.com/login))
- No mínimo uma conta GROQ API com chave válida (começa com 'gsk_') ([Crie sua CONTA](https://console.groq.com/login))
* Em caso de uso com Proxy Reverso Aponte um Subdomínio para a API e outro para o MANAGER da aplicação
---
@@ -31,32 +42,42 @@ Antes de começar, certifique-se de ter os seguintes requisitos:
2. Configure o arquivo docker-compose.yaml:
```yaml
version: "3.7"
services:
tcaudio:
image: impacteai/transcrevezap:latest
ports:
- 8005:8005 # Porta para FastAPI
- 8501:8501 # Porta para Streamlit
environment:
- REDIS_HOST=redis
- REDIS_PORT=6380
- API_DOMAIN=seu-ip
- DEBUG_MODE=false
- LOG_LEVEL=INFO
- MANAGER_USER=admin
- MANAGER_PASSWORD=sua_senha_aqui
depends_on:
- redis
redis:
image: redis:6
command: redis-server --port 6380 --appendonly yes
volumes:
- redis_data:/data
version: "3.7"
services:
tcaudio:
image: impacteai/transcrevezap:latest
build:
context: .
ports:
- 8005:8005 # Porta para FastAPI
- 8501:8501 # Porta para Streamlit
environment:
- UVICORN_PORT=8005
- UVICORN_HOST=0.0.0.0
- UVICORN_RELOAD=true
- UVICORN_WORKERS=1
- API_DOMAIN=localhost
- DEBUG_MODE=false
- LOG_LEVEL=INFO
- MANAGER_USER=admin
- MANAGER_PASSWORD=sua_senha_aqui
- REDIS_HOST=redis-transcrevezap
- REDIS_PORT=6380 # Porta personalizada para o Redis do TranscreveZAP
depends_on:
- redis-transcrevezap
command: ./start.sh
redis-transcrevezap:
image: redis:6
command: redis-server --port 6380 --appendonly yes
volumes:
redis_data:
- redis_transcrevezap_data:/data
volumes:
redis_transcrevezap_data:
driver: local
```
3. Inicie os serviços:
@@ -219,13 +240,19 @@ Para usar com Traefik, certifique-se de:
- Em produção, recomenda-se DEBUG_MODE=false
- Configure LOG_LEVEL=DEBUG apenas para troubleshooting
## ✨ Novos Recursos na v2.1
## ✨ Novos Recursos na v2.3
### 🌍 Suporte Multilíngue
- Transcrição e resumo em 10+ idiomas
- Transcrição e resumo com suporte para 16 idiomas principais
- Mudança instantânea de idioma
- Interface intuitiva para seleção de idioma
- Mantém consistência entre transcrição e resumo
- Configuração manual de idioma por contato
- Detecção automática de idioma
- Tradução automática integrada
### 🔄 Sistema de Cache para Idiomas
Implementação de cache inteligente para otimizar a detecção e processamento de idiomas.
### 🔄 Sistema Inteligente de Rodízio de Chaves
- Suporte a múltiplas chaves GROQ
@@ -233,8 +260,13 @@ Para usar com Traefik, certifique-se de:
- Maior redundância e disponibilidade
- Gestão simplificada de chaves via interface
## 🌍 Sistema de Idiomas
O TranscreveZAP agora suporta transcrição e resumo em múltiplos idiomas. Na seção "Configurações", você pode:
### ⏱️ Timestamps em Transcrições
Nova funcionalidade de timestamps que adiciona marcadores de tempo precisos em cada trecho da transcrição.
## 📋 Detalhamento das Funcionalidades
### 🌍 Sistema de Idiomas
O TranscreveZAP suporta transcrição e resumo em múltiplos idiomas. Na seção "Configurações", você pode:
1. Selecionar o idioma principal para transcrição e resumo
2. O sistema manterá Português como padrão se nenhum outro for selecionado
@@ -258,8 +290,86 @@ Idiomas suportados:
- 🇷🇺 Russo
- 🇹🇷 Turco
### 🌐 Gestão de Idiomas por Contato
#### Configuração Manual
```markdown
1. Acesse o Manager > Configurações > Idiomas e Transcrição
2. Expanda "Adicionar Novo Contato"
3. Digite o número do contato (formato: 5521999999999)
4. Selecione o idioma desejado
5. Clique em "Adicionar Contato"
```
### 🔄 Detecção Automática de Idioma
Nova funcionalidade que detecta automaticamente o idioma do contato:
- Ativação via Manager > Configurações > Idiomas e Transcrição
- Analisa o primeiro áudio de cada contato
- Cache inteligente de 24 horas
- Funciona apenas em conversas privadas
- Mantém configuração global para grupos
### ⚡ Tradução Automática
Sistema inteligente de tradução que:
- Traduz automaticamente áudios recebidos para seu idioma principal
- Mantém o contexto e estilo original da mensagem
- Preserva formatações especiais (emojis, negrito, itálico)
- Otimizado para comunicação natural
### ⏱️ Sistema de Timestamps
Nova funcionalidade que adiciona marcadores de tempo:
- Formato [MM:SS] no início de cada trecho
- Ativação via Manager > Configurações > Idiomas e Transcrição
- Precisão de segundos
- Ideal para referência e navegação em áudios longos
#### Exemplo de Saída com Timestamps:
```
[00:00] Bom dia pessoal
[00:02] Hoje vamos falar sobre
[00:05] O novo sistema de timestamps
```
## 🔧 Configuração e Uso
### Configuração de Idiomas
1. **Configuração Global**
- Defina o idioma padrão do sistema
- Acesse: Manager > Configurações > Configurações Gerais
- Selecione o idioma principal em "Idioma para Transcrição e Resumo"
2. **Configuração por Contato**
- Acesse: Manager > Configurações > Idiomas e Transcrição
- Use "Adicionar Novo Contato" ou gerencie contatos existentes
- Cada contato pode ter seu próprio idioma configurado
3. **Detecção Automática**
- Ative/Desative a detecção automática
- Configure o tempo de cache
- Gerencie exceções e configurações manuais
### Configuração de Timestamps
1. Acesse: Manager > Configurações > Idiomas e Transcrição
2. Localize a seção "Timestamps na Transcrição"
3. Use o toggle para ativar/desativar
4. As mudanças são aplicadas imediatamente
## 📊 Monitoramento e Estatísticas
### Estatísticas de Idiomas
O sistema agora oferece estatísticas detalhadas:
- Total de transcrições por idioma
- Número de detecções automáticas
- Divisão entre mensagens enviadas/recebidas
- Histórico de uso por idioma
### Visualização de Dados
- Gráficos de uso por idioma
- Distribuição de idiomas
- Estatísticas de tradução
- Performance do sistema
## 🔄 Sistema de Rodízio de Chaves GROQ
O TranscreveZAP agora suporta múltiplas chaves GROQ com sistema de rodízio automático para melhor distribuição de carga e redundância.
O TranscreveZAP suporta múltiplas chaves GROQ com sistema de rodízio automático para melhor distribuição de carga e redundância.
### Funcionalidades:
1. Adicione múltiplas chaves GROQ para distribuição de carga
@@ -300,6 +410,29 @@ Se encontrar problemas:
3. Reinicie o serviço se as alterações não forem aplicadas
4. Verifique os logs para confirmar o idioma em uso
## 📝 Notas Adicionais
### Recomendações de Uso
- Configure idiomas manualmente para contatos frequentes
- Use detecção automática como fallback
- Monitore estatísticas de uso
- Faça backups regulares das configurações
### Limitações Conhecidas
- Detecção automática requer primeiro áudio
- Cache limitado a 24 horas
- Timestamps podem variar em áudios muito longos
## 🤝 Contribuição
Agradecemos feedback e contribuições! Reporte issues e sugira melhorias em nosso GitHub.
---
### 📞 Suporte
Para suporte adicional ou dúvidas:
- WhatsApp: [Entre no GRUPO](https://chat.whatsapp.com/L9jB1SlcmQFIVxzN71Y6KG)
- Email: contato@impacte.ai
- Site: [impacte.ai](https://impacte.ai)
## 📄 **Licença**
Este projeto está licenciado sob a Licença MIT - veja o arquivo [LICENSE](LICENSE) para detalhes.

View File

@@ -170,78 +170,206 @@ async def summarize_text_if_needed(text):
})
raise
async def transcribe_audio(audio_source, apikey=None):
async def transcribe_audio(audio_source, apikey=None, remote_jid=None, from_me=False, use_timestamps=False):
"""
Transcreve áudio usando a API GROQ com sistema de rodízio de chaves.
Transcreve áudio com suporte a detecção de idioma e tradução automática.
Args:
audio_source: Caminho do arquivo de áudio ou URL
apikey: Chave da API opcional para download de áudio
remote_jid: ID do remetente/destinatário
from_me: Se o áudio foi enviado pelo próprio usuário
use_timestamps: Se True, usa verbose_json para incluir timestamps
Returns:
tuple: (texto_transcrito, False)
tuple: (texto_transcrito, has_timestamps)
"""
storage.add_log("INFO", "Iniciando processo de transcrição")
storage.add_log("INFO", "Iniciando processo de transcrição", {
"from_me": from_me,
"remote_jid": remote_jid
})
url = "https://api.groq.com/openai/v1/audio/transcriptions"
groq_key = await get_groq_key()
groq_headers = {"Authorization": f"Bearer {groq_key}"}
# Obter idioma configurado
language = redis_client.get("TRANSCRIPTION_LANGUAGE") or "pt"
storage.add_log("DEBUG", "Idioma configurado para transcrição", {
"language": language,
"redis_value": redis_client.get("TRANSCRIPTION_LANGUAGE")
})
try:
async with aiohttp.ClientSession() as session:
# Se o audio_source for uma URL
if isinstance(audio_source, str) and audio_source.startswith('http'):
storage.add_log("DEBUG", "Baixando áudio da URL", {
"url": audio_source
# Inicializar variáveis
contact_language = None
system_language = redis_client.get("TRANSCRIPTION_LANGUAGE") or "pt"
is_private = remote_jid and "@s.whatsapp.net" in remote_jid
# Determinar idioma do contato em conversas privadas
if is_private:
# Remover @s.whatsapp.net do ID para buscar no cache
contact_id = remote_jid.split('@')[0]
# 1. Primeiro tentar obter idioma configurado manualmente
contact_language = storage.get_contact_language(contact_id)
if contact_language:
storage.add_log("DEBUG", "Usando idioma configurado manualmente", {
"contact_language": contact_language,
"from_me": from_me,
"remote_jid": remote_jid,
"is_private": is_private
})
# 2. Se não houver configuração manual e detecção automática estiver ativa
elif storage.get_auto_language_detection():
# Verificar cache primeiro
cached_lang = storage.get_cached_language(contact_id)
if cached_lang:
contact_language = cached_lang.get('language')
storage.add_log("DEBUG", "Usando idioma do cache", {
"contact_language": contact_language,
"auto_detected": True
})
download_headers = {"apikey": apikey} if apikey else {}
async with session.get(audio_source, headers=download_headers) as response:
if response.status != 200:
error_text = await response.text()
storage.add_log("ERROR", "Erro no download do áudio", {
"status": response.status,
"error": error_text
})
raise Exception(f"Erro ao baixar áudio: {error_text}")
# Se não há cache ou está expirado, fazer detecção
elif not from_me: # Só detecta em mensagens recebidas
try:
# Realizar transcrição inicial sem idioma específico
data = aiohttp.FormData()
data.add_field('file', open(audio_source, 'rb'), filename='audio.mp3')
data.add_field('model', 'whisper-large-v3')
audio_data = await response.read()
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_file:
temp_file.write(audio_data)
audio_source = temp_file.name
storage.add_log("DEBUG", "Áudio salvo temporariamente", {
"path": audio_source
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=groq_headers, data=data) as response:
if response.status == 200:
initial_result = await response.json()
initial_text = initial_result.get("text", "")
# Detectar idioma do texto transcrito
detected_lang = await detect_language(initial_text)
# Salvar no cache E na configuração do contato
storage.cache_language_detection(contact_id, detected_lang)
storage.set_contact_language(contact_id, detected_lang)
contact_language = detected_lang
storage.add_log("INFO", "Idioma detectado e configurado", {
"language": detected_lang,
"remote_jid": remote_jid,
"auto_detected": True
})
except Exception as e:
storage.add_log("WARNING", "Erro na detecção automática de idioma", {
"error": str(e),
"remote_jid": remote_jid
})
# Preparar dados para transcrição
data = aiohttp.FormData()
data.add_field('file', open(audio_source, 'rb'), filename='audio.mp3')
data.add_field('model', 'whisper-large-v3')
data.add_field('language', language)
if not contact_language:
storage.add_log("DEBUG", "Usando idioma padrão do sistema", {
"from_me": from_me,
"remote_jid": remote_jid,
"is_private": is_private,
"system_language": system_language
})
storage.add_log("DEBUG", "Enviando áudio para transcrição")
# Definir idioma de transcrição e tradução baseado no contexto
if is_private and contact_language:
if from_me:
# Se estou enviando para um contato com idioma configurado
transcription_language = contact_language # Transcrever no idioma do contato
target_language = contact_language # Não precisa traduzir
storage.add_log("DEBUG", "Usando idioma do contato para áudio enviado", {
"transcription_language": transcription_language,
"target_language": target_language
})
else:
# Se estou recebendo
transcription_language = contact_language # Transcrever no idioma do contato
target_language = system_language # Traduzir para o idioma do sistema
storage.add_log("DEBUG", "Processando áudio recebido com tradução", {
"transcription_language": transcription_language,
"target_language": target_language
})
else:
# Caso padrão: usar idioma do sistema
transcription_language = system_language
target_language = system_language
storage.add_log("DEBUG", "Usando idioma do sistema", {
"transcription_language": transcription_language,
"target_language": target_language
})
storage.add_log("DEBUG", "Configuração de idiomas definida", {
"transcription_language": transcription_language,
"target_language": target_language,
"from_me": from_me,
"is_private": is_private,
"contact_language": contact_language
})
try:
# Realizar transcrição
data = aiohttp.FormData()
data.add_field('file', open(audio_source, 'rb'), filename='audio.mp3')
data.add_field('model', 'whisper-large-v3')
data.add_field('language', transcription_language)
if use_timestamps:
data.add_field('response_format', 'verbose_json')
# Realizar transcrição
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=groq_headers, data=data) as response:
if response.status == 200:
result = await response.json()
transcription = result.get("text", "")
storage.add_log("INFO", "Transcrição concluída com sucesso", {
"text_length": len(transcription)
})
return transcription, False
else:
if response.status != 200:
error_text = await response.text()
storage.add_log("ERROR", "Erro na transcrição", {
"error": error_text,
"status": response.status
})
raise Exception(f"Erro na transcrição: {error_text}")
result = await response.json()
# Processar resposta baseado no formato
transcription = format_timestamped_result(result) if use_timestamps else result.get("text", "")
# Detecção automática para novos contatos
if (is_private and storage.get_auto_language_detection() and
not from_me and not contact_language):
try:
detected_lang = await detect_language(transcription)
storage.cache_language_detection(remote_jid, detected_lang)
contact_language = detected_lang
storage.add_log("INFO", "Idioma detectado e cacheado", {
"language": detected_lang,
"remote_jid": remote_jid
})
except Exception as e:
storage.add_log("WARNING", "Erro na detecção de idioma", {"error": str(e)})
# Tradução quando necessário
need_translation = (
is_private and contact_language and
(
(from_me and transcription_language != target_language) or
(not from_me and target_language != transcription_language)
)
)
if need_translation:
try:
transcription = await translate_text(
transcription,
transcription_language,
target_language
)
storage.add_log("INFO", "Texto traduzido automaticamente", {
"from": transcription_language,
"to": target_language
})
except Exception as e:
storage.add_log("ERROR", "Erro na tradução", {"error": str(e)})
# Registrar estatísticas de uso
used_language = contact_language if contact_language else system_language
storage.record_language_usage(
used_language,
from_me,
bool(contact_language and contact_language != system_language)
)
return transcription, use_timestamps
except Exception as e:
storage.add_log("ERROR", "Erro no processo de transcrição", {
@@ -252,7 +380,129 @@ async def transcribe_audio(audio_source, apikey=None):
finally:
# Limpar arquivos temporários
if isinstance(audio_source, str) and os.path.exists(audio_source):
os.unlink(audio_source)
try:
os.unlink(audio_source)
except Exception as e:
storage.add_log("WARNING", "Erro ao remover arquivo temporário", {
"error": str(e)
})
def format_timestamped_result(result):
"""
Formata o resultado da transcrição com timestamps
"""
segments = result.get("segments", [])
formatted_lines = []
for segment in segments:
start_time = format_timestamp(segment.get("start", 0))
end_time = format_timestamp(segment.get("end", 0))
text = segment.get("text", "").strip()
if text:
formatted_lines.append(f"[{start_time} -> {end_time}] {text}")
return "\n".join(formatted_lines)
def format_timestamp(seconds):
"""
Converte segundos em formato MM:SS
"""
minutes = int(seconds // 60)
remaining_seconds = int(seconds % 60)
return f"{minutes:02d}:{remaining_seconds:02d}"
# Função para detecção de idioma
async def detect_language(text: str) -> str:
"""
Detecta o idioma do texto usando a API GROQ
Args:
text: Texto para detectar idioma
Returns:
str: Código ISO 639-1 do idioma detectado
"""
storage.add_log("DEBUG", "Iniciando detecção de idioma", {
"text_length": len(text)
})
# Lista de idiomas suportados
SUPPORTED_LANGUAGES = {
"pt", "en", "es", "fr", "de", "it", "ja", "ko",
"zh", "ro", "ru", "ar", "hi", "nl", "pl", "tr"
}
url_completions = "https://api.groq.com/openai/v1/chat/completions"
groq_key = await get_groq_key()
headers = {
"Authorization": f"Bearer {groq_key}",
"Content-Type": "application/json",
}
# Prompt melhorado com exemplos e restrições
prompt = """
Analise o texto e retorne APENAS o código ISO 639-1 do idioma principal.
Regras:
1. Retorne APENAS o código de 2 letras
2. Use somente códigos permitidos: pt, en, es, fr, de, it, ja, ko, zh, ro, ru, ar, hi, nl, pl, tr
3. Se não tiver certeza ou o idioma não estiver na lista, retorne "en"
4. Não inclua pontuação, espaços extras ou explicações
Exemplos corretos:
"Hello world" -> en
"Bonjour le monde" -> fr
"Olá mundo" -> pt
Texto para análise:
"""
json_data = {
"messages": [{
"role": "system",
"content": "Você é um detector de idiomas preciso que retorna apenas códigos ISO 639-1."
}, {
"role": "user",
"content": f"{prompt}\n\n{text[:500]}" # Limitando para os primeiros 500 caracteres
}],
"model": "llama-3.3-70b-versatile",
"temperature": 0.1
}
try:
async with aiohttp.ClientSession() as session:
storage.add_log("DEBUG", "Enviando requisição para API GROQ - Detecção de idioma")
async with session.post(url_completions, headers=headers, json=json_data) as response:
if response.status == 200:
result = await response.json()
detected_language = result["choices"][0]["message"]["content"].strip().lower()
# Validar o resultado
if detected_language not in SUPPORTED_LANGUAGES:
storage.add_log("WARNING", "Idioma detectado não suportado", {
"detected": detected_language,
"fallback": "en"
})
detected_language = "en"
storage.add_log("INFO", "Idioma detectado com sucesso", {
"detected_language": detected_language
})
return detected_language
else:
error_text = await response.text()
storage.add_log("ERROR", "Erro na detecção de idioma", {
"error": error_text,
"status": response.status
})
raise Exception(f"Erro na detecção de idioma: {error_text}")
except Exception as e:
storage.add_log("ERROR", "Erro no processo de detecção de idioma", {
"error": str(e),
"type": type(e).__name__
})
raise
async def send_message_to_whatsapp(server_url, instance, apikey, message, remote_jid, message_id):
"""Envia mensagem via WhatsApp"""
storage.add_log("DEBUG", "Preparando envio de mensagem", {
@@ -386,4 +636,99 @@ async def format_message(transcription_text, summary_text=None):
# Adicionar mensagem de negócio
message_parts.append(dynamic_settings['BUSINESS_MESSAGE'])
return "\n\n".join(message_parts)
return "\n\n".join(message_parts)
async def translate_text(text: str, source_language: str, target_language: str) -> str:
"""
Traduz o texto usando a API GROQ
Args:
text: Texto para traduzir
source_language: Código ISO 639-1 do idioma de origem
target_language: Código ISO 639-1 do idioma de destino
Returns:
str: Texto traduzido
"""
storage.add_log("DEBUG", "Iniciando tradução", {
"source_language": source_language,
"target_language": target_language,
"text_length": len(text)
})
# Se os idiomas forem iguais, retorna o texto original
if source_language == target_language:
return text
url_completions = "https://api.groq.com/openai/v1/chat/completions"
groq_key = await get_groq_key()
headers = {
"Authorization": f"Bearer {groq_key}",
"Content-Type": "application/json",
}
# Prompt melhorado com contexto e instruções específicas
prompt = f"""
Você é um tradutor profissional especializado em manter o tom e estilo do texto original.
Instruções:
1. Traduza o texto de {source_language} para {target_language}
2. Preserve todas as formatações (negrito, itálico, emojis)
3. Mantenha os mesmos parágrafos e quebras de linha
4. Preserve números, datas e nomes próprios
5. Não adicione ou remova informações
6. Não inclua notas ou explicações
7. Mantenha o mesmo nível de formalidade
Texto para tradução:
{text}
"""
json_data = {
"messages": [{
"role": "system",
"content": "Você é um tradutor profissional que mantém o estilo e formatação do texto original."
}, {
"role": "user",
"content": prompt
}],
"model": "llama-3.3-70b-versatile",
"temperature": 0.3
}
try:
async with aiohttp.ClientSession() as session:
storage.add_log("DEBUG", "Enviando requisição de tradução")
async with session.post(url_completions, headers=headers, json=json_data) as response:
if response.status == 200:
result = await response.json()
translated_text = result["choices"][0]["message"]["content"].strip()
# Verificar se a tradução manteve aproximadamente o mesmo tamanho
length_ratio = len(translated_text) / len(text)
if not (0.5 <= length_ratio <= 1.5):
storage.add_log("WARNING", "Possível erro na tradução - diferença significativa no tamanho", {
"original_length": len(text),
"translated_length": len(translated_text),
"ratio": length_ratio
})
storage.add_log("INFO", "Tradução concluída com sucesso", {
"original_length": len(text),
"translated_length": len(translated_text),
"ratio": length_ratio
})
return translated_text
else:
error_text = await response.text()
storage.add_log("ERROR", "Erro na tradução", {
"status": response.status,
"error": error_text
})
raise Exception(f"Erro na tradução: {error_text}")
except Exception as e:
storage.add_log("ERROR", "Erro no processo de tradução", {
"error": str(e),
"type": type(e).__name__
})
raise

View File

@@ -31,6 +31,13 @@ class StorageHandler:
self.log_retention_hours = int(os.getenv('LOG_RETENTION_HOURS', 48))
self.backup_retention_days = int(os.getenv('BACKUP_RETENTION_DAYS', 7))
# Garantir valores padrão para configurações de idioma
if not self.redis.exists(self._get_redis_key("auto_translation")):
self.redis.set(self._get_redis_key("auto_translation"), "false")
if not self.redis.exists(self._get_redis_key("auto_language_detection")):
self.redis.set(self._get_redis_key("auto_language_detection"), "false")
def _get_redis_key(self, key):
return f"transcrevezap:{key}"
@@ -213,4 +220,190 @@ class StorageHandler:
def save_message_settings(self, settings: dict):
"""Salva as configurações de mensagens."""
for key, value in settings.items():
self.redis.set(self._get_redis_key(key), str(value))
self.redis.set(self._get_redis_key(key), str(value))
def get_process_mode(self):
"""Retorna o modo de processamento configurado"""
mode = self.redis.get(self._get_redis_key("process_mode")) or "all"
self.logger.debug(f"Modo de processamento atual: {mode}")
return mode
def get_contact_language(self, contact_id: str) -> str:
"""
Obtém o idioma configurado para um contato específico.
O contact_id pode vir com ou sem @s.whatsapp.net
"""
# Remover @s.whatsapp.net se presente
contact_id = contact_id.split('@')[0]
return self.redis.hget(self._get_redis_key("contact_languages"), contact_id)
def set_contact_language(self, contact_id: str, language: str):
"""
Define o idioma para um contato específico
"""
# Remover @s.whatsapp.net se presente
contact_id = contact_id.split('@')[0]
self.redis.hset(self._get_redis_key("contact_languages"), contact_id, language)
self.logger.info(f"Idioma {language} definido para o contato {contact_id}")
def get_all_contact_languages(self) -> dict:
"""
Retorna um dicionário com todos os contatos e seus idiomas configurados
"""
return self.redis.hgetall(self._get_redis_key("contact_languages"))
def remove_contact_language(self, contact_id: str):
"""
Remove a configuração de idioma de um contato
"""
contact_id = contact_id.split('@')[0]
self.redis.hdel(self._get_redis_key("contact_languages"), contact_id)
self.logger.info(f"Configuração de idioma removida para o contato {contact_id}")
def get_auto_language_detection(self) -> bool:
"""
Verifica se a detecção automática de idioma está ativada
"""
return self.redis.get(self._get_redis_key("auto_language_detection")) == "true"
def set_auto_language_detection(self, enabled: bool):
"""
Ativa ou desativa a detecção automática de idioma
"""
self.redis.set(self._get_redis_key("auto_language_detection"), str(enabled).lower())
self.logger.info(f"Detecção automática de idioma {'ativada' if enabled else 'desativada'}")
def get_auto_translation(self) -> bool:
"""
Verifica se a tradução automática está ativada
"""
return self.redis.get(self._get_redis_key("auto_translation")) == "true"
def set_auto_translation(self, enabled: bool):
"""
Ativa ou desativa a tradução automática
"""
self.redis.set(self._get_redis_key("auto_translation"), str(enabled).lower())
self.logger.info(f"Tradução automática {'ativada' if enabled else 'desativada'}")
def record_language_usage(self, language: str, from_me: bool, auto_detected: bool = False):
"""
Registra estatísticas de uso de idiomas
Args:
language: Código do idioma (ex: 'pt', 'en')
from_me: Se o áudio foi enviado por nós
auto_detected: Se o idioma foi detectado automaticamente
"""
try:
# Validar idioma
if not language:
self.add_log("WARNING", "Tentativa de registrar uso sem idioma definido")
return
# Incrementar contagem total do idioma
self.redis.hincrby(
self._get_redis_key("language_stats"),
f"{language}_total",
1
)
# Incrementar contagem por direção (enviado/recebido)
direction = 'sent' if from_me else 'received'
self.redis.hincrby(
self._get_redis_key("language_stats"),
f"{language}_{direction}",
1
)
# Se foi detecção automática, registrar
if auto_detected:
self.redis.hincrby(
self._get_redis_key("language_stats"),
f"{language}_auto_detected",
1
)
# Registrar última utilização
self.redis.hset(
self._get_redis_key("language_stats"),
f"{language}_last_used",
datetime.now().isoformat()
)
# Log detalhado
self.add_log("DEBUG", "Uso de idioma registrado", {
"language": language,
"direction": direction,
"auto_detected": auto_detected
})
except Exception as e:
self.add_log("ERROR", "Erro ao registrar uso de idioma", {
"error": str(e),
"type": type(e).__name__
})
def get_language_statistics(self) -> Dict:
"""
Obtém estatísticas de uso de idiomas
"""
try:
stats_raw = self.redis.hgetall(self._get_redis_key("language_stats"))
# Organizar estatísticas por idioma
stats = {}
for key, value in stats_raw.items():
lang, metric = key.split('_', 1)
if lang not in stats:
stats[lang] = {}
if metric == 'last_used':
stats[lang][metric] = value
else:
stats[lang][metric] = int(value)
return stats
except Exception as e:
self.logger.error(f"Erro ao obter estatísticas de idioma: {e}")
return {}
def cache_language_detection(self, contact_id: str, language: str, confidence: float = 1.0):
"""
Armazena em cache o idioma detectado para um contato
"""
contact_id = contact_id.split('@')[0]
cache_data = {
'language': language,
'confidence': confidence,
'timestamp': datetime.now().isoformat(),
'auto_detected': True
}
self.redis.hset(
self._get_redis_key("language_detection_cache"),
contact_id,
json.dumps(cache_data)
)
def get_cached_language(self, contact_id: str) -> Dict:
"""
Obtém o idioma em cache para um contato
Retorna None se não houver cache ou se estiver expirado
"""
contact_id = contact_id.split('@')[0]
cached = self.redis.hget(
self._get_redis_key("language_detection_cache"),
contact_id
)
if not cached:
return None
try:
data = json.loads(cached)
# Verificar se o cache expirou (24 horas)
cache_time = datetime.fromisoformat(data['timestamp'])
if datetime.now() - cache_time > timedelta(hours=24):
return None
return data
except:
return None