diff --git a/CHANGELOG.md b/CHANGELOG.md index 6db0e04e68..193779659c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +### Added + +- `opentelemetry-instrumentation-pika` Instrumentation for `channel.consume()` (supported + only for global, non channel specific instrumentation) + ([#2397](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2397))) + + ### Breaking changes - Rename `type` attribute to `asgi.event.type` in `opentelemetry-instrumentation-asgi` diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py index c745462cf3..d9cec06525 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py @@ -77,6 +77,15 @@ def consume_hook(span: Span, body: bytes, properties: BasicProperties): PikaInstrumentor.instrument_channel(channel, publish_hook=publish_hook, consume_hook=consume_hook) +Consumer Instrumentation +------------------------ +For consumer instrumentation, pika supports two consuming modes: + +* Consumers using the `basic_consume` method which accepts a callback. This is supported for global instrumentation + (`PikaInstrumentor().instrument()`) as well channel specific instrumentation (`PikaInstrumentor().instrument_channel(channel)`) +* Consumers using the `consume` method which returns a generator over messages. This is supported for global + instrumentations only (`PikaInstrumentor().instrument()`) + API --- """ diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py index 56c78a85c3..76261c89ce 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py @@ -20,7 +20,10 @@ import wrapt from packaging import version from pika.adapters import BlockingConnection -from pika.adapters.blocking_connection import BlockingChannel +from pika.adapters.blocking_connection import ( + BlockingChannel, + _QueueConsumerGeneratorInfo, +) from opentelemetry import trace from opentelemetry.instrumentation.instrumentor import BaseInstrumentor @@ -191,6 +194,24 @@ def wrapper(wrapped, instance, args, kwargs): wrapt.wrap_function_wrapper(channel, "basic_consume", wrapper) + @staticmethod + def _decorate_queue_consumer_generator( + tracer_provider: Optional[TracerProvider], + consume_hook: utils.HookT = utils.dummy_callback, + ) -> None: + tracer = trace.get_tracer(__name__, __version__, tracer_provider) + + def wrapper(wrapped, instance, args, kwargs): + res = wrapped(*args, **kwargs) + instance.pending_events = utils.ReadyMessagesDequeProxy( + instance.pending_events, instance, tracer, consume_hook + ) + return res + + wrapt.wrap_function_wrapper( + _QueueConsumerGeneratorInfo, "__init__", wrapper + ) + def _instrument(self, **kwargs: Dict[str, Any]) -> None: tracer_provider: TracerProvider = kwargs.get("tracer_provider", None) publish_hook: utils.HookT = kwargs.get( @@ -207,10 +228,15 @@ def _instrument(self, **kwargs: Dict[str, Any]) -> None: consume_hook=consume_hook, ) + self._decorate_queue_consumer_generator( + tracer_provider, consume_hook=consume_hook + ) + def _uninstrument(self, **kwargs: Dict[str, Any]) -> None: if hasattr(self, "__opentelemetry_tracer_provider"): delattr(self, "__opentelemetry_tracer_provider") unwrap(BlockingConnection, "channel") + unwrap(_QueueConsumerGeneratorInfo, "__init__") def instrumentation_dependencies(self) -> Collection[str]: return _instruments diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py index 6dab4fdfa9..5afa5d9ee6 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py @@ -1,8 +1,13 @@ from logging import getLogger from typing import Any, Callable, List, Optional +from pika.adapters.blocking_connection import ( + _ConsumerDeliveryEvt, + _QueueConsumerGeneratorInfo, +) from pika.channel import Channel from pika.spec import Basic, BasicProperties +from wrapt import ObjectProxy from opentelemetry import context, propagate, trace from opentelemetry.instrumentation.utils import is_instrumentation_enabled @@ -128,7 +133,7 @@ def decorated_function( def _get_span( tracer: Tracer, - channel: Channel, + channel: Optional[Channel], properties: BasicProperties, task_name: str, destination: str, @@ -157,7 +162,7 @@ def _generate_span_name( def _enrich_span( span: Span, - channel: Channel, + channel: Optional[Channel], properties: BasicProperties, task_destination: str, operation: Optional[MessagingOperationValues] = None, @@ -176,6 +181,8 @@ def _enrich_span( span.set_attribute( SpanAttributes.MESSAGING_CONVERSATION_ID, properties.correlation_id ) + if not channel: + return if not hasattr(channel.connection, "params"): span.set_attribute( SpanAttributes.NET_PEER_NAME, channel.connection._impl.params.host @@ -190,3 +197,75 @@ def _enrich_span( span.set_attribute( SpanAttributes.NET_PEER_PORT, channel.connection.params.port ) + + +# pylint:disable=abstract-method +class ReadyMessagesDequeProxy(ObjectProxy): + def __init__( + self, + wrapped, + queue_consumer_generator: _QueueConsumerGeneratorInfo, + tracer: Optional[Tracer], + consume_hook: HookT = dummy_callback, + ): + super().__init__(wrapped) + self._self_active_token = None + self._self_tracer = tracer + self._self_consume_hook = consume_hook + self._self_queue_consumer_generator = queue_consumer_generator + + def popleft(self, *args, **kwargs): + try: + # end active context if exists + if self._self_active_token: + context.detach(self._self_active_token) + except Exception as inst_exception: # pylint: disable=W0703 + _LOG.exception(inst_exception) + + evt = self.__wrapped__.popleft(*args, **kwargs) + + try: + # If a new message was received, create a span and set as active context + if isinstance(evt, _ConsumerDeliveryEvt): + method = evt.method + properties = evt.properties + if not properties: + properties = BasicProperties(headers={}) + if properties.headers is None: + properties.headers = {} + ctx = propagate.extract( + properties.headers, getter=_pika_getter + ) + if not ctx: + ctx = context.get_current() + message_ctx_token = context.attach(ctx) + span = _get_span( + self._self_tracer, + None, + properties, + destination=method.exchange + if method.exchange + else method.routing_key, + span_kind=SpanKind.CONSUMER, + task_name=self._self_queue_consumer_generator.consumer_tag, + operation=MessagingOperationValues.RECEIVE, + ) + try: + context.detach(message_ctx_token) + self._self_active_token = context.attach( + trace.set_span_in_context(span) + ) + self._self_consume_hook(span, evt.body, properties) + except Exception as hook_exception: # pylint: disable=W0703 + _LOG.exception(hook_exception) + finally: + # We must end the span here, because the next place we can hook + # is not the end of the user code, but only when the next message + # arrives. we still set this span's context as the active context + # so spans created by user code that handles this message will be + # children of this one. + span.end() + except Exception as inst_exception: # pylint: disable=W0703 + _LOG.exception(inst_exception) + + return evt diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py index 6e154c04f9..ad519c4a35 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py @@ -14,6 +14,7 @@ from unittest import TestCase, mock from pika.adapters import BlockingConnection +from pika.adapters.blocking_connection import _QueueConsumerGeneratorInfo from pika.channel import Channel from wrapt import BoundFunctionWrapper @@ -21,7 +22,10 @@ from opentelemetry.instrumentation.pika.pika_instrumentor import ( _consumer_callback_attribute_name, ) -from opentelemetry.instrumentation.pika.utils import dummy_callback +from opentelemetry.instrumentation.pika.utils import ( + ReadyMessagesDequeProxy, + dummy_callback, +) from opentelemetry.trace import Tracer @@ -40,13 +44,23 @@ def test_instrument_api(self) -> None: self.assertTrue( isinstance(BlockingConnection.channel, BoundFunctionWrapper) ) + self.assertTrue( + isinstance( + _QueueConsumerGeneratorInfo.__init__, BoundFunctionWrapper + ) + ) assert hasattr( instrumentation, "__opentelemetry_tracer_provider" ), "Tracer not stored for the object!" - instrumentation.uninstrument(channel=self.channel) + instrumentation.uninstrument() self.assertFalse( isinstance(BlockingConnection.channel, BoundFunctionWrapper) ) + self.assertFalse( + isinstance( + _QueueConsumerGeneratorInfo.__init__, BoundFunctionWrapper + ) + ) @mock.patch( "opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_channel_functions" @@ -57,7 +71,7 @@ def test_instrument_api(self) -> None: @mock.patch( "opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_blocking_channel_consumers" ) - def test_instrument( + def test_instrument_channel( self, instrument_blocking_channel_consumers: mock.MagicMock, instrument_basic_consume: mock.MagicMock, @@ -110,6 +124,23 @@ def test_instrument_basic_publish( self.channel.basic_publish, decorate_basic_publish.return_value ) + def test_instrument_queue_consumer_generator(self) -> None: + instrumentation = PikaInstrumentor() + instrumentation.instrument() + generator_info = _QueueConsumerGeneratorInfo( + params=("queue", False, False), consumer_tag="tag" + ) + self.assertTrue( + isinstance(generator_info.pending_events, ReadyMessagesDequeProxy) + ) + instrumentation.uninstrument() + generator_info = _QueueConsumerGeneratorInfo( + params=("queue", False, False), consumer_tag="tag" + ) + self.assertFalse( + isinstance(generator_info.pending_events, ReadyMessagesDequeProxy) + ) + def test_uninstrument_channel_functions(self) -> None: original_function = self.channel.basic_publish self.channel.basic_publish = mock.MagicMock() diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py index ed33593389..d651ea64c9 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py @@ -11,8 +11,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import collections from unittest import TestCase, mock +from pika.adapters.blocking_connection import ( + _ConsumerCancellationEvt, + _ConsumerDeliveryEvt, + _QueueConsumerGeneratorInfo, +) from pika.channel import Channel from pika.spec import Basic, BasicProperties @@ -448,3 +454,113 @@ def test_decorate_basic_publish_when_span_is_not_recording( exchange_name, routing_key, mock_body, properties, False ) self.assertEqual(retval, callback.return_value) + + # pylint: disable=too-many-statements + @mock.patch("opentelemetry.instrumentation.pika.utils._get_span") + @mock.patch("opentelemetry.propagate.extract") + @mock.patch("opentelemetry.context.detach") + @mock.patch("opentelemetry.context.attach") + @mock.patch("opentelemetry.context.get_current") + def test_decorate_deque_proxy( + self, + context_get_current: mock.MagicMock, + context_attach: mock.MagicMock, + context_detach: mock.MagicMock, + extract: mock.MagicMock, + get_span: mock.MagicMock, + ) -> None: + returned_span = mock.MagicMock() + get_span.return_value = returned_span + consume_hook = mock.MagicMock() + tracer = mock.MagicMock() + generator_info = mock.MagicMock( + spec=_QueueConsumerGeneratorInfo, + pending_events=mock.MagicMock(spec=collections.deque), + consumer_tag="mock_task_name", + ) + method = mock.MagicMock(spec=Basic.Deliver) + method.exchange = "test_exchange" + properties = mock.MagicMock() + evt = _ConsumerDeliveryEvt(method, properties, b"mock_body") + generator_info.pending_events.popleft.return_value = evt + proxy = utils.ReadyMessagesDequeProxy( + generator_info.pending_events, generator_info, tracer, consume_hook + ) + + # First call (no detach cleanup) + res = proxy.popleft() + self.assertEqual(res, evt) + generator_info.pending_events.popleft.assert_called_once() + extract.assert_called_once_with( + properties.headers, getter=utils._pika_getter + ) + context_get_current.assert_called_once() + self.assertEqual(context_attach.call_count, 2) + self.assertEqual(context_detach.call_count, 1) + get_span.assert_called_once_with( + tracer, + None, + properties, + destination=method.exchange, + span_kind=SpanKind.CONSUMER, + task_name=generator_info.consumer_tag, + operation=MessagingOperationValues.RECEIVE, + ) + consume_hook.assert_called_once() + returned_span.end.assert_called_once() + + generator_info.pending_events.reset_mock() + extract.reset_mock() + context_get_current.reset_mock() + get_span.reset_mock() + context_attach.reset_mock() + context_detach.reset_mock() + returned_span.end.reset_mock() + consume_hook.reset_mock() + + # Second call (has detach cleanup) + res = proxy.popleft() + self.assertEqual(res, evt) + generator_info.pending_events.popleft.assert_called_once() + extract.assert_called_once_with( + properties.headers, getter=utils._pika_getter + ) + context_get_current.assert_called_once() + self.assertEqual(context_attach.call_count, 2) + self.assertEqual(context_detach.call_count, 2) + get_span.assert_called_once_with( + tracer, + None, + properties, + destination=method.exchange, + span_kind=SpanKind.CONSUMER, + task_name=generator_info.consumer_tag, + operation=MessagingOperationValues.RECEIVE, + ) + consume_hook.assert_called_once() + returned_span.end.assert_called_once() + generator_info.pending_events.reset_mock() + + extract.reset_mock() + context_get_current.reset_mock() + get_span.reset_mock() + context_attach.reset_mock() + context_detach.reset_mock() + returned_span.end.reset_mock() + consume_hook.reset_mock() + + # Third call (cancellation event) + evt = _ConsumerCancellationEvt("") + generator_info.pending_events.popleft.return_value = evt + + res = proxy.popleft() + + self.assertEqual(res, evt) + generator_info.pending_events.popleft.assert_called_once() + extract.assert_not_called() + context_get_current.not_called() + context_detach.assert_called_once() + context_attach.assert_not_called() + get_span.assert_not_called() + consume_hook.assert_not_called() + returned_span.end.assert_not_called()