feat(otel): integrate OpenTelemetry for Langfuse monitoring and add configuration settings
This commit is contained in:
@@ -11,6 +11,8 @@ from sqlalchemy.orm import Session
|
||||
from typing import Optional, AsyncGenerator
|
||||
import asyncio
|
||||
import json
|
||||
from src.utils.otel import get_tracer
|
||||
from opentelemetry import trace
|
||||
|
||||
logger = setup_logger(__name__)
|
||||
|
||||
@@ -26,159 +28,173 @@ async def run_agent(
|
||||
session_id: Optional[str] = None,
|
||||
timeout: float = 60.0,
|
||||
):
|
||||
exit_stack = None
|
||||
try:
|
||||
logger.info(
|
||||
f"Starting execution of agent {agent_id} for external_id {external_id}"
|
||||
)
|
||||
logger.info(f"Received message: {message}")
|
||||
tracer = get_tracer()
|
||||
with tracer.start_as_current_span(
|
||||
"run_agent",
|
||||
attributes={
|
||||
"agent_id": agent_id,
|
||||
"external_id": external_id,
|
||||
"session_id": session_id or f"{external_id}_{agent_id}",
|
||||
"message": message,
|
||||
},
|
||||
):
|
||||
exit_stack = None
|
||||
try:
|
||||
logger.info(
|
||||
f"Starting execution of agent {agent_id} for external_id {external_id}"
|
||||
)
|
||||
logger.info(f"Received message: {message}")
|
||||
|
||||
get_root_agent = get_agent(db, agent_id)
|
||||
logger.info(
|
||||
f"Root agent found: {get_root_agent.name} (type: {get_root_agent.type})"
|
||||
)
|
||||
get_root_agent = get_agent(db, agent_id)
|
||||
logger.info(
|
||||
f"Root agent found: {get_root_agent.name} (type: {get_root_agent.type})"
|
||||
)
|
||||
|
||||
if get_root_agent is None:
|
||||
raise AgentNotFoundError(f"Agent with ID {agent_id} not found")
|
||||
if get_root_agent is None:
|
||||
raise AgentNotFoundError(f"Agent with ID {agent_id} not found")
|
||||
|
||||
# Using the AgentBuilder to create the agent
|
||||
agent_builder = AgentBuilder(db)
|
||||
root_agent, exit_stack = await agent_builder.build_agent(get_root_agent)
|
||||
# Using the AgentBuilder to create the agent
|
||||
agent_builder = AgentBuilder(db)
|
||||
root_agent, exit_stack = await agent_builder.build_agent(get_root_agent)
|
||||
|
||||
logger.info("Configuring Runner")
|
||||
agent_runner = Runner(
|
||||
agent=root_agent,
|
||||
app_name=agent_id,
|
||||
session_service=session_service,
|
||||
artifact_service=artifacts_service,
|
||||
memory_service=memory_service,
|
||||
)
|
||||
adk_session_id = external_id + "_" + agent_id
|
||||
if session_id is None:
|
||||
session_id = adk_session_id
|
||||
logger.info("Configuring Runner")
|
||||
agent_runner = Runner(
|
||||
agent=root_agent,
|
||||
app_name=agent_id,
|
||||
session_service=session_service,
|
||||
artifact_service=artifacts_service,
|
||||
memory_service=memory_service,
|
||||
)
|
||||
adk_session_id = external_id + "_" + agent_id
|
||||
if session_id is None:
|
||||
session_id = adk_session_id
|
||||
|
||||
logger.info(f"Searching session for external_id {external_id}")
|
||||
session = session_service.get_session(
|
||||
app_name=agent_id,
|
||||
user_id=external_id,
|
||||
session_id=adk_session_id,
|
||||
)
|
||||
|
||||
if session is None:
|
||||
logger.info(f"Creating new session for external_id {external_id}")
|
||||
session = session_service.create_session(
|
||||
logger.info(f"Searching session for external_id {external_id}")
|
||||
session = session_service.get_session(
|
||||
app_name=agent_id,
|
||||
user_id=external_id,
|
||||
session_id=adk_session_id,
|
||||
)
|
||||
|
||||
content = Content(role="user", parts=[Part(text=message)])
|
||||
logger.info("Starting agent execution")
|
||||
if session is None:
|
||||
logger.info(f"Creating new session for external_id {external_id}")
|
||||
session = session_service.create_session(
|
||||
app_name=agent_id,
|
||||
user_id=external_id,
|
||||
session_id=adk_session_id,
|
||||
)
|
||||
|
||||
final_response_text = "No final response captured."
|
||||
try:
|
||||
response_queue = asyncio.Queue()
|
||||
execution_completed = asyncio.Event()
|
||||
|
||||
async def process_events():
|
||||
try:
|
||||
events_async = agent_runner.run_async(
|
||||
user_id=external_id,
|
||||
session_id=adk_session_id,
|
||||
new_message=content,
|
||||
)
|
||||
|
||||
last_response = None
|
||||
all_responses = []
|
||||
|
||||
async for event in events_async:
|
||||
if (
|
||||
event.content
|
||||
and event.content.parts
|
||||
and event.content.parts[0].text
|
||||
):
|
||||
current_text = event.content.parts[0].text
|
||||
last_response = current_text
|
||||
all_responses.append(current_text)
|
||||
|
||||
if event.actions and event.actions.escalate:
|
||||
escalate_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
|
||||
await response_queue.put(escalate_text)
|
||||
execution_completed.set()
|
||||
return
|
||||
|
||||
if last_response:
|
||||
await response_queue.put(last_response)
|
||||
else:
|
||||
await response_queue.put("Finished without specific response")
|
||||
|
||||
execution_completed.set()
|
||||
except Exception as e:
|
||||
logger.error(f"Error in process_events: {str(e)}")
|
||||
await response_queue.put(f"Error: {str(e)}")
|
||||
execution_completed.set()
|
||||
|
||||
task = asyncio.create_task(process_events())
|
||||
content = Content(role="user", parts=[Part(text=message)])
|
||||
logger.info("Starting agent execution")
|
||||
|
||||
final_response_text = "No final response captured."
|
||||
try:
|
||||
wait_task = asyncio.create_task(execution_completed.wait())
|
||||
done, pending = await asyncio.wait({wait_task}, timeout=timeout)
|
||||
response_queue = asyncio.Queue()
|
||||
execution_completed = asyncio.Event()
|
||||
|
||||
for p in pending:
|
||||
p.cancel()
|
||||
async def process_events():
|
||||
try:
|
||||
events_async = agent_runner.run_async(
|
||||
user_id=external_id,
|
||||
session_id=adk_session_id,
|
||||
new_message=content,
|
||||
)
|
||||
|
||||
if not execution_completed.is_set():
|
||||
logger.warning(f"Agent execution timed out after {timeout} seconds")
|
||||
await response_queue.put(
|
||||
"The response took too long and was interrupted."
|
||||
)
|
||||
last_response = None
|
||||
all_responses = []
|
||||
|
||||
final_response_text = await response_queue.get()
|
||||
async for event in events_async:
|
||||
if (
|
||||
event.content
|
||||
and event.content.parts
|
||||
and event.content.parts[0].text
|
||||
):
|
||||
current_text = event.content.parts[0].text
|
||||
last_response = current_text
|
||||
all_responses.append(current_text)
|
||||
|
||||
if event.actions and event.actions.escalate:
|
||||
escalate_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
|
||||
await response_queue.put(escalate_text)
|
||||
execution_completed.set()
|
||||
return
|
||||
|
||||
if last_response:
|
||||
await response_queue.put(last_response)
|
||||
else:
|
||||
await response_queue.put(
|
||||
"Finished without specific response"
|
||||
)
|
||||
|
||||
execution_completed.set()
|
||||
except Exception as e:
|
||||
logger.error(f"Error in process_events: {str(e)}")
|
||||
await response_queue.put(f"Error: {str(e)}")
|
||||
execution_completed.set()
|
||||
|
||||
task = asyncio.create_task(process_events())
|
||||
|
||||
try:
|
||||
wait_task = asyncio.create_task(execution_completed.wait())
|
||||
done, pending = await asyncio.wait({wait_task}, timeout=timeout)
|
||||
|
||||
for p in pending:
|
||||
p.cancel()
|
||||
|
||||
if not execution_completed.is_set():
|
||||
logger.warning(
|
||||
f"Agent execution timed out after {timeout} seconds"
|
||||
)
|
||||
await response_queue.put(
|
||||
"The response took too long and was interrupted."
|
||||
)
|
||||
|
||||
final_response_text = await response_queue.get()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error waiting for response: {str(e)}")
|
||||
final_response_text = f"Error processing response: {str(e)}"
|
||||
|
||||
# Add the session to memory after completion
|
||||
completed_session = session_service.get_session(
|
||||
app_name=agent_id,
|
||||
user_id=external_id,
|
||||
session_id=adk_session_id,
|
||||
)
|
||||
|
||||
memory_service.add_session_to_memory(completed_session)
|
||||
|
||||
# Cancel the processing task if it is still running
|
||||
if not task.done():
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Task cancelled successfully")
|
||||
except Exception as e:
|
||||
logger.error(f"Error cancelling task: {str(e)}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error waiting for response: {str(e)}")
|
||||
final_response_text = f"Error processing response: {str(e)}"
|
||||
logger.error(f"Error processing request: {str(e)}")
|
||||
raise e
|
||||
|
||||
# Add the session to memory after completion
|
||||
completed_session = session_service.get_session(
|
||||
app_name=agent_id,
|
||||
user_id=external_id,
|
||||
session_id=adk_session_id,
|
||||
)
|
||||
|
||||
memory_service.add_session_to_memory(completed_session)
|
||||
|
||||
# Cancel the processing task if it is still running
|
||||
if not task.done():
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Task cancelled successfully")
|
||||
except Exception as e:
|
||||
logger.error(f"Error cancelling task: {str(e)}")
|
||||
|
||||
except Exception as e:
|
||||
logger.info("Agent execution completed successfully")
|
||||
return final_response_text
|
||||
except AgentNotFoundError as e:
|
||||
logger.error(f"Error processing request: {str(e)}")
|
||||
raise e
|
||||
|
||||
logger.info("Agent execution completed successfully")
|
||||
return final_response_text
|
||||
except AgentNotFoundError as e:
|
||||
logger.error(f"Error processing request: {str(e)}")
|
||||
raise e
|
||||
except Exception as e:
|
||||
logger.error(f"Internal error processing request: {str(e)}", exc_info=True)
|
||||
raise InternalServerError(str(e))
|
||||
finally:
|
||||
# Clean up MCP connection - MUST be executed in the same task
|
||||
if exit_stack:
|
||||
logger.info("Closing MCP server connection...")
|
||||
try:
|
||||
await exit_stack.aclose()
|
||||
except Exception as e:
|
||||
logger.error(f"Error closing MCP connection: {e}")
|
||||
# Do not raise the exception to not obscure the original error
|
||||
except Exception as e:
|
||||
logger.error(f"Internal error processing request: {str(e)}", exc_info=True)
|
||||
raise InternalServerError(str(e))
|
||||
finally:
|
||||
# Clean up MCP connection - MUST be executed in the same task
|
||||
if exit_stack:
|
||||
logger.info("Closing MCP server connection...")
|
||||
try:
|
||||
await exit_stack.aclose()
|
||||
except Exception as e:
|
||||
logger.error(f"Error closing MCP connection: {e}")
|
||||
# Do not raise the exception to not obscure the original error
|
||||
|
||||
|
||||
def convert_sets(obj):
|
||||
@@ -202,89 +218,105 @@ async def run_agent_stream(
|
||||
db: Session,
|
||||
session_id: Optional[str] = None,
|
||||
) -> AsyncGenerator[str, None]:
|
||||
tracer = get_tracer()
|
||||
span = tracer.start_span(
|
||||
"run_agent_stream",
|
||||
attributes={
|
||||
"agent_id": agent_id,
|
||||
"external_id": external_id,
|
||||
"session_id": session_id or f"{external_id}_{agent_id}",
|
||||
"message": message,
|
||||
},
|
||||
)
|
||||
try:
|
||||
logger.info(
|
||||
f"Starting streaming execution of agent {agent_id} for external_id {external_id}"
|
||||
)
|
||||
logger.info(f"Received message: {message}")
|
||||
with trace.use_span(span, end_on_exit=True):
|
||||
try:
|
||||
logger.info(
|
||||
f"Starting streaming execution of agent {agent_id} for external_id {external_id}"
|
||||
)
|
||||
logger.info(f"Received message: {message}")
|
||||
|
||||
get_root_agent = get_agent(db, agent_id)
|
||||
logger.info(
|
||||
f"Root agent found: {get_root_agent.name} (type: {get_root_agent.type})"
|
||||
)
|
||||
get_root_agent = get_agent(db, agent_id)
|
||||
logger.info(
|
||||
f"Root agent found: {get_root_agent.name} (type: {get_root_agent.type})"
|
||||
)
|
||||
|
||||
if get_root_agent is None:
|
||||
raise AgentNotFoundError(f"Agent with ID {agent_id} not found")
|
||||
if get_root_agent is None:
|
||||
raise AgentNotFoundError(f"Agent with ID {agent_id} not found")
|
||||
|
||||
# Using the AgentBuilder to create the agent
|
||||
agent_builder = AgentBuilder(db)
|
||||
root_agent, exit_stack = await agent_builder.build_agent(get_root_agent)
|
||||
# Using the AgentBuilder to create the agent
|
||||
agent_builder = AgentBuilder(db)
|
||||
root_agent, exit_stack = await agent_builder.build_agent(get_root_agent)
|
||||
|
||||
logger.info("Configuring Runner")
|
||||
agent_runner = Runner(
|
||||
agent=root_agent,
|
||||
app_name=agent_id,
|
||||
session_service=session_service,
|
||||
artifact_service=artifacts_service,
|
||||
memory_service=memory_service,
|
||||
)
|
||||
adk_session_id = external_id + "_" + agent_id
|
||||
if session_id is None:
|
||||
session_id = adk_session_id
|
||||
logger.info("Configuring Runner")
|
||||
agent_runner = Runner(
|
||||
agent=root_agent,
|
||||
app_name=agent_id,
|
||||
session_service=session_service,
|
||||
artifact_service=artifacts_service,
|
||||
memory_service=memory_service,
|
||||
)
|
||||
adk_session_id = external_id + "_" + agent_id
|
||||
if session_id is None:
|
||||
session_id = adk_session_id
|
||||
|
||||
logger.info(f"Searching session for external_id {external_id}")
|
||||
session = session_service.get_session(
|
||||
app_name=agent_id,
|
||||
user_id=external_id,
|
||||
session_id=adk_session_id,
|
||||
)
|
||||
logger.info(f"Searching session for external_id {external_id}")
|
||||
session = session_service.get_session(
|
||||
app_name=agent_id,
|
||||
user_id=external_id,
|
||||
session_id=adk_session_id,
|
||||
)
|
||||
|
||||
if session is None:
|
||||
logger.info(f"Creating new session for external_id {external_id}")
|
||||
session = session_service.create_session(
|
||||
app_name=agent_id,
|
||||
user_id=external_id,
|
||||
session_id=adk_session_id,
|
||||
)
|
||||
if session is None:
|
||||
logger.info(f"Creating new session for external_id {external_id}")
|
||||
session = session_service.create_session(
|
||||
app_name=agent_id,
|
||||
user_id=external_id,
|
||||
session_id=adk_session_id,
|
||||
)
|
||||
|
||||
content = Content(role="user", parts=[Part(text=message)])
|
||||
logger.info("Starting agent streaming execution")
|
||||
content = Content(role="user", parts=[Part(text=message)])
|
||||
logger.info("Starting agent streaming execution")
|
||||
|
||||
try:
|
||||
events_async = agent_runner.run_async(
|
||||
user_id=external_id,
|
||||
session_id=adk_session_id,
|
||||
new_message=content,
|
||||
)
|
||||
|
||||
async for event in events_async:
|
||||
event_dict = event.dict()
|
||||
event_dict = convert_sets(event_dict)
|
||||
yield json.dumps(event_dict)
|
||||
|
||||
completed_session = session_service.get_session(
|
||||
app_name=agent_id,
|
||||
user_id=external_id,
|
||||
session_id=adk_session_id,
|
||||
)
|
||||
|
||||
memory_service.add_session_to_memory(completed_session)
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing request: {str(e)}")
|
||||
raise e
|
||||
finally:
|
||||
# Clean up MCP connection
|
||||
if exit_stack:
|
||||
logger.info("Closing MCP server connection...")
|
||||
try:
|
||||
await exit_stack.aclose()
|
||||
except Exception as e:
|
||||
logger.error(f"Error closing MCP connection: {e}")
|
||||
events_async = agent_runner.run_async(
|
||||
user_id=external_id,
|
||||
session_id=adk_session_id,
|
||||
new_message=content,
|
||||
)
|
||||
|
||||
logger.info("Agent streaming execution completed successfully")
|
||||
except AgentNotFoundError as e:
|
||||
logger.error(f"Error processing request: {str(e)}")
|
||||
raise e
|
||||
except Exception as e:
|
||||
logger.error(f"Internal error processing request: {str(e)}", exc_info=True)
|
||||
raise InternalServerError(str(e))
|
||||
async for event in events_async:
|
||||
event_dict = event.dict()
|
||||
event_dict = convert_sets(event_dict)
|
||||
yield json.dumps(event_dict)
|
||||
|
||||
completed_session = session_service.get_session(
|
||||
app_name=agent_id,
|
||||
user_id=external_id,
|
||||
session_id=adk_session_id,
|
||||
)
|
||||
|
||||
memory_service.add_session_to_memory(completed_session)
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing request: {str(e)}")
|
||||
raise e
|
||||
finally:
|
||||
# Clean up MCP connection
|
||||
if exit_stack:
|
||||
logger.info("Closing MCP server connection...")
|
||||
try:
|
||||
await exit_stack.aclose()
|
||||
except Exception as e:
|
||||
logger.error(f"Error closing MCP connection: {e}")
|
||||
|
||||
logger.info("Agent streaming execution completed successfully")
|
||||
except AgentNotFoundError as e:
|
||||
logger.error(f"Error processing request: {str(e)}")
|
||||
raise e
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Internal error processing request: {str(e)}", exc_info=True
|
||||
)
|
||||
raise InternalServerError(str(e))
|
||||
finally:
|
||||
span.end()
|
||||
|
||||
Reference in New Issue
Block a user