From 609b3a572c949460b5a6dce8d60d4aa75d28e512 Mon Sep 17 00:00:00 2001 From: Selcuk Gun Date: Wed, 28 May 2025 16:59:16 -0700 Subject: [PATCH] Provide sample LongRunningFunctionTool runner script and documentation PiperOrigin-RevId: 764475345 --- contributing/samples/human_in_loop/README.md | 43 ++++ contributing/samples/human_in_loop/agent.py | 10 +- contributing/samples/human_in_loop/main.py | 194 +++++++++++++++++++ 3 files changed, 245 insertions(+), 2 deletions(-) create mode 100644 contributing/samples/human_in_loop/README.md create mode 100644 contributing/samples/human_in_loop/main.py diff --git a/contributing/samples/human_in_loop/README.md b/contributing/samples/human_in_loop/README.md new file mode 100644 index 0000000..141851f --- /dev/null +++ b/contributing/samples/human_in_loop/README.md @@ -0,0 +1,43 @@ +# Agent with Long-Running Tools + +This example demonstrates an agent using a long-running tool (`ask_for_approval`). + +## Key Flow for Long-Running Tools + +1. **Initial Call**: The agent calls the long-running tool (e.g., `ask_for_approval`). +2. **Initial Tool Response**: The tool immediately returns an initial response, typically indicating a "pending" status and a way to track the request (e.g., a `ticket-id`). This is sent back to the agent as a `types.FunctionResponse` (usually processed internally by the runner and then influencing the agent's next turn). +3. **Agent Acknowledges**: The agent processes this initial response and usually informs the user about the pending status. +4. **External Process/Update**: The long-running task progresses externally (e.g., a human approves the request). +5. **❗️Crucial Step: Provide Updated Tool Response❗️**: + * Once the external process completes or updates, your application **must** construct a new `types.FunctionResponse`. + * This response should use the **same `id` and `name`** as the original `FunctionCall` to the long-running tool. + * The `response` field within this `types.FunctionResponse` should contain the *updated data* (e.g., `{'status': 'approved', ...}`). + * Send this `types.FunctionResponse` back to the agent as a part within a new message using `role="user"`. + + ```python + # Example: After external approval + updated_tool_output_data = { + "status": "approved", + "ticket-id": ticket_id, # from original call + # ... other relevant updated data + } + + updated_function_response_part = types.Part( + function_response=types.FunctionResponse( + id=long_running_function_call.id, # Original call ID + name=long_running_function_call.name, # Original call name + response=updated_tool_output_data, + ) + ) + + # Send this back to the agent + await runner.run_async( + # ... session_id, user_id ... + new_message=types.Content( + parts=[updated_function_response_part], role="user" + ), + ) + ``` +6. **Agent Acts on Update**: The agent receives this message containing the `types.FunctionResponse` and, based on its instructions, proceeds with the next steps (e.g., calling another tool like `reimburse`). + +**Why is this important?** The agent relies on receiving this subsequent `types.FunctionResponse` (provided in a message with `role="user"` containing the specific `Part`) to understand that the long-running task has concluded or its state has changed. Without it, the agent will remain unaware of the outcome of the pending task. diff --git a/contributing/samples/human_in_loop/agent.py b/contributing/samples/human_in_loop/agent.py index 8e8ccc3..acf7e45 100644 --- a/contributing/samples/human_in_loop/agent.py +++ b/contributing/samples/human_in_loop/agent.py @@ -22,14 +22,20 @@ from google.genai import types def reimburse(purpose: str, amount: float) -> str: """Reimburse the amount of money to the employee.""" - return {'status': 'ok'} + return { + 'status': 'ok', + } def ask_for_approval( purpose: str, amount: float, tool_context: ToolContext ) -> dict[str, Any]: """Ask for approval for the reimbursement.""" - return {'status': 'pending'} + return { + 'status': 'pending', + 'amount': amount, + 'ticketId': 'reimbursement-ticket-001', + } root_agent = Agent( diff --git a/contributing/samples/human_in_loop/main.py b/contributing/samples/human_in_loop/main.py new file mode 100644 index 0000000..6beb9c8 --- /dev/null +++ b/contributing/samples/human_in_loop/main.py @@ -0,0 +1,194 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio + +import agent +from dotenv import load_dotenv +from typing import Any +from typing import Union +from google.adk.agents import Agent +from google.adk.events import Event +from google.adk.runners import Runner +from google.adk.tools import LongRunningFunctionTool +from google.adk.sessions import InMemorySessionService +from google.genai import types + +import os +from opentelemetry import trace +from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter +from opentelemetry.sdk.trace import export +from opentelemetry.sdk.trace import TracerProvider + + +load_dotenv(override=True) + +APP_NAME = "human_in_the_loop" +USER_ID = "1234" +SESSION_ID = "session1234" + +session_service = InMemorySessionService() + + +async def main(): + session = await session_service.create_session( + app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID + ) + runner = Runner( + agent=agent.root_agent, + app_name=APP_NAME, + session_service=session_service, + ) + + async def call_agent(query: str): + content = types.Content(role="user", parts=[types.Part(text=query)]) + + print(f'>>> User Query: "{query}"') + print("--- Running agent's initial turn ---") + + events_async = runner.run_async( + session_id=session.id, user_id=USER_ID, new_message=content + ) + + long_running_function_call: Union[types.FunctionCall, None] = None + initial_tool_response: Union[types.FunctionResponse, None] = None + ticket_id: Union[str, None] = None + + async for event in events_async: + if event.content and event.content.parts: + for i, part in enumerate(event.content.parts): + if part.text: + print(f" Part {i} [Text]: {part.text.strip()}") + if part.function_call: + print( + f" Part {i} [FunctionCall]:" + f" {part.function_call.name}({part.function_call.args}) ID:" + f" {part.function_call.id}" + ) + if not long_running_function_call and part.function_call.id in ( + event.long_running_tool_ids or [] + ): + long_running_function_call = part.function_call + print( + " (Captured as long_running_function_call for" + f" '{part.function_call.name}')" + ) + if part.function_response: + print( + f" Part {i} [FunctionResponse]: For" + f" '{part.function_response.name}', ID:" + f" {part.function_response.id}, Response:" + f" {part.function_response.response}" + ) + if ( + long_running_function_call + and part.function_response.id == long_running_function_call.id + ): + initial_tool_response = part.function_response + if initial_tool_response.response: + ticket_id = initial_tool_response.response.get("ticketId") + print( + " (Captured as initial_tool_response for" + f" '{part.function_response.name}', Ticket ID: {ticket_id})" + ) + + print("--- End of agent's initial turn ---\n") + + if ( + long_running_function_call + and initial_tool_response + and initial_tool_response.response.get("status") == "pending" + ): + print(f"--- Simulating external approval for ticket: {ticket_id} ---\n") + + updated_tool_output_data = { + "status": "approved", + "ticketId": ticket_id, + "approver_feedback": "Approved by manager at " + str( + asyncio.get_event_loop().time() + ), + } + + updated_function_response_part = types.Part( + function_response=types.FunctionResponse( + id=long_running_function_call.id, + name=long_running_function_call.name, + response=updated_tool_output_data, + ) + ) + + print( + "--- Sending updated tool result to agent for call ID" + f" {long_running_function_call.id}: {updated_tool_output_data} ---" + ) + print("--- Running agent's turn AFTER receiving updated tool result ---") + + async for event in runner.run_async( + session_id=session.id, + user_id=USER_ID, + new_message=types.Content( + parts=[updated_function_response_part], role="user" + ), + ): + if event.content and event.content.parts: + for i, part in enumerate(event.content.parts): + if part.text: + print(f" Part {i} [Text]: {part.text.strip()}") + if part.function_call: + print( + f" Part {i} [FunctionCall]:" + f" {part.function_call.name}({part.function_call.args}) ID:" + f" {part.function_call.id}" + ) + if part.function_response: + print( + f" Part {i} [FunctionResponse]: For" + f" '{part.function_response.name}', ID:" + f" {part.function_response.id}, Response:" + f" {part.function_response.response}" + ) + print("--- End of agent's turn AFTER receiving updated tool result ---") + + elif long_running_function_call and not initial_tool_response: + print( + f"--- Long running function '{long_running_function_call.name}' was" + " called, but its initial response was not captured. ---" + ) + elif not long_running_function_call: + print( + "--- No long running function call was detected in the initial" + " turn. ---" + ) + + await call_agent("Please reimburse $50 for meals") + print("=" * 70) + await call_agent("Please reimburse $200 for conference travel") + + +if __name__ == "__main__": + provider = TracerProvider() + project_id = os.environ.get("GOOGLE_CLOUD_PROJECT") + if not project_id: + raise ValueError("GOOGLE_CLOUD_PROJECT environment variable is not set.") + print("Tracing to project", project_id) + processor = export.BatchSpanProcessor( + CloudTraceSpanExporter(project_id=project_id) + ) + provider.add_span_processor(processor) + trace.set_tracer_provider(provider) + + asyncio.run(main()) + + provider.force_flush() + print("Done tracing to project", project_id)