Skip to content

Bugfix/set default headers for properties in pika #740

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `opentelemetry-util-http` no longer contains an instrumentation entrypoint and will not be loaded
automatically by the auto instrumentor.
([#745](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/745))
- `opentelemetry-instrumentation-pika` Bugfix use properties.headers. It will prevent the header injection from raising.
([#740](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/740))

## [1.6.0-0.25b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.6.0-0.25b0) - 2021-10-13
### Added
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@
MessagingOperationValues,
SpanAttributes,
)
from opentelemetry.trace import Tracer
from opentelemetry.trace import SpanKind, Tracer
from opentelemetry.trace.span import Span


@@ -40,16 +40,15 @@ def decorated_callback(
body: bytes,
) -> Any:
if not properties:
properties = BasicProperties()
if properties.headers is None:
properties.headers = {}
properties = BasicProperties(headers={})
ctx = propagate.extract(properties.headers, getter=_pika_getter)
if not ctx:
ctx = context.get_current()
span = _get_span(
tracer,
channel,
properties,
span_kind=SpanKind.CONSUMER,
task_name=task_name,
ctx=ctx,
operation=MessagingOperationValues.RECEIVE,
@@ -74,12 +73,13 @@ def decorated_function(
mandatory: bool = False,
) -> Any:
if not properties:
properties = BasicProperties()
properties = BasicProperties(headers={})
ctx = context.get_current()
span = _get_span(
tracer,
channel,
properties,
span_kind=SpanKind.PRODUCER,
task_name="(temporary)",
ctx=ctx,
operation=None,
@@ -104,6 +104,7 @@ def _get_span(
channel: Channel,
properties: BasicProperties,
task_name: str,
span_kind: SpanKind,
ctx: context.Context,
operation: Optional[MessagingOperationValues] = None,
) -> Optional[Span]:
@@ -113,7 +114,9 @@ def _get_span(
return None
task_name = properties.type if properties.type else task_name
span = tracer.start_span(
context=ctx, name=_generate_span_name(task_name, operation)
context=ctx,
name=_generate_span_name(task_name, operation),
kind=span_kind,
)
if span.is_recording():
_enrich_span(span, channel, properties, task_name, operation)
136 changes: 131 additions & 5 deletions instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -13,9 +13,15 @@
# limitations under the License.
from unittest import TestCase, mock

from pika.channel import Channel
from pika.spec import Basic, BasicProperties

from opentelemetry.instrumentation.pika import utils
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import Span, Tracer
from opentelemetry.semconv.trace import (
MessagingOperationValues,
SpanAttributes,
)
from opentelemetry.trace import Span, SpanKind, Tracer


class TestUtils(TestCase):
@@ -32,12 +38,15 @@ def test_get_span(
channel = mock.MagicMock()
properties = mock.MagicMock()
task_name = "test.test"
span_kind = mock.MagicMock(spec=SpanKind)
get_value.return_value = None
ctx = mock.MagicMock()
_ = utils._get_span(tracer, channel, properties, task_name, ctx)
_ = utils._get_span(
tracer, channel, properties, task_name, span_kind, ctx
)
generate_span_name.assert_called_once()
tracer.start_span.assert_called_once_with(
context=ctx, name=generate_span_name.return_value
context=ctx, name=generate_span_name.return_value, kind=span_kind
)
enrich_span.assert_called_once()

@@ -54,9 +63,12 @@ def test_get_span_suppressed(
channel = mock.MagicMock()
properties = mock.MagicMock()
task_name = "test.test"
span_kind = mock.MagicMock(spec=SpanKind)
get_value.return_value = True
ctx = mock.MagicMock()
span = utils._get_span(tracer, channel, properties, task_name, ctx)
span = utils._get_span(
tracer, channel, properties, task_name, span_kind, ctx
)
self.assertEqual(span, None)
generate_span_name.assert_not_called()
enrich_span.assert_not_called()
@@ -158,3 +170,117 @@ def test_enrich_span_unique_connection() -> None:
),
],
)

@mock.patch("opentelemetry.instrumentation.pika.utils._get_span")
@mock.patch("opentelemetry.propagate.extract")
@mock.patch("opentelemetry.trace.use_span")
def test_decorate_callback(
self,
use_span: mock.MagicMock,
extract: mock.MagicMock,
get_span: mock.MagicMock,
) -> None:
callback = mock.MagicMock()
mock_task_name = "mock_task_name"
tracer = mock.MagicMock()
channel = mock.MagicMock(spec=Channel)
method = mock.MagicMock(spec=Basic.Deliver)
properties = mock.MagicMock()
mock_body = b"mock_body"
decorated_callback = utils._decorate_callback(
callback, tracer, mock_task_name
)
retval = decorated_callback(channel, method, properties, mock_body)
extract.assert_called_once_with(
properties.headers, getter=utils._pika_getter
)
get_span.assert_called_once_with(
tracer,
channel,
properties,
span_kind=SpanKind.CONSUMER,
task_name=mock_task_name,
ctx=extract.return_value,
operation=MessagingOperationValues.RECEIVE,
)
use_span.assert_called_once_with(
get_span.return_value, end_on_exit=True
)
callback.assert_called_once_with(
channel, method, properties, mock_body
)
self.assertEqual(retval, callback.return_value)

@mock.patch("opentelemetry.instrumentation.pika.utils._get_span")
@mock.patch("opentelemetry.propagate.inject")
@mock.patch("opentelemetry.context.get_current")
@mock.patch("opentelemetry.trace.use_span")
def test_decorate_basic_publish(
self,
use_span: mock.MagicMock,
get_current: mock.MagicMock,
inject: mock.MagicMock,
get_span: mock.MagicMock,
) -> None:
callback = mock.MagicMock()
tracer = mock.MagicMock()
channel = mock.MagicMock(spec=Channel)
method = mock.MagicMock(spec=Basic.Deliver)
properties = mock.MagicMock()
mock_body = b"mock_body"
decorated_basic_publish = utils._decorate_basic_publish(
callback, channel, tracer
)
retval = decorated_basic_publish(
channel, method, mock_body, properties
)
get_current.assert_called_once()
get_span.assert_called_once_with(
tracer,
channel,
properties,
span_kind=SpanKind.PRODUCER,
task_name="(temporary)",
ctx=get_current.return_value,
operation=None,
)
use_span.assert_called_once_with(
get_span.return_value, end_on_exit=True
)
get_span.return_value.is_recording.assert_called_once()
inject.assert_called_once_with(properties.headers)
callback.assert_called_once_with(
channel, method, mock_body, properties, False
)
self.assertEqual(retval, callback.return_value)

@mock.patch("opentelemetry.instrumentation.pika.utils._get_span")
@mock.patch("opentelemetry.propagate.inject")
@mock.patch("opentelemetry.context.get_current")
@mock.patch("opentelemetry.trace.use_span")
@mock.patch("pika.spec.BasicProperties.__new__")
def test_decorate_basic_publish_no_properties(
self,
basic_properties: mock.MagicMock,
use_span: mock.MagicMock,
get_current: mock.MagicMock,
inject: mock.MagicMock,
get_span: mock.MagicMock,
) -> None:
callback = mock.MagicMock()
tracer = mock.MagicMock()
channel = mock.MagicMock(spec=Channel)
method = mock.MagicMock(spec=Basic.Deliver)
mock_body = b"mock_body"
decorated_basic_publish = utils._decorate_basic_publish(
callback, channel, tracer
)
retval = decorated_basic_publish(channel, method, body=mock_body)
basic_properties.assert_called_once_with(BasicProperties, headers={})
get_current.assert_called_once()
use_span.assert_called_once_with(
get_span.return_value, end_on_exit=True
)
get_span.return_value.is_recording.assert_called_once()
inject.assert_called_once_with(basic_properties.return_value.headers)
self.assertEqual(retval, callback.return_value)