Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

botocore: add bedrock genai user events and lazy initialize tracer #3258

Merged
merged 8 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Added

- `opentelemetry-instrumentation-bocotore` Add support for GenAI user events
([#3258](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3258))

### Fixed

- `opentelemetry-instrumentation-redis` Add missing entry in doc string for `def _instrument`
([#3247](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3247))

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
boto3~=1.35.99

opentelemetry-sdk~=1.29.0
opentelemetry-exporter-otlp-proto-grpc~=1.29.0
opentelemetry-distro~=0.50b0
opentelemetry-instrumentation-botocore~=0.50b0
opentelemetry-sdk~=1.30.0
opentelemetry-exporter-otlp-proto-grpc~=1.30.0
opentelemetry-distro~=0.51b0
opentelemetry-instrumentation-botocore~=0.51b0
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,15 @@ def response_hook(span, service_name, operation_name, result):
from botocore.exceptions import ClientError
from wrapt import wrap_function_wrapper

from opentelemetry.instrumentation.botocore.extensions import _find_extension
from opentelemetry._events import get_event_logger
from opentelemetry.instrumentation.botocore.extensions import (
_find_extension,
_has_extension,
)
from opentelemetry.instrumentation.botocore.extensions.types import (
_AwsSdkCallContext,
_AwsSdkExtension,
_BotocoreInstrumentorContext,
)
from opentelemetry.instrumentation.botocore.package import _instruments
from opentelemetry.instrumentation.botocore.version import __version__
Expand Down Expand Up @@ -123,12 +129,11 @@ def instrumentation_dependencies(self) -> Collection[str]:

def _instrument(self, **kwargs):
# pylint: disable=attribute-defined-outside-init
self._tracer = get_tracer(
__name__,
__version__,
kwargs.get("tracer_provider"),
schema_url="https://opentelemetry.io/schemas/1.11.0",
)

# tracers are lazy initialized per-extension in _get_tracer
self._tracers = {}
# event_loggers are lazy initialized per-extension in _get_event_logger
self._event_loggers = {}

self.request_hook = kwargs.get("request_hook")
self.response_hook = kwargs.get("response_hook")
Expand All @@ -137,6 +142,9 @@ def _instrument(self, **kwargs):
if propagator is not None:
self.propagator = propagator

self.tracer_provider = kwargs.get("tracer_provider")
self.event_logger_provider = kwargs.get("event_logger_provider")

wrap_function_wrapper(
"botocore.client",
"BaseClient._make_api_call",
Expand All @@ -149,6 +157,50 @@ def _instrument(self, **kwargs):
self._patched_endpoint_prepare_request,
)

@staticmethod
def _get_instrumentation_name(extension: _AwsSdkExtension) -> str:
has_extension = _has_extension(extension._call_context)
return (
f"{__name__}.{extension._call_context.service}"
if has_extension
else __name__
)

def _get_tracer(self, extension: _AwsSdkExtension):
"""This is a multiplexer in order to have a tracer per extension"""

instrumentation_name = self._get_instrumentation_name(extension)
tracer = self._tracers.get(instrumentation_name)
if tracer:
return tracer

schema_version = extension.tracer_schema_version()
self._tracers[instrumentation_name] = get_tracer(
instrumentation_name,
__version__,
self.tracer_provider,
schema_url=f"https://opentelemetry.io/schemas/{schema_version}",
)
return self._tracers[instrumentation_name]

def _get_event_logger(self, extension: _AwsSdkExtension):
"""This is a multiplexer in order to have an event logger per extension"""

instrumentation_name = self._get_instrumentation_name(extension)
event_logger = self._event_loggers.get(instrumentation_name)
if event_logger:
return event_logger

schema_version = extension.event_logger_schema_version()
self._event_loggers[instrumentation_name] = get_event_logger(
instrumentation_name,
"",
schema_url=f"https://opentelemetry.io/schemas/{schema_version}",
event_logger_provider=self.event_logger_provider,
)

return self._event_loggers[instrumentation_name]

def _uninstrument(self, **kwargs):
unwrap(BaseClient, "_make_api_call")
unwrap(Endpoint, "prepare_request")
Expand Down Expand Up @@ -190,15 +242,20 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
_safe_invoke(extension.extract_attributes, attributes)
end_span_on_exit = extension.should_end_span_on_exit()

with self._tracer.start_as_current_span(
tracer = self._get_tracer(extension)
event_logger = self._get_event_logger(extension)
instrumentor_ctx = _BotocoreInstrumentorContext(
event_logger=event_logger
)
with tracer.start_as_current_span(
call_context.span_name,
kind=call_context.span_kind,
attributes=attributes,
# tracing streaming services require to close the span manually
# at a later time after the stream has been consumed
end_on_exit=end_span_on_exit,
) as span:
_safe_invoke(extension.before_service_call, span)
_safe_invoke(extension.before_service_call, span, instrumentor_ctx)
self._call_request_hook(span, call_context)

try:
Expand All @@ -209,12 +266,16 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
except ClientError as error:
result = getattr(error, "response", None)
_apply_response_attributes(span, result)
_safe_invoke(extension.on_error, span, error)
_safe_invoke(
extension.on_error, span, error, instrumentor_ctx
)
raise
_apply_response_attributes(span, result)
_safe_invoke(extension.on_success, span, result)
_safe_invoke(
extension.on_success, span, result, instrumentor_ctx
)
finally:
_safe_invoke(extension.after_service_call)
_safe_invoke(extension.after_service_call, instrumentor_ctx)
self._call_response_hook(span, call_context, result)

return result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ def loader():
}


def _has_extension(call_context: _AwsSdkCallContext) -> bool:
return call_context.service in _KNOWN_EXTENSIONS


def _find_extension(call_context: _AwsSdkCallContext) -> _AwsSdkExtension:
try:
loader = _KNOWN_EXTENSIONS.get(call_context.service)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@
from opentelemetry.instrumentation.botocore.extensions.bedrock_utils import (
ConverseStreamWrapper,
InvokeModelWithResponseStreamWrapper,
genai_capture_message_content,
message_to_event,
)
from opentelemetry.instrumentation.botocore.extensions.types import (
_AttributeMapT,
_AwsSdkExtension,
_BotoClientErrorT,
_BotocoreInstrumentorContext,
)
from opentelemetry.semconv._incubating.attributes.error_attributes import (
ERROR_TYPE,
Expand Down Expand Up @@ -205,10 +208,34 @@ def _set_if_not_none(attributes, key, value):
if value is not None:
attributes[key] = value

def before_service_call(self, span: Span):
def _get_request_messages(self):
input_text = None
if not (messages := self._call_context.params.get("messages", [])):
if body := self._call_context.params.get("body"):
decoded_body = json.loads(body)
messages = decoded_body.get("messages", [])
if not messages:
# transform old school amazon titan invokeModel api to messages
if input_text := decoded_body.get("inputText"):
messages = [
{"role": "user", "content": [{"text": input_text}]}
]

return messages

def before_service_call(
self, span: Span, instrumentor_context: _BotocoreInstrumentorContext
):
if self._call_context.operation not in self._HANDLED_OPERATIONS:
return

_capture_content = genai_capture_message_content()

messages = self._get_request_messages()
for message in messages:
event_logger = instrumentor_context.event_logger
event_logger.emit(message_to_event(message, _capture_content))

if not span.is_recording():
return

Expand Down Expand Up @@ -272,7 +299,12 @@ def _on_stream_error_callback(self, span: Span, exception):
span.set_attribute(ERROR_TYPE, type(exception).__qualname__)
span.end()

def on_success(self, span: Span, result: dict[str, Any]):
def on_success(
self,
span: Span,
result: dict[str, Any],
instrumentor_context: _BotocoreInstrumentorContext,
):
if self._call_context.operation not in self._HANDLED_OPERATIONS:
return

Expand Down Expand Up @@ -384,7 +416,12 @@ def _handle_anthropic_claude_response(
GEN_AI_RESPONSE_FINISH_REASONS, [response_body["stop_reason"]]
)

def on_error(self, span: Span, exception: _BotoClientErrorT):
def on_error(
self,
span: Span,
exception: _BotoClientErrorT,
instrumentor_context: _BotocoreInstrumentorContext,
):
if self._call_context.operation not in self._HANDLED_OPERATIONS:
return

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,21 @@
from __future__ import annotations

import json
from os import environ
from typing import Callable, Dict, Union

from botocore.eventstream import EventStream, EventStreamError
from wrapt import ObjectProxy

from opentelemetry._events import Event
from opentelemetry.instrumentation.botocore.environment_variables import (
OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT,
)
from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import (
GEN_AI_SYSTEM,
GenAiSystemValues,
)

_StreamDoneCallableT = Callable[[Dict[str, Union[int, str]]], None]
_StreamErrorCallableT = Callable[[Exception], None]

Expand Down Expand Up @@ -220,3 +230,26 @@ def _process_anthropic_claude_chunk(self, chunk):
self._process_invocation_metrics(invocation_metrics)
self._stream_done_callback(self._response)
return


def genai_capture_message_content() -> bool:
capture_content = environ.get(
OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT, "false"
)
return capture_content.lower() == "true"


def message_to_event(message, capture_content):
attributes = {GEN_AI_SYSTEM: GenAiSystemValues.AWS_BEDROCK.value}
role = message.get("role")
content = message.get("content")

body = {}
if capture_content and content:
body["content"] = content

return Event(
name=f"gen_ai.{role}.message",
attributes=attributes,
body=body if body else None,
)
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
_AttributeMapT,
_AwsSdkCallContext,
_AwsSdkExtension,
_BotocoreInstrumentorContext,
_BotoResultT,
)
from opentelemetry.semconv.trace import DbSystemValues, SpanAttributes
Expand Down Expand Up @@ -370,7 +371,9 @@ def attr_setter(key: str, value: AttributeValue):
def _get_peer_name(self) -> str:
return urlparse(self._call_context.endpoint_url).netloc

def before_service_call(self, span: Span):
def before_service_call(
self, span: Span, instrumentor_context: _BotocoreInstrumentorContext
):
if not span.is_recording() or self._op is None:
return

Expand All @@ -380,7 +383,12 @@ def before_service_call(self, span: Span):
span.set_attribute,
)

def on_success(self, span: Span, result: _BotoResultT):
def on_success(
self,
span: Span,
result: _BotoResultT,
instrumentor_context: _BotocoreInstrumentorContext,
):
if not span.is_recording():
return

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
_AttributeMapT,
_AwsSdkCallContext,
_AwsSdkExtension,
_BotocoreInstrumentorContext,
)
from opentelemetry.propagate import inject
from opentelemetry.semconv.trace import SpanAttributes
Expand Down Expand Up @@ -119,7 +120,9 @@ def extract_attributes(self, attributes: _AttributeMapT):

self._op.extract_attributes(self._call_context, attributes)

def before_service_call(self, span: Span):
def before_service_call(
self, span: Span, instrumentor_context: _BotocoreInstrumentorContext
):
if self._op is None:
return

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
_AttributeMapT,
_AwsSdkCallContext,
_AwsSdkExtension,
_BotocoreInstrumentorContext,
)
from opentelemetry.semconv.trace import (
MessagingDestinationKindValues,
Expand Down Expand Up @@ -165,6 +166,8 @@ def extract_attributes(self, attributes: _AttributeMapT):
if self._op:
self._op.extract_attributes(self._call_context, attributes)

def before_service_call(self, span: Span):
def before_service_call(
self, span: Span, instrumentor_context: _BotocoreInstrumentorContext
):
if self._op:
self._op.before_service_call(self._call_context, span)
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from opentelemetry.instrumentation.botocore.extensions.types import (
_AttributeMapT,
_AwsSdkExtension,
_BotocoreInstrumentorContext,
_BotoResultT,
)
from opentelemetry.semconv.trace import SpanAttributes
Expand Down Expand Up @@ -44,7 +45,12 @@ def extract_attributes(self, attributes: _AttributeMapT):
queue_url,
)

def on_success(self, span: Span, result: _BotoResultT):
def on_success(
self,
span: Span,
result: _BotoResultT,
instrumentor_context: _BotocoreInstrumentorContext,
):
operation = self._call_context.operation
if operation in _SUPPORTED_OPERATIONS:
try:
Expand Down
Loading