diff --git a/src/services/a2a_task_manager.py b/src/services/a2a_task_manager.py index 6f74aef7..617171e6 100644 --- a/src/services/a2a_task_manager.py +++ b/src/services/a2a_task_manager.py @@ -3,6 +3,7 @@ import asyncio from collections.abc import AsyncIterable from typing import Dict, Optional from uuid import UUID +import json from sqlalchemy.orm import Session @@ -306,7 +307,7 @@ class A2ATaskManager: external_id = task_params.sessionId full_response = "" - # We use the same streaming function used in the WebSocket + # Use the same pattern as chat_routes.py: deserialize each chunk async for chunk in run_agent_stream( agent_id=str(agent.id), external_id=external_id, @@ -316,9 +317,14 @@ class A2ATaskManager: memory_service=memory_service, db=self.db, ): - # Send incremental progress updates - update_text_part = {"type": "text", "text": chunk} - update_message = Message(role="agent", parts=[update_text_part]) + try: + chunk_data = json.loads(chunk) + except Exception as e: + logger.warning(f"Invalid chunk received: {chunk} - {e}") + continue + + # The chunk_data must be a dict with 'type' and 'text' (or other expected format) + update_message = Message(role="agent", parts=[chunk_data]) # Update the task with each intermediate message await self.update_store( @@ -337,24 +343,24 @@ class A2ATaskManager: final=False, ), ) - full_response += chunk + # If it's text, accumulate for the final response + if chunk_data.get("type") == "text": + full_response += chunk_data.get("text", "") - # Determine the task state + # Determine the final state of the task task_state = ( TaskState.INPUT_REQUIRED if "MISSING_INFO:" in full_response else TaskState.COMPLETED ) - # Create the final response part + # Create the final response final_text_part = {"type": "text", "text": full_response} parts = [final_text_part] final_message = Message(role="agent", parts=parts) - - # Create the final artifact from the final response final_artifact = Artifact(parts=parts, index=0) - # Update the task in the store with the final response + # Update the task with the final response await self.update_store( task_params.id, TaskStatus(state=task_state, message=final_message),