Skip to content

Bugfix/instrument basic publish in pika #759

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.6.1-0.25b1...HEAD)
- `opentelemetry-sdk-extension-aws` & `opentelemetry-propagator-aws` Release AWS Python SDK Extension as 2.0.1 and AWS Propagator as 1.0.1
([#753](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/753))
- `opentelemetry-instrumentation-pika` Add `_decorate_basic_consume` to ensure post instrumentation `basic_consume` calls are also instrumented.
([#759](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/759))

## [1.6.1-0.25b1](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.6.1-0.25b1) - 2021-10-18

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def _uninstrument_channel_functions(channel: Channel) -> None:
function = getattr(channel, function_name)
if hasattr(function, "_original_function"):
channel.__setattr__(function_name, function._original_function)
unwrap(channel, "basic_consume")

@staticmethod
def instrument_channel(
Expand All @@ -90,6 +91,7 @@ def instrument_channel(
PikaInstrumentor._instrument_consumers(
channel._impl._consumers, tracer
)
PikaInstrumentor._decorate_basic_consume(channel, tracer)
PikaInstrumentor._instrument_channel_functions(channel, tracer)

@staticmethod
Expand Down Expand Up @@ -120,6 +122,33 @@ def wrapper(wrapped, instance, args, kwargs):

wrapt.wrap_function_wrapper(BlockingConnection, "channel", wrapper)

@staticmethod
def _decorate_basic_consume(channel, tracer: Optional[Tracer]) -> None:
def wrapper(wrapped, instance, args, kwargs):
if not hasattr(channel, "_impl"):
_LOG.error(
"Could not find implementation for provided channel!"
)
return wrapped(*args, **kwargs)
current_keys = set(channel._impl._consumers.keys())
return_value = wrapped(*args, **kwargs)
new_key_list = list(
set(channel._impl._consumers.keys()) - current_keys
)
if not new_key_list:
_LOG.error("Could not find added callback")
return return_value
new_key = new_key_list[0]
callback = channel._impl._consumers[new_key]
decorated_callback = utils._decorate_callback(
callback, tracer, new_key
)
setattr(decorated_callback, "_original_callback", callback)
channel._impl._consumers[new_key] = decorated_callback
return return_value

wrapt.wrap_function_wrapper(channel, "basic_consume", wrapper)

def _instrument(self, **kwargs: Dict[str, Any]) -> None:
tracer_provider: TracerProvider = kwargs.get("tracer_provider", None)
self.__setattr__("__opentelemetry_tracer_provider", tracer_provider)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ def decorated_callback(
) -> Any:
if not properties:
properties = BasicProperties(headers={})
if properties.headers is None:
properties.headers = {}
ctx = propagate.extract(properties.headers, getter=_pika_getter)
if not ctx:
ctx = context.get_current()
Expand Down Expand Up @@ -74,6 +76,8 @@ def decorated_function(
) -> Any:
if not properties:
properties = BasicProperties(headers={})
if properties.headers is None:
properties.headers = {}
ctx = context.get_current()
span = _get_span(
tracer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,24 @@ def test_instrument_api(self) -> None:
@mock.patch(
"opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_channel_functions"
)
@mock.patch(
"opentelemetry.instrumentation.pika.PikaInstrumentor._decorate_basic_consume"
)
@mock.patch(
"opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_consumers"
)
def test_instrument(
self,
instrument_consumers: mock.MagicMock,
instrument_basic_consume: mock.MagicMock,
instrument_channel_functions: mock.MagicMock,
):
PikaInstrumentor.instrument_channel(channel=self.channel)
assert hasattr(
self.channel, "_is_instrumented_by_opentelemetry"
), "channel is not marked as instrumented!"
instrument_consumers.assert_called_once()
instrument_basic_consume.assert_called_once()
instrument_channel_functions.assert_called_once()

@mock.patch("opentelemetry.instrumentation.pika.utils._decorate_callback")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from wrapt import ObjectProxy

# pylint: disable=unused-import
# pylint: disable=E0611
from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY # noqa: F401
from opentelemetry.trace import StatusCode

Expand Down