From af20510b2b22d9b1d7bd3eafd723428667ff5e3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?F=C3=A1bio=20Cavalcanti?= Date: Fri, 24 Jan 2025 21:06:13 -0300 Subject: [PATCH] =?UTF-8?q?corrigido=20sistem=20de=20valida=C3=A7=C3=A3o?= =?UTF-8?q?=20de=20chaves=20groq?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- groq_handler.py | 112 ++++++++++++++++-------------- services.py | 177 ++++++++++++++++++++++++------------------------ storage.py | 25 ++++++- 3 files changed, 174 insertions(+), 140 deletions(-) diff --git a/groq_handler.py b/groq_handler.py index 6d53cf9..4a7e0f2 100644 --- a/groq_handler.py +++ b/groq_handler.py @@ -1,13 +1,23 @@ import aiohttp import json -from typing import Optional, Tuple +from typing import Optional, Tuple, Any from datetime import datetime +import logging +from storage import StorageHandler +import asyncio + +logger = logging.getLogger("GROQHandler") +logger.setLevel(logging.DEBUG) +handler = logging.StreamHandler() +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +handler.setFormatter(formatter) +logger.addHandler(handler) async def test_groq_key(key: str) -> bool: """Teste se uma chave GROQ é válida e está funcionando.""" url = "https://api.groq.com/openai/v1/models" headers = {"Authorization": f"Bearer {key}"} - + try: async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as response: @@ -15,89 +25,87 @@ async def test_groq_key(key: str) -> bool: data = await response.json() return bool(data.get("data")) return False - except Exception: + except Exception as e: + logger.error(f"Erro ao testar chave GROQ: {e}") return False async def validate_transcription_response(response_text: str) -> bool: """Valide se a resposta da transcrição é significativa.""" try: - # Remove common whitespace and punctuation cleaned_text = response_text.strip() - # Check minimum content length (adjustable threshold) return len(cleaned_text) >= 10 - except Exception: + except Exception as e: + logger.error(f"Erro ao validar resposta da transcrição: {e}") return False -async def get_working_groq_key(storage) -> Optional[str]: +async def get_working_groq_key(storage: StorageHandler) -> Optional[str]: """Obtenha uma chave GROQ funcional do pool disponível.""" keys = storage.get_groq_keys() - - for _ in range(len(keys)): # Try each key once + + for _ in range(len(keys)): key = storage.get_next_groq_key() - if key and await test_groq_key(key): + if not key: + continue + + penalized_until = storage.get_penalized_until(key) + if penalized_until and penalized_until > datetime.utcnow(): + continue + + if await test_groq_key(key): return key - - storage.add_log("ERROR", "No working GROQ keys available") + else: + storage.penalize_key(key, penalty_duration=300) + + storage.add_log("ERROR", "Nenhuma chave GROQ funcional disponível.") return None -async def handle_groq_request(url: str, headers: dict, data, storage, is_form_data: bool = False) -> Tuple[bool, dict, str]: - """ - Handle GROQ API request with retries and key rotation. - Suporta tanto JSON quanto FormData. - Returns: (success, response_data, error_message) - """ +async def handle_groq_request( + url: str, + headers: dict, + data: Any, + storage: StorageHandler, + is_form_data: bool = False +) -> Tuple[bool, dict, str]: + """Lida com requisições para a API GROQ com suporte a retries e rotação de chaves.""" max_retries = len(storage.get_groq_keys()) for attempt in range(max_retries): try: + storage.add_log("DEBUG", "Iniciando tentativa de requisição para GROQ", { + "url": url, + "is_form_data": is_form_data, + "attempt": attempt + 1 + }) + async with aiohttp.ClientSession() as session: if is_form_data: async with session.post(url, headers=headers, data=data) as response: response_data = await response.json() + if response.status == 200 and response_data.get("text"): + return True, response_data, "" else: async with session.post(url, headers=headers, json=data) as response: response_data = await response.json() - - if response.status == 200: - # Validate response content - if "choices" in response_data and response_data["choices"]: - content = response_data["choices"][0].get("message", {}).get("content") - if content and await validate_transcription_response(content): + if response.status == 200 and response_data.get("choices"): return True, response_data, "" - # Handle specific error cases error_msg = response_data.get("error", {}).get("message", "") + if "organization_restricted" in error_msg or "invalid_api_key" in error_msg: - # Try next key new_key = await get_working_groq_key(storage) if new_key: headers["Authorization"] = f"Bearer {new_key}" - storage.add_log("INFO", "Tentando nova chave GROQ após erro", { - "error": error_msg, - "attempt": attempt + 1 - }) + await asyncio.sleep(1) continue - - return False, {}, f"API Error: {error_msg}" - + + return False, response_data, error_msg + except Exception as e: - # Tratamento específico para erros de conexão - if "Connection" in str(e) and attempt < max_retries - 1: - storage.add_log("WARNING", "Erro de conexão, tentando novamente", { - "error": str(e), - "attempt": attempt + 1 - }) - await asyncio.sleep(1) # Espera 1 segundo antes de retry + storage.add_log("ERROR", "Erro na requisição", {"error": str(e)}) + if attempt < max_retries - 1: + await asyncio.sleep(1) continue - - # Se for última tentativa ou outro tipo de erro - if attempt == max_retries - 1: - storage.add_log("ERROR", "Todas tentativas falharam", { - "error": str(e), - "total_attempts": max_retries - }) - return False, {}, f"Request failed: {str(e)}" - continue - - storage.add_log("ERROR", "Todas as chaves GROQ falharam") - return False, {}, "All GROQ keys exhausted" + return False, {}, f"Request failed: {str(e)}" + + storage.add_log("ERROR", "Todas as chaves GROQ falharam.") + return False, {}, "All GROQ keys exhausted." \ No newline at end of file diff --git a/services.py b/services.py index 8dd9073..40c75bf 100644 --- a/services.py +++ b/services.py @@ -7,6 +7,7 @@ from storage import StorageHandler import os import json import tempfile +import traceback from groq_handler import get_working_groq_key, validate_transcription_response, handle_groq_request # Inicializa o storage handler storage = StorageHandler() @@ -146,7 +147,7 @@ async def summarize_text_if_needed(text): } try: - success, response_data, error = await handle_groq_request(url_completions, headers, json_data, storage) + success, response_data, error = await handle_groq_request(url_completions, headers, json_data, storage, is_form_data=False) if not success: raise Exception(error) @@ -234,27 +235,28 @@ async def transcribe_audio(audio_source, apikey=None, remote_jid=None, from_me=F elif not from_me: # Só detecta em mensagens recebidas try: # Realizar transcrição inicial sem idioma específico - data = aiohttp.FormData() - data.add_field('file', open(audio_source, 'rb'), filename='audio.mp3') - data.add_field('model', 'whisper-large-v3') - - success, response_data, error = await handle_groq_request(url, groq_headers, data, storage) - if success: - initial_text = response_data.get("text", "") - - # Detectar idioma do texto transcrito - detected_lang = await detect_language(initial_text) - - # Salvar no cache E na configuração do contato - storage.cache_language_detection(contact_id, detected_lang) - storage.set_contact_language(contact_id, detected_lang) - - contact_language = detected_lang - storage.add_log("INFO", "Idioma detectado e configurado", { - "language": detected_lang, - "remote_jid": remote_jid, - "auto_detected": True - }) + with open(audio_source, 'rb') as audio_file: + data = aiohttp.FormData() + data.add_field('file', audio_file, filename='audio.mp3') + data.add_field('model', 'whisper-large-v3') + + success, response_data, error = await handle_groq_request(url, groq_headers, data, storage, is_form_data=True) + if success: + initial_text = response_data.get("text", "") + + # Detectar idioma do texto transcrito + detected_lang = await detect_language(initial_text) + + # Salvar no cache E na configuração do contato + storage.cache_language_detection(contact_id, detected_lang) + storage.set_contact_language(contact_id, detected_lang) + + contact_language = detected_lang + storage.add_log("INFO", "Idioma detectado e configurado", { + "language": detected_lang, + "remote_jid": remote_jid, + "auto_detected": True + }) except Exception as e: storage.add_log("WARNING", "Erro na detecção automática de idioma", { "error": str(e), @@ -306,72 +308,73 @@ async def transcribe_audio(audio_source, apikey=None, remote_jid=None, from_me=F try: # Realizar transcrição - data = aiohttp.FormData() - data.add_field('file', open(audio_source, 'rb'), filename='audio.mp3') - data.add_field('model', 'whisper-large-v3') - data.add_field('language', transcription_language) - - if use_timestamps: - data.add_field('response_format', 'verbose_json') - - # Usar handle_groq_request para ter retry e validação - success, response_data, error = await handle_groq_request(url, groq_headers, data, storage) - if not success: - raise Exception(f"Erro na transcrição: {error}") - - transcription = format_timestamped_result(response_data) if use_timestamps else response_data.get("text", "") + with open(audio_source, 'rb') as audio_file: + data = aiohttp.FormData() + data.add_field('file', audio_file, filename='audio.mp3') + data.add_field('model', 'whisper-large-v3') + data.add_field('language', transcription_language) - # Validar o conteúdo da transcrição - if not await validate_transcription_response(transcription): - storage.add_log("ERROR", "Transcrição vazia ou inválida recebida") - raise Exception("Transcrição vazia ou inválida recebida") - - # Detecção automática para novos contatos - if (is_private and storage.get_auto_language_detection() and - not from_me and not contact_language): - try: - detected_lang = await detect_language(transcription) - storage.cache_language_detection(remote_jid, detected_lang) - contact_language = detected_lang - storage.add_log("INFO", "Idioma detectado e cacheado", { - "language": detected_lang, - "remote_jid": remote_jid - }) - except Exception as e: - storage.add_log("WARNING", "Erro na detecção de idioma", {"error": str(e)}) + if use_timestamps: + data.add_field('response_format', 'verbose_json') - # Tradução quando necessário - need_translation = ( - is_private and contact_language and - ( - (from_me and transcription_language != target_language) or - (not from_me and target_language != transcription_language) - ) - ) + # Usar handle_groq_request para ter retry e validação + success, response_data, error = await handle_groq_request(url, groq_headers, data, storage, is_form_data=True) + if not success: + raise Exception(f"Erro na transcrição: {error}") - if need_translation: - try: - transcription = await translate_text( - transcription, - transcription_language, - target_language + transcription = format_timestamped_result(response_data) if use_timestamps else response_data.get("text", "") + + # Validar o conteúdo da transcrição + if not await validate_transcription_response(transcription): + storage.add_log("ERROR", "Transcrição vazia ou inválida recebida") + raise Exception("Transcrição vazia ou inválida recebida") + + # Detecção automática para novos contatos + if (is_private and storage.get_auto_language_detection() and + not from_me and not contact_language): + try: + detected_lang = await detect_language(transcription) + storage.cache_language_detection(remote_jid, detected_lang) + contact_language = detected_lang + storage.add_log("INFO", "Idioma detectado e cacheado", { + "language": detected_lang, + "remote_jid": remote_jid + }) + except Exception as e: + storage.add_log("WARNING", "Erro na detecção de idioma", {"error": str(e)}) + + # Tradução quando necessário + need_translation = ( + is_private and contact_language and + ( + (from_me and transcription_language != target_language) or + (not from_me and target_language != transcription_language) ) - storage.add_log("INFO", "Texto traduzido automaticamente", { - "from": transcription_language, - "to": target_language - }) - except Exception as e: - storage.add_log("ERROR", "Erro na tradução", {"error": str(e)}) + ) - # Registrar estatísticas de uso - used_language = contact_language if contact_language else system_language - storage.record_language_usage( - used_language, - from_me, - bool(contact_language and contact_language != system_language) - ) - - return transcription, use_timestamps + if need_translation: + try: + transcription = await translate_text( + transcription, + transcription_language, + target_language + ) + storage.add_log("INFO", "Texto traduzido automaticamente", { + "from": transcription_language, + "to": target_language + }) + except Exception as e: + storage.add_log("ERROR", "Erro na tradução", {"error": str(e)}) + + # Registrar estatísticas de uso + used_language = contact_language if contact_language else system_language + storage.record_language_usage( + used_language, + from_me, + bool(contact_language and contact_language != system_language) + ) + + return transcription, use_timestamps except Exception as e: storage.add_log("ERROR", "Erro no processo de transcrição", { @@ -475,9 +478,9 @@ async def detect_language(text: str) -> str: } try: - success, response_data, error = await handle_groq_request(url_completions, headers, json_data, storage) + success, response_data, error = await handle_groq_request(url_completions, headers, json_data, storage, is_form_data=False) if not success: - raise Exception(error) + raise Exception(f"Falha na detecção de idioma: {error}") detected_language = response_data["choices"][0]["message"]["content"].strip().lower() @@ -697,9 +700,9 @@ async def translate_text(text: str, source_language: str, target_language: str) } try: - success, response_data, error = await handle_groq_request(url_completions, headers, json_data, storage) + success, response_data, error = await handle_groq_request(url_completions, headers, json_data, storage, is_form_data=False) if not success: - raise Exception(error) + raise Exception(f"Falha na tradução: {error}") translated_text = response_data["choices"][0]["message"]["content"].strip() diff --git a/storage.py b/storage.py index b91f48c..259c8f1 100644 --- a/storage.py +++ b/storage.py @@ -1,6 +1,6 @@ import json import os -from typing import List, Dict +from typing import List, Dict, Optional from datetime import datetime, timedelta import traceback import logging @@ -209,6 +209,29 @@ class StorageHandler: return keys[counter % len(keys)] + def get_penalized_until(self, key: str) -> Optional[datetime]: + """ + Retorna o timestamp até quando a chave está penalizada, ou None se não estiver penalizada. + """ + penalized_key = self._get_redis_key(f"groq_key_penalized_{key}") + penalized_until = self.redis.get(penalized_key) + if penalized_until: + return datetime.fromisoformat(penalized_until) + return None + + def penalize_key(self, key: str, penalty_duration: int): + """ + Penaliza uma chave por um tempo determinado (em segundos). + """ + penalized_key = self._get_redis_key(f"groq_key_penalized_{key}") + penalized_until = datetime.utcnow() + timedelta(seconds=penalty_duration) + self.redis.set(penalized_key, penalized_until.isoformat()) + self.redis.expire(penalized_key, penalty_duration) # Expira a chave após o tempo de penalidade + self.add_log("INFO", "Chave GROQ penalizada", { + "key": key, + "penalized_until": penalized_until.isoformat() + }) + def get_message_settings(self): """Obtém as configurações de mensagens.""" return {