Merge branch 'release/0.0.2'

This commit is contained in:
Davidson Gomes 2025-05-12 17:12:48 -03:00
commit bafbd494ed
8 changed files with 318 additions and 222 deletions

View File

@ -39,6 +39,10 @@ SENDGRID_API_KEY="your-sendgrid-api-key"
EMAIL_FROM="noreply@yourdomain.com"
APP_URL="https://yourdomain.com"
LANGFUSE_PUBLIC_KEY="your-langfuse-public-key"
LANGFUSE_SECRET_KEY="your-langfuse-secret-key"
OTEL_EXPORTER_OTLP_ENDPOINT="https://cloud.langfuse.com/api/public/otel"
# Server settings
HOST="0.0.0.0"
PORT=8000

View File

@ -49,6 +49,8 @@ dependencies = [
"jwcrypto==1.5.6",
"pyjwt[crypto]==2.9.0",
"langgraph==0.4.1",
"opentelemetry-sdk==1.33.0",
"opentelemetry-exporter-otlp==1.33.0",
]
[project.optional-dependencies]

View File

@ -102,9 +102,8 @@ async def websocket_chat(
memory_service=memory_service,
db=db,
):
# Send each chunk as a JSON message
await websocket.send_json(
{"message": chunk, "turn_complete": False}
{"message": json.loads(chunk), "turn_complete": False}
)
# Send signal of complete turn

View File

@ -84,6 +84,11 @@ class Settings(BaseSettings):
DEMO_PASSWORD: str = os.getenv("DEMO_PASSWORD", "demo123")
DEMO_CLIENT_NAME: str = os.getenv("DEMO_CLIENT_NAME", "Demo Client")
# Langfuse / OpenTelemetry settings
LANGFUSE_PUBLIC_KEY: str = os.getenv("LANGFUSE_PUBLIC_KEY", "")
LANGFUSE_SECRET_KEY: str = os.getenv("LANGFUSE_SECRET_KEY", "")
OTEL_EXPORTER_OTLP_ENDPOINT: str = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "")
class Config:
env_file = ".env"
env_file_encoding = "utf-8"

View File

@ -7,6 +7,7 @@ from fastapi.staticfiles import StaticFiles
from src.config.database import engine, Base
from src.config.settings import settings
from src.utils.logger import setup_logger
from src.utils.otel import init_otel
# Necessary for other modules
from src.services.service_providers import session_service # noqa: F401
@ -85,6 +86,9 @@ app.include_router(session_router, prefix=API_PREFIX)
app.include_router(agent_router, prefix=API_PREFIX)
app.include_router(a2a_router, prefix=API_PREFIX)
# Inicializa o OpenTelemetry para Langfuse
init_otel()
@app.get("/")
def read_root():

View File

