refactor(chat_routes, agent_runner, workflow_agent): improve JSON handling and clean up code

This commit is contained in:
Davidson Gomes 2025-05-12 16:26:06 -03:00
parent a1f6b828d5
commit f319b89806
3 changed files with 27 additions and 19 deletions

View File

@ -102,9 +102,8 @@ async def websocket_chat(
memory_service=memory_service,
db=db,
):
# Send each chunk as a JSON message
await websocket.send_json(
{"message": chunk, "turn_complete": False}
{"message": json.loads(chunk), "turn_complete": False}
)
# Send signal of complete turn

View File

@ -10,6 +10,7 @@ from src.services.agent_builder import AgentBuilder
from sqlalchemy.orm import Session
from typing import Optional, AsyncGenerator
import asyncio
import json
logger = setup_logger(__name__)
@ -180,6 +181,17 @@ async def run_agent(
# Do not raise the exception to not obscure the original error
def convert_sets(obj):
if isinstance(obj, set):
return list(obj)
elif isinstance(obj, dict):
return {k: convert_sets(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [convert_sets(i) for i in obj]
else:
return obj
async def run_agent_stream(
agent_id: str,
external_id: str,
@ -246,11 +258,9 @@ async def run_agent_stream(
)
async for event in events_async:
if event.content and event.content.parts:
text = event.content.parts[0].text
if text:
yield text
await asyncio.sleep(0) # Allow other tasks to run
event_dict = event.dict()
event_dict = convert_sets(event_dict)
yield json.dumps(event_dict)
completed_session = session_service.get_session(
app_name=agent_id,

View File

@ -304,7 +304,6 @@ class WorkflowAgent(BaseAgent):
"session_id": session_id,
}
# Função para message-node
async def message_node_function(
state: State, node_id: str, node_data: Dict[str, Any]
) -> AsyncGenerator[State, None]:
@ -318,7 +317,6 @@ class WorkflowAgent(BaseAgent):
session_id = state.get("session_id", "")
conversation_history = state.get("conversation_history", [])
# Adiciona a mensagem como um novo Event do tipo agent
new_event = Event(
author="agent",
content=Content(parts=[Part(text=message_content)]),
@ -750,7 +748,7 @@ class WorkflowAgent(BaseAgent):
content=Content(parts=[Part(text=user_message)]),
)
# Se o histórico estiver vazio, adiciona a mensagem do usuário
# If the conversation history is empty, add the user message
conversation_history = ctx.session.events or []
if not conversation_history or (len(conversation_history) == 0):
conversation_history = [user_event]
@ -768,16 +766,17 @@ class WorkflowAgent(BaseAgent):
print("\n🚀 Starting workflow execution:")
print(f"Initial content: {user_message[:100]}...")
# Execute the graph with a recursion limit to avoid infinite loops
result = await graph.ainvoke(initial_state, {"recursion_limit": 20})
sent_events = 0 # Count of events already sent
# 6. Process and return the result
final_content = result.get("content", [])
print(f"\n✅ FINAL RESULT: {final_content[:100]}...")
for content in final_content:
if content.author != "user":
yield content
async for state in graph.astream(initial_state, {"recursion_limit": 20}):
# The state can be a dict with the node name as a key
for node_state in state.values():
content = node_state.get("content", [])
# Only send new events
for event in content[sent_events:]:
if event.author != "user":
yield event
sent_events = len(content)
# Execute sub-agents
for sub_agent in self.sub_agents: