refactor(a2a_task_manager): simplify chunk processing and improve error handling

This commit is contained in:
Davidson Gomes 2025-05-13 18:47:15 -03:00
parent 72e4b7865a
commit b29d8d108e

View File

@ -411,37 +411,14 @@ class A2ATaskManager:
):
try:
chunk_data = json.loads(chunk)
except Exception as e:
logger.warning(f"Invalid chunk received: {chunk} - {e}")
continue
if (
isinstance(chunk_data, dict)
and "type" in chunk_data
and chunk_data["type"]
in [
"history",
"history_update",
"history_complete",
]
):
continue
if isinstance(chunk_data, dict) and "content" in chunk_data:
content = chunk_data.get("content", {})
role = content.get("role", "agent")
parts = content.get("parts", [])
if isinstance(chunk_data, dict):
if "type" not in chunk_data and "text" in chunk_data:
chunk_data["type"] = "text"
if "type" in chunk_data:
try:
update_message = Message(role="agent", parts=[chunk_data])
await self.update_store(
request.params.id,
TaskStatus(
state=TaskState.WORKING, message=update_message
),
update_history=False,
)
if parts:
update_message = Message(role=role, parts=parts)
yield SendTaskStreamingResponse(
id=request.id,
@ -455,14 +432,13 @@ class A2ATaskManager:
),
)
if chunk_data.get("type") == "text":
full_response += chunk_data.get("text", "")
final_message = update_message
except Exception as e:
logger.error(
f"Error processing chunk: {e}, chunk: {chunk_data}"
)
for part in parts:
if part.get("type") == "text":
full_response += part.get("text", "")
final_message = update_message
except Exception as e:
logger.error(f"Error processing chunk: {e}, chunk: {chunk}")
continue
# Determine the final state of the task
task_state = (