Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

botocore: add basic tracing for bedrock ConverseStream #3204

Merged
merged 7 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#3161](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3161))
- `opentelemetry-opentelemetry-botocore` Add basic support for GenAI attributes for AWS Bedrock InvokeModel API
([#3200](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3200))
- `opentelemetry-opentelemetry-botocore` Add basic support for GenAI attributes for AWS Bedrock ConverseStream API
([#3204](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3204))

### Fixed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ Available examples
------------------

- `converse.py` uses `bedrock-runtime` `Converse API <https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_Converse.html>_`.
- `converse_stream.py` uses `bedrock-runtime` `ConverseStream API <https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ConverseStream.html>_`.
- `invoke_model.py` uses `bedrock-runtime` `InvokeModel API <https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_InvokeModel.html>_`.

Setup
-----
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import os

import boto3


def main():
client = boto3.client("bedrock-runtime")
stream = client.converse_stream(
modelId=os.getenv("CHAT_MODEL", "amazon.titan-text-lite-v1"),
messages=[
{
"role": "user",
"content": [{"text": "Write a short poem on OpenTelemetry."}],
},
],
)

response = ""
for event in stream["stream"]:
if "contentBlockDelta" in event:
response += event["contentBlockDelta"]["delta"]["text"]
print(response)


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,15 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
}

_safe_invoke(extension.extract_attributes, attributes)
end_span_on_exit = extension.should_end_span_on_exit()

with self._tracer.start_as_current_span(
call_context.span_name,
kind=call_context.span_kind,
attributes=attributes,
# tracing streaming services require to close the span manually
# at a later time after the stream has been consumed
end_on_exit=end_span_on_exit,
) as span:
_safe_invoke(extension.before_service_call, span)
self._call_request_hook(span, call_context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@
import logging
from typing import Any

from botocore.eventstream import EventStream
from botocore.response import StreamingBody

from opentelemetry.instrumentation.botocore.extensions.bedrock_utils import (
ConverseStreamWrapper,
)
from opentelemetry.instrumentation.botocore.extensions.types import (
_AttributeMapT,
_AwsSdkExtension,
Expand Down Expand Up @@ -62,7 +66,14 @@ class _BedrockRuntimeExtension(_AwsSdkExtension):
Amazon Bedrock Runtime</a>.
"""

_HANDLED_OPERATIONS = {"Converse", "InvokeModel"}
_HANDLED_OPERATIONS = {"Converse", "ConverseStream", "InvokeModel"}
_DONT_CLOSE_SPAN_ON_END_OPERATIONS = {"ConverseStream"}

def should_end_span_on_exit(self):
return (
self._call_context.operation
not in self._DONT_CLOSE_SPAN_ON_END_OPERATIONS
)

def extract_attributes(self, attributes: _AttributeMapT):
if self._call_context.operation not in self._HANDLED_OPERATIONS:
Expand All @@ -77,7 +88,7 @@ def extract_attributes(self, attributes: _AttributeMapT):
GenAiOperationNameValues.CHAT.value
)

# Converse
# Converse / ConverseStream
if inference_config := self._call_context.params.get(
"inferenceConfig"
):
Expand Down Expand Up @@ -251,6 +262,20 @@ def on_success(self, span: Span, result: dict[str, Any]):
return

if not span.is_recording():
if not self.should_end_span_on_exit():
span.end()
return

# ConverseStream
if "stream" in result and isinstance(result["stream"], EventStream):

def stream_done_callback(response):
self._converse_on_success(span, response)
span.end()

result["stream"] = ConverseStreamWrapper(
result["stream"], stream_done_callback
)
return

# Converse
Expand Down Expand Up @@ -328,3 +353,6 @@ def on_error(self, span: Span, exception: _BotoClientErrorT):
span.set_status(Status(StatusCode.ERROR, str(exception)))
if span.is_recording():
span.set_attribute(ERROR_TYPE, type(exception).__qualname__)

if not self.should_end_span_on_exit():
span.end()
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Includes work from:
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

from botocore.eventstream import EventStream
from wrapt import ObjectProxy


# pylint: disable=abstract-method
class ConverseStreamWrapper(ObjectProxy):
"""Wrapper for botocore.eventstream.EventStream"""

def __init__(
self,
stream: EventStream,
stream_done_callback,
):
super().__init__(stream)

self._stream_done_callback = stream_done_callback
# accumulating things in the same shape of non-streaming version
# {"usage": {"inputTokens": 0, "outputTokens": 0}, "stopReason": "finish"}
self._response = {}

def __iter__(self):
for event in self.__wrapped__:
self._process_event(event)
yield event

def _process_event(self, event):
if "messageStart" in event:
# {'messageStart': {'role': 'assistant'}}
pass

if "contentBlockDelta" in event:
# {'contentBlockDelta': {'delta': {'text': "Hello"}, 'contentBlockIndex': 0}}
pass

if "contentBlockStop" in event:
# {'contentBlockStop': {'contentBlockIndex': 0}}
pass

if "messageStop" in event:
# {'messageStop': {'stopReason': 'end_turn'}}
if stop_reason := event["messageStop"].get("stopReason"):
self._response["stopReason"] = stop_reason

if "metadata" in event:
# {'metadata': {'usage': {'inputTokens': 12, 'outputTokens': 15, 'totalTokens': 27}, 'metrics': {'latencyMs': 2980}}}
if usage := event["metadata"].get("usage"):
self._response["usage"] = {}
if input_tokens := usage.get("inputTokens"):
self._response["usage"]["inputTokens"] = input_tokens

if output_tokens := usage.get("outputTokens"):
self._response["usage"]["outputTokens"] = output_tokens

self._stream_done_callback(self._response)
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ def should_trace_service_call(self) -> bool: # pylint:disable=no-self-use
"""
return True

def should_end_span_on_exit(self) -> bool: # pylint:disable=no-self-use
"""Returns if the span should be closed automatically on exit

Extensions might override this function to disable automatic closing
of the span if they need to close it at a later time themselves.
"""
return True

def extract_attributes(self, attributes: _AttributeMapT):
"""Callback which gets invoked before the span is created.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def assert_completion_attributes_from_streaming_body(
)


def assert_completion_attributes(
def assert_converse_completion_attributes(
span: ReadableSpan,
request_model: str,
response: dict[str, Any] | None,
Expand Down Expand Up @@ -128,6 +128,34 @@ def assert_completion_attributes(
)


def assert_converse_stream_completion_attributes(
span: ReadableSpan,
request_model: str,
input_tokens: int | None = None,
output_tokens: int | None = None,
finish_reason: tuple[str] | None = None,
operation_name: str = "chat",
request_top_p: int | None = None,
request_temperature: int | None = None,
request_max_tokens: int | None = None,
request_stop_sequences: list[str] | None = None,
):
return assert_all_attributes(
span,
request_model,
input_tokens,
output_tokens,
finish_reason,
operation_name,
request_top_p,
request_temperature,
request_max_tokens,
tuple(request_stop_sequences)
if request_stop_sequences is not None
else request_stop_sequences,
)


def assert_equal_or_not_present(value, attribute_name, span):
if value is not None:
assert value == span.attributes[attribute_name]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
interactions:
- request:
body: '{"messages": [{"role": "user", "content": [{"text": "Say this is a test"}]}],
"inferenceConfig": {"maxTokens": 10, "temperature": 0.8, "topP": 1, "stopSequences":
["|"]}}'
headers:
Content-Length:
- '170'
Content-Type:
- !!binary |
YXBwbGljYXRpb24vanNvbg==
User-Agent:
- !!binary |
Qm90bzMvMS4zNS41NiBtZC9Cb3RvY29yZSMxLjM1LjU2IHVhLzIuMCBvcy9saW51eCM2LjEuMC0x
MDM0LW9lbSBtZC9hcmNoI3g4Nl82NCBsYW5nL3B5dGhvbiMzLjEwLjEyIG1kL3B5aW1wbCNDUHl0
aG9uIGNmZy9yZXRyeS1tb2RlI2xlZ2FjeSBCb3RvY29yZS8xLjM1LjU2
X-Amz-Date:
- !!binary |
MjAyNTAxMjNUMDk1MTU2Wg==
X-Amz-Security-Token:
- test_aws_security_token
X-Amzn-Trace-Id:
- !!binary |
Um9vdD0xLTA0YmY4MjVjLTAxMTY5NjdhYWM1NmIxM2RlMDI1N2QwMjtQYXJlbnQ9MDdkM2U3N2Rl
OGFjMzJhNDtTYW1wbGVkPTE=
amz-sdk-invocation-id:
- !!binary |
ZGQ1MTZiNTEtOGU1Yi00NGYyLTk5MzMtZjAwYzBiOGFkYWYw
amz-sdk-request:
- !!binary |
YXR0ZW1wdD0x
authorization:
- Bearer test_aws_authorization
method: POST
uri: https://bedrock-runtime.us-east-1.amazonaws.com/model/amazon.titan-text-lite-v1/converse-stream
response:
body:
string: !!binary |
AAAAlAAAAFLEwW5hCzpldmVudC10eXBlBwAMbWVzc2FnZVN0YXJ0DTpjb250ZW50LXR5cGUHABBh
cHBsaWNhdGlvbi9qc29uDTptZXNzYWdlLXR5cGUHAAVldmVudHsicCI6ImFiY2RlZmdoaWprbG1u
b3BxcnN0dXZ3Iiwicm9sZSI6ImFzc2lzdGFudCJ9P+wfRAAAAMQAAABXjLhVJQs6ZXZlbnQtdHlw
ZQcAEWNvbnRlbnRCbG9ja0RlbHRhDTpjb250ZW50LXR5cGUHABBhcHBsaWNhdGlvbi9qc29uDTpt
ZXNzYWdlLXR5cGUHAAVldmVudHsiY29udGVudEJsb2NrSW5kZXgiOjAsImRlbHRhIjp7InRleHQi
OiJIaSEgSG93IGNhbiBJIGhlbHAgeW91In0sInAiOiJhYmNkZWZnaGlqa2xtbm9wcXJzdHUifeBJ
9mIAAACJAAAAVlvc+UsLOmV2ZW50LXR5cGUHABBjb250ZW50QmxvY2tTdG9wDTpjb250ZW50LXR5
cGUHABBhcHBsaWNhdGlvbi9qc29uDTptZXNzYWdlLXR5cGUHAAVldmVudHsiY29udGVudEJsb2Nr
SW5kZXgiOjAsInAiOiJhYmNkZSJ95xzwrwAAAKcAAABRu0n9jQs6ZXZlbnQtdHlwZQcAC21lc3Nh
Z2VTdG9wDTpjb250ZW50LXR5cGUHABBhcHBsaWNhdGlvbi9qc29uDTptZXNzYWdlLXR5cGUHAAVl
dmVudHsicCI6ImFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6QUJDREVGR0hJSiIsInN0b3BSZWFz
b24iOiJtYXhfdG9rZW5zIn1LR3pNAAAAygAAAE5X40OECzpldmVudC10eXBlBwAIbWV0YWRhdGEN
OmNvbnRlbnQtdHlwZQcAEGFwcGxpY2F0aW9uL2pzb24NOm1lc3NhZ2UtdHlwZQcABWV2ZW50eyJt
ZXRyaWNzIjp7ImxhdGVuY3lNcyI6NjA4fSwicCI6ImFiY2RlZmdoaWprIiwidXNhZ2UiOnsiaW5w
dXRUb2tlbnMiOjgsIm91dHB1dFRva2VucyI6MTAsInRvdGFsVG9rZW5zIjoxOH19iiQr+w==
headers:
Connection:
- keep-alive
Content-Type:
- application/vnd.amazon.eventstream
Date:
- Thu, 23 Jan 2025 09:51:56 GMT
Set-Cookie: test_set_cookie
Transfer-Encoding:
- chunked
x-amzn-RequestId:
- 2b74a5d3-615a-4f81-b00f-f0b10a618e23
status:
code: 200
message: OK
version: 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
interactions:
- request:
body: '{"messages": [{"role": "user", "content": [{"text": "Say this is a test"}]}]}'
headers:
Content-Length:
- '77'
Content-Type:
- !!binary |
YXBwbGljYXRpb24vanNvbg==
User-Agent:
- !!binary |
Qm90bzMvMS4zNS41NiBtZC9Cb3RvY29yZSMxLjM1LjU2IHVhLzIuMCBvcy9saW51eCM2LjEuMC0x
MDM0LW9lbSBtZC9hcmNoI3g4Nl82NCBsYW5nL3B5dGhvbiMzLjEwLjEyIG1kL3B5aW1wbCNDUHl0
aG9uIGNmZy9yZXRyeS1tb2RlI2xlZ2FjeSBCb3RvY29yZS8xLjM1LjU2
X-Amz-Date:
- !!binary |
MjAyNTAxMjNUMDk1MTU3Wg==
X-Amz-Security-Token:
- test_aws_security_token
X-Amzn-Trace-Id:
- !!binary |
Um9vdD0xLTI5NzA1OTZhLTEyZWI5NDk2ODA1ZjZhYzE5YmU3ODM2NztQYXJlbnQ9Y2M0OTA0YWE2
ZjQ2NmYxYTtTYW1wbGVkPTE=
amz-sdk-invocation-id:
- !!binary |
MjQzZWY2ZDgtNGJhNy00YTVlLWI0MGEtYThiNDE2ZDIzYjhk
amz-sdk-request:
- !!binary |
YXR0ZW1wdD0x
authorization:
- Bearer test_aws_authorization
method: POST
uri: https://bedrock-runtime.us-east-1.amazonaws.com/model/does-not-exist/converse-stream
response:
body:
string: '{"message":"The provided model identifier is invalid."}'
headers:
Connection:
- keep-alive
Content-Length:
- '55'
Content-Type:
- application/json
Date:
- Thu, 23 Jan 2025 09:51:57 GMT
Set-Cookie: test_set_cookie
x-amzn-ErrorType:
- ValidationException:http://internal.amazon.com/coral/com.amazon.bedrock/
x-amzn-RequestId:
- 358b122c-d045-4d8f-a5bb-b0bd8cf6ee59
status:
code: 400
message: Bad Request
version: 1
Loading
Loading