diff --git a/src/services/custom_agents/workflow_agent.py b/src/services/custom_agents/workflow_agent.py index 97c99ba4..bf7e7bc5 100644 --- a/src/services/custom_agents/workflow_agent.py +++ b/src/services/custom_agents/workflow_agent.py @@ -6,6 +6,9 @@ │ Creation date: May 13, 2025 │ │ Contact: contato@evolution-api.com │ ├──────────────────────────────────────────────────────────────────────────────┤ +│ @contributors: │ +│ Victor Calazans - delay node implementation (May 17, 2025) │ +├──────────────────────────────────────────────────────────────────────────────┤ │ @copyright © Evolution API 2025. All rights reserved. │ │ Licensed under the Apache License, Version 2.0 │ │ │ @@ -320,10 +323,9 @@ class WorkflowAgent(BaseAgent): ) ] ), - ) - ] + ) ] content = content + condition_content - + yield { "content": content, "status": "condition_evaluated", @@ -332,7 +334,7 @@ class WorkflowAgent(BaseAgent): "conversation_history": conversation_history, "session_id": session_id, } - + async def message_node_function( state: State, node_id: str, node_data: Dict[str, Any] ) -> AsyncGenerator[State, None]: @@ -365,15 +367,63 @@ class WorkflowAgent(BaseAgent): "status": "message_added", "node_outputs": node_outputs, "cycle_count": state.get("cycle_count", 0), + "conversation_history": conversation_history, "session_id": session_id, + } + + async def delay_node_function( + state: State, node_id: str, node_data: Dict[str, Any] + ) -> AsyncGenerator[State, None]: + delay_data = node_data.get("delay", {}) + delay_value = delay_data.get("value", 0) + delay_unit = delay_data.get("unit", "seconds") + delay_description = delay_data.get("description", "") + + # Convert to seconds based on unit + delay_seconds = delay_value + if delay_unit == "minutes": + delay_seconds = delay_value * 60 + elif delay_unit == "hours": + delay_seconds = delay_value * 3600 + + label = node_data.get("label", "delay_node") + print(f"\n⏱️ DELAY-NODE: {delay_value} {delay_unit} - {delay_description}") + + content = state.get("content", []) + session_id = state.get("session_id", "") + conversation_history = state.get("conversation_history", []) + + # Store node output information + node_outputs = state.get("node_outputs", {}) + node_outputs[node_id] = { + "delay_value": delay_value, + "delay_unit": delay_unit, + "delay_seconds": delay_seconds, + "delay_start_time": datetime.now().isoformat(), + } + + # Actually perform the delay + import asyncio + await asyncio.sleep(delay_seconds) + + + # Update node outputs with completion information + node_outputs[node_id]["delay_end_time"] = datetime.now().isoformat() + node_outputs[node_id]["delay_completed"] = True + + yield { + "content": content, + "status": "delay_completed", + "node_outputs": node_outputs, "cycle_count": state.get("cycle_count", 0), "conversation_history": conversation_history, "session_id": session_id, } - + return { "start-node": start_node_function, "agent-node": agent_node_function, "condition-node": condition_node_function, "message-node": message_node_function, + "delay-node": delay_node_function, } def _evaluate_condition(self, condition: Dict[str, Any], state: State) -> bool: