adk-python/tests/unittests/tools/openapi_tool/auth/test_auth_helper.py
Jack Sun 05142a07cc
Moves unittests to root folder and adds github action to run unit tests. (#72)
* Move unit tests to root package.

* Adds deps to "test" extra, and mark two broken tests in tests/unittests/auth/test_auth_handler.py

* Adds github workflow

* minor fix in lite_llm.py for python 3.9.

* format pyproject.toml
2025-04-11 08:25:59 -07:00

574 lines
18 KiB
Python

# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest.mock import patch
from fastapi.openapi.models import APIKey
from fastapi.openapi.models import APIKeyIn
from fastapi.openapi.models import HTTPBase
from fastapi.openapi.models import HTTPBearer
from fastapi.openapi.models import OAuth2
from fastapi.openapi.models import OpenIdConnect
from google.adk.auth.auth_credential import AuthCredential
from google.adk.auth.auth_credential import AuthCredentialTypes
from google.adk.auth.auth_credential import HttpAuth
from google.adk.auth.auth_credential import HttpCredentials
from google.adk.auth.auth_credential import ServiceAccount
from google.adk.auth.auth_credential import ServiceAccountCredential
from google.adk.auth.auth_schemes import AuthSchemeType
from google.adk.auth.auth_schemes import OpenIdConnectWithConfig
from google.adk.tools.openapi_tool.auth.auth_helpers import credential_to_param
from google.adk.tools.openapi_tool.auth.auth_helpers import dict_to_auth_scheme
from google.adk.tools.openapi_tool.auth.auth_helpers import INTERNAL_AUTH_PREFIX
from google.adk.tools.openapi_tool.auth.auth_helpers import openid_dict_to_scheme_credential
from google.adk.tools.openapi_tool.auth.auth_helpers import openid_url_to_scheme_credential
from google.adk.tools.openapi_tool.auth.auth_helpers import service_account_dict_to_scheme_credential
from google.adk.tools.openapi_tool.auth.auth_helpers import service_account_scheme_credential
from google.adk.tools.openapi_tool.auth.auth_helpers import token_to_scheme_credential
import pytest
import requests
def test_token_to_scheme_credential_api_key_header():
scheme, credential = token_to_scheme_credential(
"apikey", "header", "X-API-Key", "test_key"
)
assert isinstance(scheme, APIKey)
assert scheme.type_ == AuthSchemeType.apiKey
assert scheme.in_ == APIKeyIn.header
assert scheme.name == "X-API-Key"
assert credential == AuthCredential(
auth_type=AuthCredentialTypes.API_KEY, api_key="test_key"
)
def test_token_to_scheme_credential_api_key_query():
scheme, credential = token_to_scheme_credential(
"apikey", "query", "api_key", "test_key"
)
assert isinstance(scheme, APIKey)
assert scheme.type_ == AuthSchemeType.apiKey
assert scheme.in_ == APIKeyIn.query
assert scheme.name == "api_key"
assert credential == AuthCredential(
auth_type=AuthCredentialTypes.API_KEY, api_key="test_key"
)
def test_token_to_scheme_credential_api_key_cookie():
scheme, credential = token_to_scheme_credential(
"apikey", "cookie", "session_id", "test_key"
)
assert isinstance(scheme, APIKey)
assert scheme.type_ == AuthSchemeType.apiKey
assert scheme.in_ == APIKeyIn.cookie
assert scheme.name == "session_id"
assert credential == AuthCredential(
auth_type=AuthCredentialTypes.API_KEY, api_key="test_key"
)
def test_token_to_scheme_credential_api_key_no_credential():
scheme, credential = token_to_scheme_credential(
"apikey", "cookie", "session_id"
)
assert isinstance(scheme, APIKey)
assert credential is None
def test_token_to_scheme_credential_oauth2_token():
scheme, credential = token_to_scheme_credential(
"oauth2Token", "header", "Authorization", "test_token"
)
assert isinstance(scheme, HTTPBearer)
assert scheme.bearerFormat == "JWT"
assert credential == AuthCredential(
auth_type=AuthCredentialTypes.HTTP,
http=HttpAuth(
scheme="bearer", credentials=HttpCredentials(token="test_token")
),
)
def test_token_to_scheme_credential_oauth2_no_credential():
scheme, credential = token_to_scheme_credential(
"oauth2Token", "header", "Authorization"
)
assert isinstance(scheme, HTTPBearer)
assert credential is None
def test_service_account_dict_to_scheme_credential():
config = {
"type": "service_account",
"project_id": "project_id",
"private_key_id": "private_key_id",
"private_key": "private_key",
"client_email": "client_email",
"client_id": "client_id",
"auth_uri": "auth_uri",
"token_uri": "token_uri",
"auth_provider_x509_cert_url": "auth_provider_x509_cert_url",
"client_x509_cert_url": "client_x509_cert_url",
"universe_domain": "universe_domain",
}
scopes = ["scope1", "scope2"]
scheme, credential = service_account_dict_to_scheme_credential(config, scopes)
assert isinstance(scheme, HTTPBearer)
assert scheme.bearerFormat == "JWT"
assert credential.auth_type == AuthCredentialTypes.SERVICE_ACCOUNT
assert credential.service_account.scopes == scopes
assert (
credential.service_account.service_account_credential.project_id
== "project_id"
)
def test_service_account_scheme_credential():
config = ServiceAccount(
service_account_credential=ServiceAccountCredential(
type="service_account",
project_id="project_id",
private_key_id="private_key_id",
private_key="private_key",
client_email="client_email",
client_id="client_id",
auth_uri="auth_uri",
token_uri="token_uri",
auth_provider_x509_cert_url="auth_provider_x509_cert_url",
client_x509_cert_url="client_x509_cert_url",
universe_domain="universe_domain",
),
scopes=["scope1", "scope2"],
)
scheme, credential = service_account_scheme_credential(config)
assert isinstance(scheme, HTTPBearer)
assert scheme.bearerFormat == "JWT"
assert credential.auth_type == AuthCredentialTypes.SERVICE_ACCOUNT
assert credential.service_account == config
def test_openid_dict_to_scheme_credential():
config_dict = {
"authorization_endpoint": "auth_url",
"token_endpoint": "token_url",
"openIdConnectUrl": "openid_url",
}
credential_dict = {
"client_id": "client_id",
"client_secret": "client_secret",
"redirect_uri": "redirect_uri",
}
scopes = ["scope1", "scope2"]
scheme, credential = openid_dict_to_scheme_credential(
config_dict, scopes, credential_dict
)
assert isinstance(scheme, OpenIdConnectWithConfig)
assert scheme.authorization_endpoint == "auth_url"
assert scheme.token_endpoint == "token_url"
assert scheme.scopes == scopes
assert credential.auth_type == AuthCredentialTypes.OPEN_ID_CONNECT
assert credential.oauth2.client_id == "client_id"
assert credential.oauth2.client_secret == "client_secret"
assert credential.oauth2.redirect_uri == "redirect_uri"
def test_openid_dict_to_scheme_credential_no_openid_url():
config_dict = {
"authorization_endpoint": "auth_url",
"token_endpoint": "token_url",
}
credential_dict = {
"client_id": "client_id",
"client_secret": "client_secret",
"redirect_uri": "redirect_uri",
}
scopes = ["scope1", "scope2"]
scheme, credential = openid_dict_to_scheme_credential(
config_dict, scopes, credential_dict
)
assert scheme.openIdConnectUrl == ""
def test_openid_dict_to_scheme_credential_google_oauth_credential():
config_dict = {
"authorization_endpoint": "auth_url",
"token_endpoint": "token_url",
"openIdConnectUrl": "openid_url",
}
credential_dict = {
"web": {
"client_id": "client_id",
"client_secret": "client_secret",
"redirect_uri": "redirect_uri",
}
}
scopes = ["scope1", "scope2"]
scheme, credential = openid_dict_to_scheme_credential(
config_dict, scopes, credential_dict
)
assert isinstance(scheme, OpenIdConnectWithConfig)
assert credential.auth_type == AuthCredentialTypes.OPEN_ID_CONNECT
assert credential.oauth2.client_id == "client_id"
assert credential.oauth2.client_secret == "client_secret"
assert credential.oauth2.redirect_uri == "redirect_uri"
def test_openid_dict_to_scheme_credential_invalid_config():
config_dict = {
"invalid_field": "value",
}
credential_dict = {
"client_id": "client_id",
"client_secret": "client_secret",
}
scopes = ["scope1", "scope2"]
with pytest.raises(ValueError, match="Invalid OpenID Connect configuration"):
openid_dict_to_scheme_credential(config_dict, scopes, credential_dict)
def test_openid_dict_to_scheme_credential_missing_credential_fields():
config_dict = {
"authorization_endpoint": "auth_url",
"token_endpoint": "token_url",
}
credential_dict = {
"client_id": "client_id",
}
scopes = ["scope1", "scope2"]
with pytest.raises(
ValueError,
match="Missing required fields in credential_dict: client_secret",
):
openid_dict_to_scheme_credential(config_dict, scopes, credential_dict)
@patch("requests.get")
def test_openid_url_to_scheme_credential(mock_get):
mock_response = {
"authorization_endpoint": "auth_url",
"token_endpoint": "token_url",
"userinfo_endpoint": "userinfo_url",
}
mock_get.return_value.json.return_value = mock_response
mock_get.return_value.raise_for_status.return_value = None
credential_dict = {
"client_id": "client_id",
"client_secret": "client_secret",
"redirect_uri": "redirect_uri",
}
scopes = ["scope1", "scope2"]
scheme, credential = openid_url_to_scheme_credential(
"openid_url", scopes, credential_dict
)
assert isinstance(scheme, OpenIdConnectWithConfig)
assert scheme.authorization_endpoint == "auth_url"
assert scheme.token_endpoint == "token_url"
assert scheme.scopes == scopes
assert credential.auth_type == AuthCredentialTypes.OPEN_ID_CONNECT
assert credential.oauth2.client_id == "client_id"
assert credential.oauth2.client_secret == "client_secret"
assert credential.oauth2.redirect_uri == "redirect_uri"
mock_get.assert_called_once_with("openid_url", timeout=10)
@patch("requests.get")
def test_openid_url_to_scheme_credential_no_openid_url(mock_get):
mock_response = {
"authorization_endpoint": "auth_url",
"token_endpoint": "token_url",
"userinfo_endpoint": "userinfo_url",
}
mock_get.return_value.json.return_value = mock_response
mock_get.return_value.raise_for_status.return_value = None
credential_dict = {
"client_id": "client_id",
"client_secret": "client_secret",
"redirect_uri": "redirect_uri",
}
scopes = ["scope1", "scope2"]
scheme, credential = openid_url_to_scheme_credential(
"openid_url", scopes, credential_dict
)
assert scheme.openIdConnectUrl == "openid_url"
@patch("requests.get")
def test_openid_url_to_scheme_credential_request_exception(mock_get):
mock_get.side_effect = requests.exceptions.RequestException("Test Error")
credential_dict = {"client_id": "client_id", "client_secret": "client_secret"}
with pytest.raises(
ValueError, match="Failed to fetch OpenID configuration from openid_url"
):
openid_url_to_scheme_credential("openid_url", [], credential_dict)
@patch("requests.get")
def test_openid_url_to_scheme_credential_invalid_json(mock_get):
mock_get.return_value.json.side_effect = ValueError("Invalid JSON")
mock_get.return_value.raise_for_status.return_value = None
credential_dict = {"client_id": "client_id", "client_secret": "client_secret"}
with pytest.raises(
ValueError,
match=(
"Invalid JSON response from OpenID configuration endpoint openid_url"
),
):
openid_url_to_scheme_credential("openid_url", [], credential_dict)
def test_credential_to_param_api_key_header():
auth_scheme = APIKey(
**{"type": "apiKey", "in": "header", "name": "X-API-Key"}
)
auth_credential = AuthCredential(
auth_type=AuthCredentialTypes.API_KEY, api_key="test_key"
)
param, kwargs = credential_to_param(auth_scheme, auth_credential)
assert param.original_name == "X-API-Key"
assert param.param_location == "header"
assert kwargs == {INTERNAL_AUTH_PREFIX + "X-API-Key": "test_key"}
def test_credential_to_param_api_key_query():
auth_scheme = APIKey(**{"type": "apiKey", "in": "query", "name": "api_key"})
auth_credential = AuthCredential(
auth_type=AuthCredentialTypes.API_KEY, api_key="test_key"
)
param, kwargs = credential_to_param(auth_scheme, auth_credential)
assert param.original_name == "api_key"
assert param.param_location == "query"
assert kwargs == {INTERNAL_AUTH_PREFIX + "api_key": "test_key"}
def test_credential_to_param_api_key_cookie():
auth_scheme = APIKey(
**{"type": "apiKey", "in": "cookie", "name": "session_id"}
)
auth_credential = AuthCredential(
auth_type=AuthCredentialTypes.API_KEY, api_key="test_key"
)
param, kwargs = credential_to_param(auth_scheme, auth_credential)
assert param.original_name == "session_id"
assert param.param_location == "cookie"
assert kwargs == {INTERNAL_AUTH_PREFIX + "session_id": "test_key"}
def test_credential_to_param_http_bearer():
auth_scheme = HTTPBearer(bearerFormat="JWT")
auth_credential = AuthCredential(
auth_type=AuthCredentialTypes.HTTP,
http=HttpAuth(
scheme="bearer", credentials=HttpCredentials(token="test_token")
),
)
param, kwargs = credential_to_param(auth_scheme, auth_credential)
assert param.original_name == "Authorization"
assert param.param_location == "header"
assert kwargs == {INTERNAL_AUTH_PREFIX + "Authorization": "Bearer test_token"}
def test_credential_to_param_http_basic_not_supported():
auth_scheme = HTTPBase(scheme="basic")
auth_credential = AuthCredential(
auth_type=AuthCredentialTypes.HTTP,
http=HttpAuth(
scheme="basic",
credentials=HttpCredentials(username="user", password="password"),
),
)
with pytest.raises(
NotImplementedError, match="Basic Authentication is not supported."
):
credential_to_param(auth_scheme, auth_credential)
def test_credential_to_param_http_invalid_credentials_no_http():
auth_scheme = HTTPBase(scheme="basic")
auth_credential = AuthCredential(auth_type=AuthCredentialTypes.HTTP)
with pytest.raises(ValueError, match="Invalid HTTP auth credentials"):
credential_to_param(auth_scheme, auth_credential)
def test_credential_to_param_oauth2():
auth_scheme = OAuth2(flows={})
auth_credential = AuthCredential(
auth_type=AuthCredentialTypes.HTTP,
http=HttpAuth(
scheme="bearer", credentials=HttpCredentials(token="test_token")
),
)
param, kwargs = credential_to_param(auth_scheme, auth_credential)
assert param.original_name == "Authorization"
assert param.param_location == "header"
assert kwargs == {INTERNAL_AUTH_PREFIX + "Authorization": "Bearer test_token"}
def test_credential_to_param_openid_connect():
auth_scheme = OpenIdConnect(openIdConnectUrl="openid_url")
auth_credential = AuthCredential(
auth_type=AuthCredentialTypes.HTTP,
http=HttpAuth(
scheme="bearer", credentials=HttpCredentials(token="test_token")
),
)
param, kwargs = credential_to_param(auth_scheme, auth_credential)
assert param.original_name == "Authorization"
assert param.param_location == "header"
assert kwargs == {INTERNAL_AUTH_PREFIX + "Authorization": "Bearer test_token"}
def test_credential_to_param_openid_no_credential():
auth_scheme = OpenIdConnect(openIdConnectUrl="openid_url")
param, kwargs = credential_to_param(auth_scheme, None)
assert param == None
assert kwargs == None
def test_credential_to_param_oauth2_no_credential():
auth_scheme = OAuth2(flows={})
param, kwargs = credential_to_param(auth_scheme, None)
assert param == None
assert kwargs == None
def test_dict_to_auth_scheme_api_key():
data = {"type": "apiKey", "in": "header", "name": "X-API-Key"}
scheme = dict_to_auth_scheme(data)
assert isinstance(scheme, APIKey)
assert scheme.type_ == AuthSchemeType.apiKey
assert scheme.in_ == APIKeyIn.header
assert scheme.name == "X-API-Key"
def test_dict_to_auth_scheme_http_bearer():
data = {"type": "http", "scheme": "bearer", "bearerFormat": "JWT"}
scheme = dict_to_auth_scheme(data)
assert isinstance(scheme, HTTPBearer)
assert scheme.scheme == "bearer"
assert scheme.bearerFormat == "JWT"
def test_dict_to_auth_scheme_http_base():
data = {"type": "http", "scheme": "basic"}
scheme = dict_to_auth_scheme(data)
assert isinstance(scheme, HTTPBase)
assert scheme.scheme == "basic"
def test_dict_to_auth_scheme_oauth2():
data = {
"type": "oauth2",
"flows": {
"authorizationCode": {
"authorizationUrl": "https://example.com/auth",
"tokenUrl": "https://example.com/token",
}
},
}
scheme = dict_to_auth_scheme(data)
assert isinstance(scheme, OAuth2)
assert hasattr(scheme.flows, "authorizationCode")
def test_dict_to_auth_scheme_openid_connect():
data = {
"type": "openIdConnect",
"openIdConnectUrl": (
"https://example.com/.well-known/openid-configuration"
),
}
scheme = dict_to_auth_scheme(data)
assert isinstance(scheme, OpenIdConnect)
assert (
scheme.openIdConnectUrl
== "https://example.com/.well-known/openid-configuration"
)
def test_dict_to_auth_scheme_missing_type():
data = {"in": "header", "name": "X-API-Key"}
with pytest.raises(
ValueError, match="Missing 'type' field in security scheme dictionary."
):
dict_to_auth_scheme(data)
def test_dict_to_auth_scheme_invalid_type():
data = {"type": "invalid", "in": "header", "name": "X-API-Key"}
with pytest.raises(ValueError, match="Invalid security scheme type: invalid"):
dict_to_auth_scheme(data)
def test_dict_to_auth_scheme_invalid_data():
data = {"type": "apiKey", "in": "header"} # Missing 'name'
with pytest.raises(ValueError, match="Invalid security scheme data"):
dict_to_auth_scheme(data)
if __name__ == "__main__":
pytest.main([__file__])