220 lines
9.3 KiB
Python
220 lines
9.3 KiB
Python
import json
|
|
import os
|
|
from typing import List, Dict
|
|
from datetime import datetime, timedelta
|
|
import traceback
|
|
import logging
|
|
import redis
|
|
|
|
class StorageHandler:
|
|
def __init__(self):
|
|
# Configuração de logger
|
|
self.logger = logging.getLogger("StorageHandler")
|
|
handler = logging.StreamHandler()
|
|
formatter = logging.Formatter(
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
handler.setFormatter(formatter)
|
|
self.logger.addHandler(handler)
|
|
self.logger.setLevel(logging.DEBUG)
|
|
self.logger.info("StorageHandler inicializado.")
|
|
|
|
# Conexão com o Redis
|
|
self.redis = redis.Redis(
|
|
host=os.getenv('REDIS_HOST', 'localhost'),
|
|
port=int(os.getenv('REDIS_PORT', 6380)),
|
|
db=0,
|
|
decode_responses=True
|
|
)
|
|
|
|
# Retenção de logs e backups
|
|
self.log_retention_hours = int(os.getenv('LOG_RETENTION_HOURS', 48))
|
|
self.backup_retention_days = int(os.getenv('BACKUP_RETENTION_DAYS', 7))
|
|
|
|
def _get_redis_key(self, key):
|
|
return f"transcrevezap:{key}"
|
|
|
|
def add_log(self, level: str, message: str, metadata: dict = None):
|
|
log_entry = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"level": level,
|
|
"message": message,
|
|
"metadata": json.dumps(metadata) if metadata else None
|
|
}
|
|
self.redis.lpush(self._get_redis_key("logs"), json.dumps(log_entry))
|
|
self.redis.ltrim(self._get_redis_key("logs"), 0, 999) # Manter apenas os últimos 1000 logs
|
|
self.logger.log(getattr(logging, level.upper(), logging.INFO), f"{message} | Metadata: {metadata}")
|
|
|
|
def get_allowed_groups(self) -> List[str]:
|
|
return self.redis.smembers(self._get_redis_key("allowed_groups"))
|
|
|
|
def add_allowed_group(self, group: str):
|
|
self.redis.sadd(self._get_redis_key("allowed_groups"), group)
|
|
|
|
def remove_allowed_group(self, group: str):
|
|
self.redis.srem(self._get_redis_key("allowed_groups"), group)
|
|
|
|
def get_blocked_users(self) -> List[str]:
|
|
return self.redis.smembers(self._get_redis_key("blocked_users"))
|
|
|
|
def add_blocked_user(self, user: str):
|
|
self.redis.sadd(self._get_redis_key("blocked_users"), user)
|
|
|
|
def remove_blocked_user(self, user: str):
|
|
self.redis.srem(self._get_redis_key("blocked_users"), user)
|
|
|
|
def get_statistics(self) -> Dict:
|
|
total_processed = int(self.redis.get(self._get_redis_key("total_processed")) or 0)
|
|
last_processed = self.redis.get(self._get_redis_key("last_processed"))
|
|
daily_count = json.loads(self.redis.get(self._get_redis_key("daily_count")) or "{}")
|
|
group_count = json.loads(self.redis.get(self._get_redis_key("group_count")) or "{}")
|
|
user_count = json.loads(self.redis.get(self._get_redis_key("user_count")) or "{}")
|
|
error_count = int(self.redis.get(self._get_redis_key("error_count")) or 0)
|
|
success_rate = float(self.redis.get(self._get_redis_key("success_rate")) or 100.0)
|
|
|
|
return {
|
|
"total_processed": total_processed,
|
|
"last_processed": last_processed,
|
|
"stats": {
|
|
"daily_count": daily_count,
|
|
"group_count": group_count,
|
|
"user_count": user_count,
|
|
"error_count": error_count,
|
|
"success_rate": success_rate,
|
|
}
|
|
}
|
|
|
|
def can_process_message(self, remote_jid):
|
|
try:
|
|
allowed_groups = self.get_allowed_groups()
|
|
blocked_users = self.get_blocked_users()
|
|
|
|
if remote_jid in blocked_users:
|
|
return False
|
|
if "@g.us" in remote_jid and remote_jid not in allowed_groups:
|
|
return False
|
|
|
|
return True
|
|
except Exception as e:
|
|
self.logger.error(f"Erro ao verificar se pode processar mensagem: {e}")
|
|
return False
|
|
|
|
def record_processing(self, remote_jid):
|
|
try:
|
|
# Incrementar total processado
|
|
self.redis.incr(self._get_redis_key("total_processed"))
|
|
|
|
# Atualizar último processamento
|
|
self.redis.set(self._get_redis_key("last_processed"), datetime.now().isoformat())
|
|
|
|
# Atualizar contagem diária
|
|
today = datetime.now().strftime("%Y-%m-%d")
|
|
daily_count = json.loads(self.redis.get(self._get_redis_key("daily_count")) or "{}")
|
|
daily_count[today] = daily_count.get(today, 0) + 1
|
|
self.redis.set(self._get_redis_key("daily_count"), json.dumps(daily_count))
|
|
|
|
# Atualizar contagem de grupo ou usuário
|
|
if "@g.us" in remote_jid:
|
|
group_count = json.loads(self.redis.get(self._get_redis_key("group_count")) or "{}")
|
|
group_count[remote_jid] = group_count.get(remote_jid, 0) + 1
|
|
self.redis.set(self._get_redis_key("group_count"), json.dumps(group_count))
|
|
else:
|
|
user_count = json.loads(self.redis.get(self._get_redis_key("user_count")) or "{}")
|
|
user_count[remote_jid] = user_count.get(remote_jid, 0) + 1
|
|
self.redis.set(self._get_redis_key("user_count"), json.dumps(user_count))
|
|
|
|
# Atualizar taxa de sucesso
|
|
total = int(self.redis.get(self._get_redis_key("total_processed")) or 0)
|
|
errors = int(self.redis.get(self._get_redis_key("error_count")) or 0)
|
|
success_rate = ((total - errors) / total) * 100 if total > 0 else 100
|
|
self.redis.set(self._get_redis_key("success_rate"), success_rate)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Erro ao registrar processamento: {e}")
|
|
|
|
def record_error(self):
|
|
self.redis.incr(self._get_redis_key("error_count"))
|
|
|
|
def clean_old_logs(self):
|
|
try:
|
|
cutoff_time = datetime.now() - timedelta(hours=self.log_retention_hours)
|
|
logs = self.redis.lrange(self._get_redis_key("logs"), 0, -1)
|
|
for log in logs:
|
|
log_entry = json.loads(log)
|
|
if datetime.fromisoformat(log_entry["timestamp"]) < cutoff_time:
|
|
self.redis.lrem(self._get_redis_key("logs"), 0, log)
|
|
else:
|
|
break # Assumindo que os logs estão ordenados por tempo
|
|
except Exception as e:
|
|
self.logger.error(f"Erro ao limpar logs antigos: {e}")
|
|
|
|
def backup_data(self):
|
|
try:
|
|
data = {
|
|
"allowed_groups": list(self.get_allowed_groups()),
|
|
"blocked_users": list(self.get_blocked_users()),
|
|
"statistics": self.get_statistics(),
|
|
}
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
backup_key = f"backup:{timestamp}"
|
|
self.redis.set(backup_key, json.dumps(data))
|
|
self.redis.expire(backup_key, self.backup_retention_days * 24 * 60 * 60) # Expira após os dias de retenção
|
|
except Exception as e:
|
|
self.logger.error(f"Erro ao criar backup: {e}")
|
|
|
|
def clean_old_backups(self):
|
|
try:
|
|
for key in self.redis.scan_iter("backup:*"):
|
|
if self.redis.ttl(key) <= 0:
|
|
self.redis.delete(key)
|
|
except Exception as e:
|
|
self.logger.error(f"Erro ao limpar backups antigos: {e}")
|
|
|
|
# Método de rotação de chaves groq
|
|
def get_groq_keys(self) -> List[str]:
|
|
"""Obtém todas as chaves GROQ armazenadas."""
|
|
return list(self.redis.smembers(self._get_redis_key("groq_keys")))
|
|
|
|
def add_groq_key(self, key: str):
|
|
"""Adiciona uma nova chave GROQ ao conjunto."""
|
|
if key and key.startswith("gsk_"):
|
|
self.redis.sadd(self._get_redis_key("groq_keys"), key)
|
|
return True
|
|
return False
|
|
|
|
def remove_groq_key(self, key: str):
|
|
"""Remove uma chave GROQ do conjunto."""
|
|
self.redis.srem(self._get_redis_key("groq_keys"), key)
|
|
|
|
def get_next_groq_key(self) -> str:
|
|
"""
|
|
Obtém a próxima chave GROQ no sistema de rodízio.
|
|
Utiliza um contador no Redis para controlar a rotação.
|
|
"""
|
|
keys = self.get_groq_keys()
|
|
if not keys:
|
|
return None
|
|
# Obtém e incrementa o contador de rodízio
|
|
counter = int(self.redis.get(self._get_redis_key("groq_key_counter")) or "0")
|
|
next_counter = (counter + 1) % len(keys)
|
|
self.redis.set(self._get_redis_key("groq_key_counter"), str(next_counter))
|
|
|
|
return keys[counter % len(keys)]
|
|
|
|
def get_message_settings(self):
|
|
"""Obtém as configurações de mensagens."""
|
|
return {
|
|
"summary_header": self.redis.get(self._get_redis_key("summary_header")) or "🤖 *Resumo do áudio:*",
|
|
"transcription_header": self.redis.get(self._get_redis_key("transcription_header")) or "🔊 *Transcrição do áudio:*",
|
|
"output_mode": self.redis.get(self._get_redis_key("output_mode")) or "both",
|
|
"character_limit": int(self.redis.get(self._get_redis_key("character_limit")) or "500"),
|
|
}
|
|
|
|
def save_message_settings(self, settings: dict):
|
|
"""Salva as configurações de mensagens."""
|
|
for key, value in settings.items():
|
|
self.redis.set(self._get_redis_key(key), str(value))
|
|
|
|
def get_process_mode(self):
|
|
"""Retorna o modo de processamento configurado"""
|
|
return self.redis.get(self._get_redis_key("process_mode")) or "all" |