@ -10,6 +10,9 @@ from src.services.agent_builder import AgentBuilder
from sqlalchemy.orm import Session
from typing import Optional, AsyncGenerator
import asyncio
import json
from src.utils.otel import get_tracer
from opentelemetry import trace
logger = setup_logger(__name__)
@ -25,159 +28,184 @@ async def run_agent(
session_id: Optional[str] = None,
timeout: float = 60.0,
):
exit_stack = None
try:
logger.info(
f"Starting execution of agent {agent_id} for external_id {external_id}"
)
logger.info(f"Received message: {message}")
tracer = get_tracer()
with tracer.start_as_current_span(
"run_agent",
attributes={
"agent_id": agent_id,
"external_id": external_id,
"session_id": session_id or f"{external_id}_{agent_id}",
"message": message,
},
):
exit_stack = None
try:
logger.info(
f"Starting execution of agent {agent_id} for external_id {external_id}"
)
logger.info(f"Received message: {message}")
get_root_agent = get_agent(db, agent_id)
logger.info(
f"Root agent found: {get_root_agent.name} (type: {get_root_agent.type})"
)
get_root_agent = get_agent(db, agent_id)
logger.info(
f"Root agent found: {get_root_agent.name} (type: {get_root_agent.type})"
)
if get_root_agent is None:
raise AgentNotFoundError(f"Agent with ID {agent_id} not found")
if get_root_agent is None:
raise AgentNotFoundError(f"Agent with ID {agent_id} not found")
# Using the AgentBuilder to create the agent
agent_builder = AgentBuilder(db)
root_agent, exit_stack = await agent_builder.build_agent(get_root_agent)
# Using the AgentBuilder to create the agent
agent_builder = AgentBuilder(db)
root_agent, exit_stack = await agent_builder.build_agent(get_root_agent)
logger.info("Configuring Runner")
agent_runner = Runner(
agent=root_agent,
app_name=agent_id,
session_service=session_service,
artifact_service=artifacts_service,
memory_service=memory_service,
)
adk_session_id = external_id + "_" + agent_id
if session_id is None:
session_id = adk_session_id
logger.info("Configuring Runner")
agent_runner = Runner(
agent=root_agent,
app_name=agent_id,
session_service=session_service,
artifact_service=artifacts_service,
memory_service=memory_service,
)
adk_session_id = external_id + "_" + agent_id
if session_id is None:
session_id = adk_session_id
logger.info(f"Searching session for external_id {external_id}")
session = session_service.get_session(
app_name=agent_id,
user_id=external_id,
session_id=adk_session_id,
)
if session is None:
logger.info(f"Creating new session for external_id {external_id}")
session = session_service.create_session(
logger.info(f"Searching session for external_id {external_id}")
session = session_service.get_session(
app_name=agent_id,
user_id=external_id,
session_id=adk_session_id,
)
content = Content(role="user", parts=[Part(text=message)])
logger.info("Starting agent execution")
if session is None:
logger.info(f"Creating new session for external_id {external_id}")
session = session_service.create_session(
app_name=agent_id,
user_id=external_id,
session_id=adk_session_id,
)
final_response_text = "No final response captured."
try:
response_queue = asyncio.Queue()
execution_completed = asyncio.Event()
async def process_events():
try:
events_async = agent_runner.run_async(
user_id=external_id,
session_id=adk_session_id,
new_message=content,
)
last_response = None
all_responses = []
async for event in events_async:
if (
event.content
and event.content.parts
and event.content.parts[0].text
):
current_text = event.content.parts[0].text
last_response = current_text
all_responses.append(current_text)
if event.actions and event.actions.escalate:
escalate_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
await response_queue.put(escalate_text)
execution_completed.set()
return
if last_response:
await response_queue.put(last_response)
else:
await response_queue.put("Finished without specific response")
execution_completed.set()
except Exception as e:
logger.error(f"Error in process_events: {str(e)}")
await response_queue.put(f"Error: {str(e)}")
execution_completed.set()
task = asyncio.create_task(process_events())
content = Content(role="user", parts=[Part(text=message)])
logger.info("Starting agent execution")
final_response_text = "No final response captured."
try:
wait_task = asyncio.create_task(execution_completed.wait())
done, pending = await asyncio.wait({wait_task}, timeout=timeout)
response_queue = asyncio.Queue()
execution_completed = asyncio.Event()
for p in pending:
p.cancel()
async def process_events():
try:
events_async = agent_runner.run_async(
user_id=external_id,
session_id=adk_session_id,
new_message=content,
)
if not execution_completed.is_set():
logger.warning(f"Agent execution timed out after {timeout} seconds")
await response_queue.put(
"The response took too long and was interrupted."
)
last_response = None
all_responses = []
final_response_text = await response_queue.get()
async for event in events_async:
if (
event.content
and event.content.parts
and event.content.parts[0].text
):
current_text = event.content.parts[0].text
last_response = current_text
all_responses.append(current_text)
if event.actions and event.actions.escalate:
escalate_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
await response_queue.put(escalate_text)
execution_completed.set()
return
if last_response:
await response_queue.put(last_response)
else:
await response_queue.put(
"Finished without specific response"
)
execution_completed.set()
except Exception as e:
logger.error(f"Error in process_events: {str(e)}")
await response_queue.put(f"Error: {str(e)}")
execution_completed.set()
task = asyncio.create_task(process_events())
try:
wait_task = asyncio.create_task(execution_completed.wait())
done, pending = await asyncio.wait({wait_task}, timeout=timeout)
for p in pending:
p.cancel()
if not execution_completed.is_set():
logger.warning(
f"Agent execution timed out after {timeout} seconds"
)
await response_queue.put(
"The response took too long and was interrupted."
)
final_response_text = await response_queue.get()
except Exception as e:
logger.error(f"Error waiting for response: {str(e)}")
final_response_text = f"Error processing response: {str(e)}"
# Add the session to memory after completion
completed_session = session_service.get_session(
app_name=agent_id,
user_id=external_id,
session_id=adk_session_id,
)
memory_service.add_session_to_memory(completed_session)
# Cancel the processing task if it is still running
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
logger.info("Task cancelled successfully")
except Exception as e:
logger.error(f"Error cancelling task: {str(e)}")
except Exception as e:
logger.error(f"Error waiting for response: {str(e)}")
final_response_text = f"Error processing response: {str(e)}"
logger.error(f"Error processing request: {str(e)}")
raise e
# Add the session to memory after completion
completed_session = session_service.get_session(
app_name=agent_id,
user_id=external_id,
session_id=adk_session_id,
)
memory_service.add_session_to_memory(completed_session)
# Cancel the processing task if it is still running
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
logger.info("Task cancelled successfully")
except Exception as e:
logger.error(f"Error cancelling task: {str(e)}")
except Exception as e:
logger.info("Agent execution completed successfully")
return final_response_text
except AgentNotFoundError as e:
logger.error(f"Error processing request: {str(e)}")
raise e
except Exception as e:
logger.error(f"Internal error processing request: {str(e)}", exc_info=True)
raise InternalServerError(str(e))
finally:
# Clean up MCP connection - MUST be executed in the same task
if exit_stack:
logger.info("Closing MCP server connection...")
try:
await exit_stack.aclose()
except Exception as e:
logger.error(f"Error closing MCP connection: {e}")
# Do not raise the exception to not obscure the original error
logger.info("Agent execution completed successfully")
return final_response_text
except AgentNotFoundError as e:
logger.error(f"Error processing request: {str(e)}")
raise e
except Exception as e:
logger.error(f"Internal error processing request: {str(e)}", exc_info=True)
raise InternalServerError(str(e))
finally:
# Clean up MCP connection - MUST be executed in the same task
if exit_stack:
logger.info("Closing MCP server connection...")
try:
await exit_stack.aclose()
except Exception as e:
logger.error(f"Error closing MCP connection: {e}")
# Do not raise the exception to not obscure the original error
def convert_sets(obj):
if isinstance(obj, set):
return list(obj)
elif isinstance(obj, dict):
return {k: convert_sets(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [convert_sets(i) for i in obj]
else:
return obj
async def run_agent_stream(
@ -190,91 +218,105 @@ async def run_agent_stream(
db: Session,
session_id: Optional[str] = None,
) -> AsyncGenerator[str, None]:
tracer = get_tracer()
span = tracer.start_span(
"run_agent_stream",
attributes={
"agent_id": agent_id,
"external_id": external_id,
"session_id": session_id or f"{external_id}_{agent_id}",
"message": message,
},
)
try:
logger.info(
f"Starting streaming execution of agent {agent_id} for external_id {external_id}"
)
logger.info(f"Received message: {message}")
with trace.use_span(span, end_on_exit=True):
try:
logger.info(
f"Starting streaming execution of agent {agent_id} for external_id {external_id}"
)
logger.info(f"Received message: {message}")
get_root_agent = get_agent(db, agent_id)
logger.info(
f"Root agent found: {get_root_agent.name} (type: {get_root_agent.type})"
)
get_root_agent = get_agent(db, agent_id)
logger.info(
f"Root agent found: {get_root_agent.name} (type: {get_root_agent.type})"
)
if get_root_agent is None:
raise AgentNotFoundError(f"Agent with ID {agent_id} not found")
if get_root_agent is None:
raise AgentNotFoundError(f"Agent with ID {agent_id} not found")
# Using the AgentBuilder to create the agent
agent_builder = AgentBuilder(db)
root_agent, exit_stack = await agent_builder.build_agent(get_root_agent)
# Using the AgentBuilder to create the agent
agent_builder = AgentBuilder(db)
root_agent, exit_stack = await agent_builder.build_agent(get_root_agent)
logger.info("Configuring Runner")
agent_runner = Runner(
agent=root_agent,
app_name=agent_id,
session_service=session_service,
artifact_service=artifacts_service,
memory_service=memory_service,
)
adk_session_id = external_id + "_" + agent_id
if session_id is None:
session_id = adk_session_id
logger.info("Configuring Runner")
agent_runner = Runner(
agent=root_agent,
app_name=agent_id,
session_service=session_service,
artifact_service=artifacts_service,
memory_service=memory_service,
)
adk_session_id = external_id + "_" + agent_id
if session_id is None:
session_id = adk_session_id
logger.info(f"Searching session for external_id {external_id}")
session = session_service.get_session(
app_name=agent_id,
user_id=external_id,
session_id=adk_session_id,
)
logger.info(f"Searching session for external_id {external_id}")
session = session_service.get_session(
app_name=agent_id,
user_id=external_id,
session_id=adk_session_id,
)
if session is None:
logger.info(f"Creating new session for external_id {external_id}")
session = session_service.create_session(
app_name=agent_id,
user_id=external_id,
session_id=adk_session_id,
)
if session is None:
logger.info(f"Creating new session for external_id {external_id}")
session = session_service.create_session(
app_name=agent_id,
user_id=external_id,
session_id=adk_session_id,
)
content = Content(role="user", parts=[Part(text=message)])
logger.info("Starting agent streaming execution")
content = Content(role="user", parts=[Part(text=message)])
logger.info("Starting agent streaming execution")
try:
events_async = agent_runner.run_async(
user_id=external_id,
session_id=adk_session_id,
new_message=content,
)
async for event in events_async:
if event.content and event.content.parts:
text = event.content.parts[0].text
if text:
yield text
await asyncio.sleep(0) # Allow other tasks to run
completed_session = session_service.get_session(
app_name=agent_id,
user_id=external_id,
session_id=adk_session_id,
)
memory_service.add_session_to_memory(completed_session)
except Exception as e:
logger.error(f"Error processing request: {str(e)}")
raise e
finally:
# Clean up MCP connection
if exit_stack:
logger.info("Closing MCP server connection...")
try:
await exit_stack.aclose()
except Exception as e:
logger.error(f"Error closing MCP connection: {e}")
events_async = agent_runner.run_async(
user_id=external_id,
session_id=adk_session_id,
new_message=content,
)
logger.info("Agent streaming execution completed successfully")
except AgentNotFoundError as e:
logger.error(f"Error processing request: {str(e)}")
raise e
except Exception as e:
logger.error(f"Internal error processing request: {str(e)}", exc_info=True)
raise InternalServerError(str(e))
async for event in events_async:
event_dict = event.dict()
event_dict = convert_sets(event_dict)
yield json.dumps(event_dict)
completed_session = session_service.get_session(
app_name=agent_id,
user_id=external_id,
session_id=adk_session_id,
)
memory_service.add_session_to_memory(completed_session)
except Exception as e:
logger.error(f"Error processing request: {str(e)}")
raise e
finally:
# Clean up MCP connection
if exit_stack:
logger.info("Closing MCP server connection...")
try:
await exit_stack.aclose()
except Exception as e:
logger.error(f"Error closing MCP connection: {e}")
logger.info("Agent streaming execution completed successfully")
except AgentNotFoundError as e:
logger.error(f"Error processing request: {str(e)}")
raise e
except Exception as e:
logger.error(
f"Internal error processing request: {str(e)}", exc_info=True
)
raise InternalServerError(str(e))
finally:
span.end()

View File

@ -304,7 +304,6 @@ class WorkflowAgent(BaseAgent):
"session_id": session_id,
}
# Função para message-node
async def message_node_function(
state: State, node_id: str, node_data: Dict[str, Any]
) -> AsyncGenerator[State, None]:
@ -318,7 +317,6 @@ class WorkflowAgent(BaseAgent):
session_id = state.get("session_id", "")
conversation_history = state.get("conversation_history", [])
# Adiciona a mensagem como um novo Event do tipo agent
new_event = Event(
author="agent",
content=Content(parts=[Part(text=message_content)]),
@ -750,7 +748,7 @@ class WorkflowAgent(BaseAgent):
content=Content(parts=[Part(text=user_message)]),
)
# Se o histórico estiver vazio, adiciona a mensagem do usuário
# If the conversation history is empty, add the user message
conversation_history = ctx.session.events or []
if not conversation_history or (len(conversation_history) == 0):
conversation_history = [user_event]
@ -768,16 +766,17 @@ class WorkflowAgent(BaseAgent):
print("\n🚀 Starting workflow execution:")
print(f"Initial content: {user_message[:100]}...")
# Execute the graph with a recursion limit to avoid infinite loops
result = await graph.ainvoke(initial_state, {"recursion_limit": 20})
sent_events = 0 # Count of events already sent
# 6. Process and return the result
final_content = result.get("content", [])
print(f"\n✅ FINAL RESULT: {final_content[:100]}...")
for content in final_content:
if content.author != "user":
yield content
async for state in graph.astream(initial_state, {"recursion_limit": 20}):
# The state can be a dict with the node name as a key
for node_state in state.values():
content = node_state.get("content", [])
# Only send new events
for event in content[sent_events:]:
if event.author != "user":
yield event
sent_events = len(content)
# Execute sub-agents
for sub_agent in self.sub_agents:

41
src/utils/otel.py Normal file
View File

@ -0,0 +1,41 @@
import os
import base64
from src.config.settings import settings
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
_otlp_initialized = False
def init_otel():
global _otlp_initialized
if _otlp_initialized:
return
if not (
settings.LANGFUSE_PUBLIC_KEY
and settings.LANGFUSE_SECRET_KEY
and settings.OTEL_EXPORTER_OTLP_ENDPOINT
):
return
langfuse_auth = base64.b64encode(
f"{settings.LANGFUSE_PUBLIC_KEY}:{settings.LANGFUSE_SECRET_KEY}".encode()
).decode()
os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = settings.OTEL_EXPORTER_OTLP_ENDPOINT
os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"Authorization=Basic {langfuse_auth}"
provider = TracerProvider(
resource=Resource.create({"service.name": "evo_ai_agent"})
)
exporter = OTLPSpanExporter()
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)
_otlp_initialized = True
def get_tracer(name: str = "evo_ai_agent"):
return trace.get_tracer(name)