10 Commits
0.0.1 ... 0.0.4

Author SHA1 Message Date
Davidson Gomes
b21e355ce1 Merge branch 'release/0.0.4' 2025-05-12 17:51:36 -03:00
Davidson Gomes
0c69df107e refactor(a2a_task_manager): improve JSON handling and error logging for chunk processing 2025-05-12 17:51:27 -03:00
Davidson Gomes
4800807783 docs: add Langfuse integration section to README for tracing and observability 2025-05-12 17:29:13 -03:00
Davidson Gomes
71ecc8f35b Merge tag '0.0.3' into develop
v
2025-05-12 17:20:31 -03:00
Davidson Gomes
782c2aceff Merge branch 'release/0.0.3' 2025-05-12 17:20:29 -03:00
Davidson Gomes
ff27fb157c Merge tag '0.0.2' into develop
v
2025-05-12 17:12:51 -03:00
Davidson Gomes
bafbd494ed Merge branch 'release/0.0.2' 2025-05-12 17:12:48 -03:00
Davidson Gomes
ab1f528a34 feat(otel): integrate OpenTelemetry for Langfuse monitoring and add configuration settings 2025-05-12 17:12:39 -03:00
Davidson Gomes
f319b89806 refactor(chat_routes, agent_runner, workflow_agent): improve JSON handling and clean up code 2025-05-12 16:26:06 -03:00
Davidson Gomes
a1f6b828d5 Merge tag '0.0.1' into develop
v
2025-05-12 13:20:30 -03:00
10 changed files with 379 additions and 232 deletions

View File

@@ -39,6 +39,10 @@ SENDGRID_API_KEY="your-sendgrid-api-key"
EMAIL_FROM="noreply@yourdomain.com" EMAIL_FROM="noreply@yourdomain.com"
APP_URL="https://yourdomain.com" APP_URL="https://yourdomain.com"
LANGFUSE_PUBLIC_KEY="your-langfuse-public-key"
LANGFUSE_SECRET_KEY="your-langfuse-secret-key"
OTEL_EXPORTER_OTLP_ENDPOINT="https://cloud.langfuse.com/api/public/otel"
# Server settings # Server settings
HOST="0.0.0.0" HOST="0.0.0.0"
PORT=8000 PORT=8000

View File

