Skip to content

support InvokeModel GenAi instrumentation for additional Bedrock models #3419

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Apr 18, 2025
Merged
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
and log as debug instead of exception
([#3423](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3423))

- `opentelemetry-instrumentation-botocore` Add GenAI instrumentation for additional Bedrock models for InvokeModel API
([#3419](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3419))

## Version 1.32.0/0.53b0 (2025-04-10)

### Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
_Choice,
genai_capture_message_content,
message_to_event,
estimate_token_count,
)
from opentelemetry.instrumentation.botocore.extensions.types import (
_AttributeMapT,
Expand Down Expand Up @@ -105,7 +106,6 @@

_MODEL_ID_KEY: str = "modelId"


class _BedrockRuntimeExtension(_AwsSdkExtension):
"""
This class is an extension for <a
Expand Down Expand Up @@ -223,6 +223,23 @@ def extract_attributes(self, attributes: _AttributeMapT):
self._extract_claude_attributes(
attributes, request_body
)
elif "cohere.command-r" in model_id:
self._extract_command_r_attributes(
attributes, request_body
)
elif "cohere.command" in model_id:
self._extract_command_attributes(
attributes, request_body
)
elif "meta.llama" in model_id:
self._extract_llama_attributes(
attributes, request_body
)
elif "mistral" in model_id:
self._extract_mistral_attributes(
attributes, request_body
)

except json.JSONDecodeError:
_logger.debug("Error: Unable to parse the body as JSON")

Expand All @@ -238,9 +255,7 @@ def _extract_titan_attributes(self, attributes, request_body):
attributes, GEN_AI_REQUEST_MAX_TOKENS, config.get("maxTokenCount")
)
self._set_if_not_none(
attributes,
GEN_AI_REQUEST_STOP_SEQUENCES,
config.get("stopSequences"),
attributes, GEN_AI_REQUEST_STOP_SEQUENCES, config.get("stopSequences")
)

def _extract_nova_attributes(self, attributes, request_body):
Expand All @@ -255,29 +270,88 @@ def _extract_nova_attributes(self, attributes, request_body):
attributes, GEN_AI_REQUEST_MAX_TOKENS, config.get("max_new_tokens")
)
self._set_if_not_none(
attributes,
GEN_AI_REQUEST_STOP_SEQUENCES,
config.get("stopSequences"),
attributes, GEN_AI_REQUEST_STOP_SEQUENCES, config.get("stopSequences")
)

def _extract_claude_attributes(self, attributes, request_body):
self._set_if_not_none(
attributes,
GEN_AI_REQUEST_MAX_TOKENS,
request_body.get("max_tokens"),
attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens")
)
self._set_if_not_none(
attributes,
GEN_AI_REQUEST_TEMPERATURE,
request_body.get("temperature"),
attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature")
)
self._set_if_not_none(
attributes, GEN_AI_REQUEST_TOP_P, request_body.get("top_p")
)
self._set_if_not_none(
attributes,
GEN_AI_REQUEST_STOP_SEQUENCES,
request_body.get("stop_sequences"),
attributes, GEN_AI_REQUEST_STOP_SEQUENCES, request_body.get("stop_sequences")
)

def _extract_command_r_attributes(self, attributes, request_body):
prompt = request_body.get("message")
self._set_if_not_none(
attributes, GEN_AI_USAGE_INPUT_TOKENS, estimate_token_count(prompt)
)
self._set_if_not_none(
attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens")
)
self._set_if_not_none(
attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature")
)
self._set_if_not_none(
attributes, GEN_AI_REQUEST_TOP_P, request_body.get("p")
)
self._set_if_not_none(
attributes, GEN_AI_REQUEST_STOP_SEQUENCES, request_body.get("stop_sequences")
)

def _extract_command_attributes(self, attributes, request_body):
prompt = request_body.get("prompt")
self._set_if_not_none(
attributes, GEN_AI_USAGE_INPUT_TOKENS, estimate_token_count(prompt)
)
self._set_if_not_none(
attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens")
)
self._set_if_not_none(
attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature")
)
self._set_if_not_none(
attributes, GEN_AI_REQUEST_TOP_P, request_body.get("p")
)
self._set_if_not_none(
attributes, GEN_AI_REQUEST_STOP_SEQUENCES, request_body.get("stop_sequences")
)

def _extract_llama_attributes(self, attributes, request_body):
self._set_if_not_none(
attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_gen_len")
)
self._set_if_not_none(
attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature")
)
self._set_if_not_none(
attributes, GEN_AI_REQUEST_TOP_P, request_body.get("top_p")
)
# request for meta llama models does not contain stop_sequences field

def _extract_mistral_attributes(self, attributes, request_body):
prompt = request_body.get("prompt")
if prompt:
self._set_if_not_none(
attributes, GEN_AI_USAGE_INPUT_TOKENS, estimate_token_count(prompt)
)
self._set_if_not_none(
attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens")
)
self._set_if_not_none(
attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature")
)
self._set_if_not_none(
attributes, GEN_AI_REQUEST_TOP_P, request_body.get("top_p")
)
self._set_if_not_none(
attributes, GEN_AI_REQUEST_STOP_SEQUENCES, request_body.get("stop")
)

@staticmethod
Expand All @@ -304,12 +378,25 @@ def _get_request_messages(self):
system_messages = [{"role": "system", "content": content}]

messages = decoded_body.get("messages", [])
# if no messages interface, convert to messages format from generic API
if not messages:
# transform old school amazon titan invokeModel api to messages
if input_text := decoded_body.get("inputText"):
messages = [
{"role": "user", "content": [{"text": input_text}]}
]
model_id = self._call_context.params.get(_MODEL_ID_KEY)
if "amazon.titan" in model_id:
if input_text := decoded_body.get("inputText"):
messages = [
{"role": "user", "content": [{"text": input_text}]}
]
elif "cohere.command-r" in model_id:
# chat_history can be converted to messages; for now, just use message
if input_text := decoded_body.get("message"):
messages = [
{"role": "user", "content": [{"text": input_text}]}
]
elif "cohere.command" in model_id or "meta.llama" in model_id or "mistral.mistral" in model_id:
if input_text := decoded_body.get("prompt"):
messages = [
{"role": "user", "content": [{"text": input_text}]}
]

return system_messages + messages

Expand Down Expand Up @@ -439,6 +526,22 @@ def _invoke_model_on_success(
self._handle_anthropic_claude_response(
span, response_body, instrumentor_context, capture_content
)
elif "cohere.command-r" in model_id:
self._handle_cohere_command_r_response(
span, response_body, instrumentor_context, capture_content
)
elif "cohere.command" in model_id:
self._handle_cohere_command_response(
span, response_body, instrumentor_context, capture_content
)
elif "meta.llama" in model_id:
self._handle_meta_llama_response(
span, response_body, instrumentor_context, capture_content
)
elif "mistral" in model_id:
self._handle_mistral_ai_response(
span, response_body, instrumentor_context, capture_content
)
except json.JSONDecodeError:
_logger.debug("Error: Unable to parse the response body as JSON")
except Exception as exc: # pylint: disable=broad-exception-caught
Expand Down Expand Up @@ -724,6 +827,98 @@ def _handle_anthropic_claude_response(
token_usage_histogram.record(
output_tokens, output_attributes
)

def _handle_cohere_command_r_response(
self,
span: Span,
response_body: dict[str, Any],
instrumentor_context: _BotocoreInstrumentorContext,
capture_content: bool,
):
if "text" in response_body:
span.set_attribute(
GEN_AI_USAGE_OUTPUT_TOKENS, estimate_token_count(response_body["text"])
)
if "finish_reason" in response_body:
span.set_attribute(
GEN_AI_RESPONSE_FINISH_REASONS, [response_body["finish_reason"]]
)

event_logger = instrumentor_context.event_logger
choice = _Choice.from_invoke_cohere_command_r(
response_body, capture_content
)
event_logger.emit(choice.to_choice_event())

def _handle_cohere_command_response(
self,
span: Span,
response_body: dict[str, Any],
instrumentor_context: _BotocoreInstrumentorContext,
capture_content: bool,
):
if "generations" in response_body and response_body["generations"]:
generations = response_body["generations"][0]
if "text" in generations:
span.set_attribute(
GEN_AI_USAGE_OUTPUT_TOKENS, estimate_token_count(generations["text"])
)
if "finish_reason" in generations:
span.set_attribute(
GEN_AI_RESPONSE_FINISH_REASONS, [generations["finish_reason"]]
)

event_logger = instrumentor_context.event_logger
choice = _Choice.from_invoke_cohere_command(
response_body, capture_content
)
event_logger.emit(choice.to_choice_event())

def _handle_meta_llama_response(
self,
span: Span,
response_body: dict[str, Any],
instrumentor_context: _BotocoreInstrumentorContext,
capture_content: bool,
):
if "prompt_token_count" in response_body:
span.set_attribute(
GEN_AI_USAGE_INPUT_TOKENS, response_body["prompt_token_count"]
)
if "generation_token_count" in response_body:
span.set_attribute(
GEN_AI_USAGE_OUTPUT_TOKENS, response_body["generation_token_count"],
)
if "stop_reason" in response_body:
span.set_attribute(
GEN_AI_RESPONSE_FINISH_REASONS, [response_body["stop_reason"]]
)

event_logger = instrumentor_context.event_logger
choice = _Choice.from_invoke_meta_llama(
response_body, capture_content
)
event_logger.emit(choice.to_choice_event())

def _handle_mistral_ai_response(
self,
span: Span,
response_body: dict[str, Any],
instrumentor_context: _BotocoreInstrumentorContext,
capture_content: bool,
):
if "outputs" in response_body:
outputs = response_body["outputs"][0]
if "text" in outputs:
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, estimate_token_count(outputs["text"]))
if "stop_reason" in outputs:
span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [outputs["stop_reason"]])

event_logger = instrumentor_context.event_logger
choice = _Choice.from_invoke_mistral_mistral(
response_body, capture_content
)
event_logger.emit(choice.to_choice_event())

def on_error(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from __future__ import annotations

import json
import math
from os import environ
from typing import Any, Callable, Dict, Iterator, Sequence, Union

Expand Down Expand Up @@ -357,6 +358,13 @@ def _process_anthropic_claude_chunk(self, chunk):
self._stream_done_callback(self._response)
return

def estimate_token_count(
message: str
) -> int:
# https://docs.aws.amazon.com/bedrock/latest/userguide/model-customization-prepare.html
# use 6 chars per token to approximate token count when not provided in response body
return math.ceil(len(message) / 6)


def genai_capture_message_content() -> bool:
capture_content = environ.get(
Expand Down Expand Up @@ -519,6 +527,48 @@ def from_invoke_anthropic_claude(
message["content"] = response["content"]
return cls(message, response["stop_reason"], index=0)

@classmethod
def from_invoke_cohere_command_r(
cls, response: dict[str, Any], capture_content: bool
) -> _Choice:
if capture_content:
message = {"content": response["text"]}
else:
message = {}
return cls(message, response["finish_reason"], index=0)

@classmethod
def from_invoke_cohere_command(
cls, response: dict[str, Any], capture_content: bool
) -> _Choice:
result = response["generations"][0]
if capture_content:
message = {"content": result["text"]}
else:
message = {}
return cls(message, result["finish_reason"], index=0)

@classmethod
def from_invoke_meta_llama(
cls, response: dict[str, Any], capture_content: bool
) -> _Choice:
if capture_content:
message = {"content": response["generation"]}
else:
message = {}
return cls(message, response["stop_reason"], index=0)

@classmethod
def from_invoke_mistral_mistral(
cls, response: dict[str, Any], capture_content: bool
) -> _Choice:
result = response["outputs"][0]
if capture_content:
message = {"content": result["text"]}
else:
message = {}
return cls(message, result["stop_reason"], index=0)

def _to_body_dict(self) -> dict[str, Any]:
return {
"finish_reason": self.finish_reason,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
## Recording calls

If you need to record calls you may need to export authentication variables and the default region as environment
variables in order to have the code work properly.
variables in order to have the code work properly. The recorded tests assume the region us-east-1, so ensure that
AWS_DEFAULT_REGION is set accordingly when recording new calls.
Since tox blocks environment variables by default you need to override its configuration to let them pass:

```
Expand Down
Loading