diff --git a/.env.example b/.env.example index ff067eeb..7cd11d9f 100644 --- a/.env.example +++ b/.env.example @@ -39,6 +39,10 @@ SENDGRID_API_KEY="your-sendgrid-api-key" EMAIL_FROM="noreply@yourdomain.com" APP_URL="https://yourdomain.com" +LANGFUSE_PUBLIC_KEY="your-langfuse-public-key" +LANGFUSE_SECRET_KEY="your-langfuse-secret-key" +OTEL_EXPORTER_OTLP_ENDPOINT="https://cloud.langfuse.com/api/public/otel" + # Server settings HOST="0.0.0.0" PORT=8000 diff --git a/pyproject.toml b/pyproject.toml index 60a3f50a..fd057327 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,6 +49,8 @@ dependencies = [ "jwcrypto==1.5.6", "pyjwt[crypto]==2.9.0", "langgraph==0.4.1", + "opentelemetry-sdk==1.33.0", + "opentelemetry-exporter-otlp==1.33.0", ] [project.optional-dependencies] diff --git a/src/api/chat_routes.py b/src/api/chat_routes.py index f69b73ce..16bce304 100644 --- a/src/api/chat_routes.py +++ b/src/api/chat_routes.py @@ -102,9 +102,8 @@ async def websocket_chat( memory_service=memory_service, db=db, ): - # Send each chunk as a JSON message await websocket.send_json( - {"message": chunk, "turn_complete": False} + {"message": json.loads(chunk), "turn_complete": False} ) # Send signal of complete turn diff --git a/src/config/settings.py b/src/config/settings.py index 37561de6..75550cb7 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -84,6 +84,11 @@ class Settings(BaseSettings): DEMO_PASSWORD: str = os.getenv("DEMO_PASSWORD", "demo123") DEMO_CLIENT_NAME: str = os.getenv("DEMO_CLIENT_NAME", "Demo Client") + # Langfuse / OpenTelemetry settings + LANGFUSE_PUBLIC_KEY: str = os.getenv("LANGFUSE_PUBLIC_KEY", "") + LANGFUSE_SECRET_KEY: str = os.getenv("LANGFUSE_SECRET_KEY", "") + OTEL_EXPORTER_OTLP_ENDPOINT: str = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "") + class Config: env_file = ".env" env_file_encoding = "utf-8" diff --git a/src/main.py b/src/main.py index 9a9ef5de..d4e062e0 100644 --- a/src/main.py +++ b/src/main.py @@ -7,6 +7,7 @@ from fastapi.staticfiles import StaticFiles from src.config.database import engine, Base from src.config.settings import settings from src.utils.logger import setup_logger +from src.utils.otel import init_otel # Necessary for other modules from src.services.service_providers import session_service # noqa: F401 @@ -85,6 +86,9 @@ app.include_router(session_router, prefix=API_PREFIX) app.include_router(agent_router, prefix=API_PREFIX) app.include_router(a2a_router, prefix=API_PREFIX) +# Inicializa o OpenTelemetry para Langfuse +init_otel() + @app.get("/") def read_root(): diff --git a/src/services/agent_runner.py b/src/services/agent_runner.py index e2e93020..aa0822e3 100644 --- a/src/services/agent_runner.py +++ b/src/services/agent_runner.py @@ -10,6 +10,9 @@ from src.services.agent_builder import AgentBuilder from sqlalchemy.orm import Session from typing import Optional, AsyncGenerator import asyncio +import json +from src.utils.otel import get_tracer +from opentelemetry import trace logger = setup_logger(__name__) @@ -25,159 +28,184 @@ async def run_agent( session_id: Optional[str] = None, timeout: float = 60.0, ): - exit_stack = None - try: - logger.info( - f"Starting execution of agent {agent_id} for external_id {external_id}" - ) - logger.info(f"Received message: {message}") + tracer = get_tracer() + with tracer.start_as_current_span( + "run_agent", + attributes={ + "agent_id": agent_id, + "external_id": external_id, + "session_id": session_id or f"{external_id}_{agent_id}", + "message": message, + }, + ): + exit_stack = None + try: + logger.info( + f"Starting execution of agent {agent_id} for external_id {external_id}" + ) + logger.info(f"Received message: {message}") - get_root_agent = get_agent(db, agent_id) - logger.info( - f"Root agent found: {get_root_agent.name} (type: {get_root_agent.type})" - ) + get_root_agent = get_agent(db, agent_id) + logger.info( + f"Root agent found: {get_root_agent.name} (type: {get_root_agent.type})" + ) - if get_root_agent is None: - raise AgentNotFoundError(f"Agent with ID {agent_id} not found") + if get_root_agent is None: + raise AgentNotFoundError(f"Agent with ID {agent_id} not found") - # Using the AgentBuilder to create the agent - agent_builder = AgentBuilder(db) - root_agent, exit_stack = await agent_builder.build_agent(get_root_agent) + # Using the AgentBuilder to create the agent + agent_builder = AgentBuilder(db) + root_agent, exit_stack = await agent_builder.build_agent(get_root_agent) - logger.info("Configuring Runner") - agent_runner = Runner( - agent=root_agent, - app_name=agent_id, - session_service=session_service, - artifact_service=artifacts_service, - memory_service=memory_service, - ) - adk_session_id = external_id + "_" + agent_id - if session_id is None: - session_id = adk_session_id + logger.info("Configuring Runner") + agent_runner = Runner( + agent=root_agent, + app_name=agent_id, + session_service=session_service, + artifact_service=artifacts_service, + memory_service=memory_service, + ) + adk_session_id = external_id + "_" + agent_id + if session_id is None: + session_id = adk_session_id - logger.info(f"Searching session for external_id {external_id}") - session = session_service.get_session( - app_name=agent_id, - user_id=external_id, - session_id=adk_session_id, - ) - - if session is None: - logger.info(f"Creating new session for external_id {external_id}") - session = session_service.create_session( + logger.info(f"Searching session for external_id {external_id}") + session = session_service.get_session( app_name=agent_id, user_id=external_id, session_id=adk_session_id, ) - content = Content(role="user", parts=[Part(text=message)]) - logger.info("Starting agent execution") + if session is None: + logger.info(f"Creating new session for external_id {external_id}") + session = session_service.create_session( + app_name=agent_id, + user_id=external_id, + session_id=adk_session_id, + ) - final_response_text = "No final response captured." - try: - response_queue = asyncio.Queue() - execution_completed = asyncio.Event() - - async def process_events(): - try: - events_async = agent_runner.run_async( - user_id=external_id, - session_id=adk_session_id, - new_message=content, - ) - - last_response = None - all_responses = [] - - async for event in events_async: - if ( - event.content - and event.content.parts - and event.content.parts[0].text - ): - current_text = event.content.parts[0].text - last_response = current_text - all_responses.append(current_text) - - if event.actions and event.actions.escalate: - escalate_text = f"Agent escalated: {event.error_message or 'No specific message.'}" - await response_queue.put(escalate_text) - execution_completed.set() - return - - if last_response: - await response_queue.put(last_response) - else: - await response_queue.put("Finished without specific response") - - execution_completed.set() - except Exception as e: - logger.error(f"Error in process_events: {str(e)}") - await response_queue.put(f"Error: {str(e)}") - execution_completed.set() - - task = asyncio.create_task(process_events()) + content = Content(role="user", parts=[Part(text=message)]) + logger.info("Starting agent execution") + final_response_text = "No final response captured." try: - wait_task = asyncio.create_task(execution_completed.wait()) - done, pending = await asyncio.wait({wait_task}, timeout=timeout) + response_queue = asyncio.Queue() + execution_completed = asyncio.Event() - for p in pending: - p.cancel() + async def process_events(): + try: + events_async = agent_runner.run_async( + user_id=external_id, + session_id=adk_session_id, + new_message=content, + ) - if not execution_completed.is_set(): - logger.warning(f"Agent execution timed out after {timeout} seconds") - await response_queue.put( - "The response took too long and was interrupted." - ) + last_response = None + all_responses = [] - final_response_text = await response_queue.get() + async for event in events_async: + if ( + event.content + and event.content.parts + and event.content.parts[0].text + ): + current_text = event.content.parts[0].text + last_response = current_text + all_responses.append(current_text) + + if event.actions and event.actions.escalate: + escalate_text = f"Agent escalated: {event.error_message or 'No specific message.'}" + await response_queue.put(escalate_text) + execution_completed.set() + return + + if last_response: + await response_queue.put(last_response) + else: + await response_queue.put( + "Finished without specific response" + ) + + execution_completed.set() + except Exception as e: + logger.error(f"Error in process_events: {str(e)}") + await response_queue.put(f"Error: {str(e)}") + execution_completed.set() + + task = asyncio.create_task(process_events()) + + try: + wait_task = asyncio.create_task(execution_completed.wait()) + done, pending = await asyncio.wait({wait_task}, timeout=timeout) + + for p in pending: + p.cancel() + + if not execution_completed.is_set(): + logger.warning( + f"Agent execution timed out after {timeout} seconds" + ) + await response_queue.put( + "The response took too long and was interrupted." + ) + + final_response_text = await response_queue.get() + + except Exception as e: + logger.error(f"Error waiting for response: {str(e)}") + final_response_text = f"Error processing response: {str(e)}" + + # Add the session to memory after completion + completed_session = session_service.get_session( + app_name=agent_id, + user_id=external_id, + session_id=adk_session_id, + ) + + memory_service.add_session_to_memory(completed_session) + + # Cancel the processing task if it is still running + if not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + logger.info("Task cancelled successfully") + except Exception as e: + logger.error(f"Error cancelling task: {str(e)}") except Exception as e: - logger.error(f"Error waiting for response: {str(e)}") - final_response_text = f"Error processing response: {str(e)}" + logger.error(f"Error processing request: {str(e)}") + raise e - # Add the session to memory after completion - completed_session = session_service.get_session( - app_name=agent_id, - user_id=external_id, - session_id=adk_session_id, - ) - - memory_service.add_session_to_memory(completed_session) - - # Cancel the processing task if it is still running - if not task.done(): - task.cancel() - try: - await task - except asyncio.CancelledError: - logger.info("Task cancelled successfully") - except Exception as e: - logger.error(f"Error cancelling task: {str(e)}") - - except Exception as e: + logger.info("Agent execution completed successfully") + return final_response_text + except AgentNotFoundError as e: logger.error(f"Error processing request: {str(e)}") raise e + except Exception as e: + logger.error(f"Internal error processing request: {str(e)}", exc_info=True) + raise InternalServerError(str(e)) + finally: + # Clean up MCP connection - MUST be executed in the same task + if exit_stack: + logger.info("Closing MCP server connection...") + try: + await exit_stack.aclose() + except Exception as e: + logger.error(f"Error closing MCP connection: {e}") + # Do not raise the exception to not obscure the original error - logger.info("Agent execution completed successfully") - return final_response_text - except AgentNotFoundError as e: - logger.error(f"Error processing request: {str(e)}") - raise e - except Exception as e: - logger.error(f"Internal error processing request: {str(e)}", exc_info=True) - raise InternalServerError(str(e)) - finally: - # Clean up MCP connection - MUST be executed in the same task - if exit_stack: - logger.info("Closing MCP server connection...") - try: - await exit_stack.aclose() - except Exception as e: - logger.error(f"Error closing MCP connection: {e}") - # Do not raise the exception to not obscure the original error + +def convert_sets(obj): + if isinstance(obj, set): + return list(obj) + elif isinstance(obj, dict): + return {k: convert_sets(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [convert_sets(i) for i in obj] + else: + return obj async def run_agent_stream( @@ -190,91 +218,105 @@ async def run_agent_stream( db: Session, session_id: Optional[str] = None, ) -> AsyncGenerator[str, None]: + tracer = get_tracer() + span = tracer.start_span( + "run_agent_stream", + attributes={ + "agent_id": agent_id, + "external_id": external_id, + "session_id": session_id or f"{external_id}_{agent_id}", + "message": message, + }, + ) try: - logger.info( - f"Starting streaming execution of agent {agent_id} for external_id {external_id}" - ) - logger.info(f"Received message: {message}") + with trace.use_span(span, end_on_exit=True): + try: + logger.info( + f"Starting streaming execution of agent {agent_id} for external_id {external_id}" + ) + logger.info(f"Received message: {message}") - get_root_agent = get_agent(db, agent_id) - logger.info( - f"Root agent found: {get_root_agent.name} (type: {get_root_agent.type})" - ) + get_root_agent = get_agent(db, agent_id) + logger.info( + f"Root agent found: {get_root_agent.name} (type: {get_root_agent.type})" + ) - if get_root_agent is None: - raise AgentNotFoundError(f"Agent with ID {agent_id} not found") + if get_root_agent is None: + raise AgentNotFoundError(f"Agent with ID {agent_id} not found") - # Using the AgentBuilder to create the agent - agent_builder = AgentBuilder(db) - root_agent, exit_stack = await agent_builder.build_agent(get_root_agent) + # Using the AgentBuilder to create the agent + agent_builder = AgentBuilder(db) + root_agent, exit_stack = await agent_builder.build_agent(get_root_agent) - logger.info("Configuring Runner") - agent_runner = Runner( - agent=root_agent, - app_name=agent_id, - session_service=session_service, - artifact_service=artifacts_service, - memory_service=memory_service, - ) - adk_session_id = external_id + "_" + agent_id - if session_id is None: - session_id = adk_session_id + logger.info("Configuring Runner") + agent_runner = Runner( + agent=root_agent, + app_name=agent_id, + session_service=session_service, + artifact_service=artifacts_service, + memory_service=memory_service, + ) + adk_session_id = external_id + "_" + agent_id + if session_id is None: + session_id = adk_session_id - logger.info(f"Searching session for external_id {external_id}") - session = session_service.get_session( - app_name=agent_id, - user_id=external_id, - session_id=adk_session_id, - ) + logger.info(f"Searching session for external_id {external_id}") + session = session_service.get_session( + app_name=agent_id, + user_id=external_id, + session_id=adk_session_id, + ) - if session is None: - logger.info(f"Creating new session for external_id {external_id}") - session = session_service.create_session( - app_name=agent_id, - user_id=external_id, - session_id=adk_session_id, - ) + if session is None: + logger.info(f"Creating new session for external_id {external_id}") + session = session_service.create_session( + app_name=agent_id, + user_id=external_id, + session_id=adk_session_id, + ) - content = Content(role="user", parts=[Part(text=message)]) - logger.info("Starting agent streaming execution") + content = Content(role="user", parts=[Part(text=message)]) + logger.info("Starting agent streaming execution") - try: - events_async = agent_runner.run_async( - user_id=external_id, - session_id=adk_session_id, - new_message=content, - ) - - async for event in events_async: - if event.content and event.content.parts: - text = event.content.parts[0].text - if text: - yield text - await asyncio.sleep(0) # Allow other tasks to run - - completed_session = session_service.get_session( - app_name=agent_id, - user_id=external_id, - session_id=adk_session_id, - ) - - memory_service.add_session_to_memory(completed_session) - except Exception as e: - logger.error(f"Error processing request: {str(e)}") - raise e - finally: - # Clean up MCP connection - if exit_stack: - logger.info("Closing MCP server connection...") try: - await exit_stack.aclose() - except Exception as e: - logger.error(f"Error closing MCP connection: {e}") + events_async = agent_runner.run_async( + user_id=external_id, + session_id=adk_session_id, + new_message=content, + ) - logger.info("Agent streaming execution completed successfully") - except AgentNotFoundError as e: - logger.error(f"Error processing request: {str(e)}") - raise e - except Exception as e: - logger.error(f"Internal error processing request: {str(e)}", exc_info=True) - raise InternalServerError(str(e)) + async for event in events_async: + event_dict = event.dict() + event_dict = convert_sets(event_dict) + yield json.dumps(event_dict) + + completed_session = session_service.get_session( + app_name=agent_id, + user_id=external_id, + session_id=adk_session_id, + ) + + memory_service.add_session_to_memory(completed_session) + except Exception as e: + logger.error(f"Error processing request: {str(e)}") + raise e + finally: + # Clean up MCP connection + if exit_stack: + logger.info("Closing MCP server connection...") + try: + await exit_stack.aclose() + except Exception as e: + logger.error(f"Error closing MCP connection: {e}") + + logger.info("Agent streaming execution completed successfully") + except AgentNotFoundError as e: + logger.error(f"Error processing request: {str(e)}") + raise e + except Exception as e: + logger.error( + f"Internal error processing request: {str(e)}", exc_info=True + ) + raise InternalServerError(str(e)) + finally: + span.end() diff --git a/src/services/workflow_agent.py b/src/services/workflow_agent.py index a7217182..5571e6c6 100644 --- a/src/services/workflow_agent.py +++ b/src/services/workflow_agent.py @@ -304,7 +304,6 @@ class WorkflowAgent(BaseAgent): "session_id": session_id, } - # Função para message-node async def message_node_function( state: State, node_id: str, node_data: Dict[str, Any] ) -> AsyncGenerator[State, None]: @@ -318,7 +317,6 @@ class WorkflowAgent(BaseAgent): session_id = state.get("session_id", "") conversation_history = state.get("conversation_history", []) - # Adiciona a mensagem como um novo Event do tipo agent new_event = Event( author="agent", content=Content(parts=[Part(text=message_content)]), @@ -750,7 +748,7 @@ class WorkflowAgent(BaseAgent): content=Content(parts=[Part(text=user_message)]), ) - # Se o histórico estiver vazio, adiciona a mensagem do usuário + # If the conversation history is empty, add the user message conversation_history = ctx.session.events or [] if not conversation_history or (len(conversation_history) == 0): conversation_history = [user_event] @@ -768,16 +766,17 @@ class WorkflowAgent(BaseAgent): print("\n🚀 Starting workflow execution:") print(f"Initial content: {user_message[:100]}...") - # Execute the graph with a recursion limit to avoid infinite loops - result = await graph.ainvoke(initial_state, {"recursion_limit": 20}) + sent_events = 0 # Count of events already sent - # 6. Process and return the result - final_content = result.get("content", []) - print(f"\n✅ FINAL RESULT: {final_content[:100]}...") - - for content in final_content: - if content.author != "user": - yield content + async for state in graph.astream(initial_state, {"recursion_limit": 20}): + # The state can be a dict with the node name as a key + for node_state in state.values(): + content = node_state.get("content", []) + # Only send new events + for event in content[sent_events:]: + if event.author != "user": + yield event + sent_events = len(content) # Execute sub-agents for sub_agent in self.sub_agents: diff --git a/src/utils/otel.py b/src/utils/otel.py new file mode 100644 index 00000000..b7776f55 --- /dev/null +++ b/src/utils/otel.py @@ -0,0 +1,41 @@ +import os +import base64 +from src.config.settings import settings + +from opentelemetry import trace +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter + +_otlp_initialized = False + + +def init_otel(): + global _otlp_initialized + if _otlp_initialized: + return + if not ( + settings.LANGFUSE_PUBLIC_KEY + and settings.LANGFUSE_SECRET_KEY + and settings.OTEL_EXPORTER_OTLP_ENDPOINT + ): + return + + langfuse_auth = base64.b64encode( + f"{settings.LANGFUSE_PUBLIC_KEY}:{settings.LANGFUSE_SECRET_KEY}".encode() + ).decode() + os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = settings.OTEL_EXPORTER_OTLP_ENDPOINT + os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"Authorization=Basic {langfuse_auth}" + + provider = TracerProvider( + resource=Resource.create({"service.name": "evo_ai_agent"}) + ) + exporter = OTLPSpanExporter() + provider.add_span_processor(BatchSpanProcessor(exporter)) + trace.set_tracer_provider(provider) + _otlp_initialized = True + + +def get_tracer(name: str = "evo_ai_agent"): + return trace.get_tracer(name)