@@ -301,6 +301,51 @@ Authorization: Bearer your-token-jwt
- **LangGraph**: Framework for building stateful, multi-agent workflows - **LangGraph**: Framework for building stateful, multi-agent workflows
- **ReactFlow**: Library for building node-based visual workflows - **ReactFlow**: Library for building node-based visual workflows
## 📊 Langfuse Integration (Tracing & Observability)
Evo AI platform natively supports integration with [Langfuse](https://langfuse.com/) for detailed tracing of agent executions, prompts, model responses, and tool calls, using the OpenTelemetry (OTel) standard.
### Why use Langfuse?
- Visual dashboard for agent traces, prompts, and executions
- Detailed analytics for debugging and evaluating LLM apps
- Easy integration with Google ADK and other frameworks
### How it works
- Every agent execution (including streaming) is automatically traced via OpenTelemetry spans
- Data is sent to Langfuse, where it can be visualized and analyzed
### How to configure
1. **Set environment variables in your `.env`:**
```env
LANGFUSE_PUBLIC_KEY="pk-lf-..." # Your Langfuse public key
LANGFUSE_SECRET_KEY="sk-lf-..." # Your Langfuse secret key
OTEL_EXPORTER_OTLP_ENDPOINT="https://cloud.langfuse.com/api/public/otel" # (or us.cloud... for US region)
```
> **Attention:** Do not swap the keys! `pk-...` is public, `sk-...` is secret.
2. **Automatic initialization**
- Tracing is automatically initialized when the application starts (`src/main.py`).
- Agent execution functions are already instrumented with spans (`src/services/agent_runner.py`).
3. **View in the Langfuse dashboard**
- Access your Langfuse dashboard to see real-time traces.
### Troubleshooting
- **401 Error (Invalid credentials):**
- Check if the keys are correct and not swapped in your `.env`.
- Make sure the endpoint matches your region (EU or US).
- **Context error in async generator:**
- The code is already adjusted to avoid OpenTelemetry context issues in async generators.
- **Questions about integration:**
- See the [official Langfuse documentation - Google ADK](https://langfuse.com/docs/integrations/google-adk)
## 🤖 Agent 2 Agent (A2A) Protocol Support ## 🤖 Agent 2 Agent (A2A) Protocol Support
Evo AI implements the Google's Agent 2 Agent (A2A) protocol, enabling seamless communication and interoperability between AI agents. This implementation includes: Evo AI implements the Google's Agent 2 Agent (A2A) protocol, enabling seamless communication and interoperability between AI agents. This implementation includes:

View File

@@ -49,6 +49,8 @@ dependencies = [
"jwcrypto==1.5.6", "jwcrypto==1.5.6",
"pyjwt[crypto]==2.9.0", "pyjwt[crypto]==2.9.0",
"langgraph==0.4.1", "langgraph==0.4.1",
"opentelemetry-sdk==1.33.0",
"opentelemetry-exporter-otlp==1.33.0",
] ]
[project.optional-dependencies] [project.optional-dependencies]

View File

@@ -102,9 +102,8 @@ async def websocket_chat(
memory_service=memory_service, memory_service=memory_service,
db=db, db=db,
): ):
# Send each chunk as a JSON message
await websocket.send_json( await websocket.send_json(
{"message": chunk, "turn_complete": False} {"message": json.loads(chunk), "turn_complete": False}
) )
# Send signal of complete turn # Send signal of complete turn

View File

@@ -84,6 +84,11 @@ class Settings(BaseSettings):
DEMO_PASSWORD: str = os.getenv("DEMO_PASSWORD", "demo123") DEMO_PASSWORD: str = os.getenv("DEMO_PASSWORD", "demo123")
DEMO_CLIENT_NAME: str = os.getenv("DEMO_CLIENT_NAME", "Demo Client") DEMO_CLIENT_NAME: str = os.getenv("DEMO_CLIENT_NAME", "Demo Client")
# Langfuse / OpenTelemetry settings
LANGFUSE_PUBLIC_KEY: str = os.getenv("LANGFUSE_PUBLIC_KEY", "")
LANGFUSE_SECRET_KEY: str = os.getenv("LANGFUSE_SECRET_KEY", "")
OTEL_EXPORTER_OTLP_ENDPOINT: str = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "")
class Config: class Config:
env_file = ".env" env_file = ".env"
env_file_encoding = "utf-8" env_file_encoding = "utf-8"

View File

@@ -7,6 +7,7 @@ from fastapi.staticfiles import StaticFiles
from src.config.database import engine, Base from src.config.database import engine, Base
from src.config.settings import settings from src.config.settings import settings
from src.utils.logger import setup_logger from src.utils.logger import setup_logger
from src.utils.otel import init_otel
# Necessary for other modules # Necessary for other modules
from src.services.service_providers import session_service # noqa: F401 from src.services.service_providers import session_service # noqa: F401
@@ -85,6 +86,9 @@ app.include_router(session_router, prefix=API_PREFIX)
app.include_router(agent_router, prefix=API_PREFIX) app.include_router(agent_router, prefix=API_PREFIX)
app.include_router(a2a_router, prefix=API_PREFIX) app.include_router(a2a_router, prefix=API_PREFIX)
# Inicializa o OpenTelemetry para Langfuse
init_otel()
@app.get("/") @app.get("/")
def read_root(): def read_root():

View File

