ajustes de tradução e fuso horario

This commit is contained in:
Fábio Cavalcanti 2025-01-07 18:10:27 -03:00
parent b86c7ac764
commit 6a9ba1f087
4 changed files with 262 additions and 96 deletions

View File

@ -1,11 +1,16 @@
# Usar uma imagem oficial do Python como base
FROM python:3.10-slim
# Instalar dependências do sistema, incluindo redis-tools
# Instalar dependências do sistema, incluindo redis-tools e tzdata para fuso horário
RUN apt-get update && apt-get install -y --no-install-recommends \
redis-tools \
tzdata \
&& apt-get clean && rm -rf /var/lib/apt/lists/*
# Configurar o fuso horário
ENV TZ=America/Sao_Paulo
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
# Definir o diretório de trabalho
WORKDIR /app

View File

@ -183,8 +183,8 @@ async def transcreve_audios(request: Request):
storage.record_processing(remote_jid)
storage.add_log("INFO", "Áudio processado com sucesso", {
"remote_jid": remote_jid,
"transcription_length": len(transcription_text),
"summary_length": len(summary_text)
"transcription_length": len(transcription_text) if transcription_text else 0,
"summary_length": len(summary_text) if summary_text else 0 # Adiciona verificação
})
return {"message": "Áudio transcrito e resposta enviada com sucesso"}

View File

@ -172,12 +172,13 @@ async def summarize_text_if_needed(text):
async def transcribe_audio(audio_source, apikey=None, remote_jid=None, from_me=False, use_timestamps=False):
"""
Transcreve áudio usando a API GROQ com suporte a cache de idioma e estatísticas.
Transcreve áudio com suporte a detecção de idioma e tradução automática.
Args:
audio_source: Caminho do arquivo de áudio ou URL
apikey: Chave da API opcional para download de áudio
remote_jid: ID do remetente/destinatário
from_me: Se o áudio foi enviado pelo próprio usuário
use_timestamps: Se True, usa verbose_json para incluir timestamps
Returns:
@ -188,112 +189,132 @@ async def transcribe_audio(audio_source, apikey=None, remote_jid=None, from_me=F
groq_key = await get_groq_key()
groq_headers = {"Authorization": f"Bearer {groq_key}"}
# Determinar idioma baseado no contexto
# Inicializar variáveis
language = None
transcription_language = None
auto_detected = False
is_private = remote_jid and "@s.whatsapp.net" in remote_jid
system_language = redis_client.get("TRANSCRIPTION_LANGUAGE") or "pt"
# Determinar idioma baseado no contexto
if is_private:
# Verificar cache primeiro
cached_lang = storage.get_cached_language(remote_jid)
if cached_lang:
language = cached_lang['language']
storage.add_log("DEBUG", "Usando idioma em cache", cached_lang)
# 1. Verificar configuração manual primeiro
manual_language = storage.get_contact_language(remote_jid)
if manual_language:
language = manual_language
storage.add_log("DEBUG", "Usando idioma configurado manualmente", {
"language": manual_language,
"remote_jid": remote_jid,
"from_me": from_me
})
else:
# Verificar configuração manual
language = storage.get_contact_language(remote_jid)
# 2. Verificar cache
cached_lang = storage.get_cached_language(remote_jid)
if cached_lang:
language = cached_lang['language']
auto_detected = cached_lang.get('auto_detected', False)
storage.add_log("DEBUG", "Usando idioma em cache", {
**cached_lang,
"from_me": from_me
})
# Se não houver idioma definido, usar o global
if not language:
language = redis_client.get("TRANSCRIPTION_LANGUAGE") or "pt"
storage.add_log("DEBUG", "Idioma configurado para transcrição", {
"language": language,
"remote_jid": remote_jid,
# Definir idioma para transcrição e tradução
if from_me and is_private:
# Se o áudio é meu e destinado a um contato privado:
# - Transcreve no idioma do sistema
# - Traduz para o idioma do contato (se configurado)
transcription_language = system_language
target_language = language if language else system_language
else:
# Se o áudio é recebido:
# - Transcreve no idioma detectado/configurado do contato
# - Traduz para o idioma do sistema
transcription_language = language if language else system_language
target_language = system_language
storage.add_log("DEBUG", "Configuração de idiomas definida", {
"transcription_language": transcription_language,
"target_language": target_language,
"from_me": from_me,
"is_private": is_private,
"auto_detected": auto_detected
})
try:
async with aiohttp.ClientSession() as session:
# Se o audio_source for uma URL
if isinstance(audio_source, str) and audio_source.startswith('http'):
storage.add_log("DEBUG", "Baixando áudio da URL", {
"url": audio_source
})
download_headers = {"apikey": apikey} if apikey else {}
async with session.get(audio_source, headers=download_headers) as response:
if response.status != 200:
error_text = await response.text()
storage.add_log("ERROR", "Erro no download do áudio", {
"status": response.status,
"error": error_text
})
raise Exception(f"Erro ao baixar áudio: {error_text}")
audio_data = await response.read()
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_file:
temp_file.write(audio_data)
audio_source = temp_file.name
storage.add_log("DEBUG", "Áudio salvo temporariamente", {
"path": audio_source
})
# [Código existente para download do áudio se for URL]...
# Preparar dados para transcrição
data = aiohttp.FormData()
data.add_field('file', open(audio_source, 'rb'), filename='audio.mp3')
data.add_field('model', 'whisper-large-v3')
data.add_field('language', language)
data.add_field('language', transcription_language)
if use_timestamps:
data.add_field('response_format', 'verbose_json')
storage.add_log("DEBUG", "Enviando áudio para transcrição")
# Nova sessão para cada requisição
# Realizar transcrição
async with aiohttp.ClientSession() as session:
try:
async with session.post(url, headers=groq_headers, data=data) as response:
if response.status == 200:
result = await response.json()
# Processar resposta baseado no formato
if use_timestamps:
transcription = format_timestamped_result(result)
else:
transcription = result.get("text", "")
# Detecção automática de idioma se necessário
if (is_private and storage.get_auto_language_detection() and
not from_me and not cached_lang and not language):
try:
detected_lang = await detect_language(transcription)
storage.cache_language_detection(remote_jid, detected_lang)
auto_detected = True
language = detected_lang
except Exception as e:
storage.add_log("WARNING", "Erro na detecção automática de idioma", {
"error": str(e)
})
# Registrar estatísticas
storage.record_language_usage(language, from_me, auto_detected)
return transcription, use_timestamps
else:
error_text = await response.text()
storage.add_log("ERROR", "Erro na transcrição", {
"error": error_text,
"status": response.status
async with session.post(url, headers=groq_headers, data=data) as response:
if response.status != 200:
error_text = await response.text()
storage.add_log("ERROR", "Erro na transcrição", {
"error": error_text,
"status": response.status
})
raise Exception(f"Erro na transcrição: {error_text}")
result = await response.json()
# Processar resposta baseado no formato
transcription = format_timestamped_result(result) if use_timestamps else result.get("text", "")
# Detecção automática de idioma para novos contatos privados
if (is_private and storage.get_auto_language_detection() and
not from_me and not manual_language and not cached_lang):
try:
detected_lang = await detect_language(transcription)
storage.cache_language_detection(remote_jid, detected_lang)
auto_detected = True
language = detected_lang
storage.add_log("INFO", "Idioma detectado e cacheado", {
"language": detected_lang,
"remote_jid": remote_jid
})
raise Exception(f"Erro na transcrição: {error_text}")
except Exception as e:
storage.add_log("ERROR", "Erro na requisição HTTP", {
"error": str(e),
"type": type(e).__name__
})
raise
except Exception as e:
storage.add_log("WARNING", "Erro na detecção de idioma", {"error": str(e)})
# Tradução automática
need_translation = False
if from_me and is_private and language:
# Se for meu áudio para um contato, traduz para o idioma do contato
need_translation = transcription_language != target_language
elif not from_me and storage.get_auto_translation():
# Se for áudio recebido e tradução automática ativada, traduz para idioma do sistema
need_translation = language != system_language
if need_translation:
try:
transcription = await translate_text(
transcription,
transcription_language,
target_language
)
storage.add_log("INFO", "Texto traduzido automaticamente", {
"from": transcription_language,
"to": target_language,
"from_me": from_me
})
except Exception as e:
storage.add_log("ERROR", "Erro na tradução", {"error": str(e)})
# Registrar estatísticas
storage.record_language_usage(
language if language else system_language,
from_me,
auto_detected
)
return transcription, use_timestamps
except Exception as e:
storage.add_log("ERROR", "Erro no processo de transcrição", {
@ -304,7 +325,13 @@ async def transcribe_audio(audio_source, apikey=None, remote_jid=None, from_me=F
finally:
# Limpar arquivos temporários
if isinstance(audio_source, str) and os.path.exists(audio_source):
os.unlink(audio_source)
try:
os.unlink(audio_source)
except Exception as e:
storage.add_log("WARNING", "Erro ao remover arquivo temporário", {
"error": str(e),
"file": audio_source
})
def format_timestamped_result(result):
"""
@ -335,11 +362,23 @@ def format_timestamp(seconds):
async def detect_language(text: str) -> str:
"""
Detecta o idioma do texto usando a API GROQ
Args:
text: Texto para detectar idioma
Returns:
str: Código ISO 639-1 do idioma detectado
"""
storage.add_log("DEBUG", "Iniciando detecção de idioma", {
"text_length": len(text)
})
# Lista de idiomas suportados
SUPPORTED_LANGUAGES = {
"pt", "en", "es", "fr", "de", "it", "ja", "ko",
"zh", "ro", "ru", "ar", "hi", "nl", "pl", "tr"
}
url_completions = "https://api.groq.com/openai/v1/chat/completions"
groq_key = await get_groq_key()
headers = {
@ -347,22 +386,33 @@ async def detect_language(text: str) -> str:
"Content-Type": "application/json",
}
# Prompt para detecção de idioma que retorna apenas o código ISO 639-1
# Prompt melhorado com exemplos e restrições
prompt = """
Detecte o idioma principal do texto e retorne APENAS o código ISO 639-1 correspondente.
Exemplos de códigos: pt (português), en (inglês), es (espanhol), fr (francês), etc.
IMPORTANTE: Retorne APENAS o código de 2 letras, nada mais.
Analise o texto e retorne APENAS o código ISO 639-1 do idioma principal.
Regras:
1. Retorne APENAS o código de 2 letras
2. Use somente códigos permitidos: pt, en, es, fr, de, it, ja, ko, zh, ro, ru, ar, hi, nl, pl, tr
3. Se não tiver certeza ou o idioma não estiver na lista, retorne "en"
4. Não inclua pontuação, espaços extras ou explicações
Exemplos corretos:
"Hello world" -> en
"Bonjour le monde" -> fr
"Olá mundo" -> pt
Texto para análise:
"""
json_data = {
"messages": [{
"role": "system",
"content": "Você é um detector de idiomas preciso que retorna apenas códigos ISO 639-1."
}, {
"role": "user",
"content": f"{prompt}\n\n{text}",
"content": f"{prompt}\n\n{text[:500]}" # Limitando para os primeiros 500 caracteres
}],
"model": "llama-3.3-70b-versatile",
"temperature": 0.1, # Baixa temperatura para resposta mais consistente
"temperature": 0.1
}
try:
@ -372,6 +422,15 @@ async def detect_language(text: str) -> str:
if response.status == 200:
result = await response.json()
detected_language = result["choices"][0]["message"]["content"].strip().lower()
# Validar o resultado
if detected_language not in SUPPORTED_LANGUAGES:
storage.add_log("WARNING", "Idioma detectado não suportado", {
"detected": detected_language,
"fallback": "en"
})
detected_language = "en"
storage.add_log("INFO", "Idioma detectado com sucesso", {
"detected_language": detected_language
})
@ -523,4 +582,99 @@ async def format_message(transcription_text, summary_text=None):
# Adicionar mensagem de negócio
message_parts.append(dynamic_settings['BUSINESS_MESSAGE'])
return "\n\n".join(message_parts)
return "\n\n".join(message_parts)
async def translate_text(text: str, source_language: str, target_language: str) -> str:
"""
Traduz o texto usando a API GROQ
Args:
text: Texto para traduzir
source_language: Código ISO 639-1 do idioma de origem
target_language: Código ISO 639-1 do idioma de destino
Returns:
str: Texto traduzido
"""
storage.add_log("DEBUG", "Iniciando tradução", {
"source_language": source_language,
"target_language": target_language,
"text_length": len(text)
})
# Se os idiomas forem iguais, retorna o texto original
if source_language == target_language:
return text
url_completions = "https://api.groq.com/openai/v1/chat/completions"
groq_key = await get_groq_key()
headers = {
"Authorization": f"Bearer {groq_key}",
"Content-Type": "application/json",
}
# Prompt melhorado com contexto e instruções específicas
prompt = f"""
Você é um tradutor profissional especializado em manter o tom e estilo do texto original.
Instruções:
1. Traduza o texto de {source_language} para {target_language}
2. Preserve todas as formatações (negrito, itálico, emojis)
3. Mantenha os mesmos parágrafos e quebras de linha
4. Preserve números, datas e nomes próprios
5. Não adicione ou remova informações
6. Não inclua notas ou explicações
7. Mantenha o mesmo nível de formalidade
Texto para tradução:
{text}
"""
json_data = {
"messages": [{
"role": "system",
"content": "Você é um tradutor profissional que mantém o estilo e formatação do texto original."
}, {
"role": "user",
"content": prompt
}],
"model": "llama-3.3-70b-versatile",
"temperature": 0.3
}
try:
async with aiohttp.ClientSession() as session:
storage.add_log("DEBUG", "Enviando requisição de tradução")
async with session.post(url_completions, headers=headers, json=json_data) as response:
if response.status == 200:
result = await response.json()
translated_text = result["choices"][0]["message"]["content"].strip()
# Verificar se a tradução manteve aproximadamente o mesmo tamanho
length_ratio = len(translated_text) / len(text)
if not (0.5 <= length_ratio <= 1.5):
storage.add_log("WARNING", "Possível erro na tradução - diferença significativa no tamanho", {
"original_length": len(text),
"translated_length": len(translated_text),
"ratio": length_ratio
})
storage.add_log("INFO", "Tradução concluída com sucesso", {
"original_length": len(text),
"translated_length": len(translated_text),
"ratio": length_ratio
})
return translated_text
else:
error_text = await response.text()
storage.add_log("ERROR", "Erro na tradução", {
"status": response.status,
"error": error_text
})
raise Exception(f"Erro na tradução: {error_text}")
except Exception as e:
storage.add_log("ERROR", "Erro no processo de tradução", {
"error": str(e),
"type": type(e).__name__
})
raise

View File

@ -31,6 +31,13 @@ class StorageHandler:
self.log_retention_hours = int(os.getenv('LOG_RETENTION_HOURS', 48))
self.backup_retention_days = int(os.getenv('BACKUP_RETENTION_DAYS', 7))
# Garantir valores padrão para configurações de idioma
if not self.redis.exists(self._get_redis_key("auto_translation")):
self.redis.set(self._get_redis_key("auto_translation"), "false")
if not self.redis.exists(self._get_redis_key("auto_language_detection")):
self.redis.set(self._get_redis_key("auto_language_detection"), "false")
def _get_redis_key(self, key):
return f"transcrevezap:{key}"