From 07deb8069c132f909704d0c18e8042ac2afa9038 Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Thu, 13 Feb 2025 15:48:07 +0100 Subject: [PATCH 1/5] botocore: send choice events for bedrock chat completion --- .../botocore/extensions/bedrock.py | 218 ++++++++++++------ .../botocore/extensions/bedrock_utils.py | 120 +++++++++- .../tests/bedrock_utils.py | 3 +- .../tests/test_botocore_bedrock.py | 198 ++++++++++++++-- 4 files changed, 444 insertions(+), 95 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock.py index 5a05df4c5e..52827f0920 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock.py @@ -29,8 +29,10 @@ from opentelemetry.instrumentation.botocore.extensions.bedrock_utils import ( ConverseStreamWrapper, InvokeModelWithResponseStreamWrapper, + _Choice, genai_capture_message_content, message_to_event, + to_choice_event, ) from opentelemetry.instrumentation.botocore.extensions.types import ( _AttributeMapT, @@ -242,12 +244,12 @@ def before_service_call( if self._call_context.operation not in self._HANDLED_OPERATIONS: return - _capture_content = genai_capture_message_content() + capture_content = genai_capture_message_content() messages = self._get_request_messages() for message in messages: event_logger = instrumentor_context.event_logger - event_logger.emit(message_to_event(message, _capture_content)) + event_logger.emit(message_to_event(message, capture_content)) if not span.is_recording(): return @@ -259,27 +261,53 @@ def before_service_call( span.update_name(f"{operation_name} {request_model}") # pylint: disable=no-self-use - def _converse_on_success(self, span: Span, result: dict[str, Any]): - if usage := result.get("usage"): - if input_tokens := usage.get("inputTokens"): - span.set_attribute( - GEN_AI_USAGE_INPUT_TOKENS, - input_tokens, - ) - if output_tokens := usage.get("outputTokens"): + def _converse_on_success( + self, + span: Span, + result: dict[str, Any], + instrumentor_context: _BotocoreInstrumentorContext, + capture_content, + ): + if span.is_recording(): + if usage := result.get("usage"): + if input_tokens := usage.get("inputTokens"): + span.set_attribute( + GEN_AI_USAGE_INPUT_TOKENS, + input_tokens, + ) + if output_tokens := usage.get("outputTokens"): + span.set_attribute( + GEN_AI_USAGE_OUTPUT_TOKENS, + output_tokens, + ) + + if stop_reason := result.get("stopReason"): span.set_attribute( - GEN_AI_USAGE_OUTPUT_TOKENS, - output_tokens, + GEN_AI_RESPONSE_FINISH_REASONS, + [stop_reason], ) - if stop_reason := result.get("stopReason"): - span.set_attribute( - GEN_AI_RESPONSE_FINISH_REASONS, - [stop_reason], + event_logger = instrumentor_context.event_logger + choice = _Choice.from_converse(result, capture_content) + # this path is used by streaming apis, in that case we are already out of the span + # context so need to add the span context manually + span_ctx = span.get_span_context() + event_logger.emit( + to_choice_event( + choice, + trace_id=span_ctx.trace_id, + span_id=span_ctx.span_id, + trace_flags=span_ctx.trace_flags, ) + ) def _invoke_model_on_success( - self, span: Span, result: dict[str, Any], model_id: str + self, + span: Span, + result: dict[str, Any], + model_id: str, + instrumentor_context: _BotocoreInstrumentorContext, + capture_content, ): original_body = None try: @@ -292,12 +320,17 @@ def _invoke_model_on_success( response_body = json.loads(body_content.decode("utf-8")) if "amazon.titan" in model_id: - self._handle_amazon_titan_response(span, response_body) + self._handle_amazon_titan_response( + span, response_body, instrumentor_context, capture_content + ) elif "amazon.nova" in model_id: - self._handle_amazon_nova_response(span, response_body) + self._handle_amazon_nova_response( + span, response_body, instrumentor_context, capture_content + ) elif "anthropic.claude" in model_id: - self._handle_anthropic_claude_response(span, response_body) - + self._handle_anthropic_claude_response( + span, response_body, instrumentor_context, capture_content + ) except json.JSONDecodeError: _logger.debug("Error: Unable to parse the response body as JSON") except Exception as exc: # pylint: disable=broad-exception-caught @@ -321,80 +354,105 @@ def on_success( if self._call_context.operation not in self._HANDLED_OPERATIONS: return - if not span.is_recording(): - if not self.should_end_span_on_exit(): - span.end() - return + capture_content = genai_capture_message_content() - # ConverseStream - if "stream" in result and isinstance(result["stream"], EventStream): + if self._call_context.operation == "ConverseStream": + if "stream" in result and isinstance( + result["stream"], EventStream + ): - def stream_done_callback(response): - self._converse_on_success(span, response) - span.end() + def stream_done_callback(response): + self._converse_on_success( + span, response, instrumentor_context, capture_content + ) + span.end() - def stream_error_callback(exception): - self._on_stream_error_callback(span, exception) + def stream_error_callback(exception): + self._on_stream_error_callback(span, exception) - result["stream"] = ConverseStreamWrapper( - result["stream"], stream_done_callback, stream_error_callback + result["stream"] = ConverseStreamWrapper( + result["stream"], + stream_done_callback, + stream_error_callback, + ) + return + elif self._call_context.operation == "Converse": + self._converse_on_success( + span, result, instrumentor_context, capture_content ) - return - - # Converse - self._converse_on_success(span, result) model_id = self._call_context.params.get(_MODEL_ID_KEY) if not model_id: return - # InvokeModel - if "body" in result and isinstance(result["body"], StreamingBody): - self._invoke_model_on_success(span, result, model_id) - return - - # InvokeModelWithResponseStream - if "body" in result and isinstance(result["body"], EventStream): - - def invoke_model_stream_done_callback(response): - # the callback gets data formatted as the simpler converse API - self._converse_on_success(span, response) - span.end() + if self._call_context.operation == "InvokeModel": + if "body" in result and isinstance(result["body"], StreamingBody): + self._invoke_model_on_success( + span, + result, + model_id, + instrumentor_context, + capture_content, + ) + return + elif self._call_context.operation == "InvokeModelWithResponseStream": + if "body" in result and isinstance(result["body"], EventStream): + + def invoke_model_stream_done_callback(response): + # the callback gets data formatted as the simpler converse API + self._converse_on_success( + span, response, instrumentor_context, capture_content + ) + span.end() - def invoke_model_stream_error_callback(exception): - self._on_stream_error_callback(span, exception) + def invoke_model_stream_error_callback(exception): + self._on_stream_error_callback(span, exception) - result["body"] = InvokeModelWithResponseStreamWrapper( - result["body"], - invoke_model_stream_done_callback, - invoke_model_stream_error_callback, - model_id, - ) - return + result["body"] = InvokeModelWithResponseStreamWrapper( + result["body"], + invoke_model_stream_done_callback, + invoke_model_stream_error_callback, + model_id, + ) + return # pylint: disable=no-self-use def _handle_amazon_titan_response( - self, span: Span, response_body: dict[str, Any] + self, + span: Span, + response_body: dict[str, Any], + instrumentor_context: _BotocoreInstrumentorContext, + capture_content: bool, ): if "inputTextTokenCount" in response_body: span.set_attribute( GEN_AI_USAGE_INPUT_TOKENS, response_body["inputTextTokenCount"] ) - if "results" in response_body and response_body["results"]: - result = response_body["results"][0] - if "tokenCount" in result: - span.set_attribute( - GEN_AI_USAGE_OUTPUT_TOKENS, result["tokenCount"] - ) - if "completionReason" in result: - span.set_attribute( - GEN_AI_RESPONSE_FINISH_REASONS, - [result["completionReason"]], - ) + if "results" in response_body and response_body["results"]: + result = response_body["results"][0] + if "tokenCount" in result: + span.set_attribute( + GEN_AI_USAGE_OUTPUT_TOKENS, result["tokenCount"] + ) + if "completionReason" in result: + span.set_attribute( + GEN_AI_RESPONSE_FINISH_REASONS, + [result["completionReason"]], + ) + + event_logger = instrumentor_context.event_logger + choice = _Choice.from_invoke_amazon_titan( + response_body, capture_content + ) + event_logger.emit(to_choice_event(choice)) # pylint: disable=no-self-use def _handle_amazon_nova_response( - self, span: Span, response_body: dict[str, Any] + self, + span: Span, + response_body: dict[str, Any], + instrumentor_context: _BotocoreInstrumentorContext, + capture_content: bool, ): if "usage" in response_body: usage = response_body["usage"] @@ -411,9 +469,17 @@ def _handle_amazon_nova_response( GEN_AI_RESPONSE_FINISH_REASONS, [response_body["stopReason"]] ) + event_logger = instrumentor_context.event_logger + choice = _Choice.from_converse(response_body, capture_content) + event_logger.emit(to_choice_event(choice)) + # pylint: disable=no-self-use def _handle_anthropic_claude_response( - self, span: Span, response_body: dict[str, Any] + self, + span: Span, + response_body: dict[str, Any], + instrumentor_context: _BotocoreInstrumentorContext, + capture_content: bool, ): if usage := response_body.get("usage"): if "input_tokens" in usage: @@ -429,6 +495,12 @@ def _handle_anthropic_claude_response( GEN_AI_RESPONSE_FINISH_REASONS, [response_body["stop_reason"]] ) + event_logger = instrumentor_context.event_logger + choice = _Choice.from_invoke_anthropic_claude( + response_body, capture_content + ) + event_logger.emit(to_choice_event(choice)) + def on_error( self, span: Span, diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock_utils.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock_utils.py index 28579c993a..29a0bc6d4b 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock_utils.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock_utils.py @@ -49,8 +49,11 @@ def __init__( self._stream_done_callback = stream_done_callback self._stream_error_callback = stream_error_callback # accumulating things in the same shape of non-streaming version - # {"usage": {"inputTokens": 0, "outputTokens": 0}, "stopReason": "finish"} + # {"usage": {"inputTokens": 0, "outputTokens": 0}, "stopReason": "finish", "output": {"message": {"role": "", "content": [{"text": ""}]} self._response = {} + self._message = None + self._content_buf = "" + self._record_message = False def __iter__(self): try: @@ -64,10 +67,17 @@ def __iter__(self): def _process_event(self, event): if "messageStart" in event: # {'messageStart': {'role': 'assistant'}} + if event["messageStart"].get("role") == "assistant": + self._record_message = True + self._message = {"role": "assistant", "content": []} return if "contentBlockDelta" in event: # {'contentBlockDelta': {'delta': {'text': "Hello"}, 'contentBlockIndex': 0}} + if self._record_message: + self._content_buf += ( + event["contentBlockDelta"].get("delta", {}).get("text", "") + ) return if "contentBlockStop" in event: @@ -78,6 +88,14 @@ def _process_event(self, event): # {'messageStop': {'stopReason': 'end_turn'}} if stop_reason := event["messageStop"].get("stopReason"): self._response["stopReason"] = stop_reason + + if self._record_message: + self._message["content"].append({"text": self._content_buf}) + self._content_buf = "" + self._response["output"] = {"message": self._message} + self._record_message = False + self._message = None + return if "metadata" in event: @@ -91,6 +109,7 @@ def _process_event(self, event): self._response["usage"]["outputTokens"] = output_tokens self._stream_done_callback(self._response) + return @@ -112,8 +131,11 @@ def __init__( self._model_id = model_id # accumulating things in the same shape of the Converse API - # {"usage": {"inputTokens": 0, "outputTokens": 0}, "stopReason": "finish"} + # {"usage": {"inputTokens": 0, "outputTokens": 0}, "stopReason": "finish", "output": {"message": {"role": "", "content": [{"text": ""}]} self._response = {} + self._message = None + self._content_buf = "" + self._record_message = False def __iter__(self): try: @@ -159,15 +181,27 @@ def _process_amazon_titan_chunk(self, chunk): # "inputTokenCount":9,"outputTokenCount":128,"invocationLatency":3569,"firstByteLatency":2180 # } self._process_invocation_metrics(invocation_metrics) + + # transform the shape of the message to match other models + self._response["output"] = { + "message": {"content": [{"text": chunk["outputText"]}]} + } self._stream_done_callback(self._response) def _process_amazon_nova_chunk(self, chunk): if "messageStart" in chunk: # {'messageStart': {'role': 'assistant'}} + if chunk["messageStart"].get("role") == "assistant": + self._record_message = True + self._message = {"role": "assistant", "content": []} return if "contentBlockDelta" in chunk: # {'contentBlockDelta': {'delta': {'text': "Hello"}, 'contentBlockIndex': 0}} + if self._record_message: + self._content_buf += ( + chunk["contentBlockDelta"].get("delta", {}).get("text", "") + ) return if "contentBlockStop" in chunk: @@ -178,6 +212,13 @@ def _process_amazon_nova_chunk(self, chunk): # {'messageStop': {'stopReason': 'end_turn'}} if stop_reason := chunk["messageStop"].get("stopReason"): self._response["stopReason"] = stop_reason + + if self._record_message: + self._message["content"].append({"text": self._content_buf}) + self._content_buf = "" + self._response["output"] = {"message": self._message} + self._record_message = False + self._message = None return if "metadata" in chunk: @@ -200,6 +241,13 @@ def _process_anthropic_claude_chunk(self, chunk): if message_type == "message_start": # {'type': 'message_start', 'message': {'id': 'id', 'type': 'message', 'role': 'assistant', 'model': 'claude-2.0', 'content': [], 'stop_reason': None, 'stop_sequence': None, 'usage': {'input_tokens': 18, 'output_tokens': 1}}} + if chunk.get("message", {}).get("role") == "assistant": + self._record_message = True + message = chunk["message"] + self._message = { + "role": message["role"], + "content": message.get("content", []), + } return if message_type == "content_block_start": @@ -208,10 +256,14 @@ def _process_anthropic_claude_chunk(self, chunk): if message_type == "content_block_delta": # {'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': 'Here'}} + if self._record_message: + self._content_buf += chunk.get("delta", {}).get("text", "") return if message_type == "content_block_stop": # {'type': 'content_block_stop', 'index': 0} + self._message["content"].append({"text": self._content_buf}) + self._content_buf = "" return if message_type == "message_delta": @@ -228,7 +280,12 @@ def _process_anthropic_claude_chunk(self, chunk): "amazon-bedrock-invocationMetrics" ): self._process_invocation_metrics(invocation_metrics) - self._stream_done_callback(self._response) + + if self._record_message: + self._response["output"] = {"message": self._message} + self._record_message = False + self._message = None + self._stream_done_callback(self._response) return @@ -253,3 +310,60 @@ def message_to_event(message, capture_content): attributes=attributes, body=body if body else None, ) + + +class _Choice: + def __init__(self, message, finish_reason, index): + self.message = message + self.finish_reason = finish_reason + self.index = index + + @classmethod + def from_converse(cls, response, capture_content): + orig_message = response["output"]["message"] + if role := orig_message.get("role"): + message = {"role": role} + else: + # amazon.titan does not serialize the role + message = {} + if capture_content: + message["content"] = orig_message["content"] + return cls(message, response["stopReason"], index=0) + + @classmethod + def from_invoke_amazon_titan(cls, response, capture_content): + result = response["results"][0] + if capture_content: + message = {"content": result["outputText"]} + else: + message = {} + return cls(message, result["completionReason"], index=0) + + @classmethod + def from_invoke_anthropic_claude(cls, response, capture_content): + if capture_content: + message = { + "content": response["content"], + "role": response["role"], + } + else: + message = {"role": response["role"]} + + return cls(message, response["stop_reason"], index=0) + + def to_body_dict(self): + return { + "finish_reason": self.finish_reason, + "index": self.index, + "message": self.message, + } + + +def to_choice_event(choice: _Choice, **event_kwargs): + attributes = {GEN_AI_SYSTEM: GenAiSystemValues.AWS_BEDROCK.value} + return Event( + name="gen_ai.choice", + attributes=attributes, + body=choice.to_body_dict(), + **event_kwargs, + ) diff --git a/instrumentation/opentelemetry-instrumentation-botocore/tests/bedrock_utils.py b/instrumentation/opentelemetry-instrumentation-botocore/tests/bedrock_utils.py index b645571d35..7f52eedc62 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/tests/bedrock_utils.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/tests/bedrock_utils.py @@ -161,6 +161,7 @@ def assert_stream_completion_attributes( def assert_equal_or_not_present(value, attribute_name, span): if value is not None: + assert attribute_name in span.attributes assert value == span.attributes[attribute_name], span.attributes[ attribute_name ] @@ -254,5 +255,5 @@ def assert_message_in_logs(log, event_name, expected_content, parent_span): assert log.log_record.body assert dict(log.log_record.body) == remove_none_values( expected_content - ) + ), dict(log.log_record.body) assert_log_parent(log, parent_span) diff --git a/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_bedrock.py b/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_bedrock.py index 57d3697254..e2bf697517 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_bedrock.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_bedrock.py @@ -80,9 +80,18 @@ def test_converse_with_content( ) logs = log_exporter.get_finished_logs() - assert len(logs) == 1 + assert len(logs) == 2 user_content = filter_message_keys(messages[0], ["content"]) assert_message_in_logs(logs[0], "gen_ai.user.message", user_content, span) + choice_body = { + "index": 0, + "finish_reason": "max_tokens", + "message": { + "content": [{"text": "Hi, how can I help you"}], + "role": "assistant", + }, + } + assert_message_in_logs(logs[1], "gen_ai.choice", choice_body, span) @pytest.mark.skipif( @@ -114,7 +123,7 @@ def test_converse_with_content_different_events( ) logs = log_exporter.get_finished_logs() - assert len(logs) == 4 + assert len(logs) == 5 assert_message_in_logs( logs[0], "gen_ai.system.message", {"content": system_content}, span ) @@ -129,6 +138,15 @@ def test_converse_with_content_different_events( assert_message_in_logs( logs[3], "gen_ai.user.message", last_user_content, span ) + choice_body = { + "index": 0, + "finish_reason": "end_turn", + "message": { + "content": [{"text": "This is a test"}], + "role": "assistant", + }, + } + assert_message_in_logs(logs[4], "gen_ai.choice", choice_body, span) @pytest.mark.skipif( @@ -159,11 +177,17 @@ def test_converse_no_content_different_events( ) logs = log_exporter.get_finished_logs() - assert len(logs) == 4 + assert len(logs) == 5 assert_message_in_logs(logs[0], "gen_ai.system.message", None, span) assert_message_in_logs(logs[1], "gen_ai.user.message", None, span) assert_message_in_logs(logs[2], "gen_ai.assistant.message", None, span) assert_message_in_logs(logs[3], "gen_ai.user.message", None, span) + choice_body = { + "index": 0, + "finish_reason": "end_turn", + "message": {"role": "assistant"}, + } + assert_message_in_logs(logs[4], "gen_ai.choice", choice_body, span) @pytest.mark.skipif( @@ -204,8 +228,14 @@ def test_converse_no_content( ) logs = log_exporter.get_finished_logs() - assert len(logs) == 1 + assert len(logs) == 2 assert_message_in_logs(logs[0], "gen_ai.user.message", None, span) + choice_body = { + "index": 0, + "finish_reason": "max_tokens", + "message": {"role": "assistant"}, + } + assert_message_in_logs(logs[1], "gen_ai.choice", choice_body, span) @pytest.mark.skipif( @@ -303,9 +333,18 @@ def test_converse_stream_with_content( ) logs = log_exporter.get_finished_logs() - assert len(logs) == 1 + assert len(logs) == 2 user_content = filter_message_keys(messages[0], ["content"]) assert_message_in_logs(logs[0], "gen_ai.user.message", user_content, span) + choice_body = { + "index": 0, + "finish_reason": "max_tokens", + "message": { + "content": [{"text": "I am here and ready to assist"}], + "role": "assistant", + }, + } + assert_message_in_logs(logs[1], "gen_ai.choice", choice_body, span) @pytest.mark.skipif( @@ -343,7 +382,7 @@ def test_converse_stream_with_content_different_events( ) logs = log_exporter.get_finished_logs() - assert len(logs) == 4 + assert len(logs) == 5 assert_message_in_logs( logs[0], "gen_ai.system.message", {"content": system_content}, span ) @@ -358,6 +397,15 @@ def test_converse_stream_with_content_different_events( assert_message_in_logs( logs[3], "gen_ai.user.message", last_user_content, span ) + choice_body = { + "index": 0, + "finish_reason": "end_turn", + "message": { + "role": "assistant", + "content": [{"text": "This is a test"}], + }, + } + assert_message_in_logs(logs[4], "gen_ai.choice", choice_body, span) @pytest.mark.skipif( @@ -394,7 +442,7 @@ def test_converse_stream_no_content( if "contentBlockDelta" in event: text += event["contentBlockDelta"]["delta"]["text"] if "messageStop" in event: - finish_reason = (event["messageStop"]["stopReason"],) + finish_reason = event["messageStop"]["stopReason"] if "metadata" in event: usage = event["metadata"]["usage"] input_tokens = usage["inputTokens"] @@ -411,7 +459,7 @@ def test_converse_stream_no_content( llm_model_value, input_tokens, output_tokens, - finish_reason, + (finish_reason,), "chat", top_p, temperature, @@ -420,8 +468,14 @@ def test_converse_stream_no_content( ) logs = log_exporter.get_finished_logs() - assert len(logs) == 1 + assert len(logs) == 2 assert_message_in_logs(logs[0], "gen_ai.user.message", None, span) + choice_body = { + "index": 0, + "finish_reason": finish_reason, + "message": {"role": "assistant"}, + } + assert_message_in_logs(logs[1], "gen_ai.choice", choice_body, span) @pytest.mark.skipif( @@ -458,11 +512,17 @@ def test_converse_stream_no_content_different_events( ) logs = log_exporter.get_finished_logs() - assert len(logs) == 4 + assert len(logs) == 5 assert_message_in_logs(logs[0], "gen_ai.system.message", None, span) assert_message_in_logs(logs[1], "gen_ai.user.message", None, span) assert_message_in_logs(logs[2], "gen_ai.assistant.message", None, span) assert_message_in_logs(logs[3], "gen_ai.user.message", None, span) + choice_body = { + "index": 0, + "finish_reason": "end_turn", + "message": {"role": "assistant"}, + } + assert_message_in_logs(logs[4], "gen_ai.choice", choice_body, span) @pytest.mark.skipif( @@ -662,14 +722,37 @@ def test_invoke_model_with_content( ) logs = log_exporter.get_finished_logs() - assert len(logs) == 1 + assert len(logs) == 2 if model_family == "anthropic.claude": user_content = { "content": [{"text": "Say this is a test", "type": "text"}] } else: user_content = {"content": [{"text": "Say this is a test"}]} + if model_family == "amazon.titan": + message = {"content": " comment\nHello! I am writing this as a"} + finish_reason = "LENGTH" + elif model_family == "amazon.nova": + message = { + "role": "assistant", + "content": [{"text": "Certainly, here's a test:\n\n---\n\n**"}], + } + finish_reason = "max_tokens" + elif model_family == "anthropic.claude": + message = { + "role": "assistant", + "content": [ + {"type": "text", "text": 'Okay, I said "This is a test"'} + ], + } + finish_reason = "max_tokens" assert_message_in_logs(logs[0], "gen_ai.user.message", user_content, span) + choice_body = { + "index": 0, + "finish_reason": finish_reason, + "message": message, + } + assert_message_in_logs(logs[1], "gen_ai.choice", choice_body, span) @pytest.mark.parametrize( @@ -690,9 +773,13 @@ def test_invoke_model_with_content_different_events( if llm_model_value == "amazon.nova-micro-v1:0": messages = amazon_nova_messages() system = amazon_nova_system() + finish_reason = "max_tokens" + choice_content = [{"text": "Again, this is a test. If you need"}] elif llm_model_value == "anthropic.claude-v2": messages = anthropic_claude_messages() system = anthropic_claude_system() + finish_reason = "end_turn" + choice_content = [{"type": "text", "text": "This is a test"}] body = get_invoke_model_body( llm_model_value, @@ -715,7 +802,7 @@ def test_invoke_model_with_content_different_events( ) logs = log_exporter.get_finished_logs() - assert len(logs) == 4 + assert len(logs) == 5 assert_message_in_logs( logs[0], "gen_ai.system.message", @@ -733,6 +820,12 @@ def test_invoke_model_with_content_different_events( assert_message_in_logs( logs[3], "gen_ai.user.message", last_user_content, span ) + choice_body = { + "index": 0, + "finish_reason": finish_reason, + "message": {"role": "assistant", "content": choice_content}, + } + assert_message_in_logs(logs[4], "gen_ai.choice", choice_body, span) @pytest.mark.parametrize( @@ -770,8 +863,23 @@ def test_invoke_model_no_content( ) logs = log_exporter.get_finished_logs() - assert len(logs) == 1 + assert len(logs) == 2 assert_message_in_logs(logs[0], "gen_ai.user.message", None, span) + if model_family == "anthropic.claude": + choice_message = {"role": "assistant"} + finish_reason = "max_tokens" + elif model_family == "amazon.nova": + choice_message = {"role": "assistant"} + finish_reason = "max_tokens" + elif model_family == "amazon.titan": + choice_message = {} + finish_reason = "LENGTH" + choice_body = { + "index": 0, + "finish_reason": finish_reason, + "message": choice_message, + } + assert_message_in_logs(logs[1], "gen_ai.choice", choice_body, span) @pytest.mark.parametrize( @@ -791,9 +899,11 @@ def test_invoke_model_no_content_different_events( if llm_model_value == "amazon.nova-micro-v1:0": messages = amazon_nova_messages() system = amazon_nova_system() + finish_reason = "max_tokens" elif llm_model_value == "anthropic.claude-v2": messages = anthropic_claude_messages() system = anthropic_claude_system() + finish_reason = "end_turn" body = get_invoke_model_body( llm_model_value, @@ -816,11 +926,17 @@ def test_invoke_model_no_content_different_events( ) logs = log_exporter.get_finished_logs() - assert len(logs) == 4 + assert len(logs) == 5 assert_message_in_logs(logs[0], "gen_ai.system.message", None, span) assert_message_in_logs(logs[1], "gen_ai.user.message", None, span) assert_message_in_logs(logs[2], "gen_ai.assistant.message", None, span) assert_message_in_logs(logs[3], "gen_ai.user.message", None, span) + choice_body = { + "index": 0, + "finish_reason": finish_reason, + "message": {"role": "assistant"}, + } + assert_message_in_logs(logs[4], "gen_ai.choice", choice_body, span) @pytest.mark.vcr() @@ -935,7 +1051,7 @@ def test_invoke_model_with_response_stream_with_content( ) logs = log_exporter.get_finished_logs() - assert len(logs) == 1 + assert len(logs) == 2 if model_family == "anthropic.claude": user_content = { "content": [{"text": "Say this is a test", "type": "text"}] @@ -944,6 +1060,31 @@ def test_invoke_model_with_response_stream_with_content( user_content = {"content": [{"text": "Say this is a test"}]} assert_message_in_logs(logs[0], "gen_ai.user.message", user_content, span) + if model_family == "anthropic.claude": + choice_message = { + "content": [{"text": "Okay, I will repeat: This is a test"}], + "role": "assistant", + } + elif model_family == "amazon.nova": + choice_message = { + "content": [ + {"text": "It sounds like you're initiating a message or"} + ], + "role": "assistant", + } + elif model_family == "amazon.titan": + choice_message = { + "content": [ + {"text": "\nHello! I am a computer program designed to"} + ] + } + choice_body = { + "index": 0, + "finish_reason": finish_reason, + "message": choice_message, + } + assert_message_in_logs(logs[1], "gen_ai.choice", choice_body, span) + @pytest.mark.parametrize( "model_family", @@ -963,10 +1104,12 @@ def test_invoke_model_with_response_stream_with_content_different_events( messages = amazon_nova_messages() system = amazon_nova_system() finish_reason = "max_tokens" + choice_content = [{"text": "This is a test again. If you need any"}] elif llm_model_value == "anthropic.claude-v2": messages = anthropic_claude_messages() system = anthropic_claude_system() finish_reason = "end_turn" + choice_content = [{"text": "This is a test"}] max_tokens = 10 body = get_invoke_model_body( @@ -996,7 +1139,7 @@ def test_invoke_model_with_response_stream_with_content_different_events( ) logs = log_exporter.get_finished_logs() - assert len(logs) == 4 + assert len(logs) == 5 assert_message_in_logs( logs[0], "gen_ai.system.message", @@ -1014,6 +1157,12 @@ def test_invoke_model_with_response_stream_with_content_different_events( assert_message_in_logs( logs[3], "gen_ai.user.message", last_user_content, span ) + choice_body = { + "index": 0, + "finish_reason": finish_reason, + "message": {"content": choice_content, "role": "assistant"}, + } + assert_message_in_logs(logs[4], "gen_ai.choice", choice_body, span) @pytest.mark.parametrize( @@ -1099,8 +1248,15 @@ def test_invoke_model_with_response_stream_no_content( ) logs = log_exporter.get_finished_logs() - assert len(logs) == 1 + assert len(logs) == 2 assert_message_in_logs(logs[0], "gen_ai.user.message", None, span) + message = {} if model_family == "amazon.titan" else {"role": "assistant"} + choice_body = { + "index": 0, + "finish_reason": finish_reason, + "message": message, + } + assert_message_in_logs(logs[1], "gen_ai.choice", choice_body, span) @pytest.mark.parametrize( @@ -1153,11 +1309,17 @@ def test_invoke_model_with_response_stream_no_content_different_events( ) logs = log_exporter.get_finished_logs() - assert len(logs) == 4 + assert len(logs) == 5 assert_message_in_logs(logs[0], "gen_ai.system.message", None, span) assert_message_in_logs(logs[1], "gen_ai.user.message", None, span) assert_message_in_logs(logs[2], "gen_ai.assistant.message", None, span) assert_message_in_logs(logs[3], "gen_ai.user.message", None, span) + choice_body = { + "index": 0, + "finish_reason": finish_reason, + "message": {"role": "assistant"}, + } + assert_message_in_logs(logs[4], "gen_ai.choice", choice_body, span) @pytest.mark.vcr() From d62551629656ae2e90b079958a81c01a5cfbc608 Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Wed, 19 Feb 2025 15:57:22 +0100 Subject: [PATCH 2/5] Please pylint --- .../tests/test_botocore_bedrock.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_bedrock.py b/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_bedrock.py index e2bf697517..30d3b09ca4 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_bedrock.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_bedrock.py @@ -699,6 +699,7 @@ def test_invoke_model_with_content( instrument_with_content, model_family, ): + # pylint:disable=too-many-locals llm_model_value = get_model_name_from_family(model_family) max_tokens, temperature, top_p, stop_sequences = 10, 0.8, 1, ["|"] body = get_invoke_model_body( @@ -840,6 +841,7 @@ def test_invoke_model_no_content( instrument_no_content, model_family, ): + # pylint:disable=too-many-locals llm_model_value = get_model_name_from_family(model_family) max_tokens, temperature, top_p, stop_sequences = 10, 0.8, 1, ["|"] body = get_invoke_model_body( @@ -980,7 +982,7 @@ def test_invoke_model_with_response_stream_with_content( instrument_with_content, model_family, ): - # pylint:disable=too-many-locals,too-many-branches + # pylint:disable=too-many-locals,too-many-branches,too-many-statements llm_model_value = get_model_name_from_family(model_family) max_tokens, temperature, top_p, stop_sequences = 10, 0.8, 1, ["|"] body = get_invoke_model_body( From 5e313b187e637005988085747d6f383a807eae7f Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Wed, 19 Feb 2025 16:00:44 +0100 Subject: [PATCH 3/5] Add CHANGELOG --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 101cafd361..7ebe3369f3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3258](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3258)) - `opentelemetry-instrumentation-botocore` Add support for GenAI system events ([#3266](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3266)) +- `opentelemetry-instrumentation-botocore` Add support for GenAI choice events + ([#3275](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3275)) ### Fixed From c50ba8d23baae3a711c8853f254920b5bd8e92a5 Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Thu, 20 Feb 2025 09:18:05 +0100 Subject: [PATCH 4/5] Always call done stream callback --- .../instrumentation/botocore/extensions/bedrock_utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock_utils.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock_utils.py index 29a0bc6d4b..1402d6a5bb 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock_utils.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock_utils.py @@ -285,7 +285,8 @@ def _process_anthropic_claude_chunk(self, chunk): self._response["output"] = {"message": self._message} self._record_message = False self._message = None - self._stream_done_callback(self._response) + + self._stream_done_callback(self._response) return From 253e9b2f3aaaada4c5acfb1cf0b909263895760c Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Thu, 20 Feb 2025 09:23:16 +0100 Subject: [PATCH 5/5] Move choice event creation to _Choice and add types --- .../botocore/extensions/bedrock.py | 10 ++--- .../botocore/extensions/bedrock_utils.py | 39 +++++++++++-------- 2 files changed, 27 insertions(+), 22 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock.py index 52827f0920..3a6c536728 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock.py @@ -32,7 +32,6 @@ _Choice, genai_capture_message_content, message_to_event, - to_choice_event, ) from opentelemetry.instrumentation.botocore.extensions.types import ( _AttributeMapT, @@ -293,8 +292,7 @@ def _converse_on_success( # context so need to add the span context manually span_ctx = span.get_span_context() event_logger.emit( - to_choice_event( - choice, + choice.to_choice_event( trace_id=span_ctx.trace_id, span_id=span_ctx.span_id, trace_flags=span_ctx.trace_flags, @@ -444,7 +442,7 @@ def _handle_amazon_titan_response( choice = _Choice.from_invoke_amazon_titan( response_body, capture_content ) - event_logger.emit(to_choice_event(choice)) + event_logger.emit(choice.to_choice_event()) # pylint: disable=no-self-use def _handle_amazon_nova_response( @@ -471,7 +469,7 @@ def _handle_amazon_nova_response( event_logger = instrumentor_context.event_logger choice = _Choice.from_converse(response_body, capture_content) - event_logger.emit(to_choice_event(choice)) + event_logger.emit(choice.to_choice_event()) # pylint: disable=no-self-use def _handle_anthropic_claude_response( @@ -499,7 +497,7 @@ def _handle_anthropic_claude_response( choice = _Choice.from_invoke_anthropic_claude( response_body, capture_content ) - event_logger.emit(to_choice_event(choice)) + event_logger.emit(choice.to_choice_event()) def on_error( self, diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock_utils.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock_utils.py index 1402d6a5bb..1bfdaf00b1 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock_utils.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock_utils.py @@ -16,7 +16,7 @@ import json from os import environ -from typing import Callable, Dict, Union +from typing import Any, Callable, Dict, Union from botocore.eventstream import EventStream, EventStreamError from wrapt import ObjectProxy @@ -297,7 +297,7 @@ def genai_capture_message_content() -> bool: return capture_content.lower() == "true" -def message_to_event(message, capture_content): +def message_to_event(message: dict[str, Any], capture_content: bool) -> Event: attributes = {GEN_AI_SYSTEM: GenAiSystemValues.AWS_BEDROCK.value} role = message.get("role") content = message.get("content") @@ -314,13 +314,17 @@ def message_to_event(message, capture_content): class _Choice: - def __init__(self, message, finish_reason, index): + def __init__( + self, message: dict[str, Any], finish_reason: str, index: int + ): self.message = message self.finish_reason = finish_reason self.index = index @classmethod - def from_converse(cls, response, capture_content): + def from_converse( + cls, response: dict[str, Any], capture_content: bool + ) -> _Choice: orig_message = response["output"]["message"] if role := orig_message.get("role"): message = {"role": role} @@ -332,7 +336,9 @@ def from_converse(cls, response, capture_content): return cls(message, response["stopReason"], index=0) @classmethod - def from_invoke_amazon_titan(cls, response, capture_content): + def from_invoke_amazon_titan( + cls, response: dict[str, Any], capture_content: bool + ) -> _Choice: result = response["results"][0] if capture_content: message = {"content": result["outputText"]} @@ -341,7 +347,9 @@ def from_invoke_amazon_titan(cls, response, capture_content): return cls(message, result["completionReason"], index=0) @classmethod - def from_invoke_anthropic_claude(cls, response, capture_content): + def from_invoke_anthropic_claude( + cls, response: dict[str, Any], capture_content: bool + ) -> _Choice: if capture_content: message = { "content": response["content"], @@ -352,19 +360,18 @@ def from_invoke_anthropic_claude(cls, response, capture_content): return cls(message, response["stop_reason"], index=0) - def to_body_dict(self): + def _to_body_dict(self) -> dict[str, Any]: return { "finish_reason": self.finish_reason, "index": self.index, "message": self.message, } - -def to_choice_event(choice: _Choice, **event_kwargs): - attributes = {GEN_AI_SYSTEM: GenAiSystemValues.AWS_BEDROCK.value} - return Event( - name="gen_ai.choice", - attributes=attributes, - body=choice.to_body_dict(), - **event_kwargs, - ) + def to_choice_event(self, **event_kwargs) -> Event: + attributes = {GEN_AI_SYSTEM: GenAiSystemValues.AWS_BEDROCK.value} + return Event( + name="gen_ai.choice", + attributes=attributes, + body=self._to_body_dict(), + **event_kwargs, + )