Skip to content

botocore: Introduce instrumentation extensions #718

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#664](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/664))
- `opentelemetry-instrumentation-botocore` Fix span injection for lambda invoke
([#663](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/663))
- `opentelemetry-instrumentation-botocore` Introduce instrumentation extensions
([#718](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/718))

### Changed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,15 @@ def response_hook(span, service_name, operation_name, result):

import json
import logging
from typing import Any, Collection, Dict, Optional, Tuple
from typing import Any, Callable, Collection, Dict, Optional, Tuple

from botocore.client import BaseClient
from botocore.endpoint import Endpoint
from botocore.exceptions import ClientError
from wrapt import wrap_function_wrapper

from opentelemetry import context as context_api
from opentelemetry.instrumentation.botocore.extensions import _find_extension
from opentelemetry.instrumentation.botocore.extensions.types import (
_AwsSdkCallContext,
)
Expand Down Expand Up @@ -190,6 +191,10 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
if call_context is None:
return original_func(*args, **kwargs)

extension = _find_extension(call_context)
if not extension.should_trace_service_call():
return original_func(*args, **kwargs)

attributes = {
SpanAttributes.RPC_SYSTEM: "aws-api",
SpanAttributes.RPC_SERVICE: call_context.service_id,
Expand All @@ -198,6 +203,8 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
"aws.region": call_context.region,
}

_safe_invoke(extension.extract_attributes, attributes)

with self._tracer.start_as_current_span(
call_context.span_name,
kind=call_context.span_kind,
Expand All @@ -208,6 +215,7 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
BotocoreInstrumentor._patch_lambda_invoke(call_context.params)

_set_api_call_attributes(span, call_context)
_safe_invoke(extension.before_service_call, span)
self._call_request_hook(span, call_context)

token = context_api.attach(
Expand All @@ -220,11 +228,14 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
except ClientError as error:
result = getattr(error, "response", None)
_apply_response_attributes(span, result)
_safe_invoke(extension.on_error, span, error)
raise
else:
_apply_response_attributes(span, result)
_safe_invoke(extension.on_success, span, result)
finally:
context_api.detach(token)
_safe_invoke(extension.after_service_call)

self._call_response_hook(span, call_context, result)

Expand Down Expand Up @@ -254,8 +265,6 @@ def _set_api_call_attributes(span, call_context: _AwsSdkCallContext):
if not span.is_recording():
return

if "QueueUrl" in call_context.params:
span.set_attribute("aws.queue_url", call_context.params["QueueUrl"])
if "TableName" in call_context.params:
span.set_attribute("aws.table_name", call_context.params["TableName"])

Expand Down Expand Up @@ -309,3 +318,14 @@ def _determine_call_context(
# extracting essential attributes ('service' and 'operation') failed.
logger.error("Error when initializing call context", exc_info=ex)
return None


def _safe_invoke(function: Callable, *args):
function_name = "<unknown>"
try:
function_name = function.__name__
function(*args)
except Exception as ex: # pylint:disable=broad-except
logger.error(
"Error when invoking function '%s'", function_name, exc_info=ex
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import importlib
import logging

from opentelemetry.instrumentation.botocore.extensions.types import (
_AwsSdkCallContext,
_AwsSdkExtension,
)

_logger = logging.getLogger(__name__)


def _lazy_load(module, cls):
def loader():
imported_mod = importlib.import_module(module, __name__)
return getattr(imported_mod, cls, None)

return loader


_KNOWN_EXTENSIONS = {
"sqs": _lazy_load(".sqs", "_SqsExtension"),
}


def _find_extension(call_context: _AwsSdkCallContext) -> _AwsSdkExtension:
try:
loader = _KNOWN_EXTENSIONS.get(call_context.service)
if loader is None:
return _AwsSdkExtension(call_context)

extension_cls = loader()
return extension_cls(call_context)
except Exception as ex: # pylint: disable=broad-except
_logger.error("Error when loading extension: %s", ex)
return _AwsSdkExtension(call_context)
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from opentelemetry.instrumentation.botocore.extensions.types import (
_AttributeMapT,
_AwsSdkExtension,
)


class _SqsExtension(_AwsSdkExtension):
def extract_attributes(self, attributes: _AttributeMapT):
queue_url = self._call_context.params.get("QueueUrl")
if queue_url:
# TODO: update when semantic conventions exist
attributes["aws.queue_url"] = queue_url
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@
from typing import Any, Dict, Optional, Tuple

from opentelemetry.trace import SpanKind
from opentelemetry.trace.span import Span
from opentelemetry.util.types import AttributeValue

_logger = logging.getLogger(__name__)

_BotoClientT = "botocore.client.BaseClient"
_BotoResultT = Dict[str, Any]
_BotoClientErrorT = "botocore.exceptions.ClientError"

_OperationParamsT = Dict[str, Any]
_AttributeMapT = Dict[str, AttributeValue]


class _AwsSdkCallContext:
Expand Down Expand Up @@ -70,3 +75,49 @@ def _get_attr(obj, name: str, default=None):
except AttributeError:
_logger.warning("Could not get attribute '%s'", name)
return default


class _AwsSdkExtension:
def __init__(self, call_context: _AwsSdkCallContext):
self._call_context = call_context

def should_trace_service_call(self) -> bool: # pylint:disable=no-self-use
"""Returns if the AWS SDK service call should be traced or not

Extensions might override this function to disable tracing for certain
operations.
"""
return True

def extract_attributes(self, attributes: _AttributeMapT):
"""Callback which gets invoked before the span is created.

Extensions might override this function to extract additional attributes.
"""

def before_service_call(self, span: Span):
"""Callback which gets invoked after the span is created but before the
AWS SDK service is called.

Extensions might override this function e.g. for injecting the span into
a carrier.
"""

def on_success(self, span: Span, result: _BotoResultT):
"""Callback that gets invoked when the AWS SDK call returns
successfully.

Extensions might override this function e.g. to extract and set response
attributes on the span.
"""

def on_error(self, span: Span, exception: _BotoClientErrorT):
"""Callback that gets invoked when the AWS SDK service call raises a
ClientError.
"""

def after_service_call(self):
"""Callback that gets invoked after the AWS SDK service was called.

Extensions might override this function to do some cleanup tasks.
"""