Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VertexAI handle streaming requests #3331

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ def _methods_to_wrap(
client_class.generate_content.__name__, # pyright: ignore[reportUnknownMemberType]
method_wrappers.generate_content,
)
yield (
client_class,
client_class.stream_generate_content.__name__, # pyright: ignore[reportUnknownMemberType]
method_wrappers.stream_generate_content,
)

for client_class in (
async_client.PredictionServiceAsyncClient,
Expand All @@ -92,6 +97,11 @@ def _methods_to_wrap(
client_class.generate_content.__name__, # pyright: ignore[reportUnknownMemberType]
method_wrappers.agenerate_content,
)
yield (
client_class,
client_class.stream_generate_content.__name__, # pyright: ignore[reportUnknownMemberType]
method_wrappers.astream_generate_content,
)


class VertexAIInstrumentor(BaseInstrumentor):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,11 @@ def choice_event(
https://github.com/open-telemetry/semantic-conventions/blob/v1.28.0/docs/gen-ai/gen-ai-events.md#event-gen_aichoice
"""
body: dict[str, AnyValue] = {
"finish_reason": finish_reason,
"index": index,
"message": _asdict_filter_nulls(message),
}
if finish_reason:
body["finish_reason"] = finish_reason

tool_calls_list = [
_asdict_filter_nulls(tool_call) for tool_call in tool_calls
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
from typing import (
TYPE_CHECKING,
Any,
AsyncIterable,
Awaitable,
Callable,
Iterable,
MutableSequence,
)

Expand All @@ -36,11 +38,17 @@
from opentelemetry.trace import SpanKind, Tracer

if TYPE_CHECKING:
from google.cloud.aiplatform_v1.services.prediction_service import client
from google.cloud.aiplatform_v1.services.prediction_service import (
async_client,
client,
)
from google.cloud.aiplatform_v1.types import (
content,
prediction_service,
)
from google.cloud.aiplatform_v1beta1.services.prediction_service import (
async_client as async_client_v1beta1,
)
from google.cloud.aiplatform_v1beta1.services.prediction_service import (
client as client_v1beta1,
)
Expand Down Expand Up @@ -101,7 +109,9 @@ def __init__(
def _with_instrumentation(
self,
instance: client.PredictionServiceClient
| client_v1beta1.PredictionServiceClient,
| client_v1beta1.PredictionServiceClient
| async_client.PredictionServiceAsyncClient
| async_client_v1beta1.PredictionServiceAsyncClient,
args: Any,
kwargs: Any,
):
Expand Down Expand Up @@ -178,8 +188,8 @@ async def agenerate_content(
| prediction_service_v1beta1.GenerateContentResponse
],
],
instance: client.PredictionServiceClient
| client_v1beta1.PredictionServiceClient,
instance: async_client.PredictionServiceAsyncClient
| async_client_v1beta1.PredictionServiceAsyncClient,
args: Any,
kwargs: Any,
) -> (
Expand All @@ -192,3 +202,53 @@ async def agenerate_content(
response = await wrapped(*args, **kwargs)
handle_response(response)
return response

def stream_generate_content(
self,
wrapped: Callable[
...,
Iterable[prediction_service.GenerateContentResponse]
| Iterable[prediction_service_v1beta1.GenerateContentResponse],
],
instance: client.PredictionServiceClient
| client_v1beta1.PredictionServiceClient,
args: Any,
kwargs: Any,
) -> Iterable[
prediction_service.GenerateContentResponse
| prediction_service_v1beta1.GenerateContentResponse,
]:
with self._with_instrumentation(
instance, args, kwargs
) as handle_response:
for response in wrapped(*args, **kwargs):
handle_response(response)
yield response

async def astream_generate_content(
self,
wrapped: Callable[
...,
Awaitable[
AsyncIterable[prediction_service.GenerateContentResponse]
]
| Awaitable[
AsyncIterable[
prediction_service_v1beta1.GenerateContentResponse
]
],
],
instance: async_client.PredictionServiceAsyncClient
| async_client_v1beta1.PredictionServiceAsyncClient,
args: Any,
kwargs: Any,
) -> AsyncIterable[
prediction_service.GenerateContentResponse
| prediction_service_v1beta1.GenerateContentResponse,
]:
with self._with_instrumentation(
instance, args, kwargs
) as handle_response:
async for response in await wrapped(*args, **kwargs):
handle_response(response)
yield response
Original file line number Diff line number Diff line change
Expand Up @@ -330,10 +330,9 @@ def _map_finish_reason(
| content_v1beta1.Candidate.FinishReason,
) -> FinishReason | str:
EnumType = type(finish_reason) # pylint: disable=invalid-name
if (
finish_reason is EnumType.FINISH_REASON_UNSPECIFIED
or finish_reason is EnumType.OTHER
):
if finish_reason is EnumType.FINISH_REASON_UNSPECIFIED:
return ""
if finish_reason is EnumType.OTHER:
return "error"
if finish_reason is EnumType.STOP:
return "stop"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
interactions:
- request:
body: |-
{
"contents": [
{
"role": "user",
"parts": [
{
"text": "Get weather details in New Delhi and San Francisco?"
}
]
}
],
"tools": [
{
"functionDeclarations": [
{
"name": "get_current_weather",
"description": "Get the current weather in a given location",
"parameters": {
"type": 6,
"properties": {
"location": {
"type": 1,
"description": "The location for which to get the weather. It can be a city name, a city name and state, or a zip code. Examples: 'San Francisco', 'San Francisco, CA', '95616', etc."
}
},
"propertyOrdering": [
"location"
]
}
}
]
}
]
}
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate
Connection:
- keep-alive
Content-Length:
- '824'
Content-Type:
- application/json
User-Agent:
- python-requests/2.32.3
method: POST
uri: https://us-central1-aiplatform.googleapis.com/v1/projects/fake-project/locations/us-central1/publishers/google/models/gemini-1.5-flash-002:streamGenerateContent?%24alt=json%3Benum-encoding%3Dint
response:
body:
string: |-
[
{
"candidates": [
{
"content": {
"role": "model",
"parts": [
{
"functionCall": {
"name": "get_current_weather",
"args": {
"location": "New Delhi"
}
}
},
{
"functionCall": {
"name": "get_current_weather",
"args": {
"location": "San Francisco"
}
}
}
]
},
"finishReason": 1
}
],
"usageMetadata": {
"promptTokenCount": 72,
"candidatesTokenCount": 16,
"totalTokenCount": 88,
"promptTokensDetails": [
{
"modality": 1,
"tokenCount": 72
}
],
"candidatesTokensDetails": [
{
"modality": 1,
"tokenCount": 16
}
]
},
"modelVersion": "gemini-1.5-flash-002",
"createTime": "2025-03-05T04:44:12.226326Z",
"responseId": "nNbHZ5boDZeTmecP49qwuQU"
}
]
headers:
Content-Type:
- application/json; charset=UTF-8
Transfer-Encoding:
- chunked
Vary:
- Origin
- X-Origin
- Referer
content-length:
- '985'
status:
code: 200
message: OK
version: 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
interactions:
- request:
body: |-
{
"contents": [
{
"role": "user",
"parts": [
{
"text": "Get weather details in New Delhi and San Francisco?"
}
]
}
],
"tools": [
{
"functionDeclarations": [
{
"name": "get_current_weather",
"description": "Get the current weather in a given location",
"parameters": {
"type": 6,
"properties": {
"location": {
"type": 1,
"description": "The location for which to get the weather. It can be a city name, a city name and state, or a zip code. Examples: 'San Francisco', 'San Francisco, CA', '95616', etc."
}
},
"propertyOrdering": [
"location"
]
}
}
]
}
]
}
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate
Connection:
- keep-alive
Content-Length:
- '824'
Content-Type:
- application/json
User-Agent:
- python-requests/2.32.3
method: POST
uri: https://us-central1-aiplatform.googleapis.com/v1/projects/fake-project/locations/us-central1/publishers/google/models/gemini-1.5-flash-002:streamGenerateContent?%24alt=json%3Benum-encoding%3Dint
response:
body:
string: |-
[
{
"candidates": [
{
"content": {
"role": "model",
"parts": [
{
"functionCall": {
"name": "get_current_weather",
"args": {
"location": "New Delhi"
}
}
},
{
"functionCall": {
"name": "get_current_weather",
"args": {
"location": "San Francisco"
}
}
}
]
},
"finishReason": 1
}
],
"usageMetadata": {
"promptTokenCount": 72,
"candidatesTokenCount": 16,
"totalTokenCount": 88,
"promptTokensDetails": [
{
"modality": 1,
"tokenCount": 72
}
],
"candidatesTokensDetails": [
{
"modality": 1,
"tokenCount": 16
}
]
},
"modelVersion": "gemini-1.5-flash-002",
"createTime": "2025-03-05T04:46:18.094334Z",
"responseId": "GtfHZ_7gBe2Om9IPrJa3MQ"
}
]
headers:
Content-Type:
- application/json; charset=UTF-8
Transfer-Encoding:
- chunked
Vary:
- Origin
- X-Origin
- Referer
content-length:
- '984'
status:
code: 200
message: OK
version: 1
Loading
Loading