@@ -3,6 +3,7 @@ import asyncio
from collections.abc import AsyncIterable from collections.abc import AsyncIterable
from typing import Dict, Optional from typing import Dict, Optional
from uuid import UUID from uuid import UUID
import json
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
@@ -306,7 +307,7 @@ class A2ATaskManager:
external_id = task_params.sessionId external_id = task_params.sessionId
full_response = "" full_response = ""
# We use the same streaming function used in the WebSocket # Use the same pattern as chat_routes.py: deserialize each chunk
async for chunk in run_agent_stream( async for chunk in run_agent_stream(
agent_id=str(agent.id), agent_id=str(agent.id),
external_id=external_id, external_id=external_id,
@@ -316,9 +317,14 @@ class A2ATaskManager:
memory_service=memory_service, memory_service=memory_service,
db=self.db, db=self.db,
): ):
# Send incremental progress updates try:
update_text_part = {"type": "text", "text": chunk} chunk_data = json.loads(chunk)
update_message = Message(role="agent", parts=[update_text_part]) except Exception as e:
logger.warning(f"Invalid chunk received: {chunk} - {e}")
continue
# The chunk_data must be a dict with 'type' and 'text' (or other expected format)
update_message = Message(role="agent", parts=[chunk_data])
# Update the task with each intermediate message # Update the task with each intermediate message
await self.update_store( await self.update_store(
@@ -337,24 +343,24 @@ class A2ATaskManager:
final=False, final=False,
), ),
) )
full_response += chunk # If it's text, accumulate for the final response
if chunk_data.get("type") == "text":
full_response += chunk_data.get("text", "")
# Determine the task state # Determine the final state of the task
task_state = ( task_state = (
TaskState.INPUT_REQUIRED TaskState.INPUT_REQUIRED
if "MISSING_INFO:" in full_response if "MISSING_INFO:" in full_response
else TaskState.COMPLETED else TaskState.COMPLETED
) )
# Create the final response part # Create the final response
final_text_part = {"type": "text", "text": full_response} final_text_part = {"type": "text", "text": full_response}
parts = [final_text_part] parts = [final_text_part]
final_message = Message(role="agent", parts=parts) final_message = Message(role="agent", parts=parts)
# Create the final artifact from the final response
final_artifact = Artifact(parts=parts, index=0) final_artifact = Artifact(parts=parts, index=0)
# Update the task in the store with the final response # Update the task with the final response
await self.update_store( await self.update_store(
task_params.id, task_params.id,
TaskStatus(state=task_state, message=final_message), TaskStatus(state=task_state, message=final_message),

View File

@@ -10,6 +10,9 @@ from src.services.agent_builder import AgentBuilder
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from typing import Optional, AsyncGenerator from typing import Optional, AsyncGenerator
import asyncio import asyncio
import json
from src.utils.otel import get_tracer
from opentelemetry import trace
logger = setup_logger(__name__) logger = setup_logger(__name__)
@@ -25,159 +28,184 @@ async def run_agent(
session_id: Optional[str] = None, session_id: Optional[str] = None,
timeout: float = 60.0, timeout: float = 60.0,
): ):
exit_stack = None tracer = get_tracer()
try: with tracer.start_as_current_span(
logger.info( "run_agent",
f"Starting execution of agent {agent_id} for external_id {external_id}" attributes={
) "agent_id": agent_id,
logger.info(f"Received message: {message}") "external_id": external_id,
"session_id": session_id or f"{external_id}_{agent_id}",
"message": message,
},
):
exit_stack = None
try:
logger.info(
f"Starting execution of agent {agent_id} for external_id {external_id}"
)
logger.info(f"Received message: {message}")
get_root_agent = get_agent(db, agent_id) get_root_agent = get_agent(db, agent_id)
logger.info( logger.info(
f"Root agent found: {get_root_agent.name} (type: {get_root_agent.type})" f"Root agent found: {get_root_agent.name} (type: {get_root_agent.type})"
) )
if get_root_agent is None: if get_root_agent is None:
raise AgentNotFoundError(f"Agent with ID {agent_id} not found") raise AgentNotFoundError(f"Agent with ID {agent_id} not found")
# Using the AgentBuilder to create the agent # Using the AgentBuilder to create the agent
agent_builder = AgentBuilder(db) agent_builder = AgentBuilder(db)
root_agent, exit_stack = await agent_builder.build_agent(get_root_agent) root_agent, exit_stack = await agent_builder.build_agent(get_root_agent)
logger.info("Configuring Runner") logger.info("Configuring Runner")
agent_runner = Runner( agent_runner = Runner(
agent=root_agent, agent=root_agent,
app_name=agent_id, app_name=agent_id,
session_service=session_service, session_service=session_service,
artifact_service=artifacts_service, artifact_service=artifacts_service,
memory_service=memory_service, memory_service=memory_service,
) )
adk_session_id = external_id + "_" + agent_id adk_session_id = external_id + "_" + agent_id
if session_id is None: if session_id is None:
session_id = adk_session_id session_id = adk_session_id
logger.info(f"Searching session for external_id {external_id}") logger.info(f"Searching session for external_id {external_id}")
session = session_service.get_session( session = session_service.get_session(
app_name=agent_id,
user_id=external_id,
session_id=adk_session_id,
)
if session is None:
logger.info(f"Creating new session for external_id {external_id}")
session = session_service.create_session(
app_name=agent_id, app_name=agent_id,
user_id=external_id, user_id=external_id,
session_id=adk_session_id, session_id=adk_session_id,
) )
content = Content(role="user", parts=[Part(text=message)]) if session is None:
logger.info("Starting agent execution") logger.info(f"Creating new session for external_id {external_id}")
session = session_service.create_session(
app_name=agent_id,
user_id=external_id,
session_id=adk_session_id,
)
final_response_text = "No final response captured." content = Content(role="user", parts=[Part(text=message)])
try: logger.info("Starting agent execution")
response_queue = asyncio.Queue()
execution_completed = asyncio.Event()
async def process_events():
try:
events_async = agent_runner.run_async(
user_id=external_id,
session_id=adk_session_id,
new_message=content,
)
last_response = None
all_responses = []
async for event in events_async:
if (
event.content
and event.content.parts
and event.content.parts[0].text
):
current_text = event.content.parts[0].text
last_response = current_text
all_responses.append(current_text)
if event.actions and event.actions.escalate:
escalate_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
await response_queue.put(escalate_text)
execution_completed.set()
return
if last_response:
await response_queue.put(last_response)
else:
await response_queue.put("Finished without specific response")
execution_completed.set()
except Exception as e:
logger.error(f"Error in process_events: {str(e)}")
await response_queue.put(f"Error: {str(e)}")
execution_completed.set()
task = asyncio.create_task(process_events())
final_response_text = "No final response captured."
try: try:
wait_task = asyncio.create_task(execution_completed.wait()) response_queue = asyncio.Queue()
done, pending = await asyncio.wait({wait_task}, timeout=timeout) execution_completed = asyncio.Event()
for p in pending: async def process_events():
p.cancel() try:
events_async = agent_runner.run_async(
user_id=external_id,
session_id=adk_session_id,
new_message=content,
)
if not execution_completed.is_set(): last_response = None
logger.warning(f"Agent execution timed out after {timeout} seconds") all_responses = []
await response_queue.put(
"The response took too long and was interrupted."
)
final_response_text = await response_queue.get() async for event in events_async:
if (
event.content
and event.content.parts
and event.content.parts[0].text
):
current_text = event.content.parts[0].text
last_response = current_text
all_responses.append(current_text)
if event.actions and event.actions.escalate:
escalate_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
await response_queue.put(escalate_text)
execution_completed.set()
return
if last_response:
await response_queue.put(last_response)
else:
await response_queue.put(
"Finished without specific response"
)
execution_completed.set()
except Exception as e:
logger.error(f"Error in process_events: {str(e)}")
await response_queue.put(f"Error: {str(e)}")
execution_completed.set()
task = asyncio.create_task(process_events())
try:
wait_task = asyncio.create_task(execution_completed.wait())
done, pending = await asyncio.wait({wait_task}, timeout=timeout)
for p in pending:
p.cancel()
if not execution_completed.is_set():
logger.warning(
f"Agent execution timed out after {timeout} seconds"
)
await response_queue.put(
"The response took too long and was interrupted."
)
final_response_text = await response_queue.get()
except Exception as e:
logger.error(f"Error waiting for response: {str(e)}")
final_response_text = f"Error processing response: {str(e)}"
# Add the session to memory after completion
completed_session = session_service.get_session(
app_name=agent_id,
user_id=external_id,
session_id=adk_session_id,
)
memory_service.add_session_to_memory(completed_session)
# Cancel the processing task if it is still running
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
logger.info("Task cancelled successfully")
except Exception as e:
logger.error(f"Error cancelling task: {str(e)}")
except Exception as e: except Exception as e:
logger.error(f"Error waiting for response: {str(e)}") logger.error(f"Error processing request: {str(e)}")
final_response_text = f"Error processing response: {str(e)}" raise e
# Add the session to memory after completion logger.info("Agent execution completed successfully")
completed_session = session_service.get_session( return final_response_text
app_name=agent_id, except AgentNotFoundError as e:
user_id=external_id,
session_id=adk_session_id,
)
memory_service.add_session_to_memory(completed_session)
# Cancel the processing task if it is still running
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
logger.info("Task cancelled successfully")
except Exception as e:
logger.error(f"Error cancelling task: {str(e)}")
except Exception as e:
logger.error(f"Error processing request: {str(e)}") logger.error(f"Error processing request: {str(e)}")
raise e raise e
except Exception as e:
logger.error(f"Internal error processing request: {str(e)}", exc_info=True)
raise InternalServerError(str(e))
finally:
# Clean up MCP connection - MUST be executed in the same task
if exit_stack:
logger.info("Closing MCP server connection...")
try:
await exit_stack.aclose()
except Exception as e:
logger.error(f"Error closing MCP connection: {e}")
# Do not raise the exception to not obscure the original error
logger.info("Agent execution completed successfully")
return final_response_text def convert_sets(obj):
except AgentNotFoundError as e: if isinstance(obj, set):
logger.error(f"Error processing request: {str(e)}") return list(obj)
raise e elif isinstance(obj, dict):
except Exception as e: return {k: convert_sets(v) for k, v in obj.items()}
logger.error(f"Internal error processing request: {str(e)}", exc_info=True) elif isinstance(obj, list):
raise InternalServerError(str(e)) return [convert_sets(i) for i in obj]
finally: else:
# Clean up MCP connection - MUST be executed in the same task return obj
if exit_stack:
logger.info("Closing MCP server connection...")
try:
await exit_stack.aclose()
except Exception as e:
logger.error(f"Error closing MCP connection: {e}")
# Do not raise the exception to not obscure the original error
async def run_agent_stream( async def run_agent_stream(
@@ -190,91 +218,105 @@ async def run_agent_stream(
db: Session, db: Session,
session_id: Optional[str] = None, session_id: Optional[str] = None,
) -> AsyncGenerator[str, None]: ) -> AsyncGenerator[str, None]:
tracer = get_tracer()
span = tracer.start_span(
"run_agent_stream",
attributes={
"agent_id": agent_id,
"external_id": external_id,
"session_id": session_id or f"{external_id}_{agent_id}",
"message": message,
},
)
try: try:
logger.info( with trace.use_span(span, end_on_exit=True):
f"Starting streaming execution of agent {agent_id} for external_id {external_id}" try:
) logger.info(
logger.info(f"Received message: {message}") f"Starting streaming execution of agent {agent_id} for external_id {external_id}"
)
logger.info(f"Received message: {message}")
get_root_agent = get_agent(db, agent_id) get_root_agent = get_agent(db, agent_id)
logger.info( logger.info(
f"Root agent found: {get_root_agent.name} (type: {get_root_agent.type})" f"Root agent found: {get_root_agent.name} (type: {get_root_agent.type})"
) )
if get_root_agent is None: if get_root_agent is None:
raise AgentNotFoundError(f"Agent with ID {agent_id} not found") raise AgentNotFoundError(f"Agent with ID {agent_id} not found")
# Using the AgentBuilder to create the agent # Using the AgentBuilder to create the agent
agent_builder = AgentBuilder(db) agent_builder = AgentBuilder(db)
root_agent, exit_stack = await agent_builder.build_agent(get_root_agent) root_agent, exit_stack = await agent_builder.build_agent(get_root_agent)
logger.info("Configuring Runner") logger.info("Configuring Runner")
agent_runner = Runner( agent_runner = Runner(
agent=root_agent, agent=root_agent,
app_name=agent_id, app_name=agent_id,
session_service=session_service, session_service=session_service,
artifact_service=artifacts_service, artifact_service=artifacts_service,
memory_service=memory_service, memory_service=memory_service,
) )
adk_session_id = external_id + "_" + agent_id adk_session_id = external_id + "_" + agent_id
if session_id is None: if session_id is None:
session_id = adk_session_id session_id = adk_session_id
logger.info(f"Searching session for external_id {external_id}") logger.info(f"Searching session for external_id {external_id}")
session = session_service.get_session( session = session_service.get_session(
app_name=agent_id, app_name=agent_id,
user_id=external_id, user_id=external_id,
session_id=adk_session_id, session_id=adk_session_id,
) )
if session is None: if session is None:
logger.info(f"Creating new session for external_id {external_id}") logger.info(f"Creating new session for external_id {external_id}")
session = session_service.create_session( session = session_service.create_session(
app_name=agent_id, app_name=agent_id,
user_id=external_id, user_id=external_id,
session_id=adk_session_id, session_id=adk_session_id,
) )
content = Content(role="user", parts=[Part(text=message)]) content = Content(role="user", parts=[Part(text=message)])
logger.info("Starting agent streaming execution") logger.info("Starting agent streaming execution")
try:
events_async = agent_runner.run_async(
user_id=external_id,
session_id=adk_session_id,
new_message=content,
)
async for event in events_async:
if event.content and event.content.parts:
text = event.content.parts[0].text
if text:
yield text
await asyncio.sleep(0) # Allow other tasks to run
completed_session = session_service.get_session(
app_name=agent_id,
user_id=external_id,
session_id=adk_session_id,
)
memory_service.add_session_to_memory(completed_session)
except Exception as e:
logger.error(f"Error processing request: {str(e)}")
raise e
finally:
# Clean up MCP connection
if exit_stack:
logger.info("Closing MCP server connection...")
try: try:
await exit_stack.aclose() events_async = agent_runner.run_async(
except Exception as e: user_id=external_id,
logger.error(f"Error closing MCP connection: {e}") session_id=adk_session_id,
new_message=content,
)
logger.info("Agent streaming execution completed successfully") async for event in events_async:
except AgentNotFoundError as e: event_dict = event.dict()
logger.error(f"Error processing request: {str(e)}") event_dict = convert_sets(event_dict)
raise e yield json.dumps(event_dict)
except Exception as e:
logger.error(f"Internal error processing request: {str(e)}", exc_info=True) completed_session = session_service.get_session(
raise InternalServerError(str(e)) app_name=agent_id,
user_id=external_id,
session_id=adk_session_id,
)
memory_service.add_session_to_memory(completed_session)
except Exception as e:
logger.error(f"Error processing request: {str(e)}")
raise e
finally:
# Clean up MCP connection
if exit_stack:
logger.info("Closing MCP server connection...")
try:
await exit_stack.aclose()
except Exception as e:
logger.error(f"Error closing MCP connection: {e}")
logger.info("Agent streaming execution completed successfully")
except AgentNotFoundError as e:
logger.error(f"Error processing request: {str(e)}")
raise e
except Exception as e:
logger.error(
f"Internal error processing request: {str(e)}", exc_info=True
)
raise InternalServerError(str(e))
finally:
span.end()

View File

@@ -304,7 +304,6 @@ class WorkflowAgent(BaseAgent):
"session_id": session_id, "session_id": session_id,
} }
# Função para message-node
async def message_node_function( async def message_node_function(
state: State, node_id: str, node_data: Dict[str, Any] state: State, node_id: str, node_data: Dict[str, Any]
) -> AsyncGenerator[State, None]: ) -> AsyncGenerator[State, None]:
@@ -318,7 +317,6 @@ class WorkflowAgent(BaseAgent):
session_id = state.get("session_id", "") session_id = state.get("session_id", "")
conversation_history = state.get("conversation_history", []) conversation_history = state.get("conversation_history", [])
# Adiciona a mensagem como um novo Event do tipo agent
new_event = Event( new_event = Event(
author="agent", author="agent",
content=Content(parts=[Part(text=message_content)]), content=Content(parts=[Part(text=message_content)]),
@@ -750,7 +748,7 @@ class WorkflowAgent(BaseAgent):
content=Content(parts=[Part(text=user_message)]), content=Content(parts=[Part(text=user_message)]),
) )
# Se o histórico estiver vazio, adiciona a mensagem do usuário # If the conversation history is empty, add the user message
conversation_history = ctx.session.events or [] conversation_history = ctx.session.events or []
if not conversation_history or (len(conversation_history) == 0): if not conversation_history or (len(conversation_history) == 0):
conversation_history = [user_event] conversation_history = [user_event]
@@ -768,16 +766,17 @@ class WorkflowAgent(BaseAgent):
print("\n🚀 Starting workflow execution:") print("\n🚀 Starting workflow execution:")
print(f"Initial content: {user_message[:100]}...") print(f"Initial content: {user_message[:100]}...")
# Execute the graph with a recursion limit to avoid infinite loops sent_events = 0 # Count of events already sent
result = await graph.ainvoke(initial_state, {"recursion_limit": 20})
# 6. Process and return the result async for state in graph.astream(initial_state, {"recursion_limit": 20}):
final_content = result.get("content", []) # The state can be a dict with the node name as a key
print(f"\n✅ FINAL RESULT: {final_content[:100]}...") for node_state in state.values():
content = node_state.get("content", [])
for content in final_content: # Only send new events
if content.author != "user": for event in content[sent_events:]:
yield content if event.author != "user":
yield event
sent_events = len(content)
# Execute sub-agents # Execute sub-agents
for sub_agent in self.sub_agents: for sub_agent in self.sub_agents:

41
src/utils/otel.py Normal file
View File

@@ -0,0 +1,41 @@
import os
import base64
from src.config.settings import settings
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
_otlp_initialized = False
def init_otel():
global _otlp_initialized
if _otlp_initialized:
return
if not (
settings.LANGFUSE_PUBLIC_KEY
and settings.LANGFUSE_SECRET_KEY
and settings.OTEL_EXPORTER_OTLP_ENDPOINT
):
return
langfuse_auth = base64.b64encode(
f"{settings.LANGFUSE_PUBLIC_KEY}:{settings.LANGFUSE_SECRET_KEY}".encode()
).decode()
os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = settings.OTEL_EXPORTER_OTLP_ENDPOINT
os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"Authorization=Basic {langfuse_auth}"
provider = TracerProvider(
resource=Resource.create({"service.name": "evo_ai_agent"})
)
exporter = OTLPSpanExporter()
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)
_otlp_initialized = True
def get_tracer(name: str = "evo_ai_agent"):
return trace.get_tracer(name)