feat:Make VertexAiSessionService true async.

PiperOrigin-RevId: 762547133
This commit is contained in:
Shangjie Chen
2025-05-23 13:30:57 -07:00
committed by Copybara-Service
parent 79681e3513
commit d212e50c10
2 changed files with 47 additions and 17 deletions

View File

@@ -11,9 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import logging
import re
import time
from typing import Any
from typing import Optional
@@ -69,7 +69,8 @@ class VertexAiSessionService(BaseSessionService):
if state:
session_json_dict['session_state'] = state
api_response = self.api_client.request(
api_client = _get_api_client(self.project, self.location)
api_response = await api_client.async_request(
http_method='POST',
path=f'reasoningEngines/{reasoning_engine_id}/sessions',
request_dict=session_json_dict,
@@ -81,7 +82,7 @@ class VertexAiSessionService(BaseSessionService):
max_retry_attempt = 5
while max_retry_attempt >= 0:
lro_response = self.api_client.request(
lro_response = await api_client.async_request(
http_method='GET',
path=f'operations/{operation_id}',
request_dict={},
@@ -90,11 +91,11 @@ class VertexAiSessionService(BaseSessionService):
if lro_response.get('done', None):
break
time.sleep(1)
await asyncio.sleep(1)
max_retry_attempt -= 1
# Get session resource
get_session_api_response = self.api_client.request(
get_session_api_response = await api_client.async_request(
http_method='GET',
path=f'reasoningEngines/{reasoning_engine_id}/sessions/{session_id}',
request_dict={},
@@ -124,7 +125,8 @@ class VertexAiSessionService(BaseSessionService):
reasoning_engine_id = _parse_reasoning_engine_id(app_name)
# Get session resource
get_session_api_response = self.api_client.request(
api_client = _get_api_client(self.project, self.location)
get_session_api_response = await api_client.async_request(
http_method='GET',
path=f'reasoningEngines/{reasoning_engine_id}/sessions/{session_id}',
request_dict={},
@@ -142,7 +144,7 @@ class VertexAiSessionService(BaseSessionService):
last_update_time=update_timestamp,
)
list_events_api_response = self.api_client.request(
list_events_api_response = await api_client.async_request(
http_method='GET',
path=f'reasoningEngines/{reasoning_engine_id}/sessions/{session_id}/events',
request_dict={},
@@ -181,7 +183,8 @@ class VertexAiSessionService(BaseSessionService):
) -> ListSessionsResponse:
reasoning_engine_id = _parse_reasoning_engine_id(app_name)
api_response = self.api_client.request(
api_client = _get_api_client(self.project, self.location)
api_response = await api_client.async_request(
http_method='GET',
path=f'reasoningEngines/{reasoning_engine_id}/sessions?filter=user_id={user_id}',
request_dict={},
@@ -207,7 +210,8 @@ class VertexAiSessionService(BaseSessionService):
self, *, app_name: str, user_id: str, session_id: str
) -> None:
reasoning_engine_id = _parse_reasoning_engine_id(app_name)
self.api_client.request(
api_client = _get_api_client(self.project, self.location)
await api_client.async_request(
http_method='DELETE',
path=f'reasoningEngines/{reasoning_engine_id}/sessions/{session_id}',
request_dict={},
@@ -219,15 +223,25 @@ class VertexAiSessionService(BaseSessionService):
await super().append_event(session=session, event=event)
reasoning_engine_id = _parse_reasoning_engine_id(session.app_name)
self.api_client.request(
api_client = _get_api_client(self.project, self.location)
await api_client.async_request(
http_method='POST',
path=f'reasoningEngines/{reasoning_engine_id}/sessions/{session.id}:appendEvent',
request_dict=_convert_event_to_json(event),
)
return event
def _get_api_client(project: str, location: str):
"""Instantiates an API client for the given project and location.
It needs to be instantiated inside each request so that the event loop
management.
"""
client = genai.Client(vertexai=True, project=project, location=location)
return client._api_client
def _convert_event_to_json(event: Event):
metadata_json = {
'partial': event.partial,