refactor(a2a_task_manager): improve JSON handling and error logging for chunk processing

This commit is contained in:
Davidson Gomes 2025-05-12 17:51:27 -03:00
parent 4800807783
commit 0c69df107e

View File

@ -3,6 +3,7 @@ import asyncio
from collections.abc import AsyncIterable from collections.abc import AsyncIterable
from typing import Dict, Optional from typing import Dict, Optional
from uuid import UUID from uuid import UUID
import json
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
@ -306,7 +307,7 @@ class A2ATaskManager:
external_id = task_params.sessionId external_id = task_params.sessionId
full_response = "" full_response = ""
# We use the same streaming function used in the WebSocket # Use the same pattern as chat_routes.py: deserialize each chunk
async for chunk in run_agent_stream( async for chunk in run_agent_stream(
agent_id=str(agent.id), agent_id=str(agent.id),
external_id=external_id, external_id=external_id,
@ -316,9 +317,14 @@ class A2ATaskManager:
memory_service=memory_service, memory_service=memory_service,
db=self.db, db=self.db,
): ):
# Send incremental progress updates try:
update_text_part = {"type": "text", "text": chunk} chunk_data = json.loads(chunk)
update_message = Message(role="agent", parts=[update_text_part]) except Exception as e:
logger.warning(f"Invalid chunk received: {chunk} - {e}")
continue
# The chunk_data must be a dict with 'type' and 'text' (or other expected format)
update_message = Message(role="agent", parts=[chunk_data])
# Update the task with each intermediate message # Update the task with each intermediate message
await self.update_store( await self.update_store(
@ -337,24 +343,24 @@ class A2ATaskManager:
final=False, final=False,
), ),
) )
full_response += chunk # If it's text, accumulate for the final response
if chunk_data.get("type") == "text":
full_response += chunk_data.get("text", "")
# Determine the task state # Determine the final state of the task
task_state = ( task_state = (
TaskState.INPUT_REQUIRED TaskState.INPUT_REQUIRED
if "MISSING_INFO:" in full_response if "MISSING_INFO:" in full_response
else TaskState.COMPLETED else TaskState.COMPLETED
) )
# Create the final response part # Create the final response
final_text_part = {"type": "text", "text": full_response} final_text_part = {"type": "text", "text": full_response}
parts = [final_text_part] parts = [final_text_part]
final_message = Message(role="agent", parts=parts) final_message = Message(role="agent", parts=parts)
# Create the final artifact from the final response
final_artifact = Artifact(parts=parts, index=0) final_artifact = Artifact(parts=parts, index=0)
# Update the task in the store with the final response # Update the task with the final response
await self.update_store( await self.update_store(
task_params.id, task_params.id,
TaskStatus(state=task_state, message=final_message), TaskStatus(state=task_state, message=final_message),