mirror of
https://github.com/EvolutionAPI/adk-python.git
synced 2025-07-14 01:41:25 -06:00
524 lines
18 KiB
Python
524 lines
18 KiB
Python
# Copyright 2025 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import queue
|
|
import threading
|
|
from typing import AsyncGenerator
|
|
from typing import Generator
|
|
from typing import Optional
|
|
import warnings
|
|
|
|
from google.genai import types
|
|
|
|
from .agents.active_streaming_tool import ActiveStreamingTool
|
|
from .agents.base_agent import BaseAgent
|
|
from .agents.invocation_context import InvocationContext
|
|
from .agents.invocation_context import new_invocation_context_id
|
|
from .agents.live_request_queue import LiveRequestQueue
|
|
from .agents.llm_agent import LlmAgent
|
|
from .agents.run_config import RunConfig
|
|
from .artifacts.base_artifact_service import BaseArtifactService
|
|
from .artifacts.in_memory_artifact_service import InMemoryArtifactService
|
|
from .code_executors.built_in_code_executor import BuiltInCodeExecutor
|
|
from .events.event import Event
|
|
from .memory.base_memory_service import BaseMemoryService
|
|
from .memory.in_memory_memory_service import InMemoryMemoryService
|
|
from .sessions.base_session_service import BaseSessionService
|
|
from .sessions.in_memory_session_service import InMemorySessionService
|
|
from .sessions.session import Session
|
|
from .telemetry import tracer
|
|
from .tools.base_toolset import BaseToolset
|
|
|
|
logger = logging.getLogger('google_adk.' + __name__)
|
|
|
|
|
|
class Runner:
|
|
"""The Runner class is used to run agents.
|
|
|
|
It manages the execution of an agent within a session, handling message
|
|
processing, event generation, and interaction with various services like
|
|
artifact storage, session management, and memory.
|
|
|
|
Attributes:
|
|
app_name: The application name of the runner.
|
|
agent: The root agent to run.
|
|
artifact_service: The artifact service for the runner.
|
|
session_service: The session service for the runner.
|
|
memory_service: The memory service for the runner.
|
|
"""
|
|
|
|
app_name: str
|
|
"""The app name of the runner."""
|
|
agent: BaseAgent
|
|
"""The root agent to run."""
|
|
artifact_service: Optional[BaseArtifactService] = None
|
|
"""The artifact service for the runner."""
|
|
session_service: BaseSessionService
|
|
"""The session service for the runner."""
|
|
memory_service: Optional[BaseMemoryService] = None
|
|
"""The memory service for the runner."""
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
app_name: str,
|
|
agent: BaseAgent,
|
|
artifact_service: Optional[BaseArtifactService] = None,
|
|
session_service: BaseSessionService,
|
|
memory_service: Optional[BaseMemoryService] = None,
|
|
):
|
|
"""Initializes the Runner.
|
|
|
|
Args:
|
|
app_name: The application name of the runner.
|
|
agent: The root agent to run.
|
|
artifact_service: The artifact service for the runner.
|
|
session_service: The session service for the runner.
|
|
memory_service: The memory service for the runner.
|
|
"""
|
|
self.app_name = app_name
|
|
self.agent = agent
|
|
self.artifact_service = artifact_service
|
|
self.session_service = session_service
|
|
self.memory_service = memory_service
|
|
|
|
def run(
|
|
self,
|
|
*,
|
|
user_id: str,
|
|
session_id: str,
|
|
new_message: types.Content,
|
|
run_config: RunConfig = RunConfig(),
|
|
) -> Generator[Event, None, None]:
|
|
"""Runs the agent.
|
|
|
|
NOTE: This sync interface is only for local testing and convenience purpose.
|
|
Consider using `run_async` for production usage.
|
|
|
|
Args:
|
|
user_id: The user ID of the session.
|
|
session_id: The session ID of the session.
|
|
new_message: A new message to append to the session.
|
|
run_config: The run config for the agent.
|
|
|
|
Yields:
|
|
The events generated by the agent.
|
|
"""
|
|
event_queue = queue.Queue()
|
|
|
|
async def _invoke_run_async():
|
|
try:
|
|
async for event in self.run_async(
|
|
user_id=user_id,
|
|
session_id=session_id,
|
|
new_message=new_message,
|
|
run_config=run_config,
|
|
):
|
|
event_queue.put(event)
|
|
finally:
|
|
event_queue.put(None)
|
|
|
|
def _asyncio_thread_main():
|
|
try:
|
|
asyncio.run(_invoke_run_async())
|
|
finally:
|
|
event_queue.put(None)
|
|
|
|
thread = threading.Thread(target=_asyncio_thread_main)
|
|
thread.start()
|
|
|
|
# consumes and re-yield the events from background thread.
|
|
while True:
|
|
event = event_queue.get()
|
|
if event is None:
|
|
break
|
|
else:
|
|
yield event
|
|
|
|
thread.join()
|
|
|
|
async def run_async(
|
|
self,
|
|
*,
|
|
user_id: str,
|
|
session_id: str,
|
|
new_message: types.Content,
|
|
run_config: RunConfig = RunConfig(),
|
|
) -> AsyncGenerator[Event, None]:
|
|
"""Main entry method to run the agent in this runner.
|
|
|
|
Args:
|
|
user_id: The user ID of the session.
|
|
session_id: The session ID of the session.
|
|
new_message: A new message to append to the session.
|
|
run_config: The run config for the agent.
|
|
|
|
Yields:
|
|
The events generated by the agent.
|
|
"""
|
|
with tracer.start_as_current_span('invocation'):
|
|
session = await self.session_service.get_session(
|
|
app_name=self.app_name, user_id=user_id, session_id=session_id
|
|
)
|
|
if not session:
|
|
raise ValueError(f'Session not found: {session_id}')
|
|
|
|
invocation_context = self._new_invocation_context(
|
|
session,
|
|
new_message=new_message,
|
|
run_config=run_config,
|
|
)
|
|
root_agent = self.agent
|
|
|
|
if new_message:
|
|
await self._append_new_message_to_session(
|
|
session,
|
|
new_message,
|
|
invocation_context,
|
|
run_config.save_input_blobs_as_artifacts,
|
|
)
|
|
|
|
invocation_context.agent = self._find_agent_to_run(session, root_agent)
|
|
async for event in invocation_context.agent.run_async(invocation_context):
|
|
if not event.partial:
|
|
await self.session_service.append_event(session=session, event=event)
|
|
yield event
|
|
|
|
async def _append_new_message_to_session(
|
|
self,
|
|
session: Session,
|
|
new_message: types.Content,
|
|
invocation_context: InvocationContext,
|
|
save_input_blobs_as_artifacts: bool = False,
|
|
):
|
|
"""Appends a new message to the session.
|
|
|
|
Args:
|
|
session: The session to append the message to.
|
|
new_message: The new message to append.
|
|
invocation_context: The invocation context for the message.
|
|
save_input_blobs_as_artifacts: Whether to save input blobs as artifacts.
|
|
"""
|
|
if not new_message.parts:
|
|
raise ValueError('No parts in the new_message.')
|
|
|
|
if self.artifact_service and save_input_blobs_as_artifacts:
|
|
# The runner directly saves the artifacts (if applicable) in the
|
|
# user message and replaces the artifact data with a file name
|
|
# placeholder.
|
|
for i, part in enumerate(new_message.parts):
|
|
if part.inline_data is None:
|
|
continue
|
|
file_name = f'artifact_{invocation_context.invocation_id}_{i}'
|
|
await self.artifact_service.save_artifact(
|
|
app_name=self.app_name,
|
|
user_id=session.user_id,
|
|
session_id=session.id,
|
|
filename=file_name,
|
|
artifact=part,
|
|
)
|
|
new_message.parts[i] = types.Part(
|
|
text=f'Uploaded file: {file_name}. It is saved into artifacts'
|
|
)
|
|
# Appends only. We do not yield the event because it's not from the model.
|
|
event = Event(
|
|
invocation_id=invocation_context.invocation_id,
|
|
author='user',
|
|
content=new_message,
|
|
)
|
|
await self.session_service.append_event(session=session, event=event)
|
|
|
|
async def run_live(
|
|
self,
|
|
*,
|
|
user_id: Optional[str] = None,
|
|
session_id: Optional[str] = None,
|
|
live_request_queue: LiveRequestQueue,
|
|
run_config: RunConfig = RunConfig(),
|
|
session: Optional[Session] = None,
|
|
) -> AsyncGenerator[Event, None]:
|
|
"""Runs the agent in live mode (experimental feature).
|
|
|
|
Args:
|
|
user_id: The user ID for the session. Required if `session` is None.
|
|
session_id: The session ID for the session. Required if `session` is
|
|
None.
|
|
live_request_queue: The queue for live requests.
|
|
run_config: The run config for the agent.
|
|
session: The session to use. This parameter is deprecated, please use
|
|
`user_id` and `session_id` instead.
|
|
|
|
Yields:
|
|
AsyncGenerator[Event, None]: An asynchronous generator that yields
|
|
`Event`
|
|
objects as they are produced by the agent during its live execution.
|
|
|
|
.. warning::
|
|
This feature is **experimental** and its API or behavior may change
|
|
in future releases.
|
|
|
|
.. note::
|
|
Either `session` or both `user_id` and `session_id` must be provided.
|
|
"""
|
|
if session is None and (user_id is None or session_id is None):
|
|
raise ValueError(
|
|
'Either session or user_id and session_id must be provided.'
|
|
)
|
|
if session is not None:
|
|
warnings.warn(
|
|
'The `session` parameter is deprecated. Please use `user_id` and'
|
|
' `session_id` instead.',
|
|
DeprecationWarning,
|
|
stacklevel=2,
|
|
)
|
|
if not session:
|
|
session = await self.session_service.get_session(
|
|
app_name=self.app_name, user_id=user_id, session_id=session_id
|
|
)
|
|
if not session:
|
|
raise ValueError(f'Session not found: {session_id}')
|
|
invocation_context = self._new_invocation_context_for_live(
|
|
session,
|
|
live_request_queue=live_request_queue,
|
|
run_config=run_config,
|
|
)
|
|
|
|
root_agent = self.agent
|
|
invocation_context.agent = self._find_agent_to_run(session, root_agent)
|
|
|
|
invocation_context.active_streaming_tools = {}
|
|
# TODO(hangfei): switch to use canonical_tools.
|
|
# for shell agents, there is no tools associated with it so we should skip.
|
|
if hasattr(invocation_context.agent, 'tools'):
|
|
for tool in invocation_context.agent.tools:
|
|
# replicate a LiveRequestQueue for streaming tools that relis on
|
|
# LiveRequestQueue
|
|
from typing import get_type_hints
|
|
|
|
type_hints = get_type_hints(tool)
|
|
for arg_type in type_hints.values():
|
|
if arg_type is LiveRequestQueue:
|
|
if not invocation_context.active_streaming_tools:
|
|
invocation_context.active_streaming_tools = {}
|
|
active_streaming_tools = ActiveStreamingTool(
|
|
stream=LiveRequestQueue()
|
|
)
|
|
invocation_context.active_streaming_tools[tool.__name__] = (
|
|
active_streaming_tools
|
|
)
|
|
|
|
async for event in invocation_context.agent.run_live(invocation_context):
|
|
await self.session_service.append_event(session=session, event=event)
|
|
yield event
|
|
|
|
def _find_agent_to_run(
|
|
self, session: Session, root_agent: BaseAgent
|
|
) -> BaseAgent:
|
|
"""Finds the agent to run to continue the session.
|
|
|
|
A qualified agent must be either of:
|
|
- The root agent;
|
|
- An LlmAgent who replied last and is capable to transfer to any other agent
|
|
in the agent hierarchy.
|
|
|
|
Args:
|
|
session: The session to find the agent for.
|
|
root_agent: The root agent of the runner.
|
|
|
|
Returns:
|
|
The agent of the last message in the session or the root agent.
|
|
"""
|
|
for event in filter(lambda e: e.author != 'user', reversed(session.events)):
|
|
if event.author == root_agent.name:
|
|
# Found root agent.
|
|
return root_agent
|
|
if not (agent := root_agent.find_sub_agent(event.author)):
|
|
# Agent not found, continue looking.
|
|
logger.warning(
|
|
'Event from an unknown agent: %s, event id: %s',
|
|
event.author,
|
|
event.id,
|
|
)
|
|
continue
|
|
if self._is_transferable_across_agent_tree(agent):
|
|
return agent
|
|
# Falls back to root agent if no suitable agents are found in the session.
|
|
return root_agent
|
|
|
|
def _is_transferable_across_agent_tree(self, agent_to_run: BaseAgent) -> bool:
|
|
"""Whether the agent to run can transfer to any other agent in the agent tree.
|
|
|
|
This typically means all agent_to_run's parent through root agent can
|
|
transfer to their parent_agent.
|
|
|
|
Args:
|
|
agent_to_run: The agent to check for transferability.
|
|
|
|
Returns:
|
|
True if the agent can transfer, False otherwise.
|
|
"""
|
|
agent = agent_to_run
|
|
while agent:
|
|
if not isinstance(agent, LlmAgent):
|
|
# Only LLM-based Agent can provider agent transfer capability.
|
|
return False
|
|
if agent.disallow_transfer_to_parent:
|
|
return False
|
|
agent = agent.parent_agent
|
|
return True
|
|
|
|
def _new_invocation_context(
|
|
self,
|
|
session: Session,
|
|
*,
|
|
new_message: Optional[types.Content] = None,
|
|
live_request_queue: Optional[LiveRequestQueue] = None,
|
|
run_config: RunConfig = RunConfig(),
|
|
) -> InvocationContext:
|
|
"""Creates a new invocation context.
|
|
|
|
Args:
|
|
session: The session for the context.
|
|
new_message: The new message for the context.
|
|
live_request_queue: The live request queue for the context.
|
|
run_config: The run config for the context.
|
|
|
|
Returns:
|
|
The new invocation context.
|
|
"""
|
|
invocation_id = new_invocation_context_id()
|
|
|
|
if run_config.support_cfc and isinstance(self.agent, LlmAgent):
|
|
model_name = self.agent.canonical_model.model
|
|
if not model_name.startswith('gemini-2'):
|
|
raise ValueError(
|
|
f'CFC is not supported for model: {model_name} in agent:'
|
|
f' {self.agent.name}'
|
|
)
|
|
if not isinstance(self.agent.code_executor, BuiltInCodeExecutor):
|
|
self.agent.code_executor = BuiltInCodeExecutor()
|
|
|
|
return InvocationContext(
|
|
artifact_service=self.artifact_service,
|
|
session_service=self.session_service,
|
|
memory_service=self.memory_service,
|
|
invocation_id=invocation_id,
|
|
agent=self.agent,
|
|
session=session,
|
|
user_content=new_message,
|
|
live_request_queue=live_request_queue,
|
|
run_config=run_config,
|
|
)
|
|
|
|
def _new_invocation_context_for_live(
|
|
self,
|
|
session: Session,
|
|
*,
|
|
live_request_queue: Optional[LiveRequestQueue] = None,
|
|
run_config: RunConfig = RunConfig(),
|
|
) -> InvocationContext:
|
|
"""Creates a new invocation context for live multi-agent."""
|
|
|
|
# For live multi-agent, we need model's text transcription as context for
|
|
# next agent.
|
|
if self.agent.sub_agents and live_request_queue:
|
|
if not run_config.response_modalities:
|
|
# default
|
|
run_config.response_modalities = ['AUDIO']
|
|
if not run_config.output_audio_transcription:
|
|
run_config.output_audio_transcription = (
|
|
types.AudioTranscriptionConfig()
|
|
)
|
|
elif 'TEXT' not in run_config.response_modalities:
|
|
if not run_config.output_audio_transcription:
|
|
run_config.output_audio_transcription = (
|
|
types.AudioTranscriptionConfig()
|
|
)
|
|
if not run_config.input_audio_transcription:
|
|
# need this input transcription for agent transferring in live mode.
|
|
run_config.input_audio_transcription = types.AudioTranscriptionConfig()
|
|
return self._new_invocation_context(
|
|
session,
|
|
live_request_queue=live_request_queue,
|
|
run_config=run_config,
|
|
)
|
|
|
|
def _collect_toolset(self, agent: BaseAgent) -> set[BaseToolset]:
|
|
toolsets = set()
|
|
if isinstance(agent, LlmAgent):
|
|
for tool_union in agent.tools:
|
|
if isinstance(tool_union, BaseToolset):
|
|
toolsets.add(tool_union)
|
|
for sub_agent in agent.sub_agents:
|
|
toolsets.update(self._collect_toolset(sub_agent))
|
|
return toolsets
|
|
|
|
async def _cleanup_toolsets(self, toolsets_to_close: set[BaseToolset]):
|
|
"""Clean up toolsets with proper task context management."""
|
|
if not toolsets_to_close:
|
|
return
|
|
|
|
# This maintains the same task context throughout cleanup
|
|
for toolset in toolsets_to_close:
|
|
try:
|
|
logger.info('Closing toolset: %s', type(toolset).__name__)
|
|
# Use asyncio.wait_for to add timeout protection
|
|
await asyncio.wait_for(toolset.close(), timeout=10.0)
|
|
logger.info('Successfully closed toolset: %s', type(toolset).__name__)
|
|
except asyncio.TimeoutError:
|
|
logger.warning('Toolset %s cleanup timed out', type(toolset).__name__)
|
|
except Exception as e:
|
|
logger.error('Error closing toolset %s: %s', type(toolset).__name__, e)
|
|
|
|
async def close(self):
|
|
"""Closes the runner."""
|
|
await self._cleanup_toolsets(self._collect_toolset(self.agent))
|
|
|
|
|
|
class InMemoryRunner(Runner):
|
|
"""An in-memory Runner for testing and development.
|
|
|
|
This runner uses in-memory implementations for artifact, session, and memory
|
|
services, providing a lightweight and self-contained environment for agent
|
|
execution.
|
|
|
|
Attributes:
|
|
agent: The root agent to run.
|
|
app_name: The application name of the runner. Defaults to
|
|
'InMemoryRunner'.
|
|
_in_memory_session_service: Deprecated. Please don't use. The in-memory
|
|
session service for the runner.
|
|
"""
|
|
|
|
def __init__(self, agent: BaseAgent, *, app_name: str = 'InMemoryRunner'):
|
|
"""Initializes the InMemoryRunner.
|
|
|
|
Args:
|
|
agent: The root agent to run.
|
|
app_name: The application name of the runner. Defaults to
|
|
'InMemoryRunner'.
|
|
"""
|
|
self._in_memory_session_service = InMemorySessionService()
|
|
super().__init__(
|
|
app_name=app_name,
|
|
agent=agent,
|
|
artifact_service=InMemoryArtifactService(),
|
|
session_service=self._in_memory_session_service,
|
|
memory_service=InMemoryMemoryService(),
|
|
)
|