Skip to content

Add support for async and streaming responses in the Google GenAI instrumentation #3298

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 87 commits into from
Mar 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
b72b4cd
Begin instrumentation of GenAI SDK.
michaelsafyan Feb 7, 2025
74a648a
Merge branch 'open-telemetry:main' into genaisdk_instrumentation
michaelsafyan Feb 10, 2025
a850642
Snapshot current state.
michaelsafyan Feb 10, 2025
257da64
Created minimal tests and got first test to pass.
michaelsafyan Feb 10, 2025
3244258
Added test for span attributes.
michaelsafyan Feb 10, 2025
431fdf5
Ensure that token counts work.
michaelsafyan Feb 10, 2025
b6068e2
Add more tests.
michaelsafyan Feb 10, 2025
5ac2108
Make it easy to turn off instrumentation for streaming and async to a…
michaelsafyan Feb 10, 2025
f503787
Merge branch 'open-telemetry:main' into genaisdk_instrumentation
michaelsafyan Feb 11, 2025
7fcbd8c
Add licenses and fill out main README.rst.
michaelsafyan Feb 11, 2025
6ecd69c
Add a changelog file.
michaelsafyan Feb 11, 2025
5bcb73a
Fill out 'requirements.txt' and 'README.rst' for the manual instrumen…
michaelsafyan Feb 11, 2025
65241b0
Add missing exporter dependency for the manual instrumentation example.
michaelsafyan Feb 11, 2025
44f5081
Fill out rest of the zero-code example.
michaelsafyan Feb 11, 2025
5e5f226
Add minimal tests for async, streaming cases.
michaelsafyan Feb 11, 2025
7b3805c
Update sync test to use indirection on top of 'client.models.generate…
michaelsafyan Feb 11, 2025
3ee8dd1
Fix ruff check issues.
michaelsafyan Feb 11, 2025
6b66fe4
Add subproject to top-level project build mechanism.
michaelsafyan Feb 11, 2025
06ab153
Simplify invocation of pylint.
michaelsafyan Feb 12, 2025
9f73c64
Fix 'make test' command and lint issues.
michaelsafyan Feb 12, 2025
319f0a5
Add '.dev' suffix to version per feedback on pull request #3256
michaelsafyan Feb 12, 2025
493689e
Fix README.rst files for the examples.
michaelsafyan Feb 12, 2025
368a3cc
Add specific versions for the examples.
michaelsafyan Feb 12, 2025
4682521
Merge branch 'main' into genaisdk_instrumentation
michaelsafyan Feb 12, 2025
e2afc4e
Merge branch 'open-telemetry:main' into genaisdk_instrumentation
michaelsafyan Feb 12, 2025
a157e2e
Revamp 'make test' to not require local 'tox.ini' configuration.
michaelsafyan Feb 12, 2025
dd606c2
Extend separators per review comment.
michaelsafyan Feb 12, 2025
a2ec3fb
Fix version conflict caused by non-hermetic requirements.
michaelsafyan Feb 12, 2025
1304fed
Fix typo on the comment line.
michaelsafyan Feb 12, 2025
07cbf2f
Merge branch 'open-telemetry:main' into genaisdk_instrumentation
michaelsafyan Feb 13, 2025
159418e
Add test for the use of the 'vertex_ai' system, and improve how this …
michaelsafyan Feb 13, 2025
bf98bba
Factor out testing logic to enable sharing with the async code.
michaelsafyan Feb 13, 2025
e4386ad
Addressed minor lint issues.
michaelsafyan Feb 13, 2025
e6420b2
Make it clearer that nonstreaming_base is a helper module that is not…
michaelsafyan Feb 13, 2025
86a6f0e
Merge branch 'open-telemetry:main' into genaisdk_instrumentation
michaelsafyan Feb 14, 2025
23fe105
Merge branch 'open-telemetry:main' into genaisdk_instrumentation
michaelsafyan Feb 18, 2025
97494b5
Integrate feedback from related pull request #3268.
michaelsafyan Feb 18, 2025
c006346
Update workflows with 'tox -e generate-workflows'.
michaelsafyan Feb 18, 2025
038f90e
Improve data model and add some rudimentary type checking.
michaelsafyan Feb 18, 2025
7647992
Accept only 'true' for a true value to align with other code.
michaelsafyan Feb 18, 2025
e678135
Merge branch 'main' into genaisdk_instrumentation
michaelsafyan Feb 19, 2025
c834e97
Update the scope name used.
michaelsafyan Feb 19, 2025
6ae0525
Add **kwargs to patched methods to prevent future breakage due to the…
michaelsafyan Feb 19, 2025
1a8cad9
Remove redundant list conversion in call to "sorted".
michaelsafyan Feb 19, 2025
00d2d9f
Reformat with 'tox -e ruff'.
michaelsafyan Feb 19, 2025
f0ca9ae
Fix failing lint workflow.
michaelsafyan Feb 19, 2025
016596e
Fix failing lint workflow.
michaelsafyan Feb 19, 2025
19c2627
Exclude Google GenAI instrumentation from the bootstrap code for now.
michaelsafyan Feb 19, 2025
d2a42bd
Minor improvements to the tooling shell files.
michaelsafyan Feb 19, 2025
ab36068
Fix typo flagged by codespell spellchecker.
michaelsafyan Feb 19, 2025
117c811
Increase alignment with broader repo practices.
michaelsafyan Feb 20, 2025
a761858
Add more TODOs and documentation to clarify the intended work scope.
michaelsafyan Feb 20, 2025
94ee68b
Remove unneeded accessor from OTelWrapper.
michaelsafyan Feb 20, 2025
fe5a19c
Add more comments to the tests.
michaelsafyan Feb 20, 2025
3fc667e
Reformat with ruff.
michaelsafyan Feb 20, 2025
f240b85
Change 'desireable' to 'desirable' per codespell spellchecker.
michaelsafyan Feb 20, 2025
b263ae7
Make tests pass without pythonpath
aabmass Feb 20, 2025
567adc6
Fix new lint errors showing up after change
aabmass Feb 20, 2025
62c4963
Revert "Fix new lint errors showing up after change"
aabmass Feb 20, 2025
918341f
Merge pull request #1 from aabmass/genaisdk_instrumentation-pythonpat…
michaelsafyan Feb 20, 2025
82c832b
Add TODO item required/requested from code review.
michaelsafyan Feb 21, 2025
a8e8739
Resolve merge conflict.
michaelsafyan Feb 21, 2025
25aa401
Simplify changelog per PR feedback.
michaelsafyan Feb 21, 2025
4adef35
Remove square brackets from model name in span name per PR feedback.
michaelsafyan Feb 21, 2025
08c84bb
Merge branch 'main' into genaisdk_instrumentation
michaelsafyan Feb 21, 2025
654c0d9
Merge branch 'main' into genaisdk_instrumentation
michaelsafyan Feb 24, 2025
e654b89
Merge branch 'open-telemetry:main' into genaisdk_instrumentation
michaelsafyan Feb 24, 2025
1415f2c
Checkpoint current state.
michaelsafyan Feb 25, 2025
1a01b79
Misc test cleanup. Now that scripts are invoked solely through pytest…
michaelsafyan Feb 25, 2025
194f0b0
Improve quality of event logging.
michaelsafyan Feb 25, 2025
51bd151
Resolve merge conflict.
michaelsafyan Feb 25, 2025
5129b6f
Implement streaming support in RequestsMocker, get tests passing again.
michaelsafyan Feb 25, 2025
c492776
Add test with multiple responses.
michaelsafyan Feb 25, 2025
cce005d
Remove support for async and streaming from TODOs, since this is now …
michaelsafyan Feb 25, 2025
1403bdf
Increase testing coverage for streaming.
michaelsafyan Feb 25, 2025
da9a98d
Resolve merge conflicts.
michaelsafyan Feb 27, 2025
8c8b204
Reformat with ruff.
michaelsafyan Feb 27, 2025
980bf1a
Add minor version bump with changelog.
michaelsafyan Feb 27, 2025
119505f
Merge branch 'open-telemetry:main' into async_and_streaming
michaelsafyan Feb 28, 2025
19146a5
Change TODOs to bulleted list.
michaelsafyan Feb 28, 2025
ae289d5
Update per PR feedback
michaelsafyan Feb 28, 2025
94d1cd2
Restructure streaming async logic to begin execution earlier.
michaelsafyan Feb 28, 2025
4b1f244
Reformat with ruff.
michaelsafyan Feb 28, 2025
f0e4574
Merge branch 'main' into async_and_streaming
michaelsafyan Mar 3, 2025
3dc2734
Disable pylint check for catching broad exception. Should be allowed …
michaelsafyan Mar 3, 2025
0d64c2f
Simplify async streaming solution per PR comment.
michaelsafyan Mar 3, 2025
523000c
Merge branch 'main' into async_and_streaming
aabmass Mar 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Add support for async and streaming.
([#3298](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3298))

Create an initial version of Open Telemetry instrumentation for github.com/googleapis/python-genai.
([#3256](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3256))
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@

Here are some TODO items required to achieve stability for this package:

1. Add support for streaming interfaces
2. Add support for async interfaces
3. Add more span-level attributes for request configuration
4. Add more span-level attributes for response information
5. Verify and correct formatting of events:
- Including the 'role' field for message events
- Including tool invocation information
6. Emit events for safety ratings when they block responses
7. Additional cleanup/improvement tasks such as:
- Adoption of 'wrapt' instead of 'functools.wraps'
- Bolstering test coverage
8. Migrate tests to use VCR.py
- Add more span-level attributes for request configuration
- Add more span-level attributes for response information
- Verify and correct formatting of events:
- Including the 'role' field for message events
- Including tool invocation information
- Emit events for safety ratings when they block responses
- Additional cleanup/improvement tasks such as:
- Adoption of 'wrapt' instead of 'functools.wraps'
- Bolstering test coverage
- Migrate tests to use VCR.py

## Future

Beyond the above TODOs, it would also be desirable to extend the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,11 @@
_logger = logging.getLogger(__name__)


# Constant used for the value of 'gen_ai.operation.name".
_GENERATE_CONTENT_OP_NAME = "generate_content"

# Constant used to make the absence of content more understandable.
_CONTENT_ELIDED = "<elided>"

# Enable these after these cases are fully vetted and tested
_INSTRUMENT_STREAMING = False
_INSTRUMENT_ASYNC = False
# Constant used for the value of 'gen_ai.operation.name".
_GENERATE_CONTENT_OP_NAME = "generate_content"


class _MethodsSnapshot:
Expand Down Expand Up @@ -220,7 +216,9 @@ def __init__(
self._response_index = 0
self._candidate_index = 0

def start_span_as_current_span(self, model_name, function_name):
def start_span_as_current_span(
self, model_name, function_name, end_on_exit=True
):
return self._otel_wrapper.start_as_current_span(
f"{_GENERATE_CONTENT_OP_NAME} {model_name}",
start_time=self._start_time,
Expand All @@ -230,6 +228,7 @@ def start_span_as_current_span(self, model_name, function_name):
gen_ai_attributes.GEN_AI_REQUEST_MODEL: self._genai_request_model,
gen_ai_attributes.GEN_AI_OPERATION_NAME: _GENERATE_CONTENT_OP_NAME,
},
end_on_exit=end_on_exit,
)

def process_request(
Expand Down Expand Up @@ -543,9 +542,6 @@ def _create_instrumented_generate_content_stream(
snapshot: _MethodsSnapshot, otel_wrapper: OTelWrapper
):
wrapped_func = snapshot.generate_content_stream
if not _INSTRUMENT_STREAMING:
# TODO: remove once this case has been fully tested
return wrapped_func

@functools.wraps(wrapped_func)
def instrumented_generate_content_stream(
Expand Down Expand Up @@ -586,9 +582,6 @@ def _create_instrumented_async_generate_content(
snapshot: _MethodsSnapshot, otel_wrapper: OTelWrapper
):
wrapped_func = snapshot.async_generate_content
if not _INSTRUMENT_ASYNC:
# TODO: remove once this case has been fully tested
return wrapped_func

@functools.wraps(wrapped_func)
async def instrumented_generate_content(
Expand Down Expand Up @@ -630,9 +623,6 @@ def _create_instrumented_async_generate_content_stream( # pyright: ignore
snapshot: _MethodsSnapshot, otel_wrapper: OTelWrapper
):
wrapped_func = snapshot.async_generate_content_stream
if not _INSTRUMENT_ASYNC or not _INSTRUMENT_STREAMING:
# TODO: remove once this case has been fully tested
return wrapped_func

@functools.wraps(wrapped_func)
async def instrumented_generate_content_stream(
Expand All @@ -647,24 +637,38 @@ async def instrumented_generate_content_stream(
self, otel_wrapper, model
)
with helper.start_span_as_current_span(
model, "google.genai.AsyncModels.generate_content_stream"
):
model,
"google.genai.AsyncModels.generate_content_stream",
end_on_exit=False,
) as span:
helper.process_request(contents, config)
try:
async for response in await wrapped_func(
self,
model=model,
contents=contents,
config=config,
**kwargs,
): # pyright: ignore
helper.process_response(response)
yield response # pyright: ignore
except Exception as error:
helper.process_error(error)
try:
response_async_generator = await wrapped_func(
self,
model=model,
contents=contents,
config=config,
**kwargs,
)
except Exception as error: # pylint: disable=broad-exception-caught
helper.process_error(error)
helper.finalize_processing()
with trace.use_span(span, end_on_exit=True):
raise
finally:
helper.finalize_processing()

async def _response_async_generator_wrapper():
with trace.use_span(span, end_on_exit=True):
try:
async for response in response_async_generator:
helper.process_response(response)
yield response
except Exception as error:
helper.process_error(error)
raise
finally:
helper.finalize_processing()

return _response_async_generator_wrapper()

return instrumented_generate_content_stream

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@
# This version should stay below "1.0" until the fundamentals
# in "TODOS.md" have been addressed. Please revisit the TODOs
# listed there before bumping to a stable version.
__version__ = "0.0.1.dev"
__version__ = "0.0.2.dev"
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,16 @@ def get_event_named(self, event_name):
return event
return None

def get_events_named(self, event_name):
result = []
for event in self.get_finished_logs():
event_name_attr = event.attributes.get("event.name")
if event_name_attr is None:
continue
if event_name_attr == event_name:
result.append(event)
return result

def assert_has_event_named(self, name):
event = self.get_event_named(name)
finished_logs = self.get_finished_logs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import http.client
import io
import json
from typing import Optional

import requests
import requests.sessions
Expand Down Expand Up @@ -81,7 +82,7 @@ def response(self):


def _return_error_status(
args: RequestsCallArgs, status_code: int, reason: str = None
args: RequestsCallArgs, status_code: int, reason: Optional[str] = None
):
result = requests.Response()
result.url = args.request.url
Expand Down Expand Up @@ -123,6 +124,35 @@ def generate_response_from_dict(args):
raise ValueError(f"Unsupported response type: {type(response)}")


def _to_stream_response_generator(response_generators):
if len(response_generators) == 1:
return response_generators[0]

def combined_generator(args):
first_response = response_generators[0](args)
if first_response.status_code != 200:
return first_response
result = requests.Response()
result.status_code = 200
result.headers["content-type"] = "application/json"
result.encoding = "utf-8"
result.headers["transfer-encoding"] = "chunked"
contents = []
for generator in response_generators:
response = generator(args)
if response.status_code != 200:
continue
response_json = response.json()
response_json_str = json.dumps(response_json)
contents.append(f"data: {response_json_str}")
contents_str = "\r\n".join(contents)
full_contents = f"{contents_str}\r\n\r\n"
result.raw = io.BytesIO(full_contents.encode())
return result

return combined_generator


class RequestsMocker:
def __init__(self):
self._original_send = requests.sessions.Session.send
Expand Down Expand Up @@ -159,6 +189,38 @@ def _do_send(
session: requests.sessions.Session,
request: requests.PreparedRequest,
**kwargs,
):
stream = kwargs.get("stream", False)
if not stream:
return self._do_send_non_streaming(session, request, **kwargs)
return self._do_send_streaming(session, request, **kwargs)

def _do_send_streaming(
self,
session: requests.sessions.Session,
request: requests.PreparedRequest,
**kwargs,
):
args = RequestsCallArgs(session, request, **kwargs)
response_generators = []
for matcher, response_generator in self._handlers:
if matcher is None:
response_generators.append(response_generator)
elif matcher(args):
response_generators.append(response_generator)
if not response_generators:
response_generators.append(_return_404)
response_generator = _to_stream_response_generator(response_generators)
call = RequestsCall(args, response_generator)
result = call.response
self._calls.append(call)
return result

def _do_send_non_streaming(
self,
session: requests.sessions.Session,
request: requests.PreparedRequest,
**kwargs,
):
args = RequestsCallArgs(session, request, **kwargs)
response_generator = self._lookup_response_generator(args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,7 @@
import unittest

from ..common.base import TestCase


def create_valid_response(
response_text="The model response", input_tokens=10, output_tokens=20
):
return {
"modelVersion": "gemini-2.0-flash-test123",
"usageMetadata": {
"promptTokenCount": input_tokens,
"candidatesTokenCount": output_tokens,
"totalTokenCount": input_tokens + output_tokens,
},
"candidates": [
{
"content": {
"role": "model",
"parts": [
{
"text": response_text,
}
],
}
}
],
}
from .util import create_valid_response


class NonStreamingTestCase(TestCase):
Expand All @@ -56,22 +32,12 @@ def setUp(self): # pylint: disable=invalid-name
def generate_content(self, *args, **kwargs):
raise NotImplementedError("Must implement 'generate_content'.")

@property
def expected_function_name(self):
raise NotImplementedError("Must implement 'expected_function_name'.")

def configure_valid_response(
self,
response_text="The model_response",
input_tokens=10,
output_tokens=20,
):
self.requests.add_response(
create_valid_response(
response_text=response_text,
input_tokens=input_tokens,
output_tokens=output_tokens,
)
)
def configure_valid_response(self, *args, **kwargs):
self.requests.add_response(create_valid_response(*args, **kwargs))

def test_instrumentation_does_not_break_core_functionality(self):
self.configure_valid_response(response_text="Yep, it works!")
Expand Down
Loading