diff --git a/.env.example b/.env.example index ff067eeb..7cd11d9f 100644 --- a/.env.example +++ b/.env.example @@ -39,6 +39,10 @@ SENDGRID_API_KEY="your-sendgrid-api-key" EMAIL_FROM="noreply@yourdomain.com" APP_URL="https://yourdomain.com" +LANGFUSE_PUBLIC_KEY="your-langfuse-public-key" +LANGFUSE_SECRET_KEY="your-langfuse-secret-key" +OTEL_EXPORTER_OTLP_ENDPOINT="https://cloud.langfuse.com/api/public/otel" + # Server settings HOST="0.0.0.0" PORT=8000 diff --git a/pyproject.toml b/pyproject.toml index 60a3f50a..fd057327 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,6 +49,8 @@ dependencies = [ "jwcrypto==1.5.6", "pyjwt[crypto]==2.9.0", "langgraph==0.4.1", + "opentelemetry-sdk==1.33.0", + "opentelemetry-exporter-otlp==1.33.0", ] [project.optional-dependencies] diff --git a/src/config/settings.py b/src/config/settings.py index 37561de6..75550cb7 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -84,6 +84,11 @@ class Settings(BaseSettings): DEMO_PASSWORD: str = os.getenv("DEMO_PASSWORD", "demo123") DEMO_CLIENT_NAME: str = os.getenv("DEMO_CLIENT_NAME", "Demo Client") + # Langfuse / OpenTelemetry settings + LANGFUSE_PUBLIC_KEY: str = os.getenv("LANGFUSE_PUBLIC_KEY", "") + LANGFUSE_SECRET_KEY: str = os.getenv("LANGFUSE_SECRET_KEY", "") + OTEL_EXPORTER_OTLP_ENDPOINT: str = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "") + class Config: env_file = ".env" env_file_encoding = "utf-8" diff --git a/src/main.py b/src/main.py index 9a9ef5de..d4e062e0 100644 --- a/src/main.py +++ b/src/main.py @@ -7,6 +7,7 @@ from fastapi.staticfiles import StaticFiles from src.config.database import engine, Base from src.config.settings import settings from src.utils.logger import setup_logger +from src.utils.otel import init_otel # Necessary for other modules from src.services.service_providers import session_service # noqa: F401 @@ -85,6 +86,9 @@ app.include_router(session_router, prefix=API_PREFIX) app.include_router(agent_router, prefix=API_PREFIX) app.include_router(a2a_router, prefix=API_PREFIX) +# Inicializa o OpenTelemetry para Langfuse +init_otel() + @app.get("/") def read_root(): diff --git a/src/services/agent_runner.py b/src/services/agent_runner.py index 727fb4d1..aa0822e3 100644 --- a/src/services/agent_runner.py +++ b/src/services/agent_runner.py @@ -11,6 +11,8 @@ from sqlalchemy.orm import Session from typing import Optional, AsyncGenerator import asyncio import json +from src.utils.otel import get_tracer +from opentelemetry import trace logger = setup_logger(__name__) @@ -26,159 +28,173 @@ async def run_agent( session_id: Optional[str] = None, timeout: float = 60.0, ): - exit_stack = None - try: - logger.info( - f"Starting execution of agent {agent_id} for external_id {external_id}" - ) - logger.info(f"Received message: {message}") + tracer = get_tracer() + with tracer.start_as_current_span( + "run_agent", + attributes={ + "agent_id": agent_id, + "external_id": external_id, + "session_id": session_id or f"{external_id}_{agent_id}", + "message": message, + }, + ): + exit_stack = None + try: + logger.info( + f"Starting execution of agent {agent_id} for external_id {external_id}" + ) + logger.info(f"Received message: {message}") - get_root_agent = get_agent(db, agent_id) - logger.info( - f"Root agent found: {get_root_agent.name} (type: {get_root_agent.type})" - ) + get_root_agent = get_agent(db, agent_id) + logger.info( + f"Root agent found: {get_root_agent.name} (type: {get_root_agent.type})" + ) - if get_root_agent is None: - raise AgentNotFoundError(f"Agent with ID {agent_id} not found") + if get_root_agent is None: + raise AgentNotFoundError(f"Agent with ID {agent_id} not found") - # Using the AgentBuilder to create the agent - agent_builder = AgentBuilder(db) - root_agent, exit_stack = await agent_builder.build_agent(get_root_agent) + # Using the AgentBuilder to create the agent + agent_builder = AgentBuilder(db) + root_agent, exit_stack = await agent_builder.build_agent(get_root_agent) - logger.info("Configuring Runner") - agent_runner = Runner( - agent=root_agent, - app_name=agent_id, - session_service=session_service, - artifact_service=artifacts_service, - memory_service=memory_service, - ) - adk_session_id = external_id + "_" + agent_id - if session_id is None: - session_id = adk_session_id + logger.info("Configuring Runner") + agent_runner = Runner( + agent=root_agent, + app_name=agent_id, + session_service=session_service, + artifact_service=artifacts_service, + memory_service=memory_service, + ) + adk_session_id = external_id + "_" + agent_id + if session_id is None: + session_id = adk_session_id - logger.info(f"Searching session for external_id {external_id}") - session = session_service.get_session( - app_name=agent_id, - user_id=external_id, - session_id=adk_session_id, - ) - - if session is None: - logger.info(f"Creating new session for external_id {external_id}") - session = session_service.create_session( + logger.info(f"Searching session for external_id {external_id}") + session = session_service.get_session( app_name=agent_id, user_id=external_id, session_id=adk_session_id, ) - content = Content(role="user", parts=[Part(text=message)]) - logger.info("Starting agent execution") + if session is None: + logger.info(f"Creating new session for external_id {external_id}") + session = session_service.create_session( + app_name=agent_id, + user_id=external_id, + session_id=adk_session_id, + ) - final_response_text = "No final response captured." - try: - response_queue = asyncio.Queue() - execution_completed = asyncio.Event() - - async def process_events(): - try: - events_async = agent_runner.run_async( - user_id=external_id, - session_id=adk_session_id, - new_message=content, - ) - - last_response = None - all_responses = [] - - async for event in events_async: - if ( - event.content - and event.content.parts - and event.content.parts[0].text - ): - current_text = event.content.parts[0].text - last_response = current_text - all_responses.append(current_text) - - if event.actions and event.actions.escalate: - escalate_text = f"Agent escalated: {event.error_message or 'No specific message.'}" - await response_queue.put(escalate_text) - execution_completed.set() - return - - if last_response: - await response_queue.put(last_response) - else: - await response_queue.put("Finished without specific response") - - execution_completed.set() - except Exception as e: - logger.error(f"Error in process_events: {str(e)}") - await response_queue.put(f"Error: {str(e)}") - execution_completed.set() - - task = asyncio.create_task(process_events()) + content = Content(role="user", parts=[Part(text=message)]) + logger.info("Starting agent execution") + final_response_text = "No final response captured." try: - wait_task = asyncio.create_task(execution_completed.wait()) - done, pending = await asyncio.wait({wait_task}, timeout=timeout) + response_queue = asyncio.Queue() + execution_completed = asyncio.Event() - for p in pending: - p.cancel() + async def process_events(): + try: + events_async = agent_runner.run_async( + user_id=external_id, + session_id=adk_session_id, + new_message=content, + ) - if not execution_completed.is_set(): - logger.warning(f"Agent execution timed out after {timeout} seconds") - await response_queue.put( - "The response took too long and was interrupted." - ) + last_response = None + all_responses = [] - final_response_text = await response_queue.get() + async for event in events_async: + if ( + event.content + and event.content.parts + and event.content.parts[0].text + ): + current_text = event.content.parts[0].text + last_response = current_text + all_responses.append(current_text) + + if event.actions and event.actions.escalate: + escalate_text = f"Agent escalated: {event.error_message or 'No specific message.'}" + await response_queue.put(escalate_text) + execution_completed.set() + return + + if last_response: + await response_queue.put(last_response) + else: + await response_queue.put( + "Finished without specific response" + ) + + execution_completed.set() + except Exception as e: + logger.error(f"Error in process_events: {str(e)}") + await response_queue.put(f"Error: {str(e)}") + execution_completed.set() + + task = asyncio.create_task(process_events()) + + try: + wait_task = asyncio.create_task(execution_completed.wait()) + done, pending = await asyncio.wait({wait_task}, timeout=timeout) + + for p in pending: + p.cancel() + + if not execution_completed.is_set(): + logger.warning( + f"Agent execution timed out after {timeout} seconds" + ) + await response_queue.put( + "The response took too long and was interrupted." + ) + + final_response_text = await response_queue.get() + + except Exception as e: + logger.error(f"Error waiting for response: {str(e)}") + final_response_text = f"Error processing response: {str(e)}" + + # Add the session to memory after completion + completed_session = session_service.get_session( + app_name=agent_id, + user_id=external_id, + session_id=adk_session_id, + ) + + memory_service.add_session_to_memory(completed_session) + + # Cancel the processing task if it is still running + if not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + logger.info("Task cancelled successfully") + except Exception as e: + logger.error(f"Error cancelling task: {str(e)}") except Exception as e: - logger.error(f"Error waiting for response: {str(e)}") - final_response_text = f"Error processing response: {str(e)}" + logger.error(f"Error processing request: {str(e)}") + raise e - # Add the session to memory after completion - completed_session = session_service.get_session( - app_name=agent_id, - user_id=external_id, - session_id=adk_session_id, - ) - - memory_service.add_session_to_memory(completed_session) - - # Cancel the processing task if it is still running - if not task.done(): - task.cancel() - try: - await task - except asyncio.CancelledError: - logger.info("Task cancelled successfully") - except Exception as e: - logger.error(f"Error cancelling task: {str(e)}") - - except Exception as e: + logger.info("Agent execution completed successfully") + return final_response_text + except AgentNotFoundError as e: logger.error(f"Error processing request: {str(e)}") raise e - - logger.info("Agent execution completed successfully") - return final_response_text - except AgentNotFoundError as e: - logger.error(f"Error processing request: {str(e)}") - raise e - except Exception as e: - logger.error(f"Internal error processing request: {str(e)}", exc_info=True) - raise InternalServerError(str(e)) - finally: - # Clean up MCP connection - MUST be executed in the same task - if exit_stack: - logger.info("Closing MCP server connection...") - try: - await exit_stack.aclose() - except Exception as e: - logger.error(f"Error closing MCP connection: {e}") - # Do not raise the exception to not obscure the original error + except Exception as e: + logger.error(f"Internal error processing request: {str(e)}", exc_info=True) + raise InternalServerError(str(e)) + finally: + # Clean up MCP connection - MUST be executed in the same task + if exit_stack: + logger.info("Closing MCP server connection...") + try: + await exit_stack.aclose() + except Exception as e: + logger.error(f"Error closing MCP connection: {e}") + # Do not raise the exception to not obscure the original error def convert_sets(obj): @@ -202,89 +218,105 @@ async def run_agent_stream( db: Session, session_id: Optional[str] = None, ) -> AsyncGenerator[str, None]: + tracer = get_tracer() + span = tracer.start_span( + "run_agent_stream", + attributes={ + "agent_id": agent_id, + "external_id": external_id, + "session_id": session_id or f"{external_id}_{agent_id}", + "message": message, + }, + ) try: - logger.info( - f"Starting streaming execution of agent {agent_id} for external_id {external_id}" - ) - logger.info(f"Received message: {message}") + with trace.use_span(span, end_on_exit=True): + try: + logger.info( + f"Starting streaming execution of agent {agent_id} for external_id {external_id}" + ) + logger.info(f"Received message: {message}") - get_root_agent = get_agent(db, agent_id) - logger.info( - f"Root agent found: {get_root_agent.name} (type: {get_root_agent.type})" - ) + get_root_agent = get_agent(db, agent_id) + logger.info( + f"Root agent found: {get_root_agent.name} (type: {get_root_agent.type})" + ) - if get_root_agent is None: - raise AgentNotFoundError(f"Agent with ID {agent_id} not found") + if get_root_agent is None: + raise AgentNotFoundError(f"Agent with ID {agent_id} not found") - # Using the AgentBuilder to create the agent - agent_builder = AgentBuilder(db) - root_agent, exit_stack = await agent_builder.build_agent(get_root_agent) + # Using the AgentBuilder to create the agent + agent_builder = AgentBuilder(db) + root_agent, exit_stack = await agent_builder.build_agent(get_root_agent) - logger.info("Configuring Runner") - agent_runner = Runner( - agent=root_agent, - app_name=agent_id, - session_service=session_service, - artifact_service=artifacts_service, - memory_service=memory_service, - ) - adk_session_id = external_id + "_" + agent_id - if session_id is None: - session_id = adk_session_id + logger.info("Configuring Runner") + agent_runner = Runner( + agent=root_agent, + app_name=agent_id, + session_service=session_service, + artifact_service=artifacts_service, + memory_service=memory_service, + ) + adk_session_id = external_id + "_" + agent_id + if session_id is None: + session_id = adk_session_id - logger.info(f"Searching session for external_id {external_id}") - session = session_service.get_session( - app_name=agent_id, - user_id=external_id, - session_id=adk_session_id, - ) + logger.info(f"Searching session for external_id {external_id}") + session = session_service.get_session( + app_name=agent_id, + user_id=external_id, + session_id=adk_session_id, + ) - if session is None: - logger.info(f"Creating new session for external_id {external_id}") - session = session_service.create_session( - app_name=agent_id, - user_id=external_id, - session_id=adk_session_id, - ) + if session is None: + logger.info(f"Creating new session for external_id {external_id}") + session = session_service.create_session( + app_name=agent_id, + user_id=external_id, + session_id=adk_session_id, + ) - content = Content(role="user", parts=[Part(text=message)]) - logger.info("Starting agent streaming execution") + content = Content(role="user", parts=[Part(text=message)]) + logger.info("Starting agent streaming execution") - try: - events_async = agent_runner.run_async( - user_id=external_id, - session_id=adk_session_id, - new_message=content, - ) - - async for event in events_async: - event_dict = event.dict() - event_dict = convert_sets(event_dict) - yield json.dumps(event_dict) - - completed_session = session_service.get_session( - app_name=agent_id, - user_id=external_id, - session_id=adk_session_id, - ) - - memory_service.add_session_to_memory(completed_session) - except Exception as e: - logger.error(f"Error processing request: {str(e)}") - raise e - finally: - # Clean up MCP connection - if exit_stack: - logger.info("Closing MCP server connection...") try: - await exit_stack.aclose() - except Exception as e: - logger.error(f"Error closing MCP connection: {e}") + events_async = agent_runner.run_async( + user_id=external_id, + session_id=adk_session_id, + new_message=content, + ) - logger.info("Agent streaming execution completed successfully") - except AgentNotFoundError as e: - logger.error(f"Error processing request: {str(e)}") - raise e - except Exception as e: - logger.error(f"Internal error processing request: {str(e)}", exc_info=True) - raise InternalServerError(str(e)) + async for event in events_async: + event_dict = event.dict() + event_dict = convert_sets(event_dict) + yield json.dumps(event_dict) + + completed_session = session_service.get_session( + app_name=agent_id, + user_id=external_id, + session_id=adk_session_id, + ) + + memory_service.add_session_to_memory(completed_session) + except Exception as e: + logger.error(f"Error processing request: {str(e)}") + raise e + finally: + # Clean up MCP connection + if exit_stack: + logger.info("Closing MCP server connection...") + try: + await exit_stack.aclose() + except Exception as e: + logger.error(f"Error closing MCP connection: {e}") + + logger.info("Agent streaming execution completed successfully") + except AgentNotFoundError as e: + logger.error(f"Error processing request: {str(e)}") + raise e + except Exception as e: + logger.error( + f"Internal error processing request: {str(e)}", exc_info=True + ) + raise InternalServerError(str(e)) + finally: + span.end() diff --git a/src/utils/otel.py b/src/utils/otel.py new file mode 100644 index 00000000..b7776f55 --- /dev/null +++ b/src/utils/otel.py @@ -0,0 +1,41 @@ +import os +import base64 +from src.config.settings import settings + +from opentelemetry import trace +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter + +_otlp_initialized = False + + +def init_otel(): + global _otlp_initialized + if _otlp_initialized: + return + if not ( + settings.LANGFUSE_PUBLIC_KEY + and settings.LANGFUSE_SECRET_KEY + and settings.OTEL_EXPORTER_OTLP_ENDPOINT + ): + return + + langfuse_auth = base64.b64encode( + f"{settings.LANGFUSE_PUBLIC_KEY}:{settings.LANGFUSE_SECRET_KEY}".encode() + ).decode() + os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = settings.OTEL_EXPORTER_OTLP_ENDPOINT + os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"Authorization=Basic {langfuse_auth}" + + provider = TracerProvider( + resource=Resource.create({"service.name": "evo_ai_agent"}) + ) + exporter = OTLPSpanExporter() + provider.add_span_processor(BatchSpanProcessor(exporter)) + trace.set_tracer_provider(provider) + _otlp_initialized = True + + +def get_tracer(name: str = "evo_ai_agent"): + return trace.get_tracer(name)