Skip to content

Commit c24c77d

Browse files
Bugfix/instrument basic publish in pika (#759)
1 parent 433b856 commit c24c77d

File tree

5 files changed

+41
-0
lines changed

5 files changed

+41
-0
lines changed

Diff for: CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1313

1414
- `opentelemetry-sdk-extension-aws` & `opentelemetry-propagator-aws` Release AWS Python SDK Extension as 2.0.1 and AWS Propagator as 1.0.1
1515
([#753](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/753))
16+
- `opentelemetry-instrumentation-pika` Add `_decorate_basic_consume` to ensure post instrumentation `basic_consume` calls are also instrumented.
17+
([#759](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/759))
1618

1719
### Fixed
1820

Diff for: instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py

+29
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ def _uninstrument_channel_functions(channel: Channel) -> None:
7070
function = getattr(channel, function_name)
7171
if hasattr(function, "_original_function"):
7272
channel.__setattr__(function_name, function._original_function)
73+
unwrap(channel, "basic_consume")
7374

7475
@staticmethod
7576
def instrument_channel(
@@ -90,6 +91,7 @@ def instrument_channel(
9091
PikaInstrumentor._instrument_consumers(
9192
channel._impl._consumers, tracer
9293
)
94+
PikaInstrumentor._decorate_basic_consume(channel, tracer)
9395
PikaInstrumentor._instrument_channel_functions(channel, tracer)
9496

9597
@staticmethod
@@ -120,6 +122,33 @@ def wrapper(wrapped, instance, args, kwargs):
120122

121123
wrapt.wrap_function_wrapper(BlockingConnection, "channel", wrapper)
122124

125+
@staticmethod
126+
def _decorate_basic_consume(channel, tracer: Optional[Tracer]) -> None:
127+
def wrapper(wrapped, instance, args, kwargs):
128+
if not hasattr(channel, "_impl"):
129+
_LOG.error(
130+
"Could not find implementation for provided channel!"
131+
)
132+
return wrapped(*args, **kwargs)
133+
current_keys = set(channel._impl._consumers.keys())
134+
return_value = wrapped(*args, **kwargs)
135+
new_key_list = list(
136+
set(channel._impl._consumers.keys()) - current_keys
137+
)
138+
if not new_key_list:
139+
_LOG.error("Could not find added callback")
140+
return return_value
141+
new_key = new_key_list[0]
142+
callback = channel._impl._consumers[new_key]
143+
decorated_callback = utils._decorate_callback(
144+
callback, tracer, new_key
145+
)
146+
setattr(decorated_callback, "_original_callback", callback)
147+
channel._impl._consumers[new_key] = decorated_callback
148+
return return_value
149+
150+
wrapt.wrap_function_wrapper(channel, "basic_consume", wrapper)
151+
123152
def _instrument(self, **kwargs: Dict[str, Any]) -> None:
124153
tracer_provider: TracerProvider = kwargs.get("tracer_provider", None)
125154
self.__setattr__("__opentelemetry_tracer_provider", tracer_provider)

Diff for: instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py

+4
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ def decorated_callback(
4141
) -> Any:
4242
if not properties:
4343
properties = BasicProperties(headers={})
44+
if properties.headers is None:
45+
properties.headers = {}
4446
ctx = propagate.extract(properties.headers, getter=_pika_getter)
4547
if not ctx:
4648
ctx = context.get_current()
@@ -74,6 +76,8 @@ def decorated_function(
7476
) -> Any:
7577
if not properties:
7678
properties = BasicProperties(headers={})
79+
if properties.headers is None:
80+
properties.headers = {}
7781
ctx = context.get_current()
7882
span = _get_span(
7983
tracer,

Diff for: instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py

+5
Original file line numberDiff line numberDiff line change
@@ -45,19 +45,24 @@ def test_instrument_api(self) -> None:
4545
@mock.patch(
4646
"opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_channel_functions"
4747
)
48+
@mock.patch(
49+
"opentelemetry.instrumentation.pika.PikaInstrumentor._decorate_basic_consume"
50+
)
4851
@mock.patch(
4952
"opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_consumers"
5053
)
5154
def test_instrument(
5255
self,
5356
instrument_consumers: mock.MagicMock,
57+
instrument_basic_consume: mock.MagicMock,
5458
instrument_channel_functions: mock.MagicMock,
5559
):
5660
PikaInstrumentor.instrument_channel(channel=self.channel)
5761
assert hasattr(
5862
self.channel, "_is_instrumented_by_opentelemetry"
5963
), "channel is not marked as instrumented!"
6064
instrument_consumers.assert_called_once()
65+
instrument_basic_consume.assert_called_once()
6166
instrument_channel_functions.assert_called_once()
6267

6368
@mock.patch("opentelemetry.instrumentation.pika.utils._decorate_callback")

Diff for: opentelemetry-instrumentation/src/opentelemetry/instrumentation/utils.py

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from wrapt import ObjectProxy
1818

1919
# pylint: disable=unused-import
20+
# pylint: disable=E0611
2021
from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY # noqa: F401
2122
from opentelemetry.trace import StatusCode
2223

0 commit comments

Comments
 (0)