Skip to content

botocore: send choice events for bedrock chat completion #3275

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#3258](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3258))
- `opentelemetry-instrumentation-botocore` Add support for GenAI system events
([#3266](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3266))
- `opentelemetry-instrumentation-botocore` Add support for GenAI choice events
([#3275](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3275))

### Fixed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from opentelemetry.instrumentation.botocore.extensions.bedrock_utils import (
ConverseStreamWrapper,
InvokeModelWithResponseStreamWrapper,
_Choice,
genai_capture_message_content,
message_to_event,
)
Expand Down Expand Up @@ -242,12 +243,12 @@ def before_service_call(
if self._call_context.operation not in self._HANDLED_OPERATIONS:
return

_capture_content = genai_capture_message_content()
capture_content = genai_capture_message_content()

messages = self._get_request_messages()
for message in messages:
event_logger = instrumentor_context.event_logger
event_logger.emit(message_to_event(message, _capture_content))
event_logger.emit(message_to_event(message, capture_content))

if not span.is_recording():
return
Expand All @@ -259,27 +260,52 @@ def before_service_call(
span.update_name(f"{operation_name} {request_model}")

# pylint: disable=no-self-use
def _converse_on_success(self, span: Span, result: dict[str, Any]):
if usage := result.get("usage"):
if input_tokens := usage.get("inputTokens"):
span.set_attribute(
GEN_AI_USAGE_INPUT_TOKENS,
input_tokens,
)
if output_tokens := usage.get("outputTokens"):
def _converse_on_success(
self,
span: Span,
result: dict[str, Any],
instrumentor_context: _BotocoreInstrumentorContext,
capture_content,
):
if span.is_recording():
if usage := result.get("usage"):
if input_tokens := usage.get("inputTokens"):
span.set_attribute(
GEN_AI_USAGE_INPUT_TOKENS,
input_tokens,
)
if output_tokens := usage.get("outputTokens"):
span.set_attribute(
GEN_AI_USAGE_OUTPUT_TOKENS,
output_tokens,
)

if stop_reason := result.get("stopReason"):
span.set_attribute(
GEN_AI_USAGE_OUTPUT_TOKENS,
output_tokens,
GEN_AI_RESPONSE_FINISH_REASONS,
[stop_reason],
)

if stop_reason := result.get("stopReason"):
span.set_attribute(
GEN_AI_RESPONSE_FINISH_REASONS,
[stop_reason],
event_logger = instrumentor_context.event_logger
choice = _Choice.from_converse(result, capture_content)
# this path is used by streaming apis, in that case we are already out of the span
# context so need to add the span context manually
span_ctx = span.get_span_context()
event_logger.emit(
choice.to_choice_event(
trace_id=span_ctx.trace_id,
span_id=span_ctx.span_id,
trace_flags=span_ctx.trace_flags,
)
)

def _invoke_model_on_success(
self, span: Span, result: dict[str, Any], model_id: str
self,
span: Span,
result: dict[str, Any],
model_id: str,
instrumentor_context: _BotocoreInstrumentorContext,
capture_content,
):
original_body = None
try:
Expand All @@ -292,12 +318,17 @@ def _invoke_model_on_success(

response_body = json.loads(body_content.decode("utf-8"))
if "amazon.titan" in model_id:
self._handle_amazon_titan_response(span, response_body)
self._handle_amazon_titan_response(
span, response_body, instrumentor_context, capture_content
)
elif "amazon.nova" in model_id:
self._handle_amazon_nova_response(span, response_body)
self._handle_amazon_nova_response(
span, response_body, instrumentor_context, capture_content
)
elif "anthropic.claude" in model_id:
self._handle_anthropic_claude_response(span, response_body)

self._handle_anthropic_claude_response(
span, response_body, instrumentor_context, capture_content
)
except json.JSONDecodeError:
_logger.debug("Error: Unable to parse the response body as JSON")
except Exception as exc: # pylint: disable=broad-exception-caught
Expand All @@ -321,80 +352,105 @@ def on_success(
if self._call_context.operation not in self._HANDLED_OPERATIONS:
return

if not span.is_recording():
if not self.should_end_span_on_exit():
span.end()
return
capture_content = genai_capture_message_content()

# ConverseStream
if "stream" in result and isinstance(result["stream"], EventStream):
if self._call_context.operation == "ConverseStream":
if "stream" in result and isinstance(
result["stream"], EventStream
):

def stream_done_callback(response):
self._converse_on_success(span, response)
span.end()
def stream_done_callback(response):
self._converse_on_success(
span, response, instrumentor_context, capture_content
)
span.end()

def stream_error_callback(exception):
self._on_stream_error_callback(span, exception)
def stream_error_callback(exception):
self._on_stream_error_callback(span, exception)

result["stream"] = ConverseStreamWrapper(
result["stream"], stream_done_callback, stream_error_callback
result["stream"] = ConverseStreamWrapper(
result["stream"],
stream_done_callback,
stream_error_callback,
)
return
elif self._call_context.operation == "Converse":
self._converse_on_success(
span, result, instrumentor_context, capture_content
)
return

# Converse
self._converse_on_success(span, result)

model_id = self._call_context.params.get(_MODEL_ID_KEY)
if not model_id:
return

# InvokeModel
if "body" in result and isinstance(result["body"], StreamingBody):
self._invoke_model_on_success(span, result, model_id)
return

# InvokeModelWithResponseStream
if "body" in result and isinstance(result["body"], EventStream):

def invoke_model_stream_done_callback(response):
# the callback gets data formatted as the simpler converse API
self._converse_on_success(span, response)
span.end()
if self._call_context.operation == "InvokeModel":
if "body" in result and isinstance(result["body"], StreamingBody):
self._invoke_model_on_success(
span,
result,
model_id,
instrumentor_context,
capture_content,
)
return
elif self._call_context.operation == "InvokeModelWithResponseStream":
if "body" in result and isinstance(result["body"], EventStream):

def invoke_model_stream_done_callback(response):
# the callback gets data formatted as the simpler converse API
self._converse_on_success(
span, response, instrumentor_context, capture_content
)
span.end()

def invoke_model_stream_error_callback(exception):
self._on_stream_error_callback(span, exception)
def invoke_model_stream_error_callback(exception):
self._on_stream_error_callback(span, exception)

result["body"] = InvokeModelWithResponseStreamWrapper(
result["body"],
invoke_model_stream_done_callback,
invoke_model_stream_error_callback,
model_id,
)
return
result["body"] = InvokeModelWithResponseStreamWrapper(
result["body"],
invoke_model_stream_done_callback,
invoke_model_stream_error_callback,
model_id,
)
return

# pylint: disable=no-self-use
def _handle_amazon_titan_response(
self, span: Span, response_body: dict[str, Any]
self,
span: Span,
response_body: dict[str, Any],
instrumentor_context: _BotocoreInstrumentorContext,
capture_content: bool,
):
if "inputTextTokenCount" in response_body:
span.set_attribute(
GEN_AI_USAGE_INPUT_TOKENS, response_body["inputTextTokenCount"]
)
if "results" in response_body and response_body["results"]:
result = response_body["results"][0]
if "tokenCount" in result:
span.set_attribute(
GEN_AI_USAGE_OUTPUT_TOKENS, result["tokenCount"]
)
if "completionReason" in result:
span.set_attribute(
GEN_AI_RESPONSE_FINISH_REASONS,
[result["completionReason"]],
)
if "results" in response_body and response_body["results"]:
result = response_body["results"][0]
if "tokenCount" in result:
span.set_attribute(
GEN_AI_USAGE_OUTPUT_TOKENS, result["tokenCount"]
)
if "completionReason" in result:
span.set_attribute(
GEN_AI_RESPONSE_FINISH_REASONS,
[result["completionReason"]],
)

event_logger = instrumentor_context.event_logger
choice = _Choice.from_invoke_amazon_titan(
response_body, capture_content
)
event_logger.emit(choice.to_choice_event())

# pylint: disable=no-self-use
def _handle_amazon_nova_response(
self, span: Span, response_body: dict[str, Any]
self,
span: Span,
response_body: dict[str, Any],
instrumentor_context: _BotocoreInstrumentorContext,
capture_content: bool,
):
if "usage" in response_body:
usage = response_body["usage"]
Expand All @@ -411,9 +467,17 @@ def _handle_amazon_nova_response(
GEN_AI_RESPONSE_FINISH_REASONS, [response_body["stopReason"]]
)

event_logger = instrumentor_context.event_logger
choice = _Choice.from_converse(response_body, capture_content)
event_logger.emit(choice.to_choice_event())

# pylint: disable=no-self-use
def _handle_anthropic_claude_response(
self, span: Span, response_body: dict[str, Any]
self,
span: Span,
response_body: dict[str, Any],
instrumentor_context: _BotocoreInstrumentorContext,
capture_content: bool,
):
if usage := response_body.get("usage"):
if "input_tokens" in usage:
Expand All @@ -429,6 +493,12 @@ def _handle_anthropic_claude_response(
GEN_AI_RESPONSE_FINISH_REASONS, [response_body["stop_reason"]]
)

event_logger = instrumentor_context.event_logger
choice = _Choice.from_invoke_anthropic_claude(
response_body, capture_content
)
event_logger.emit(choice.to_choice_event())

def on_error(
self,
span: Span,
Expand Down
Loading