Skip to content

Commit ad06e70

Browse files
authored
feat(pika): adding support for channel.consume instrumentation (#2397)
* feat(pika): adding support for channel.consume instrumentation * updated changelog * wip tests * updating docs * more tests * removing span member on object proxy * adding test for ReadyMessagesDequeProxy * adding tests * better comment on span.end() * fixing docs * ending span even on exceptions
1 parent 2317adc commit ad06e70

File tree

6 files changed

+274
-6
lines changed

6 files changed

+274
-6
lines changed

Diff for: CHANGELOG.md

+7
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## Unreleased
99

10+
### Added
11+
12+
- `opentelemetry-instrumentation-pika` Instrumentation for `channel.consume()` (supported
13+
only for global, non channel specific instrumentation)
14+
([#2397](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2397)))
15+
16+
1017
### Breaking changes
1118

1219
- Rename `type` attribute to `asgi.event.type` in `opentelemetry-instrumentation-asgi`

Diff for: instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py

+9
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,15 @@ def consume_hook(span: Span, body: bytes, properties: BasicProperties):
7777
7878
PikaInstrumentor.instrument_channel(channel, publish_hook=publish_hook, consume_hook=consume_hook)
7979
80+
Consumer Instrumentation
81+
------------------------
82+
For consumer instrumentation, pika supports two consuming modes:
83+
84+
* Consumers using the `basic_consume` method which accepts a callback. This is supported for global instrumentation
85+
(`PikaInstrumentor().instrument()`) as well channel specific instrumentation (`PikaInstrumentor().instrument_channel(channel)`)
86+
* Consumers using the `consume` method which returns a generator over messages. This is supported for global
87+
instrumentations only (`PikaInstrumentor().instrument()`)
88+
8089
API
8190
---
8291
"""

Diff for: instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py

+27-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020
import wrapt
2121
from packaging import version
2222
from pika.adapters import BlockingConnection
23-
from pika.adapters.blocking_connection import BlockingChannel
23+
from pika.adapters.blocking_connection import (
24+
BlockingChannel,
25+
_QueueConsumerGeneratorInfo,
26+
)
2427

2528
from opentelemetry import trace
2629
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
@@ -191,6 +194,24 @@ def wrapper(wrapped, instance, args, kwargs):
191194

192195
wrapt.wrap_function_wrapper(channel, "basic_consume", wrapper)
193196

197+
@staticmethod
198+
def _decorate_queue_consumer_generator(
199+
tracer_provider: Optional[TracerProvider],
200+
consume_hook: utils.HookT = utils.dummy_callback,
201+
) -> None:
202+
tracer = trace.get_tracer(__name__, __version__, tracer_provider)
203+
204+
def wrapper(wrapped, instance, args, kwargs):
205+
res = wrapped(*args, **kwargs)
206+
instance.pending_events = utils.ReadyMessagesDequeProxy(
207+
instance.pending_events, instance, tracer, consume_hook
208+
)
209+
return res
210+
211+
wrapt.wrap_function_wrapper(
212+
_QueueConsumerGeneratorInfo, "__init__", wrapper
213+
)
214+
194215
def _instrument(self, **kwargs: Dict[str, Any]) -> None:
195216
tracer_provider: TracerProvider = kwargs.get("tracer_provider", None)
196217
publish_hook: utils.HookT = kwargs.get(
@@ -207,10 +228,15 @@ def _instrument(self, **kwargs: Dict[str, Any]) -> None:
207228
consume_hook=consume_hook,
208229
)
209230

231+
self._decorate_queue_consumer_generator(
232+
tracer_provider, consume_hook=consume_hook
233+
)
234+
210235
def _uninstrument(self, **kwargs: Dict[str, Any]) -> None:
211236
if hasattr(self, "__opentelemetry_tracer_provider"):
212237
delattr(self, "__opentelemetry_tracer_provider")
213238
unwrap(BlockingConnection, "channel")
239+
unwrap(_QueueConsumerGeneratorInfo, "__init__")
214240

215241
def instrumentation_dependencies(self) -> Collection[str]:
216242
return _instruments

Diff for: instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py

+81-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
from logging import getLogger
22
from typing import Any, Callable, List, Optional
33

4+
from pika.adapters.blocking_connection import (
5+
_ConsumerDeliveryEvt,
6+
_QueueConsumerGeneratorInfo,
7+
)
48
from pika.channel import Channel
59
from pika.spec import Basic, BasicProperties
10+
from wrapt import ObjectProxy
611

712
from opentelemetry import context, propagate, trace
813
from opentelemetry.instrumentation.utils import is_instrumentation_enabled
@@ -128,7 +133,7 @@ def decorated_function(
128133

129134
def _get_span(
130135
tracer: Tracer,
131-
channel: Channel,
136+
channel: Optional[Channel],
132137
properties: BasicProperties,
133138
task_name: str,
134139
destination: str,
@@ -157,7 +162,7 @@ def _generate_span_name(
157162

158163
def _enrich_span(
159164
span: Span,
160-
channel: Channel,
165+
channel: Optional[Channel],
161166
properties: BasicProperties,
162167
task_destination: str,
163168
operation: Optional[MessagingOperationValues] = None,
@@ -176,6 +181,8 @@ def _enrich_span(
176181
span.set_attribute(
177182
SpanAttributes.MESSAGING_CONVERSATION_ID, properties.correlation_id
178183
)
184+
if not channel:
185+
return
179186
if not hasattr(channel.connection, "params"):
180187
span.set_attribute(
181188
SpanAttributes.NET_PEER_NAME, channel.connection._impl.params.host
@@ -190,3 +197,75 @@ def _enrich_span(
190197
span.set_attribute(
191198
SpanAttributes.NET_PEER_PORT, channel.connection.params.port
192199
)
200+
201+
202+
# pylint:disable=abstract-method
203+
class ReadyMessagesDequeProxy(ObjectProxy):
204+
def __init__(
205+
self,
206+
wrapped,
207+
queue_consumer_generator: _QueueConsumerGeneratorInfo,
208+
tracer: Optional[Tracer],
209+
consume_hook: HookT = dummy_callback,
210+
):
211+
super().__init__(wrapped)
212+
self._self_active_token = None
213+
self._self_tracer = tracer
214+
self._self_consume_hook = consume_hook
215+
self._self_queue_consumer_generator = queue_consumer_generator
216+
217+
def popleft(self, *args, **kwargs):
218+
try:
219+
# end active context if exists
220+
if self._self_active_token:
221+
context.detach(self._self_active_token)
222+
except Exception as inst_exception: # pylint: disable=W0703
223+
_LOG.exception(inst_exception)
224+
225+
evt = self.__wrapped__.popleft(*args, **kwargs)
226+
227+
try:
228+
# If a new message was received, create a span and set as active context
229+
if isinstance(evt, _ConsumerDeliveryEvt):
230+
method = evt.method
231+
properties = evt.properties
232+
if not properties:
233+
properties = BasicProperties(headers={})
234+
if properties.headers is None:
235+
properties.headers = {}
236+
ctx = propagate.extract(
237+
properties.headers, getter=_pika_getter
238+
)
239+
if not ctx:
240+
ctx = context.get_current()
241+
message_ctx_token = context.attach(ctx)
242+
span = _get_span(
243+
self._self_tracer,
244+
None,
245+
properties,
246+
destination=method.exchange
247+
if method.exchange
248+
else method.routing_key,
249+
span_kind=SpanKind.CONSUMER,
250+
task_name=self._self_queue_consumer_generator.consumer_tag,
251+
operation=MessagingOperationValues.RECEIVE,
252+
)
253+
try:
254+
context.detach(message_ctx_token)
255+
self._self_active_token = context.attach(
256+
trace.set_span_in_context(span)
257+
)
258+
self._self_consume_hook(span, evt.body, properties)
259+
except Exception as hook_exception: # pylint: disable=W0703
260+
_LOG.exception(hook_exception)
261+
finally:
262+
# We must end the span here, because the next place we can hook
263+
# is not the end of the user code, but only when the next message
264+
# arrives. we still set this span's context as the active context
265+
# so spans created by user code that handles this message will be
266+
# children of this one.
267+
span.end()
268+
except Exception as inst_exception: # pylint: disable=W0703
269+
_LOG.exception(inst_exception)
270+
271+
return evt

Diff for: instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py

+34-3
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,18 @@
1414
from unittest import TestCase, mock
1515

1616
from pika.adapters import BlockingConnection
17+
from pika.adapters.blocking_connection import _QueueConsumerGeneratorInfo
1718
from pika.channel import Channel
1819
from wrapt import BoundFunctionWrapper
1920

2021
from opentelemetry.instrumentation.pika import PikaInstrumentor
2122
from opentelemetry.instrumentation.pika.pika_instrumentor import (
2223
_consumer_callback_attribute_name,
2324
)
24-
from opentelemetry.instrumentation.pika.utils import dummy_callback
25+
from opentelemetry.instrumentation.pika.utils import (
26+
ReadyMessagesDequeProxy,
27+
dummy_callback,
28+
)
2529
from opentelemetry.trace import Tracer
2630

2731

@@ -40,13 +44,23 @@ def test_instrument_api(self) -> None:
4044
self.assertTrue(
4145
isinstance(BlockingConnection.channel, BoundFunctionWrapper)
4246
)
47+
self.assertTrue(
48+
isinstance(
49+
_QueueConsumerGeneratorInfo.__init__, BoundFunctionWrapper
50+
)
51+
)
4352
assert hasattr(
4453
instrumentation, "__opentelemetry_tracer_provider"
4554
), "Tracer not stored for the object!"
46-
instrumentation.uninstrument(channel=self.channel)
55+
instrumentation.uninstrument()
4756
self.assertFalse(
4857
isinstance(BlockingConnection.channel, BoundFunctionWrapper)
4958
)
59+
self.assertFalse(
60+
isinstance(
61+
_QueueConsumerGeneratorInfo.__init__, BoundFunctionWrapper
62+
)
63+
)
5064

5165
@mock.patch(
5266
"opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_channel_functions"
@@ -57,7 +71,7 @@ def test_instrument_api(self) -> None:
5771
@mock.patch(
5872
"opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_blocking_channel_consumers"
5973
)
60-
def test_instrument(
74+
def test_instrument_channel(
6175
self,
6276
instrument_blocking_channel_consumers: mock.MagicMock,
6377
instrument_basic_consume: mock.MagicMock,
@@ -110,6 +124,23 @@ def test_instrument_basic_publish(
110124
self.channel.basic_publish, decorate_basic_publish.return_value
111125
)
112126

127+
def test_instrument_queue_consumer_generator(self) -> None:
128+
instrumentation = PikaInstrumentor()
129+
instrumentation.instrument()
130+
generator_info = _QueueConsumerGeneratorInfo(
131+
params=("queue", False, False), consumer_tag="tag"
132+
)
133+
self.assertTrue(
134+
isinstance(generator_info.pending_events, ReadyMessagesDequeProxy)
135+
)
136+
instrumentation.uninstrument()
137+
generator_info = _QueueConsumerGeneratorInfo(
138+
params=("queue", False, False), consumer_tag="tag"
139+
)
140+
self.assertFalse(
141+
isinstance(generator_info.pending_events, ReadyMessagesDequeProxy)
142+
)
143+
113144
def test_uninstrument_channel_functions(self) -> None:
114145
original_function = self.channel.basic_publish
115146
self.channel.basic_publish = mock.MagicMock()

0 commit comments

Comments
 (0)