structure saas with tools and mcps

This commit is contained in:
Davidson Gomes 2025-04-25 19:10:26 -03:00
parent d413984c5b
commit b3f87abce1
16 changed files with 299 additions and 148 deletions

11
.env
View File

@ -1,3 +1,14 @@
OPENAI_API_KEY=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 OPENAI_API_KEY=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
POSTGRES_CONNECTION_STRING=postgresql://postgres:root@localhost:5432/google-a2a-saas POSTGRES_CONNECTION_STRING=postgresql://postgres:root@localhost:5432/google-a2a-saas
TENANT_ID=45cffb85-51c8-41ed-aa8d-710970a7ce50
KNOWLEDGE_API_URL=http://localhost:5540
KNOWLEDGE_API_KEY=79405047-7a5e-4b18-b25a-4af149d747dc
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=3
REDIS_PASSWORD=
LOG_LEVEL=DEBUG

View File

@ -9,19 +9,19 @@ from src.schemas.schemas import (
Client, ClientCreate, Client, ClientCreate,
Contact, ContactCreate, Contact, ContactCreate,
Agent, AgentCreate, Agent, AgentCreate,
Message, MessageCreate
) )
from src.services import ( from src.services import (
client_service, client_service,
contact_service, contact_service,
agent_service, agent_service,
message_service
) )
from src.schemas.chat import ChatRequest, ChatResponse, ErrorResponse from src.schemas.chat import ChatRequest, ChatResponse, ErrorResponse
from src.services.agent_runner import run_agent from src.services.agent_runner import run_agent
from src.core.exceptions import AgentNotFoundError, InternalServerError from src.core.exceptions import AgentNotFoundError
from google.adk.artifacts.in_memory_artifact_service import InMemoryArtifactService from google.adk.artifacts.in_memory_artifact_service import InMemoryArtifactService
from google.adk.sessions import DatabaseSessionService from google.adk.sessions import DatabaseSessionService
from google.adk.memory import InMemoryMemoryService
from google.adk.memory import InMemoryMemoryService
from src.config.settings import settings from src.config.settings import settings
router = APIRouter() router = APIRouter()
@ -32,6 +32,7 @@ POSTGRES_CONNECTION_STRING = settings.POSTGRES_CONNECTION_STRING
# Inicializar os serviços globalmente # Inicializar os serviços globalmente
session_service = DatabaseSessionService(db_url=POSTGRES_CONNECTION_STRING) session_service = DatabaseSessionService(db_url=POSTGRES_CONNECTION_STRING)
artifacts_service = InMemoryArtifactService() artifacts_service = InMemoryArtifactService()
memory_service = InMemoryMemoryService()
@router.post("/chat", response_model=ChatResponse, responses={ @router.post("/chat", response_model=ChatResponse, responses={
400: {"model": ErrorResponse}, 400: {"model": ErrorResponse},
@ -46,6 +47,7 @@ async def chat(request: ChatRequest, db: Session = Depends(get_db)):
request.message, request.message,
session_service, session_service,
artifacts_service, artifacts_service,
memory_service,
db db
) )
@ -146,32 +148,3 @@ def delete_agent(agent_id: uuid.UUID, db: Session = Depends(get_db)):
if not agent_service.delete_agent(db, agent_id): if not agent_service.delete_agent(db, agent_id):
raise HTTPException(status_code=404, detail="Agent not found") raise HTTPException(status_code=404, detail="Agent not found")
return {"message": "Agent deleted successfully"} return {"message": "Agent deleted successfully"}
# Rotas para Mensagens
@router.post("/messages/", response_model=Message)
def create_message(message: MessageCreate, db: Session = Depends(get_db)):
return message_service.create_message(db, message)
@router.get("/messages/{session_id}", response_model=List[Message])
def read_messages(session_id: uuid.UUID, skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
return message_service.get_messages_by_session(db, session_id, skip, limit)
@router.get("/message/{message_id}", response_model=Message)
def read_message(message_id: int, db: Session = Depends(get_db)):
db_message = message_service.get_message(db, message_id)
if db_message is None:
raise HTTPException(status_code=404, detail="Message not found")
return db_message
@router.put("/message/{message_id}", response_model=Message)
def update_message(message_id: int, message: MessageCreate, db: Session = Depends(get_db)):
db_message = message_service.update_message(db, message_id, message)
if db_message is None:
raise HTTPException(status_code=404, detail="Message not found")
return db_message
@router.delete("/message/{message_id}")
def delete_message(message_id: int, db: Session = Depends(get_db)):
if not message_service.delete_message(db, message_id):
raise HTTPException(status_code=404, detail="Message not found")
return {"message": "Message deleted successfully"}

View File

@ -28,6 +28,20 @@ class Settings(BaseSettings):
LOG_LEVEL: str = os.getenv("LOG_LEVEL", "INFO") LOG_LEVEL: str = os.getenv("LOG_LEVEL", "INFO")
LOG_DIR: str = "logs" LOG_DIR: str = "logs"
# Configurações da API de Conhecimento
KNOWLEDGE_API_URL: str = os.getenv("KNOWLEDGE_API_URL", "http://localhost:5540")
KNOWLEDGE_API_KEY: str = os.getenv("KNOWLEDGE_API_KEY", "")
TENANT_ID: str = os.getenv("TENANT_ID", "")
# Configurações do Redis
REDIS_HOST: str = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT: int = int(os.getenv("REDIS_PORT", 6379))
REDIS_DB: int = int(os.getenv("REDIS_DB", 0))
REDIS_PASSWORD: Optional[str] = os.getenv("REDIS_PASSWORD")
# TTL do cache de ferramentas em segundos (1 hora)
TOOLS_CACHE_TTL: int = int(os.getenv("TOOLS_CACHE_TTL", 3600))
class Config: class Config:
env_file = ".env" env_file = ".env"
case_sensitive = True case_sensitive = True

View File

@ -25,6 +25,7 @@ class Agent(Base):
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
client_id = Column(UUID(as_uuid=True), ForeignKey("clients.id", ondelete="CASCADE")) client_id = Column(UUID(as_uuid=True), ForeignKey("clients.id", ondelete="CASCADE"))
name = Column(String, nullable=False) name = Column(String, nullable=False)
description = Column(Text, nullable=True)
type = Column(String, nullable=False) type = Column(String, nullable=False)
model = Column(String, nullable=False) model = Column(String, nullable=False)
api_key = Column(String, nullable=False) api_key = Column(String, nullable=False)
@ -34,12 +35,3 @@ class Agent(Base):
__table_args__ = ( __table_args__ = (
CheckConstraint("type IN ('llm', 'sequential', 'parallel', 'loop')", name='check_agent_type'), CheckConstraint("type IN ('llm', 'sequential', 'parallel', 'loop')", name='check_agent_type'),
) )
class Message(Base):
__tablename__ = "messages"
id = Column(BigInteger, primary_key=True, autoincrement=True)
session_id = Column(UUID(as_uuid=True))
sender = Column(String)
content = Column(Text)
created_at = Column(DateTime(timezone=True), server_default=func.now())

View File

@ -48,18 +48,3 @@ class Agent(AgentBase):
class Config: class Config:
from_attributes = True from_attributes = True
class MessageBase(BaseModel):
session_id: UUID
sender: str
content: str
class MessageCreate(MessageBase):
pass
class Message(MessageBase):
id: int
created_at: datetime
class Config:
from_attributes = True

View File

@ -1,6 +1,7 @@
from typing import List, Optional, Tuple from typing import List, Optional, Tuple
from google.adk.agents.llm_agent import LlmAgent from google.adk.agents.llm_agent import LlmAgent
from google.adk.agents import SequentialAgent, ParallelAgent, LoopAgent from google.adk.agents import SequentialAgent, ParallelAgent, LoopAgent
from google.adk.memory import InMemoryMemoryService
from google.adk.models.lite_llm import LiteLlm from google.adk.models.lite_llm import LiteLlm
from src.utils.logger import setup_logger from src.utils.logger import setup_logger
from src.core.exceptions import AgentNotFoundError from src.core.exceptions import AgentNotFoundError
@ -9,16 +10,166 @@ from src.services.custom_tools import CustomToolBuilder
from src.services.mcp_service import MCPService from src.services.mcp_service import MCPService
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from contextlib import AsyncExitStack from contextlib import AsyncExitStack
from google.adk.agents.callback_context import CallbackContext
from google.adk.models import LlmResponse, LlmRequest
from typing import Optional
import logging
import os
import requests
from datetime import datetime
logger = setup_logger(__name__) logger = setup_logger(__name__)
def before_model_callback(
callback_context: CallbackContext, llm_request: LlmRequest
) -> Optional[LlmResponse]:
"""
Callback executado antes do modelo gerar uma resposta.
Sempre executa a busca na base de conhecimento antes de prosseguir.
"""
try:
agent_name = callback_context.agent_name
logger.debug(f"🔄 Before model call for agent: {agent_name}")
# Extrai a última mensagem do usuário
last_user_message = ""
if llm_request.contents and llm_request.contents[-1].role == "user":
if llm_request.contents[-1].parts:
last_user_message = llm_request.contents[-1].parts[0].text
logger.debug(f"📝 Última mensagem do usuário: {last_user_message}")
# Extrai e formata o histórico de mensagens
history = []
for content in llm_request.contents:
if content.parts and content.parts[0].text:
# Substitui 'model' por 'assistant' no role
role = "assistant" if content.role == "model" else content.role
history.append(
{
"role": role,
"content": {
"type": "text",
"text": content.parts[0].text,
},
}
)
# loga o histórico de mensagens
logger.debug(f"📝 Histórico de mensagens: {history}")
if last_user_message:
logger.info("🔍 Executando busca na base de conhecimento")
# Executa a busca na base de conhecimento de forma síncrona
search_results = search_knowledge_base_function_sync(
last_user_message, history
)
if search_results:
logger.info("✅ Resultados encontrados, adicionando ao contexto")
# Obtém a instrução original do sistema
original_instruction = llm_request.config.system_instruction or ""
# Adiciona os resultados da busca e o histórico ao contexto do sistema
modified_text = (
original_instruction
+ "\n\n<knowledge_context>\n"
+ str(search_results)
+ "\n</knowledge_context>\n\n<history>\n"
+ str(history)
+ "\n</history>"
)
llm_request.config.system_instruction = modified_text
logger.debug(
f"📝 Instrução do sistema atualizada com resultados da busca e histórico"
)
else:
logger.warning("⚠️ Nenhum resultado encontrado na busca")
else:
logger.warning("⚠️ Nenhuma mensagem do usuário encontrada")
logger.info("✅ Before_model_callback finalizado")
return None
except Exception as e:
logger.error(f"❌ Erro no before_model_callback: {str(e)}", exc_info=True)
return None
def search_knowledge_base_function_sync(query: str, history=[]):
"""
Search knowledge base de forma síncrona.
Args:
query (str): The search query, with user message and history messages, all in one string
Returns:
dict: The search results
"""
try:
logger.info("🔍 Iniciando busca na base de conhecimento")
logger.debug(f"Query recebida: {query}")
# url = os.getenv("KNOWLEDGE_API_URL") + "/api/v1/search"
url = os.getenv("KNOWLEDGE_API_URL") + "/api/v1/knowledge"
tenant_id = os.getenv("TENANT_ID")
url = url + "?tenant_id=" + tenant_id
logger.debug(f"URL da API: {url}")
logger.debug(f"Tenant ID: {tenant_id}")
headers = {
"x-api-key": f"{os.getenv('KNOWLEDGE_API_KEY')}",
"Content-Type": "application/json",
}
logger.debug(f"Headers configurados: {headers}")
payload = {
"gemini_api_key": os.getenv("GOOGLE_API_KEY"),
"gemini_model": "gemini-2.0-flash-lite-001",
"gemini_temperature": 0.7,
"query": query,
"tenant_id": tenant_id,
"history": history,
}
logger.debug(f"Payload da requisição: {payload}")
# Usando requests para fazer a requisição síncrona com timeout
logger.info("🔄 Fazendo requisição síncrona para a API de conhecimento")
# response = requests.post(url, headers=headers, json=payload)
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
logger.info("✅ Busca realizada com sucesso")
result = response.json()
logger.debug(f"Resultado da busca: {result}")
return result
else:
logger.error(
f"❌ Erro ao realizar busca. Status code: {response.status_code}"
)
return None
except requests.exceptions.Timeout:
logger.error("❌ Timeout ao realizar busca na base de conhecimento")
return None
except requests.exceptions.RequestException as e:
logger.error(f"❌ Erro na requisição: {str(e)}", exc_info=True)
return None
except Exception as e:
logger.error(f"❌ Erro ao realizar busca: {str(e)}", exc_info=True)
return None
class AgentBuilder: class AgentBuilder:
def __init__(self, db: Session): def __init__(self, db: Session, memory_service: InMemoryMemoryService):
self.db = db self.db = db
self.custom_tool_builder = CustomToolBuilder() self.custom_tool_builder = CustomToolBuilder()
self.mcp_service = MCPService() self.mcp_service = MCPService()
self.memory_service = memory_service
async def _create_llm_agent(self, agent) -> Tuple[LlmAgent, Optional[AsyncExitStack]]: async def _create_llm_agent(
self, agent
) -> Tuple[LlmAgent, Optional[AsyncExitStack]]:
"""Cria um agente LLM a partir dos dados do agente.""" """Cria um agente LLM a partir dos dados do agente."""
# Obtém ferramentas personalizadas da configuração # Obtém ferramentas personalizadas da configuração
custom_tools = [] custom_tools = []
@ -34,15 +185,40 @@ class AgentBuilder:
# Combina todas as ferramentas # Combina todas as ferramentas
all_tools = custom_tools + mcp_tools all_tools = custom_tools + mcp_tools
return LlmAgent( # Verifica se load_memory está habilitado
name=agent.name, before_model_callback_func = None
model=LiteLlm(model=agent.model, api_key=agent.api_key), if agent.config.get("load_memory") == True:
instruction=agent.instruction, before_model_callback_func = before_model_callback
description=agent.config.get("description", ""),
tools=all_tools,
), mcp_exit_stack
async def _get_sub_agents(self, sub_agent_ids: List[str]) -> List[Tuple[LlmAgent, Optional[AsyncExitStack]]]: now = datetime.now()
current_datetime = now.strftime("%d/%m/%Y %H:%M")
current_day_of_week = now.strftime("%A")
current_date_iso = now.strftime("%Y-%m-%d")
current_time = now.strftime("%H:%M")
# Substitui as variáveis no prompt
formatted_prompt = agent.instruction.format(
current_datetime=current_datetime,
current_day_of_week=current_day_of_week,
current_date_iso=current_date_iso,
current_time=current_time,
)
return (
LlmAgent(
name=agent.name,
model=LiteLlm(model=agent.model, api_key=agent.api_key),
instruction=formatted_prompt,
description=agent.description,
tools=all_tools,
before_model_callback=before_model_callback_func,
),
mcp_exit_stack,
)
async def _get_sub_agents(
self, sub_agent_ids: List[str]
) -> List[Tuple[LlmAgent, Optional[AsyncExitStack]]]:
"""Obtém e cria os sub-agentes LLM.""" """Obtém e cria os sub-agentes LLM."""
sub_agents = [] sub_agents = []
for sub_agent_id in sub_agent_ids: for sub_agent_id in sub_agent_ids:
@ -52,20 +228,26 @@ class AgentBuilder:
raise AgentNotFoundError(f"Agente com ID {sub_agent_id} não encontrado") raise AgentNotFoundError(f"Agente com ID {sub_agent_id} não encontrado")
if agent.type != "llm": if agent.type != "llm":
raise ValueError(f"Agente {agent.name} (ID: {agent.id}) não é um agente LLM") raise ValueError(
f"Agente {agent.name} (ID: {agent.id}) não é um agente LLM"
)
sub_agent, exit_stack = await self._create_llm_agent(agent) sub_agent, exit_stack = await self._create_llm_agent(agent)
sub_agents.append((sub_agent, exit_stack)) sub_agents.append((sub_agent, exit_stack))
return sub_agents return sub_agents
async def build_llm_agent(self, root_agent) -> Tuple[LlmAgent, Optional[AsyncExitStack]]: async def build_llm_agent(
self, root_agent
) -> Tuple[LlmAgent, Optional[AsyncExitStack]]:
"""Constrói um agente LLM com seus sub-agentes.""" """Constrói um agente LLM com seus sub-agentes."""
logger.info("Criando agente LLM") logger.info("Criando agente LLM")
sub_agents = [] sub_agents = []
if root_agent.config.get("sub_agents"): if root_agent.config.get("sub_agents"):
sub_agents_with_stacks = await self._get_sub_agents(root_agent.config.get("sub_agents")) sub_agents_with_stacks = await self._get_sub_agents(
root_agent.config.get("sub_agents")
)
sub_agents = [agent for agent, _ in sub_agents_with_stacks] sub_agents = [agent for agent, _ in sub_agents_with_stacks]
root_llm_agent, exit_stack = await self._create_llm_agent(root_agent) root_llm_agent, exit_stack = await self._create_llm_agent(root_agent)
@ -74,39 +256,56 @@ class AgentBuilder:
return root_llm_agent, exit_stack return root_llm_agent, exit_stack
async def build_composite_agent(self, root_agent) -> Tuple[SequentialAgent | ParallelAgent | LoopAgent, Optional[AsyncExitStack]]: async def build_composite_agent(
self, root_agent
) -> Tuple[SequentialAgent | ParallelAgent | LoopAgent, Optional[AsyncExitStack]]:
"""Constrói um agente composto (Sequential, Parallel ou Loop) com seus sub-agentes.""" """Constrói um agente composto (Sequential, Parallel ou Loop) com seus sub-agentes."""
logger.info(f"Processando sub-agentes para agente {root_agent.type}") logger.info(f"Processando sub-agentes para agente {root_agent.type}")
sub_agents_with_stacks = await self._get_sub_agents(root_agent.config.get("sub_agents", [])) sub_agents_with_stacks = await self._get_sub_agents(
root_agent.config.get("sub_agents", [])
)
sub_agents = [agent for agent, _ in sub_agents_with_stacks] sub_agents = [agent for agent, _ in sub_agents_with_stacks]
if root_agent.type == "sequential": if root_agent.type == "sequential":
logger.info("Criando SequentialAgent") logger.info("Criando SequentialAgent")
return SequentialAgent( return (
name=root_agent.name, SequentialAgent(
sub_agents=sub_agents, name=root_agent.name,
description=root_agent.config.get("description", ""), sub_agents=sub_agents,
), None description=root_agent.config.get("description", ""),
),
None,
)
elif root_agent.type == "parallel": elif root_agent.type == "parallel":
logger.info("Criando ParallelAgent") logger.info("Criando ParallelAgent")
return ParallelAgent( return (
name=root_agent.name, ParallelAgent(
sub_agents=sub_agents, name=root_agent.name,
description=root_agent.config.get("description", ""), sub_agents=sub_agents,
), None description=root_agent.config.get("description", ""),
),
None,
)
elif root_agent.type == "loop": elif root_agent.type == "loop":
logger.info("Criando LoopAgent") logger.info("Criando LoopAgent")
return LoopAgent( return (
name=root_agent.name, LoopAgent(
sub_agents=sub_agents, name=root_agent.name,
description=root_agent.config.get("description", ""), sub_agents=sub_agents,
max_iterations=root_agent.config.get("max_iterations", 5), description=root_agent.config.get("description", ""),
), None max_iterations=root_agent.config.get("max_iterations", 5),
),
None,
)
else: else:
raise ValueError(f"Tipo de agente inválido: {root_agent.type}") raise ValueError(f"Tipo de agente inválido: {root_agent.type}")
async def build_agent(self, root_agent) -> Tuple[LlmAgent | SequentialAgent | ParallelAgent | LoopAgent, Optional[AsyncExitStack]]: async def build_agent(
self, root_agent
) -> Tuple[
LlmAgent | SequentialAgent | ParallelAgent | LoopAgent, Optional[AsyncExitStack]
]:
"""Constrói o agente apropriado baseado no tipo do agente root.""" """Constrói o agente apropriado baseado no tipo do agente root."""
if root_agent.type == "llm": if root_agent.type == "llm":
return await self.build_llm_agent(root_agent) return await self.build_llm_agent(root_agent)

View File

@ -6,6 +6,7 @@ from google.adk.agents import SequentialAgent, ParallelAgent, LoopAgent
from google.adk.runners import Runner from google.adk.runners import Runner
from google.genai.types import Content, Part from google.genai.types import Content, Part
from google.adk.sessions import DatabaseSessionService from google.adk.sessions import DatabaseSessionService
from google.adk.memory import InMemoryMemoryService
from google.adk.artifacts.in_memory_artifact_service import InMemoryArtifactService from google.adk.artifacts.in_memory_artifact_service import InMemoryArtifactService
from src.utils.logger import setup_logger from src.utils.logger import setup_logger
from src.core.exceptions import AgentNotFoundError, InternalServerError from src.core.exceptions import AgentNotFoundError, InternalServerError
@ -23,6 +24,7 @@ async def run_agent(
message: str, message: str,
session_service: DatabaseSessionService, session_service: DatabaseSessionService,
artifacts_service: InMemoryArtifactService, artifacts_service: InMemoryArtifactService,
memory_service: InMemoryMemoryService,
db: Session, db: Session,
): ):
try: try:
@ -40,7 +42,7 @@ async def run_agent(
raise AgentNotFoundError(f"Agente com ID {agent_id} não encontrado") raise AgentNotFoundError(f"Agente com ID {agent_id} não encontrado")
# Usando o AgentBuilder para criar o agente # Usando o AgentBuilder para criar o agente
agent_builder = AgentBuilder(db) agent_builder = AgentBuilder(db, memory_service)
root_agent, exit_stack = await agent_builder.build_agent(get_root_agent) root_agent, exit_stack = await agent_builder.build_agent(get_root_agent)
logger.info("Configurando Runner") logger.info("Configurando Runner")

View File

@ -1,35 +0,0 @@
from sqlalchemy.orm import Session
from src.models.models import Message
from src.schemas.schemas import MessageCreate
from typing import List
import uuid
def get_message(db: Session, message_id: int) -> Message:
return db.query(Message).filter(Message.id == message_id).first()
def get_messages_by_session(db: Session, session_id: uuid.UUID, skip: int = 0, limit: int = 100) -> List[Message]:
return db.query(Message).filter(Message.session_id == session_id).offset(skip).limit(limit).all()
def create_message(db: Session, message: MessageCreate) -> Message:
db_message = Message(**message.model_dump())
db.add(db_message)
db.commit()
db.refresh(db_message)
return db_message
def update_message(db: Session, message_id: int, message: MessageCreate) -> Message:
db_message = db.query(Message).filter(Message.id == message_id).first()
if db_message:
for key, value in message.model_dump().items():
setattr(db_message, key, value)
db.commit()
db.refresh(db_message)
return db_message
def delete_message(db: Session, message_id: int) -> bool:
db_message = db.query(Message).filter(Message.id == message_id).first()
if db_message:
db.delete(db_message)
db.commit()
return True
return False

View File

@ -1,6 +1,8 @@
import logging import logging
import os
import sys import sys
from typing import Optional from typing import Optional
from src.config.settings import settings
class CustomFormatter(logging.Formatter): class CustomFormatter(logging.Formatter):
"""Formatação personalizada para logs""" """Formatação personalizada para logs"""
@ -26,25 +28,33 @@ class CustomFormatter(logging.Formatter):
formatter = logging.Formatter(log_fmt) formatter = logging.Formatter(log_fmt)
return formatter.format(record) return formatter.format(record)
def setup_logger(name: str, level: Optional[int] = logging.INFO) -> logging.Logger: def setup_logger(name: str) -> logging.Logger:
""" """
Configura um logger personalizado Configura um logger personalizado
Args: Args:
name: Nome do logger name: Nome do logger
level: Nível de log (default: INFO)
Returns: Returns:
logging.Logger: Logger configurado logging.Logger: Logger configurado
""" """
logger = logging.getLogger(name) logger = logging.getLogger(name)
logger.setLevel(level)
# Evitar duplicação de handlers # Remove handlers existentes para evitar duplicação
if not logger.handlers: if logger.handlers:
# Handler para console logger.handlers.clear()
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(CustomFormatter()) # Configura o nível do logger baseado na variável de ambiente ou configuração
logger.addHandler(console_handler) log_level = getattr(logging, os.getenv("LOG_LEVEL", settings.LOG_LEVEL).upper())
logger.setLevel(log_level)
# Handler para console
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(CustomFormatter())
console_handler.setLevel(log_level)
logger.addHandler(console_handler)
# Impede que os logs sejam propagados para o logger root
logger.propagate = False
return logger return logger