From b86c7ac764b4b22b4e58af1fee94729bf82e18ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?F=C3=A1bio=20Cavalcanti?= Date: Tue, 7 Jan 2025 15:44:59 -0300 Subject: [PATCH] =?UTF-8?q?adicionado=20fun=C3=A7=C3=B5es=20de=20detalhame?= =?UTF-8?q?nto=20de=20idiomas=20e=20comportamento=20de=20tradutor=20automa?= =?UTF-8?q?tico?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.py | 16 +++- manager.py | 209 ++++++++++++++++++++++++++++++++++++++++++++++------ services.py | 195 ++++++++++++++++++++++++++++++++++++++++-------- storage.py | 167 ++++++++++++++++++++++++++++++++++++++++- 4 files changed, 533 insertions(+), 54 deletions(-) diff --git a/main.py b/main.py index 7d4e92a..3757914 100644 --- a/main.py +++ b/main.py @@ -126,10 +126,22 @@ async def transcreve_audios(request: Request): transcription_header = get_config("transcription_header", "🔊 *Transcrição do áudio:*") character_limit = int(get_config("character_limit", "500")) + # Verificar se timestamps estão habilitados + use_timestamps = get_config("use_timestamps", "false") == "true" # Transcrever áudio storage.add_log("INFO", "Iniciando transcrição") - transcription_text, _ = await transcribe_audio(audio_source) - + transcription_text, has_timestamps = await transcribe_audio( + audio_source, + apikey=apikey, + remote_jid=remote_jid, + use_timestamps=use_timestamps + ) + # Log do resultado + storage.add_log("INFO", "Transcrição concluída", { + "has_timestamps": has_timestamps, + "text_length": len(transcription_text), + "remote_jid": remote_jid + }) # Determinar se precisa de resumo baseado no modo de saída summary_text = None if output_mode in ["both", "summary_only"] or ( diff --git a/manager.py b/manager.py index d2730df..58fc7fb 100644 --- a/manager.py +++ b/manager.py @@ -11,6 +11,26 @@ import redis # Conectar ao Redis redis_client = redis.Redis(host=os.getenv('REDIS_HOST', 'localhost'), port=int(os.getenv('REDIS_PORT', 6380)), decode_responses=True) +# Dicionário de idiomas em português +IDIOMAS = { + "pt": "Português", + "en": "Inglês", + "es": "Espanhol", + "fr": "Francês", + "de": "Alemão", + "it": "Italiano", + "ja": "Japonês", + "ko": "Coreano", + "zh": "Chinês", + "ro": "Romeno", + "ru": "Russo", + "ar": "Árabe", + "hi": "Hindi", + "nl": "Holandês", + "pl": "Polonês", + "tr": "Turco" +} + # Função para salvar configurações no Redis def save_to_redis(key, value): try: @@ -403,11 +423,65 @@ def message_settings_section(): except Exception as e: st.error(f"Erro ao salvar configurações: {str(e)}") +def show_language_statistics(): + """Exibe estatísticas de uso de idiomas""" + stats = storage.get_language_statistics() + + if not stats: + st.info("Ainda não há estatísticas de uso de idiomas.") + return + + # Resumo geral + st.subheader("📊 Estatísticas de Idiomas") + + # Criar métricas resumidas + total_usage = sum(s.get('total', 0) for s in stats.values()) + auto_detected = sum(s.get('auto_detected', 0) for s in stats.values()) + + col1, col2, col3 = st.columns(3) + with col1: + st.metric("Total de Transcrições", total_usage) + with col2: + st.metric("Detecções Automáticas", auto_detected) + with col3: + st.metric("Idiomas Diferentes", len(stats)) + + # Gráfico de uso por idioma + usage_data = [] + for lang, data in stats.items(): + usage_data.append({ + 'Idioma': IDIOMAS.get(lang, lang), + 'Total': data.get('total', 0), + 'Enviados': data.get('sent', 0), + 'Recebidos': data.get('received', 0), + 'Auto-detectados': data.get('auto_detected', 0) + }) + + if usage_data: + df = pd.DataFrame(usage_data) + + # Gráfico de barras empilhadas + fig = px.bar(df, + x='Idioma', + y=['Enviados', 'Recebidos'], + title='Uso por Idioma', + barmode='stack') + st.plotly_chart(fig, use_container_width=True) + + # Tabela detalhada + st.subheader("📋 Detalhamento por Idioma") + st.dataframe(df.sort_values('Total', ascending=False)) + def manage_settings(): st.title("⚙️ Configurações") # Criar tabs para melhor organização - tab1, tab2, tab3 = st.tabs(["🔑 Chaves API", "🌐 Configurações Gerais", "📝 Formatação de Mensagens"]) + tab1, tab2, tab3, tab4 = st.tabs([ + "🔑 Chaves API", + "🌐 Configurações Gerais", + "📝 Formatação de Mensagens", + "🗣️ Idiomas e Transcrição" + ]) with tab1: st.subheader("Gerenciamento de Chaves GROQ") @@ -510,27 +584,6 @@ def manage_settings(): # Configuração de idioma st.markdown("---") st.subheader("🌐 Idioma") - - # Dicionário de idiomas em português - IDIOMAS = { - "pt": "Português", - "en": "Inglês", - "es": "Espanhol", - "fr": "Francês", - "de": "Alemão", - "it": "Italiano", - "ja": "Japonês", - "ko": "Coreano", - "zh": "Chinês", - "ro": "Romeno", - "ru": "Russo", - "ar": "Árabe", - "hi": "Hindi", - "nl": "Holandês", - "pl": "Polonês", - "tr": "Turco" - } - # Carregar configuração atual de idioma current_language = get_from_redis("TRANSCRIPTION_LANGUAGE", "pt") @@ -629,6 +682,118 @@ def manage_settings(): except Exception as e: st.error(f"Erro ao salvar configurações: {str(e)}") + + with tab4: + st.subheader("Idiomas e Transcrição") + + # Adicionar estatísticas no topo + show_language_statistics() + + # Seção de Detecção Automática + st.markdown("---") + st.markdown("### 🔄 Detecção Automática de Idioma") + + col1, col2 = st.columns(2) + with col1: + auto_detect = st.toggle( + "Ativar detecção automática", + value=storage.get_auto_language_detection(), + help="Detecta e configura automaticamente o idioma dos contatos" + ) + + if auto_detect: + st.info(""" + A detecção automática de idioma: + 1. Analisa o primeiro áudio de cada contato + 2. Configura o idioma automaticamente + 3. Usa cache de 24 horas para otimização + 4. Funciona apenas em conversas privadas + 5. Mantém o idioma global para grupos + 6. Permite tradução automática entre idiomas + """) + + # Seção de Timestamps + st.markdown("---") + st.markdown("### ⏱️ Timestamps na Transcrição") + use_timestamps = st.toggle( + "Incluir timestamps", + value=get_from_redis("use_timestamps", "false") == "true", + help="Adiciona marcadores de tempo em cada trecho" + ) + + if use_timestamps: + st.info("Os timestamps serão mostrados no formato [MM:SS] para cada trecho da transcrição") + + # Seção de Configuração Manual de Idiomas por Contato + st.markdown("---") + st.markdown("### 👥 Idiomas por Contato") + + # Obter contatos configurados + contact_languages = storage.get_all_contact_languages() + + # Adicionar novo contato + with st.expander("➕ Adicionar Novo Contato", expanded=not bool(contact_languages)): + new_contact = st.text_input( + "Número do Contato", + placeholder="Ex: 5521999999999", + help="Digite apenas números, sem símbolos ou @s.whatsapp.net" + ) + + new_language = st.selectbox( + "Idioma do Contato", + options=list(IDIOMAS.keys()), + format_func=lambda x: IDIOMAS[x], + help="Idioma para transcrição dos áudios deste contato" + ) + + if st.button("Adicionar Contato"): + if new_contact and new_contact.isdigit(): + storage.set_contact_language(new_contact, new_language) + st.success(f"✅ Contato configurado com idioma {IDIOMAS[new_language]}") + st.experimental_rerun() + else: + st.error("Por favor, insira um número válido") + + # Listar contatos configurados + if contact_languages: + st.markdown("### Contatos Configurados") + for contact, language in contact_languages.items(): + col1, col2, col3 = st.columns([2, 2, 1]) + with col1: + st.text(f"+{contact}") + with col2: + current_language = st.selectbox( + "Idioma", + options=list(IDIOMAS.keys()), + format_func=lambda x: IDIOMAS[x], + key=f"lang_{contact}", + index=list(IDIOMAS.keys()).index(language) if language in IDIOMAS else 0 + ) + if current_language != language: + storage.set_contact_language(contact, current_language) + with col3: + if st.button("🗑️", key=f"remove_{contact}"): + storage.remove_contact_language(contact) + st.success("Contato removido") + st.experimental_rerun() + + # Botão de Salvar + if st.button("💾 Salvar Configurações de Idioma e Transcrição"): + try: + storage.set_auto_language_detection(auto_detect) + save_to_redis("use_timestamps", str(use_timestamps).lower()) + st.success("✅ Configurações salvas com sucesso!") + + # Mostrar resumo das configurações + st.info(f""" + Configurações atuais: + - Detecção automática: {'Ativada' if auto_detect else 'Desativada'} + - Timestamps: {'Ativados' if use_timestamps else 'Desativados'} + - Contatos configurados: {len(contact_languages)} + """) + except Exception as e: + st.error(f"Erro ao salvar configurações: {str(e)}") + if "authenticated" not in st.session_state: st.session_state.authenticated = False diff --git a/services.py b/services.py index ebe6a3f..c18255b 100644 --- a/services.py +++ b/services.py @@ -170,29 +170,50 @@ async def summarize_text_if_needed(text): }) raise -async def transcribe_audio(audio_source, apikey=None): +async def transcribe_audio(audio_source, apikey=None, remote_jid=None, from_me=False, use_timestamps=False): """ - Transcreve áudio usando a API GROQ com sistema de rodízio de chaves. + Transcreve áudio usando a API GROQ com suporte a cache de idioma e estatísticas. Args: audio_source: Caminho do arquivo de áudio ou URL apikey: Chave da API opcional para download de áudio + remote_jid: ID do remetente/destinatário + use_timestamps: Se True, usa verbose_json para incluir timestamps Returns: - tuple: (texto_transcrito, False) + tuple: (texto_transcrito, has_timestamps) """ storage.add_log("INFO", "Iniciando processo de transcrição") url = "https://api.groq.com/openai/v1/audio/transcriptions" groq_key = await get_groq_key() groq_headers = {"Authorization": f"Bearer {groq_key}"} - # Obter idioma configurado - language = redis_client.get("TRANSCRIPTION_LANGUAGE") or "pt" + # Determinar idioma baseado no contexto + language = None + auto_detected = False + is_private = remote_jid and "@s.whatsapp.net" in remote_jid + + if is_private: + # Verificar cache primeiro + cached_lang = storage.get_cached_language(remote_jid) + if cached_lang: + language = cached_lang['language'] + storage.add_log("DEBUG", "Usando idioma em cache", cached_lang) + else: + # Verificar configuração manual + language = storage.get_contact_language(remote_jid) + + # Se não houver idioma definido, usar o global + if not language: + language = redis_client.get("TRANSCRIPTION_LANGUAGE") or "pt" + storage.add_log("DEBUG", "Idioma configurado para transcrição", { "language": language, - "redis_value": redis_client.get("TRANSCRIPTION_LANGUAGE") + "remote_jid": remote_jid, + "from_me": from_me, + "auto_detected": auto_detected }) - + try: async with aiohttp.ClientSession() as session: # Se o audio_source for uma URL @@ -219,29 +240,60 @@ async def transcribe_audio(audio_source, apikey=None): "path": audio_source }) - # Preparar dados para transcrição - data = aiohttp.FormData() - data.add_field('file', open(audio_source, 'rb'), filename='audio.mp3') - data.add_field('model', 'whisper-large-v3') - data.add_field('language', language) + # Preparar dados para transcrição + data = aiohttp.FormData() + data.add_field('file', open(audio_source, 'rb'), filename='audio.mp3') + data.add_field('model', 'whisper-large-v3') + data.add_field('language', language) + + if use_timestamps: + data.add_field('response_format', 'verbose_json') - storage.add_log("DEBUG", "Enviando áudio para transcrição") - async with session.post(url, headers=groq_headers, data=data) as response: - if response.status == 200: - result = await response.json() - transcription = result.get("text", "") - storage.add_log("INFO", "Transcrição concluída com sucesso", { - "text_length": len(transcription) - }) - - return transcription, False - else: - error_text = await response.text() - storage.add_log("ERROR", "Erro na transcrição", { - "error": error_text, - "status": response.status - }) - raise Exception(f"Erro na transcrição: {error_text}") + storage.add_log("DEBUG", "Enviando áudio para transcrição") + + # Nova sessão para cada requisição + async with aiohttp.ClientSession() as session: + try: + async with session.post(url, headers=groq_headers, data=data) as response: + if response.status == 200: + result = await response.json() + + # Processar resposta baseado no formato + if use_timestamps: + transcription = format_timestamped_result(result) + else: + transcription = result.get("text", "") + + # Detecção automática de idioma se necessário + if (is_private and storage.get_auto_language_detection() and + not from_me and not cached_lang and not language): + try: + detected_lang = await detect_language(transcription) + storage.cache_language_detection(remote_jid, detected_lang) + auto_detected = True + language = detected_lang + except Exception as e: + storage.add_log("WARNING", "Erro na detecção automática de idioma", { + "error": str(e) + }) + + # Registrar estatísticas + storage.record_language_usage(language, from_me, auto_detected) + + return transcription, use_timestamps + else: + error_text = await response.text() + storage.add_log("ERROR", "Erro na transcrição", { + "error": error_text, + "status": response.status + }) + raise Exception(f"Erro na transcrição: {error_text}") + except Exception as e: + storage.add_log("ERROR", "Erro na requisição HTTP", { + "error": str(e), + "type": type(e).__name__ + }) + raise except Exception as e: storage.add_log("ERROR", "Erro no processo de transcrição", { @@ -253,6 +305,91 @@ async def transcribe_audio(audio_source, apikey=None): # Limpar arquivos temporários if isinstance(audio_source, str) and os.path.exists(audio_source): os.unlink(audio_source) + +def format_timestamped_result(result): + """ + Formata o resultado da transcrição com timestamps + """ + segments = result.get("segments", []) + formatted_lines = [] + + for segment in segments: + start_time = format_timestamp(segment.get("start", 0)) + end_time = format_timestamp(segment.get("end", 0)) + text = segment.get("text", "").strip() + + if text: + formatted_lines.append(f"[{start_time} -> {end_time}] {text}") + + return "\n".join(formatted_lines) + +def format_timestamp(seconds): + """ + Converte segundos em formato MM:SS + """ + minutes = int(seconds // 60) + remaining_seconds = int(seconds % 60) + return f"{minutes:02d}:{remaining_seconds:02d}" + +# Função para detecção de idioma +async def detect_language(text: str) -> str: + """ + Detecta o idioma do texto usando a API GROQ + """ + storage.add_log("DEBUG", "Iniciando detecção de idioma", { + "text_length": len(text) + }) + + url_completions = "https://api.groq.com/openai/v1/chat/completions" + groq_key = await get_groq_key() + headers = { + "Authorization": f"Bearer {groq_key}", + "Content-Type": "application/json", + } + + # Prompt para detecção de idioma que retorna apenas o código ISO 639-1 + prompt = """ + Detecte o idioma principal do texto e retorne APENAS o código ISO 639-1 correspondente. + Exemplos de códigos: pt (português), en (inglês), es (espanhol), fr (francês), etc. + IMPORTANTE: Retorne APENAS o código de 2 letras, nada mais. + + Texto para análise: + """ + + json_data = { + "messages": [{ + "role": "user", + "content": f"{prompt}\n\n{text}", + }], + "model": "llama-3.3-70b-versatile", + "temperature": 0.1, # Baixa temperatura para resposta mais consistente + } + + try: + async with aiohttp.ClientSession() as session: + storage.add_log("DEBUG", "Enviando requisição para API GROQ - Detecção de idioma") + async with session.post(url_completions, headers=headers, json=json_data) as response: + if response.status == 200: + result = await response.json() + detected_language = result["choices"][0]["message"]["content"].strip().lower() + storage.add_log("INFO", "Idioma detectado com sucesso", { + "detected_language": detected_language + }) + return detected_language + else: + error_text = await response.text() + storage.add_log("ERROR", "Erro na detecção de idioma", { + "error": error_text, + "status": response.status + }) + raise Exception(f"Erro na detecção de idioma: {error_text}") + except Exception as e: + storage.add_log("ERROR", "Erro no processo de detecção de idioma", { + "error": str(e), + "type": type(e).__name__ + }) + raise + async def send_message_to_whatsapp(server_url, instance, apikey, message, remote_jid, message_id): """Envia mensagem via WhatsApp""" storage.add_log("DEBUG", "Preparando envio de mensagem", { diff --git a/storage.py b/storage.py index 29c525e..53dde6b 100644 --- a/storage.py +++ b/storage.py @@ -219,4 +219,169 @@ class StorageHandler: """Retorna o modo de processamento configurado""" mode = self.redis.get(self._get_redis_key("process_mode")) or "all" self.logger.debug(f"Modo de processamento atual: {mode}") - return mode \ No newline at end of file + return mode + + def get_contact_language(self, contact_id: str) -> str: + """ + Obtém o idioma configurado para um contato específico. + O contact_id pode vir com ou sem @s.whatsapp.net + """ + # Remover @s.whatsapp.net se presente + contact_id = contact_id.split('@')[0] + return self.redis.hget(self._get_redis_key("contact_languages"), contact_id) + + def set_contact_language(self, contact_id: str, language: str): + """ + Define o idioma para um contato específico + """ + # Remover @s.whatsapp.net se presente + contact_id = contact_id.split('@')[0] + self.redis.hset(self._get_redis_key("contact_languages"), contact_id, language) + self.logger.info(f"Idioma {language} definido para o contato {contact_id}") + + def get_all_contact_languages(self) -> dict: + """ + Retorna um dicionário com todos os contatos e seus idiomas configurados + """ + return self.redis.hgetall(self._get_redis_key("contact_languages")) + + def remove_contact_language(self, contact_id: str): + """ + Remove a configuração de idioma de um contato + """ + contact_id = contact_id.split('@')[0] + self.redis.hdel(self._get_redis_key("contact_languages"), contact_id) + self.logger.info(f"Configuração de idioma removida para o contato {contact_id}") + + def get_auto_language_detection(self) -> bool: + """ + Verifica se a detecção automática de idioma está ativada + """ + return self.redis.get(self._get_redis_key("auto_language_detection")) == "true" + + def set_auto_language_detection(self, enabled: bool): + """ + Ativa ou desativa a detecção automática de idioma + """ + self.redis.set(self._get_redis_key("auto_language_detection"), str(enabled).lower()) + self.logger.info(f"Detecção automática de idioma {'ativada' if enabled else 'desativada'}") + + def get_auto_translation(self) -> bool: + """ + Verifica se a tradução automática está ativada + """ + return self.redis.get(self._get_redis_key("auto_translation")) == "true" + + def set_auto_translation(self, enabled: bool): + """ + Ativa ou desativa a tradução automática + """ + self.redis.set(self._get_redis_key("auto_translation"), str(enabled).lower()) + self.logger.info(f"Tradução automática {'ativada' if enabled else 'desativada'}") + + def record_language_usage(self, language: str, from_me: bool, auto_detected: bool = False): + """ + Registra estatísticas de uso de idiomas + Args: + language: Código do idioma (ex: 'pt', 'en') + from_me: Se o áudio foi enviado por nós + auto_detected: Se o idioma foi detectado automaticamente + """ + try: + # Incrementar contagem total do idioma + self.redis.hincrby( + self._get_redis_key("language_stats"), + f"{language}_total", + 1 + ) + + # Incrementar contagem por direção (enviado/recebido) + self.redis.hincrby( + self._get_redis_key("language_stats"), + f"{language}_{'sent' if from_me else 'received'}", + 1 + ) + + # Se foi detecção automática, registrar + if auto_detected: + self.redis.hincrby( + self._get_redis_key("language_stats"), + f"{language}_auto_detected", + 1 + ) + + # Registrar última utilização + self.redis.hset( + self._get_redis_key("language_stats"), + f"{language}_last_used", + datetime.now().isoformat() + ) + + except Exception as e: + self.logger.error(f"Erro ao registrar uso de idioma: {e}") + + def get_language_statistics(self) -> Dict: + """ + Obtém estatísticas de uso de idiomas + """ + try: + stats_raw = self.redis.hgetall(self._get_redis_key("language_stats")) + + # Organizar estatísticas por idioma + stats = {} + for key, value in stats_raw.items(): + lang, metric = key.split('_', 1) + + if lang not in stats: + stats[lang] = {} + + if metric == 'last_used': + stats[lang][metric] = value + else: + stats[lang][metric] = int(value) + + return stats + except Exception as e: + self.logger.error(f"Erro ao obter estatísticas de idioma: {e}") + return {} + + def cache_language_detection(self, contact_id: str, language: str, confidence: float = 1.0): + """ + Armazena em cache o idioma detectado para um contato + """ + contact_id = contact_id.split('@')[0] + cache_data = { + 'language': language, + 'confidence': confidence, + 'timestamp': datetime.now().isoformat(), + 'auto_detected': True + } + self.redis.hset( + self._get_redis_key("language_detection_cache"), + contact_id, + json.dumps(cache_data) + ) + + def get_cached_language(self, contact_id: str) -> Dict: + """ + Obtém o idioma em cache para um contato + Retorna None se não houver cache ou se estiver expirado + """ + contact_id = contact_id.split('@')[0] + cached = self.redis.hget( + self._get_redis_key("language_detection_cache"), + contact_id + ) + + if not cached: + return None + + try: + data = json.loads(cached) + # Verificar se o cache expirou (24 horas) + cache_time = datetime.fromisoformat(data['timestamp']) + if datetime.now() - cache_time > timedelta(hours=24): + return None + return data + except: + return None \ No newline at end of file