diff --git a/README.md b/README.md index 4c9a3008..8b2e485c 100644 --- a/README.md +++ b/README.md @@ -301,6 +301,51 @@ Authorization: Bearer your-token-jwt - **LangGraph**: Framework for building stateful, multi-agent workflows - **ReactFlow**: Library for building node-based visual workflows +## 📊 Langfuse Integration (Tracing & Observability) + +Evo AI platform natively supports integration with [Langfuse](https://langfuse.com/) for detailed tracing of agent executions, prompts, model responses, and tool calls, using the OpenTelemetry (OTel) standard. + +### Why use Langfuse? + +- Visual dashboard for agent traces, prompts, and executions +- Detailed analytics for debugging and evaluating LLM apps +- Easy integration with Google ADK and other frameworks + +### How it works + +- Every agent execution (including streaming) is automatically traced via OpenTelemetry spans +- Data is sent to Langfuse, where it can be visualized and analyzed + +### How to configure + +1. **Set environment variables in your `.env`:** + + ```env + LANGFUSE_PUBLIC_KEY="pk-lf-..." # Your Langfuse public key + LANGFUSE_SECRET_KEY="sk-lf-..." # Your Langfuse secret key + OTEL_EXPORTER_OTLP_ENDPOINT="https://cloud.langfuse.com/api/public/otel" # (or us.cloud... for US region) + ``` + + > **Attention:** Do not swap the keys! `pk-...` is public, `sk-...` is secret. + +2. **Automatic initialization** + + - Tracing is automatically initialized when the application starts (`src/main.py`). + - Agent execution functions are already instrumented with spans (`src/services/agent_runner.py`). + +3. **View in the Langfuse dashboard** + - Access your Langfuse dashboard to see real-time traces. + +### Troubleshooting + +- **401 Error (Invalid credentials):** + - Check if the keys are correct and not swapped in your `.env`. + - Make sure the endpoint matches your region (EU or US). +- **Context error in async generator:** + - The code is already adjusted to avoid OpenTelemetry context issues in async generators. +- **Questions about integration:** + - See the [official Langfuse documentation - Google ADK](https://langfuse.com/docs/integrations/google-adk) + ## 🤖 Agent 2 Agent (A2A) Protocol Support Evo AI implements the Google's Agent 2 Agent (A2A) protocol, enabling seamless communication and interoperability between AI agents. This implementation includes: diff --git a/src/services/a2a_task_manager.py b/src/services/a2a_task_manager.py index 6f74aef7..617171e6 100644 --- a/src/services/a2a_task_manager.py +++ b/src/services/a2a_task_manager.py @@ -3,6 +3,7 @@ import asyncio from collections.abc import AsyncIterable from typing import Dict, Optional from uuid import UUID +import json from sqlalchemy.orm import Session @@ -306,7 +307,7 @@ class A2ATaskManager: external_id = task_params.sessionId full_response = "" - # We use the same streaming function used in the WebSocket + # Use the same pattern as chat_routes.py: deserialize each chunk async for chunk in run_agent_stream( agent_id=str(agent.id), external_id=external_id, @@ -316,9 +317,14 @@ class A2ATaskManager: memory_service=memory_service, db=self.db, ): - # Send incremental progress updates - update_text_part = {"type": "text", "text": chunk} - update_message = Message(role="agent", parts=[update_text_part]) + try: + chunk_data = json.loads(chunk) + except Exception as e: + logger.warning(f"Invalid chunk received: {chunk} - {e}") + continue + + # The chunk_data must be a dict with 'type' and 'text' (or other expected format) + update_message = Message(role="agent", parts=[chunk_data]) # Update the task with each intermediate message await self.update_store( @@ -337,24 +343,24 @@ class A2ATaskManager: final=False, ), ) - full_response += chunk + # If it's text, accumulate for the final response + if chunk_data.get("type") == "text": + full_response += chunk_data.get("text", "") - # Determine the task state + # Determine the final state of the task task_state = ( TaskState.INPUT_REQUIRED if "MISSING_INFO:" in full_response else TaskState.COMPLETED ) - # Create the final response part + # Create the final response final_text_part = {"type": "text", "text": full_response} parts = [final_text_part] final_message = Message(role="agent", parts=parts) - - # Create the final artifact from the final response final_artifact = Artifact(parts=parts, index=0) - # Update the task in the store with the final response + # Update the task with the final response await self.update_store( task_params.id, TaskStatus(state=task_state, message=final_message),