Update AgentEvaluator to new new EvalSchema

PiperOrigin-RevId: 759293759
This commit is contained in:
Ankur Sharma 2025-05-15 14:08:39 -07:00 committed by Copybara-Service
parent bdd678db31
commit 4c6820e78c
9 changed files with 153 additions and 149 deletions

View File

@ -17,10 +17,8 @@ import json
import logging
import os
import sys
import traceback
from typing import Any
from typing import AsyncGenerator
from typing import cast
from typing import Optional
import uuid
@ -350,7 +348,7 @@ def _get_evaluator(eval_metric: EvalMetric) -> Evaluator:
return TrajectoryEvaluator(threshold=eval_metric.threshold)
elif (
eval_metric.metric_name == RESPONSE_MATCH_SCORE_KEY
or eval_metric == RESPONSE_EVALUATION_SCORE_KEY
or eval_metric.metric_name == RESPONSE_EVALUATION_SCORE_KEY
):
return ResponseEvaluator(
threshold=eval_metric.threshold, metric_name=eval_metric.metric_name

View File

@ -18,8 +18,13 @@ from os import path
from typing import Dict
from typing import List
from typing import Union
import uuid
from .eval_set import EvalSet
from .evaluation_generator import EvaluationGenerator
from .evaluator import EvalStatus
from .evaluator import EvaluationResult
from .evaluator import Evaluator
from .local_eval_sets_manager import convert_eval_set_to_pydanctic_schema
from .response_evaluator import ResponseEvaluator
from .trajectory_evaluator import TrajectoryEvaluator
@ -75,6 +80,62 @@ class AgentEvaluator:
)
return DEFAULT_CRITERIA
@staticmethod
async def evaluate_eval_set(
agent_module: str,
eval_set: EvalSet,
criteria: dict[str, float],
num_runs=NUM_RUNS,
agent_name=None,
):
"""Evaluates an agent using the given EvalSet.
Args:
agent_module: The path to python module that contains the definition of
the agent. There is convention in place here, where the code is going to
look for 'root_agent' in the loaded module.
eval_set: The eval set.
criteria: Evauation criterias, a dictionary of metric names to their
respective thresholds.
num_runs: Number of times all entries in the eval dataset should be
assessed.
agent_name: The name of the agent.
"""
eval_case_responses_list = await EvaluationGenerator.generate_responses(
eval_set=eval_set,
agent_module_path=agent_module,
repeat_num=num_runs,
agent_name=agent_name,
)
for eval_case_responses in eval_case_responses_list:
actual_invocations = [
invocation
for invocations in eval_case_responses.responses
for invocation in invocations
]
expected_invocations = (
eval_case_responses.eval_case.conversation * num_runs
)
for metric_name, threshold in criteria.items():
metric_evaluator = AgentEvaluator._get_metric_evaluator(
metric_name=metric_name, threshold=threshold
)
evaluation_result: EvaluationResult = (
metric_evaluator.evaluate_invocations(
actual_invocations=actual_invocations,
expected_invocations=expected_invocations,
)
)
assert evaluation_result.overall_eval_status == EvalStatus.PASSED, (
f"`{eval_case_responses.eval_case.eval_id}`: "
f"{metric_name} for {agent_module} Failed. Expected {threshold},"
f" but got {evaluation_result.overall_score}."
)
@staticmethod
async def evaluate(
agent_module,
@ -109,33 +170,31 @@ class AgentEvaluator:
else:
test_files = [eval_dataset_file_path_or_dir]
initial_session_state = {}
initial_session = {}
if initial_session_file:
with open(initial_session_file, "r") as f:
initial_session_state = json.loads(f.read())["state"]
initial_session = json.loads(f.read())
for test_file in test_files:
dataset = AgentEvaluator._load_dataset(test_file)[0]
data = AgentEvaluator._load_dataset(test_file)[0]
criteria = AgentEvaluator.find_config_for_test_file(test_file)
AgentEvaluator._validate_input([data], criteria)
AgentEvaluator._validate_input([dataset], criteria)
eval_data = {
"name": test_file,
"data": data,
"initial_session": initial_session,
}
evaluation_response = await AgentEvaluator._generate_responses(
agent_module,
[dataset],
num_runs,
eval_set = convert_eval_set_to_pydanctic_schema(
eval_set_id=str(uuid.uuid4()), eval_set_in_json_format=[eval_data]
)
await AgentEvaluator.evaluate_eval_set(
agent_module=agent_module,
eval_set=eval_set,
criteria=criteria,
num_runs=num_runs,
agent_name=agent_name,
initial_session={"state": initial_session_state},
)
if AgentEvaluator._response_evaluation_required(criteria, [dataset]):
AgentEvaluator._evaluate_response_scores(
agent_module, evaluation_response, criteria
)
if AgentEvaluator._trajectory_evaluation_required(criteria, [dataset]):
AgentEvaluator._evaluate_tool_trajectory(
agent_module, evaluation_response, criteria
)
@staticmethod
@ -221,102 +280,13 @@ class AgentEvaluator:
)
@staticmethod
def _get_infer_criteria(eval_dataset):
"""Infers evaluation criteria based on the provided dataset.
Args:
eval_dataset (list): A list of evaluation samples.
Returns:
dict: Inferred evaluation criteria based on dataset fields.
"""
inferred_criteria = {}
sample = eval_dataset[0][0]
if QUERY_COLUMN in sample and EXPECTED_TOOL_USE_COLUMN in sample:
inferred_criteria[TOOL_TRAJECTORY_SCORE_KEY] = DEFAULT_CRITERIA[
TOOL_TRAJECTORY_SCORE_KEY
]
if QUERY_COLUMN in sample and REFERENCE_COLUMN in sample:
inferred_criteria[RESPONSE_MATCH_SCORE_KEY] = DEFAULT_CRITERIA[
RESPONSE_MATCH_SCORE_KEY
]
return inferred_criteria
@staticmethod
async def _generate_responses(
agent_module, eval_dataset, num_runs, agent_name=None, initial_session={}
def _get_metric_evaluator(metric_name: str, threshold: float) -> Evaluator:
if metric_name == TOOL_TRAJECTORY_SCORE_KEY:
return TrajectoryEvaluator(threshold=threshold)
elif (
metric_name == RESPONSE_MATCH_SCORE_KEY
or metric_name == RESPONSE_EVALUATION_SCORE_KEY
):
"""Generates evaluation responses by running the agent module multiple times."""
return EvaluationGenerator.generate_responses(
eval_dataset,
agent_module,
repeat_num=num_runs,
agent_name=agent_name,
initial_session=initial_session,
)
return ResponseEvaluator(threshold=threshold, metric_name=metric_name)
@staticmethod
def _response_evaluation_required(criteria, eval_dataset):
"""Checks if response evaluation are needed."""
return REFERENCE_COLUMN in eval_dataset[0][0] and any(
key in criteria
for key in [RESPONSE_EVALUATION_SCORE_KEY, RESPONSE_MATCH_SCORE_KEY]
)
@staticmethod
def _trajectory_evaluation_required(evaluation_criteria, eval_dataset):
"""Checks if response evaluation are needed."""
return (
EXPECTED_TOOL_USE_COLUMN in eval_dataset[0][0]
and TOOL_TRAJECTORY_SCORE_KEY in evaluation_criteria
)
@staticmethod
def _evaluate_response_scores(agent_module, evaluation_response, criteria):
"""Evaluates response scores and raises an assertion error if they don't meet the criteria."""
metrics = ResponseEvaluator.evaluate(
evaluation_response, criteria, print_detailed_results=True
)
AgentEvaluator._assert_score(
metrics,
"coherence/mean",
criteria.get(RESPONSE_EVALUATION_SCORE_KEY),
"Average response evaluation score",
agent_module,
)
AgentEvaluator._assert_score(
metrics,
"rouge_1/mean",
criteria.get(RESPONSE_MATCH_SCORE_KEY),
"Average response match score",
agent_module,
)
@staticmethod
def _evaluate_tool_trajectory(agent_module, evaluation_response, criteria):
"""Evaluates tool trajectory scores and raises an assertion error if they don't meet the criteria."""
score = TrajectoryEvaluator.evaluate(
evaluation_response, print_detailed_results=True
)
AgentEvaluator._assert_score(
{TOOL_TRAJECTORY_SCORE_KEY: score},
TOOL_TRAJECTORY_SCORE_KEY,
criteria[TOOL_TRAJECTORY_SCORE_KEY],
"Average tool trajectory evaluation score",
agent_module,
)
@staticmethod
def _assert_score(metrics, metric_key, threshold, description, agent_module):
"""Asserts that a metric meets the specified threshold."""
if metric_key in metrics:
actual_score = metrics[metric_key]
assert actual_score >= threshold, (
f"{description} for {agent_module} is lower than expected. "
f"Expected >= {threshold}, but got {actual_score}."
)
raise ValueError(f"Unsupported eval metric: {metric_name}")

View File

@ -13,9 +13,12 @@
# limitations under the License.
import importlib
from typing import Any, Optional
from typing import Any
from typing import Optional
import uuid
from pydantic import BaseModel
from ..agents.llm_agent import Agent
from ..artifacts.base_artifact_service import BaseArtifactService
from ..artifacts.in_memory_artifact_service import InMemoryArtifactService
@ -23,9 +26,21 @@ from ..runners import Runner
from ..sessions.base_session_service import BaseSessionService
from ..sessions.in_memory_session_service import InMemorySessionService
from ..sessions.session import Session
from .eval_case import EvalCase
from .eval_case import IntermediateData
from .eval_case import Invocation
from .eval_case import SessionInput
from .eval_set import EvalSet
class EvalCaseResponses(BaseModel):
"""Contains multiple responses associated with an EvalCase.
Multiple responses are a result of repeated requests to genereate inferences.
"""
eval_case: EvalCase
responses: list[list[Invocation]]
class EvaluationGenerator:
@ -33,12 +48,11 @@ class EvaluationGenerator:
@staticmethod
async def generate_responses(
eval_dataset,
agent_module_path,
repeat_num=3,
agent_name=None,
initial_session={},
):
eval_set: EvalSet,
agent_module_path: str,
repeat_num: int = 3,
agent_name: str = None,
) -> list[EvalCaseResponses]:
"""Returns evaluation responses for the given dataset and agent.
Args:
@ -48,16 +62,22 @@ class EvaluationGenerator:
usually done to remove uncertainty that a single run may bring.
agent_name: The name of the agent that should be evaluated. This is
usually the sub-agent.
initial_session: Initial session for the eval data.
"""
results = []
for eval_case in eval_set.eval_cases:
responses = []
for _ in range(repeat_num):
for data in eval_dataset:
results.append(
EvaluationGenerator._process_query(
data, agent_module_path, agent_name, initial_session
response_invocations = await EvaluationGenerator._process_query(
eval_case.conversation,
agent_module_path,
agent_name,
eval_case.session_input,
)
responses.append(response_invocations)
results.append(
EvalCaseResponses(eval_case=eval_case, responses=responses)
)
return results
@ -89,7 +109,12 @@ class EvaluationGenerator:
return results
@staticmethod
def _process_query(data, module_name, agent_name=None, initial_session={}):
async def _process_query(
invocations: list[Invocation],
module_name: str,
agent_name: Optional[str] = None,
initial_session: Optional[SessionInput] = None,
) -> list[Invocation]:
"""Process a query using the agent and evaluation dataset."""
module_path = f"{module_name}"
agent_module = importlib.import_module(module_path)
@ -102,8 +127,8 @@ class EvaluationGenerator:
agent_to_evaluate = root_agent.find_agent(agent_name)
assert agent_to_evaluate, f"Sub-Agent `{agent_name}` not found."
return EvaluationGenerator._generate_inferences_from_root_agent(
data, agent_to_evaluate, reset_func, initial_session
return await EvaluationGenerator._generate_inferences_from_root_agent(
invocations, agent_to_evaluate, reset_func, initial_session
)
@staticmethod
@ -216,3 +241,5 @@ class EvaluationGenerator:
responses[index]["actual_tool_use"] = actual_tool_uses
responses[index]["response"] = response
return responses
return responses
return responses

View File

@ -43,16 +43,16 @@ def _convert_invocation_to_pydantic_schema(
expected_tool_use = []
expected_intermediate_agent_responses = []
for old_tool_use in invocation_in_json_format["expected_tool_use"]:
for old_tool_use in invocation_in_json_format.get("expected_tool_use", []):
expected_tool_use.append(
genai_types.FunctionCall(
name=old_tool_use["tool_name"], args=old_tool_use["tool_input"]
)
)
for old_intermediate_response in invocation_in_json_format[
"expected_intermediate_agent_responses"
]:
for old_intermediate_response in invocation_in_json_format.get(
"expected_intermediate_agent_responses", []
):
expected_intermediate_agent_responses.append((
old_intermediate_response["author"],
[genai_types.Part.from_text(text=old_intermediate_response["text"])],
@ -134,14 +134,18 @@ def convert_eval_set_to_pydanctic_schema(
_convert_invocation_to_pydantic_schema(old_invocation)
)
session_input = None
if "initial_session" in old_eval_case:
session_input = SessionInput(
app_name=old_eval_case["initial_session"].get("app_name", ""),
user_id=old_eval_case["initial_session"].get("user_id", ""),
state=old_eval_case["initial_session"].get("state", {}),
)
new_eval_case = EvalCase(
eval_id=old_eval_case["name"],
conversation=new_invocations,
session_input=SessionInput(
app_name=old_eval_case["initial_session"]["app_name"],
user_id=old_eval_case["initial_session"]["user_id"],
state=old_eval_case["initial_session"]["state"],
),
session_input=session_input,
creation_timestamp=time.time(),
)
eval_cases.append(new_eval_case)

View File

@ -13,6 +13,7 @@
# limitations under the License.
from google.adk.evaluation import AgentEvaluator
import pytest
@pytest.mark.asyncio

View File

@ -13,6 +13,7 @@
# limitations under the License.
from google.adk.evaluation import AgentEvaluator
import pytest
@pytest.mark.asyncio

View File

@ -13,11 +13,12 @@
# limitations under the License.
from google.adk.evaluation import AgentEvaluator
import pytest
@pytest.mark.asyncio
async def test_eval_agent():
AgentEvaluator.evaluate(
await AgentEvaluator.evaluate(
agent_module="tests.integration.fixture.home_automation_agent",
eval_dataset_file_path_or_dir="tests/integration/fixture/home_automation_agent/simple_test.test.json",
num_runs=4,

View File

@ -13,6 +13,7 @@
# limitations under the License.
from google.adk.evaluation import AgentEvaluator
import pytest
@pytest.mark.asyncio

View File

@ -13,6 +13,7 @@
# limitations under the License.
from google.adk.evaluation import AgentEvaluator
import pytest
@pytest.mark.asyncio