Skip to content

Commit 9587360

Browse files
botocore: add bedrock genai user events and lazy initialize tracers and event loggers (#3258)
* Create per-extension tracers if there's an extension available * botocore: add user events for bedrock * Remove pass of AWS env vars from tox.ini * Remove handling for other types of messages * Please pylint * Add changelog * Update CHANGELOG.md Co-authored-by: Adrian Cole <[email protected]> --------- Co-authored-by: Adrian Cole <[email protected]>
1 parent 3436861 commit 9587360

21 files changed

+869
-44
lines changed

CHANGELOG.md

+6
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111
1212
## Unreleased
1313

14+
### Added
15+
16+
- `opentelemetry-instrumentation-botocore` Add support for GenAI user events and lazy initialize tracer
17+
([#3258](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3258))
18+
1419
### Fixed
20+
1521
- `opentelemetry-instrumentation-redis` Add missing entry in doc string for `def _instrument`
1622
([#3247](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3247))
1723

Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
boto3~=1.35.99
22

3-
opentelemetry-sdk~=1.29.0
4-
opentelemetry-exporter-otlp-proto-grpc~=1.29.0
5-
opentelemetry-distro~=0.50b0
6-
opentelemetry-instrumentation-botocore~=0.50b0
3+
opentelemetry-sdk~=1.30.0
4+
opentelemetry-exporter-otlp-proto-grpc~=1.30.0
5+
opentelemetry-distro~=0.51b0
6+
opentelemetry-instrumentation-botocore~=0.51b0

instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py

+73-12
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,15 @@ def response_hook(span, service_name, operation_name, result):
8686
from botocore.exceptions import ClientError
8787
from wrapt import wrap_function_wrapper
8888

89-
from opentelemetry.instrumentation.botocore.extensions import _find_extension
89+
from opentelemetry._events import get_event_logger
90+
from opentelemetry.instrumentation.botocore.extensions import (
91+
_find_extension,
92+
_has_extension,
93+
)
9094
from opentelemetry.instrumentation.botocore.extensions.types import (
9195
_AwsSdkCallContext,
96+
_AwsSdkExtension,
97+
_BotocoreInstrumentorContext,
9298
)
9399
from opentelemetry.instrumentation.botocore.package import _instruments
94100
from opentelemetry.instrumentation.botocore.version import __version__
@@ -123,12 +129,11 @@ def instrumentation_dependencies(self) -> Collection[str]:
123129

124130
def _instrument(self, **kwargs):
125131
# pylint: disable=attribute-defined-outside-init
126-
self._tracer = get_tracer(
127-
__name__,
128-
__version__,
129-
kwargs.get("tracer_provider"),
130-
schema_url="https://opentelemetry.io/schemas/1.11.0",
131-
)
132+
133+
# tracers are lazy initialized per-extension in _get_tracer
134+
self._tracers = {}
135+
# event_loggers are lazy initialized per-extension in _get_event_logger
136+
self._event_loggers = {}
132137

133138
self.request_hook = kwargs.get("request_hook")
134139
self.response_hook = kwargs.get("response_hook")
@@ -137,6 +142,9 @@ def _instrument(self, **kwargs):
137142
if propagator is not None:
138143
self.propagator = propagator
139144

145+
self.tracer_provider = kwargs.get("tracer_provider")
146+
self.event_logger_provider = kwargs.get("event_logger_provider")
147+
140148
wrap_function_wrapper(
141149
"botocore.client",
142150
"BaseClient._make_api_call",
@@ -149,6 +157,50 @@ def _instrument(self, **kwargs):
149157
self._patched_endpoint_prepare_request,
150158
)
151159

160+
@staticmethod
161+
def _get_instrumentation_name(extension: _AwsSdkExtension) -> str:
162+
has_extension = _has_extension(extension._call_context)
163+
return (
164+
f"{__name__}.{extension._call_context.service}"
165+
if has_extension
166+
else __name__
167+
)
168+
169+
def _get_tracer(self, extension: _AwsSdkExtension):
170+
"""This is a multiplexer in order to have a tracer per extension"""
171+
172+
instrumentation_name = self._get_instrumentation_name(extension)
173+
tracer = self._tracers.get(instrumentation_name)
174+
if tracer:
175+
return tracer
176+
177+
schema_version = extension.tracer_schema_version()
178+
self._tracers[instrumentation_name] = get_tracer(
179+
instrumentation_name,
180+
__version__,
181+
self.tracer_provider,
182+
schema_url=f"https://opentelemetry.io/schemas/{schema_version}",
183+
)
184+
return self._tracers[instrumentation_name]
185+
186+
def _get_event_logger(self, extension: _AwsSdkExtension):
187+
"""This is a multiplexer in order to have an event logger per extension"""
188+
189+
instrumentation_name = self._get_instrumentation_name(extension)
190+
event_logger = self._event_loggers.get(instrumentation_name)
191+
if event_logger:
192+
return event_logger
193+
194+
schema_version = extension.event_logger_schema_version()
195+
self._event_loggers[instrumentation_name] = get_event_logger(
196+
instrumentation_name,
197+
"",
198+
schema_url=f"https://opentelemetry.io/schemas/{schema_version}",
199+
event_logger_provider=self.event_logger_provider,
200+
)
201+
202+
return self._event_loggers[instrumentation_name]
203+
152204
def _uninstrument(self, **kwargs):
153205
unwrap(BaseClient, "_make_api_call")
154206
unwrap(Endpoint, "prepare_request")
@@ -190,15 +242,20 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
190242
_safe_invoke(extension.extract_attributes, attributes)
191243
end_span_on_exit = extension.should_end_span_on_exit()
192244

193-
with self._tracer.start_as_current_span(
245+
tracer = self._get_tracer(extension)
246+
event_logger = self._get_event_logger(extension)
247+
instrumentor_ctx = _BotocoreInstrumentorContext(
248+
event_logger=event_logger
249+
)
250+
with tracer.start_as_current_span(
194251
call_context.span_name,
195252
kind=call_context.span_kind,
196253
attributes=attributes,
197254
# tracing streaming services require to close the span manually
198255
# at a later time after the stream has been consumed
199256
end_on_exit=end_span_on_exit,
200257
) as span:
201-
_safe_invoke(extension.before_service_call, span)
258+
_safe_invoke(extension.before_service_call, span, instrumentor_ctx)
202259
self._call_request_hook(span, call_context)
203260

204261
try:
@@ -209,12 +266,16 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
209266
except ClientError as error:
210267
result = getattr(error, "response", None)
211268
_apply_response_attributes(span, result)
212-
_safe_invoke(extension.on_error, span, error)
269+
_safe_invoke(
270+
extension.on_error, span, error, instrumentor_ctx
271+
)
213272
raise
214273
_apply_response_attributes(span, result)
215-
_safe_invoke(extension.on_success, span, result)
274+
_safe_invoke(
275+
extension.on_success, span, result, instrumentor_ctx
276+
)
216277
finally:
217-
_safe_invoke(extension.after_service_call)
278+
_safe_invoke(extension.after_service_call, instrumentor_ctx)
218279
self._call_response_hook(span, call_context, result)
219280

220281
return result

instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/__init__.py

+4
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ def loader():
4040
}
4141

4242

43+
def _has_extension(call_context: _AwsSdkCallContext) -> bool:
44+
return call_context.service in _KNOWN_EXTENSIONS
45+
46+
4347
def _find_extension(call_context: _AwsSdkCallContext) -> _AwsSdkExtension:
4448
try:
4549
loader = _KNOWN_EXTENSIONS.get(call_context.service)

instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock.py

+40-3
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,14 @@
2929
from opentelemetry.instrumentation.botocore.extensions.bedrock_utils import (
3030
ConverseStreamWrapper,
3131
InvokeModelWithResponseStreamWrapper,
32+
genai_capture_message_content,
33+
message_to_event,
3234
)
3335
from opentelemetry.instrumentation.botocore.extensions.types import (
3436
_AttributeMapT,
3537
_AwsSdkExtension,
3638
_BotoClientErrorT,
39+
_BotocoreInstrumentorContext,
3740
)
3841
from opentelemetry.semconv._incubating.attributes.error_attributes import (
3942
ERROR_TYPE,
@@ -205,10 +208,34 @@ def _set_if_not_none(attributes, key, value):
205208
if value is not None:
206209
attributes[key] = value
207210

208-
def before_service_call(self, span: Span):
211+
def _get_request_messages(self):
212+
input_text = None
213+
if not (messages := self._call_context.params.get("messages", [])):
214+
if body := self._call_context.params.get("body"):
215+
decoded_body = json.loads(body)
216+
messages = decoded_body.get("messages", [])
217+
if not messages:
218+
# transform old school amazon titan invokeModel api to messages
219+
if input_text := decoded_body.get("inputText"):
220+
messages = [
221+
{"role": "user", "content": [{"text": input_text}]}
222+
]
223+
224+
return messages
225+
226+
def before_service_call(
227+
self, span: Span, instrumentor_context: _BotocoreInstrumentorContext
228+
):
209229
if self._call_context.operation not in self._HANDLED_OPERATIONS:
210230
return
211231

232+
_capture_content = genai_capture_message_content()
233+
234+
messages = self._get_request_messages()
235+
for message in messages:
236+
event_logger = instrumentor_context.event_logger
237+
event_logger.emit(message_to_event(message, _capture_content))
238+
212239
if not span.is_recording():
213240
return
214241

@@ -272,7 +299,12 @@ def _on_stream_error_callback(self, span: Span, exception):
272299
span.set_attribute(ERROR_TYPE, type(exception).__qualname__)
273300
span.end()
274301

275-
def on_success(self, span: Span, result: dict[str, Any]):
302+
def on_success(
303+
self,
304+
span: Span,
305+
result: dict[str, Any],
306+
instrumentor_context: _BotocoreInstrumentorContext,
307+
):
276308
if self._call_context.operation not in self._HANDLED_OPERATIONS:
277309
return
278310

@@ -384,7 +416,12 @@ def _handle_anthropic_claude_response(
384416
GEN_AI_RESPONSE_FINISH_REASONS, [response_body["stop_reason"]]
385417
)
386418

387-
def on_error(self, span: Span, exception: _BotoClientErrorT):
419+
def on_error(
420+
self,
421+
span: Span,
422+
exception: _BotoClientErrorT,
423+
instrumentor_context: _BotocoreInstrumentorContext,
424+
):
388425
if self._call_context.operation not in self._HANDLED_OPERATIONS:
389426
return
390427

instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock_utils.py

+33
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,21 @@
1515
from __future__ import annotations
1616

1717
import json
18+
from os import environ
1819
from typing import Callable, Dict, Union
1920

2021
from botocore.eventstream import EventStream, EventStreamError
2122
from wrapt import ObjectProxy
2223

24+
from opentelemetry._events import Event
25+
from opentelemetry.instrumentation.botocore.environment_variables import (
26+
OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT,
27+
)
28+
from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import (
29+
GEN_AI_SYSTEM,
30+
GenAiSystemValues,
31+
)
32+
2333
_StreamDoneCallableT = Callable[[Dict[str, Union[int, str]]], None]
2434
_StreamErrorCallableT = Callable[[Exception], None]
2535

@@ -220,3 +230,26 @@ def _process_anthropic_claude_chunk(self, chunk):
220230
self._process_invocation_metrics(invocation_metrics)
221231
self._stream_done_callback(self._response)
222232
return
233+
234+
235+
def genai_capture_message_content() -> bool:
236+
capture_content = environ.get(
237+
OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT, "false"
238+
)
239+
return capture_content.lower() == "true"
240+
241+
242+
def message_to_event(message, capture_content):
243+
attributes = {GEN_AI_SYSTEM: GenAiSystemValues.AWS_BEDROCK.value}
244+
role = message.get("role")
245+
content = message.get("content")
246+
247+
body = {}
248+
if capture_content and content:
249+
body["content"] = content
250+
251+
return Event(
252+
name=f"gen_ai.{role}.message",
253+
attributes=attributes,
254+
body=body if body else None,
255+
)

instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/dynamodb.py

+10-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
_AttributeMapT,
2323
_AwsSdkCallContext,
2424
_AwsSdkExtension,
25+
_BotocoreInstrumentorContext,
2526
_BotoResultT,
2627
)
2728
from opentelemetry.semconv.trace import DbSystemValues, SpanAttributes
@@ -370,7 +371,9 @@ def attr_setter(key: str, value: AttributeValue):
370371
def _get_peer_name(self) -> str:
371372
return urlparse(self._call_context.endpoint_url).netloc
372373

373-
def before_service_call(self, span: Span):
374+
def before_service_call(
375+
self, span: Span, instrumentor_context: _BotocoreInstrumentorContext
376+
):
374377
if not span.is_recording() or self._op is None:
375378
return
376379

@@ -380,7 +383,12 @@ def before_service_call(self, span: Span):
380383
span.set_attribute,
381384
)
382385

383-
def on_success(self, span: Span, result: _BotoResultT):
386+
def on_success(
387+
self,
388+
span: Span,
389+
result: _BotoResultT,
390+
instrumentor_context: _BotocoreInstrumentorContext,
391+
):
384392
if not span.is_recording():
385393
return
386394

instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/lmbd.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
_AttributeMapT,
2323
_AwsSdkCallContext,
2424
_AwsSdkExtension,
25+
_BotocoreInstrumentorContext,
2526
)
2627
from opentelemetry.propagate import inject
2728
from opentelemetry.semconv.trace import SpanAttributes
@@ -119,7 +120,9 @@ def extract_attributes(self, attributes: _AttributeMapT):
119120

120121
self._op.extract_attributes(self._call_context, attributes)
121122

122-
def before_service_call(self, span: Span):
123+
def before_service_call(
124+
self, span: Span, instrumentor_context: _BotocoreInstrumentorContext
125+
):
123126
if self._op is None:
124127
return
125128

instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/sns.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
_AttributeMapT,
2424
_AwsSdkCallContext,
2525
_AwsSdkExtension,
26+
_BotocoreInstrumentorContext,
2627
)
2728
from opentelemetry.semconv.trace import (
2829
MessagingDestinationKindValues,
@@ -165,6 +166,8 @@ def extract_attributes(self, attributes: _AttributeMapT):
165166
if self._op:
166167
self._op.extract_attributes(self._call_context, attributes)
167168

168-
def before_service_call(self, span: Span):
169+
def before_service_call(
170+
self, span: Span, instrumentor_context: _BotocoreInstrumentorContext
171+
):
169172
if self._op:
170173
self._op.before_service_call(self._call_context, span)

instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/sqs.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from opentelemetry.instrumentation.botocore.extensions.types import (
1717
_AttributeMapT,
1818
_AwsSdkExtension,
19+
_BotocoreInstrumentorContext,
1920
_BotoResultT,
2021
)
2122
from opentelemetry.semconv.trace import SpanAttributes
@@ -44,7 +45,12 @@ def extract_attributes(self, attributes: _AttributeMapT):
4445
queue_url,
4546
)
4647

47-
def on_success(self, span: Span, result: _BotoResultT):
48+
def on_success(
49+
self,
50+
span: Span,
51+
result: _BotoResultT,
52+
instrumentor_context: _BotocoreInstrumentorContext,
53+
):
4854
operation = self._call_context.operation
4955
if operation in _SUPPORTED_OPERATIONS:
5056
try:

0 commit comments

Comments
 (0)