diff --git a/src/api/chat_routes.py b/src/api/chat_routes.py index f69b73ce..16bce304 100644 --- a/src/api/chat_routes.py +++ b/src/api/chat_routes.py @@ -102,9 +102,8 @@ async def websocket_chat( memory_service=memory_service, db=db, ): - # Send each chunk as a JSON message await websocket.send_json( - {"message": chunk, "turn_complete": False} + {"message": json.loads(chunk), "turn_complete": False} ) # Send signal of complete turn diff --git a/src/services/agent_runner.py b/src/services/agent_runner.py index e2e93020..727fb4d1 100644 --- a/src/services/agent_runner.py +++ b/src/services/agent_runner.py @@ -10,6 +10,7 @@ from src.services.agent_builder import AgentBuilder from sqlalchemy.orm import Session from typing import Optional, AsyncGenerator import asyncio +import json logger = setup_logger(__name__) @@ -180,6 +181,17 @@ async def run_agent( # Do not raise the exception to not obscure the original error +def convert_sets(obj): + if isinstance(obj, set): + return list(obj) + elif isinstance(obj, dict): + return {k: convert_sets(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [convert_sets(i) for i in obj] + else: + return obj + + async def run_agent_stream( agent_id: str, external_id: str, @@ -246,11 +258,9 @@ async def run_agent_stream( ) async for event in events_async: - if event.content and event.content.parts: - text = event.content.parts[0].text - if text: - yield text - await asyncio.sleep(0) # Allow other tasks to run + event_dict = event.dict() + event_dict = convert_sets(event_dict) + yield json.dumps(event_dict) completed_session = session_service.get_session( app_name=agent_id, diff --git a/src/services/workflow_agent.py b/src/services/workflow_agent.py index a7217182..5571e6c6 100644 --- a/src/services/workflow_agent.py +++ b/src/services/workflow_agent.py @@ -304,7 +304,6 @@ class WorkflowAgent(BaseAgent): "session_id": session_id, } - # Função para message-node async def message_node_function( state: State, node_id: str, node_data: Dict[str, Any] ) -> AsyncGenerator[State, None]: @@ -318,7 +317,6 @@ class WorkflowAgent(BaseAgent): session_id = state.get("session_id", "") conversation_history = state.get("conversation_history", []) - # Adiciona a mensagem como um novo Event do tipo agent new_event = Event( author="agent", content=Content(parts=[Part(text=message_content)]), @@ -750,7 +748,7 @@ class WorkflowAgent(BaseAgent): content=Content(parts=[Part(text=user_message)]), ) - # Se o histórico estiver vazio, adiciona a mensagem do usuário + # If the conversation history is empty, add the user message conversation_history = ctx.session.events or [] if not conversation_history or (len(conversation_history) == 0): conversation_history = [user_event] @@ -768,16 +766,17 @@ class WorkflowAgent(BaseAgent): print("\n🚀 Starting workflow execution:") print(f"Initial content: {user_message[:100]}...") - # Execute the graph with a recursion limit to avoid infinite loops - result = await graph.ainvoke(initial_state, {"recursion_limit": 20}) + sent_events = 0 # Count of events already sent - # 6. Process and return the result - final_content = result.get("content", []) - print(f"\n✅ FINAL RESULT: {final_content[:100]}...") - - for content in final_content: - if content.author != "user": - yield content + async for state in graph.astream(initial_state, {"recursion_limit": 20}): + # The state can be a dict with the node name as a key + for node_state in state.values(): + content = node_state.get("content", []) + # Only send new events + for event in content[sent_events:]: + if event.author != "user": + yield event + sent_events = len(content) # Execute sub-agents for sub_agent in self.sub_agents: