mirror of
https://github.com/EvolutionAPI/adk-python.git
synced 2025-07-14 01:41:25 -06:00
Provide sample LongRunningFunctionTool runner script and documentation
PiperOrigin-RevId: 764475345
This commit is contained in:
parent
2a8ca06c3e
commit
609b3a572c
43
contributing/samples/human_in_loop/README.md
Normal file
43
contributing/samples/human_in_loop/README.md
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
# Agent with Long-Running Tools
|
||||||
|
|
||||||
|
This example demonstrates an agent using a long-running tool (`ask_for_approval`).
|
||||||
|
|
||||||
|
## Key Flow for Long-Running Tools
|
||||||
|
|
||||||
|
1. **Initial Call**: The agent calls the long-running tool (e.g., `ask_for_approval`).
|
||||||
|
2. **Initial Tool Response**: The tool immediately returns an initial response, typically indicating a "pending" status and a way to track the request (e.g., a `ticket-id`). This is sent back to the agent as a `types.FunctionResponse` (usually processed internally by the runner and then influencing the agent's next turn).
|
||||||
|
3. **Agent Acknowledges**: The agent processes this initial response and usually informs the user about the pending status.
|
||||||
|
4. **External Process/Update**: The long-running task progresses externally (e.g., a human approves the request).
|
||||||
|
5. **❗️Crucial Step: Provide Updated Tool Response❗️**:
|
||||||
|
* Once the external process completes or updates, your application **must** construct a new `types.FunctionResponse`.
|
||||||
|
* This response should use the **same `id` and `name`** as the original `FunctionCall` to the long-running tool.
|
||||||
|
* The `response` field within this `types.FunctionResponse` should contain the *updated data* (e.g., `{'status': 'approved', ...}`).
|
||||||
|
* Send this `types.FunctionResponse` back to the agent as a part within a new message using `role="user"`.
|
||||||
|
|
||||||
|
```python
|
||||||
|
# Example: After external approval
|
||||||
|
updated_tool_output_data = {
|
||||||
|
"status": "approved",
|
||||||
|
"ticket-id": ticket_id, # from original call
|
||||||
|
# ... other relevant updated data
|
||||||
|
}
|
||||||
|
|
||||||
|
updated_function_response_part = types.Part(
|
||||||
|
function_response=types.FunctionResponse(
|
||||||
|
id=long_running_function_call.id, # Original call ID
|
||||||
|
name=long_running_function_call.name, # Original call name
|
||||||
|
response=updated_tool_output_data,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Send this back to the agent
|
||||||
|
await runner.run_async(
|
||||||
|
# ... session_id, user_id ...
|
||||||
|
new_message=types.Content(
|
||||||
|
parts=[updated_function_response_part], role="user"
|
||||||
|
),
|
||||||
|
)
|
||||||
|
```
|
||||||
|
6. **Agent Acts on Update**: The agent receives this message containing the `types.FunctionResponse` and, based on its instructions, proceeds with the next steps (e.g., calling another tool like `reimburse`).
|
||||||
|
|
||||||
|
**Why is this important?** The agent relies on receiving this subsequent `types.FunctionResponse` (provided in a message with `role="user"` containing the specific `Part`) to understand that the long-running task has concluded or its state has changed. Without it, the agent will remain unaware of the outcome of the pending task.
|
@ -22,14 +22,20 @@ from google.genai import types
|
|||||||
|
|
||||||
def reimburse(purpose: str, amount: float) -> str:
|
def reimburse(purpose: str, amount: float) -> str:
|
||||||
"""Reimburse the amount of money to the employee."""
|
"""Reimburse the amount of money to the employee."""
|
||||||
return {'status': 'ok'}
|
return {
|
||||||
|
'status': 'ok',
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def ask_for_approval(
|
def ask_for_approval(
|
||||||
purpose: str, amount: float, tool_context: ToolContext
|
purpose: str, amount: float, tool_context: ToolContext
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
"""Ask for approval for the reimbursement."""
|
"""Ask for approval for the reimbursement."""
|
||||||
return {'status': 'pending'}
|
return {
|
||||||
|
'status': 'pending',
|
||||||
|
'amount': amount,
|
||||||
|
'ticketId': 'reimbursement-ticket-001',
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
root_agent = Agent(
|
root_agent = Agent(
|
||||||
|
194
contributing/samples/human_in_loop/main.py
Normal file
194
contributing/samples/human_in_loop/main.py
Normal file
@ -0,0 +1,194 @@
|
|||||||
|
# Copyright 2025 Google LLC
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
import agent
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
from typing import Any
|
||||||
|
from typing import Union
|
||||||
|
from google.adk.agents import Agent
|
||||||
|
from google.adk.events import Event
|
||||||
|
from google.adk.runners import Runner
|
||||||
|
from google.adk.tools import LongRunningFunctionTool
|
||||||
|
from google.adk.sessions import InMemorySessionService
|
||||||
|
from google.genai import types
|
||||||
|
|
||||||
|
import os
|
||||||
|
from opentelemetry import trace
|
||||||
|
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
|
||||||
|
from opentelemetry.sdk.trace import export
|
||||||
|
from opentelemetry.sdk.trace import TracerProvider
|
||||||
|
|
||||||
|
|
||||||
|
load_dotenv(override=True)
|
||||||
|
|
||||||
|
APP_NAME = "human_in_the_loop"
|
||||||
|
USER_ID = "1234"
|
||||||
|
SESSION_ID = "session1234"
|
||||||
|
|
||||||
|
session_service = InMemorySessionService()
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
session = await session_service.create_session(
|
||||||
|
app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
|
||||||
|
)
|
||||||
|
runner = Runner(
|
||||||
|
agent=agent.root_agent,
|
||||||
|
app_name=APP_NAME,
|
||||||
|
session_service=session_service,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def call_agent(query: str):
|
||||||
|
content = types.Content(role="user", parts=[types.Part(text=query)])
|
||||||
|
|
||||||
|
print(f'>>> User Query: "{query}"')
|
||||||
|
print("--- Running agent's initial turn ---")
|
||||||
|
|
||||||
|
events_async = runner.run_async(
|
||||||
|
session_id=session.id, user_id=USER_ID, new_message=content
|
||||||
|
)
|
||||||
|
|
||||||
|
long_running_function_call: Union[types.FunctionCall, None] = None
|
||||||
|
initial_tool_response: Union[types.FunctionResponse, None] = None
|
||||||
|
ticket_id: Union[str, None] = None
|
||||||
|
|
||||||
|
async for event in events_async:
|
||||||
|
if event.content and event.content.parts:
|
||||||
|
for i, part in enumerate(event.content.parts):
|
||||||
|
if part.text:
|
||||||
|
print(f" Part {i} [Text]: {part.text.strip()}")
|
||||||
|
if part.function_call:
|
||||||
|
print(
|
||||||
|
f" Part {i} [FunctionCall]:"
|
||||||
|
f" {part.function_call.name}({part.function_call.args}) ID:"
|
||||||
|
f" {part.function_call.id}"
|
||||||
|
)
|
||||||
|
if not long_running_function_call and part.function_call.id in (
|
||||||
|
event.long_running_tool_ids or []
|
||||||
|
):
|
||||||
|
long_running_function_call = part.function_call
|
||||||
|
print(
|
||||||
|
" (Captured as long_running_function_call for"
|
||||||
|
f" '{part.function_call.name}')"
|
||||||
|
)
|
||||||
|
if part.function_response:
|
||||||
|
print(
|
||||||
|
f" Part {i} [FunctionResponse]: For"
|
||||||
|
f" '{part.function_response.name}', ID:"
|
||||||
|
f" {part.function_response.id}, Response:"
|
||||||
|
f" {part.function_response.response}"
|
||||||
|
)
|
||||||
|
if (
|
||||||
|
long_running_function_call
|
||||||
|
and part.function_response.id == long_running_function_call.id
|
||||||
|
):
|
||||||
|
initial_tool_response = part.function_response
|
||||||
|
if initial_tool_response.response:
|
||||||
|
ticket_id = initial_tool_response.response.get("ticketId")
|
||||||
|
print(
|
||||||
|
" (Captured as initial_tool_response for"
|
||||||
|
f" '{part.function_response.name}', Ticket ID: {ticket_id})"
|
||||||
|
)
|
||||||
|
|
||||||
|
print("--- End of agent's initial turn ---\n")
|
||||||
|
|
||||||
|
if (
|
||||||
|
long_running_function_call
|
||||||
|
and initial_tool_response
|
||||||
|
and initial_tool_response.response.get("status") == "pending"
|
||||||
|
):
|
||||||
|
print(f"--- Simulating external approval for ticket: {ticket_id} ---\n")
|
||||||
|
|
||||||
|
updated_tool_output_data = {
|
||||||
|
"status": "approved",
|
||||||
|
"ticketId": ticket_id,
|
||||||
|
"approver_feedback": "Approved by manager at " + str(
|
||||||
|
asyncio.get_event_loop().time()
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
updated_function_response_part = types.Part(
|
||||||
|
function_response=types.FunctionResponse(
|
||||||
|
id=long_running_function_call.id,
|
||||||
|
name=long_running_function_call.name,
|
||||||
|
response=updated_tool_output_data,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
print(
|
||||||
|
"--- Sending updated tool result to agent for call ID"
|
||||||
|
f" {long_running_function_call.id}: {updated_tool_output_data} ---"
|
||||||
|
)
|
||||||
|
print("--- Running agent's turn AFTER receiving updated tool result ---")
|
||||||
|
|
||||||
|
async for event in runner.run_async(
|
||||||
|
session_id=session.id,
|
||||||
|
user_id=USER_ID,
|
||||||
|
new_message=types.Content(
|
||||||
|
parts=[updated_function_response_part], role="user"
|
||||||
|
),
|
||||||
|
):
|
||||||
|
if event.content and event.content.parts:
|
||||||
|
for i, part in enumerate(event.content.parts):
|
||||||
|
if part.text:
|
||||||
|
print(f" Part {i} [Text]: {part.text.strip()}")
|
||||||
|
if part.function_call:
|
||||||
|
print(
|
||||||
|
f" Part {i} [FunctionCall]:"
|
||||||
|
f" {part.function_call.name}({part.function_call.args}) ID:"
|
||||||
|
f" {part.function_call.id}"
|
||||||
|
)
|
||||||
|
if part.function_response:
|
||||||
|
print(
|
||||||
|
f" Part {i} [FunctionResponse]: For"
|
||||||
|
f" '{part.function_response.name}', ID:"
|
||||||
|
f" {part.function_response.id}, Response:"
|
||||||
|
f" {part.function_response.response}"
|
||||||
|
)
|
||||||
|
print("--- End of agent's turn AFTER receiving updated tool result ---")
|
||||||
|
|
||||||
|
elif long_running_function_call and not initial_tool_response:
|
||||||
|
print(
|
||||||
|
f"--- Long running function '{long_running_function_call.name}' was"
|
||||||
|
" called, but its initial response was not captured. ---"
|
||||||
|
)
|
||||||
|
elif not long_running_function_call:
|
||||||
|
print(
|
||||||
|
"--- No long running function call was detected in the initial"
|
||||||
|
" turn. ---"
|
||||||
|
)
|
||||||
|
|
||||||
|
await call_agent("Please reimburse $50 for meals")
|
||||||
|
print("=" * 70)
|
||||||
|
await call_agent("Please reimburse $200 for conference travel")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
provider = TracerProvider()
|
||||||
|
project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
|
||||||
|
if not project_id:
|
||||||
|
raise ValueError("GOOGLE_CLOUD_PROJECT environment variable is not set.")
|
||||||
|
print("Tracing to project", project_id)
|
||||||
|
processor = export.BatchSpanProcessor(
|
||||||
|
CloudTraceSpanExporter(project_id=project_id)
|
||||||
|
)
|
||||||
|
provider.add_span_processor(processor)
|
||||||
|
trace.set_tracer_provider(provider)
|
||||||
|
|
||||||
|
asyncio.run(main())
|
||||||
|
|
||||||
|
provider.force_flush()
|
||||||
|
print("Done tracing to project", project_id)
|
Loading…
Reference in New Issue
Block a user