From b29d8d108e85622d8c1b063dce6b7521d7cd1c4b Mon Sep 17 00:00:00 2001 From: Davidson Gomes Date: Tue, 13 May 2025 18:47:15 -0300 Subject: [PATCH] refactor(a2a_task_manager): simplify chunk processing and improve error handling --- src/services/a2a_task_manager.py | 50 +++++++++----------------------- 1 file changed, 13 insertions(+), 37 deletions(-) diff --git a/src/services/a2a_task_manager.py b/src/services/a2a_task_manager.py index 581a2a26..ce86676d 100644 --- a/src/services/a2a_task_manager.py +++ b/src/services/a2a_task_manager.py @@ -411,37 +411,14 @@ class A2ATaskManager: ): try: chunk_data = json.loads(chunk) - except Exception as e: - logger.warning(f"Invalid chunk received: {chunk} - {e}") - continue - if ( - isinstance(chunk_data, dict) - and "type" in chunk_data - and chunk_data["type"] - in [ - "history", - "history_update", - "history_complete", - ] - ): - continue + if isinstance(chunk_data, dict) and "content" in chunk_data: + content = chunk_data.get("content", {}) + role = content.get("role", "agent") + parts = content.get("parts", []) - if isinstance(chunk_data, dict): - if "type" not in chunk_data and "text" in chunk_data: - chunk_data["type"] = "text" - - if "type" in chunk_data: - try: - update_message = Message(role="agent", parts=[chunk_data]) - - await self.update_store( - request.params.id, - TaskStatus( - state=TaskState.WORKING, message=update_message - ), - update_history=False, - ) + if parts: + update_message = Message(role=role, parts=parts) yield SendTaskStreamingResponse( id=request.id, @@ -455,14 +432,13 @@ class A2ATaskManager: ), ) - if chunk_data.get("type") == "text": - full_response += chunk_data.get("text", "") - final_message = update_message - - except Exception as e: - logger.error( - f"Error processing chunk: {e}, chunk: {chunk_data}" - ) + for part in parts: + if part.get("type") == "text": + full_response += part.get("text", "") + final_message = update_message + except Exception as e: + logger.error(f"Error processing chunk: {e}, chunk: {chunk}") + continue # Determine the final state of the task task_state = (