Skip to content

Commit 20700ba

Browse files
Support aio_pika 8
- Fix tests for new shape of the AbstractConnection class - Run tests against aio_pika 7 and 8
1 parent c92ba14 commit 20700ba

File tree

10 files changed

+127
-15
lines changed

10 files changed

+127
-15
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1313
([#1553](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1553))
1414
- `opentelemetry/sdk/extension/aws` Implement [`aws.ecs.*`](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/cloud_provider/aws/ecs.md) and [`aws.logs.*`](https://opentelemetry.io/docs/reference/specification/resource/semantic_conventions/cloud_provider/aws/logs/) resource attributes in the `AwsEcsResourceDetector` detector when the ECS Metadata v4 is available
1515
([#1212](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1212))
16+
- `opentelemetry-instrumentation-aio-pika` Support `aio_pika` 8.x
17+
([#1481](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1481))
1618

1719
### Fixed
1820

instrumentation/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11

22
| Instrumentation | Supported Packages | Metrics support |
33
| --------------- | ------------------ | --------------- |
4-
| [opentelemetry-instrumentation-aio-pika](./opentelemetry-instrumentation-aio-pika) | aio_pika ~= 7.2.0 | No
4+
| [opentelemetry-instrumentation-aio-pika](./opentelemetry-instrumentation-aio-pika) | aio_pika >= 7.2.0, < 9.0.0 | No
55
| [opentelemetry-instrumentation-aiohttp-client](./opentelemetry-instrumentation-aiohttp-client) | aiohttp ~= 3.0 | No
66
| [opentelemetry-instrumentation-aiopg](./opentelemetry-instrumentation-aiopg) | aiopg >= 0.13.0, < 2.0.0 | No
77
| [opentelemetry-instrumentation-asgi](./opentelemetry-instrumentation-asgi) | asgiref ~= 3.0 | No

instrumentation/opentelemetry-instrumentation-aio-pika/pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ dependencies = [
3131

3232
[project.optional-dependencies]
3333
instruments = [
34-
"aio_pika ~= 7.2.0",
34+
"aio_pika >= 7.2.0, < 9.0.0",
3535
]
3636
test = [
3737
"opentelemetry-instrumentation-aio-pika[instruments]",

instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/package.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,4 @@
1313
# limitations under the License.
1414
from typing import Collection
1515

16-
_instruments: Collection[str] = ("aio_pika ~= 7.2.0",)
16+
_instruments: Collection[str] = ("aio_pika >= 7.2.0, < 9.0.0",)

instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,13 @@ def set_destination(self, destination: str):
4949
self._attributes[SpanAttributes.MESSAGING_DESTINATION] = destination
5050

5151
def set_channel(self, channel: AbstractChannel):
52-
url = channel.connection.connection.url
52+
connection = channel.connection
53+
if getattr(connection, "connection", None):
54+
# aio_rmq 7
55+
url = connection.connection.url
56+
else:
57+
# aio_rmq 8
58+
url = connection.url
5359
self._attributes.update({
5460
SpanAttributes.NET_PEER_NAME: url.host,
5561
SpanAttributes.NET_PEER_PORT: url.port

instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515
SERVER_URL = URL(
1616
f"amqp://{SERVER_USER}:{SERVER_PASS}@{SERVER_HOST}:{SERVER_PORT}/"
1717
)
18-
CONNECTION = Namespace(connection=Namespace(url=SERVER_URL))
19-
CHANNEL = Namespace(connection=CONNECTION, loop=None)
18+
CONNECTION_7 = Namespace(connection=Namespace(url=SERVER_URL))
19+
CONNECTION_8 = Namespace(connection=Namespace(url=SERVER_URL))
20+
CHANNEL_7 = Namespace(connection=CONNECTION_7, loop=None)
21+
CHANNEL_8 = Namespace(connection=CONNECTION_8, loop=None)
2022
MESSAGE = Namespace(
2123
properties=Namespace(
2224
message_id=MESSAGE_ID, correlation_id=CORRELATION_ID, headers={}

instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_callback_decorator.py

+42-3
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
from opentelemetry.trace import SpanKind, get_tracer
2424

2525
from .consts import (
26-
CHANNEL,
26+
CHANNEL_7,
27+
CHANNEL_8,
2728
CORRELATION_ID,
2829
EXCHANGE_NAME,
2930
MESSAGE,
@@ -52,7 +53,7 @@ def setUp(self):
5253
asyncio.set_event_loop(self.loop)
5354

5455
def test_get_callback_span(self):
55-
queue = Queue(CHANNEL, QUEUE_NAME, False, False, False, None)
56+
queue = Queue(CHANNEL_7, QUEUE_NAME, False, False, False, None)
5657
tracer = mock.MagicMock()
5758
CallbackDecorator(tracer, queue)._get_span(MESSAGE)
5859
tracer.start_span.assert_called_once_with(
@@ -62,7 +63,45 @@ def test_get_callback_span(self):
6263
)
6364

6465
def test_decorate_callback(self):
65-
queue = Queue(CHANNEL, QUEUE_NAME, False, False, False, None)
66+
queue = Queue(CHANNEL_7, QUEUE_NAME, False, False, False, None)
67+
callback = mock.MagicMock(return_value=asyncio.sleep(0))
68+
with mock.patch.object(
69+
CallbackDecorator, "_get_span"
70+
) as mocked_get_callback_span:
71+
callback_decorator = CallbackDecorator(self.tracer, queue)
72+
decorated_callback = callback_decorator.decorate(callback)
73+
self.loop.run_until_complete(decorated_callback(MESSAGE))
74+
mocked_get_callback_span.assert_called_once()
75+
callback.assert_called_once_with(MESSAGE)
76+
77+
class TestInstrumentedQueue(TestCase):
78+
EXPECTED_ATTRIBUTES = {
79+
SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM,
80+
SpanAttributes.MESSAGING_DESTINATION: EXCHANGE_NAME,
81+
SpanAttributes.NET_PEER_NAME: SERVER_HOST,
82+
SpanAttributes.NET_PEER_PORT: SERVER_PORT,
83+
SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID,
84+
SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID,
85+
SpanAttributes.MESSAGING_OPERATION: "receive",
86+
}
87+
88+
def setUp(self):
89+
self.tracer = get_tracer(__name__)
90+
self.loop = asyncio.new_event_loop()
91+
asyncio.set_event_loop(self.loop)
92+
93+
def test_get_callback_span(self):
94+
queue = Queue(CHANNEL_8, QUEUE_NAME, False, False, False, None)
95+
tracer = mock.MagicMock()
96+
CallbackDecorator(tracer, queue)._get_span(MESSAGE)
97+
tracer.start_span.assert_called_once_with(
98+
f"{EXCHANGE_NAME} receive",
99+
kind=SpanKind.CONSUMER,
100+
attributes=self.EXPECTED_ATTRIBUTES,
101+
)
102+
103+
def test_decorate_callback(self):
104+
queue = Queue(CHANNEL_8, QUEUE_NAME, False, False, False, None)
66105
callback = mock.MagicMock(return_value=asyncio.sleep(0))
67106
with mock.patch.object(
68107
CallbackDecorator, "_get_span"

instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py

+59-5
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424
from opentelemetry.trace import SpanKind, get_tracer
2525

2626
from .consts import (
27-
CHANNEL,
28-
CONNECTION,
27+
CHANNEL_7,
28+
CHANNEL_8,
29+
CONNECTION_7,
30+
CONNECTION_8,
2931
CORRELATION_ID,
3032
EXCHANGE_NAME,
3133
MESSAGE,
@@ -37,7 +39,7 @@
3739
)
3840

3941

40-
class TestInstrumentedExchange(TestCase):
42+
class TestInstrumentedExchangeAioRmq7(TestCase):
4143
EXPECTED_ATTRIBUTES = {
4244
SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM,
4345
SpanAttributes.MESSAGING_DESTINATION: f"{EXCHANGE_NAME},{ROUTING_KEY}",
@@ -54,7 +56,7 @@ def setUp(self):
5456
asyncio.set_event_loop(self.loop)
5557

5658
def test_get_publish_span(self):
57-
exchange = Exchange(CONNECTION, CHANNEL, EXCHANGE_NAME)
59+
exchange = Exchange(CONNECTION_7, CHANNEL_7, EXCHANGE_NAME)
5860
tracer = mock.MagicMock()
5961
PublishDecorator(tracer, exchange)._get_publish_span(
6062
MESSAGE, ROUTING_KEY
@@ -66,7 +68,59 @@ def test_get_publish_span(self):
6668
)
6769

6870
def _test_publish(self, exchange_type: Type[Exchange]):
69-
exchange = exchange_type(CONNECTION, CHANNEL, EXCHANGE_NAME)
71+
exchange = exchange_type(CONNECTION_7, CHANNEL_7, EXCHANGE_NAME)
72+
with mock.patch.object(
73+
PublishDecorator, "_get_publish_span"
74+
) as mock_get_publish_span:
75+
with mock.patch.object(
76+
Exchange, "publish", return_value=asyncio.sleep(0)
77+
) as mock_publish:
78+
decorated_publish = PublishDecorator(
79+
self.tracer, exchange
80+
).decorate(mock_publish)
81+
self.loop.run_until_complete(
82+
decorated_publish(MESSAGE, ROUTING_KEY)
83+
)
84+
mock_publish.assert_called_once()
85+
mock_get_publish_span.assert_called_once()
86+
87+
def test_publish(self):
88+
self._test_publish(Exchange)
89+
90+
def test_robust_publish(self):
91+
self._test_publish(RobustExchange)
92+
93+
94+
class TestInstrumentedExchangeAioRmq8(TestCase):
95+
EXPECTED_ATTRIBUTES = {
96+
SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM,
97+
SpanAttributes.MESSAGING_DESTINATION: f"{EXCHANGE_NAME},{ROUTING_KEY}",
98+
SpanAttributes.NET_PEER_NAME: SERVER_HOST,
99+
SpanAttributes.NET_PEER_PORT: SERVER_PORT,
100+
SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID,
101+
SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID,
102+
SpanAttributes.MESSAGING_TEMP_DESTINATION: True,
103+
}
104+
105+
def setUp(self):
106+
self.tracer = get_tracer(__name__)
107+
self.loop = asyncio.new_event_loop()
108+
asyncio.set_event_loop(self.loop)
109+
110+
def test_get_publish_span(self):
111+
exchange = Exchange(CONNECTION_8, CHANNEL_8, EXCHANGE_NAME)
112+
tracer = mock.MagicMock()
113+
PublishDecorator(tracer, exchange)._get_publish_span(
114+
MESSAGE, ROUTING_KEY
115+
)
116+
tracer.start_span.assert_called_once_with(
117+
f"{EXCHANGE_NAME},{ROUTING_KEY} send",
118+
kind=SpanKind.PRODUCER,
119+
attributes=self.EXPECTED_ATTRIBUTES,
120+
)
121+
122+
def _test_publish(self, exchange_type: Type[Exchange]):
123+
exchange = exchange_type(CONNECTION_8, CHANNEL_8, EXCHANGE_NAME)
70124
with mock.patch.object(
71125
PublishDecorator, "_get_publish_span"
72126
) as mock_get_publish_span:

opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
libraries = {
1919
"aio_pika": {
20-
"library": "aio_pika ~= 7.2.0",
20+
"library": "aio_pika >= 7.2.0, < 9.0.0",
2121
"instrumentation": "opentelemetry-instrumentation-aio-pika==0.37b0.dev",
2222
},
2323
"aiohttp": {

tox.ini

+9
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,10 @@ envlist =
203203
py3{7,8,9,10,11}-test-instrumentation-pika{0,1}
204204
pypy3-test-instrumentation-pika{0,1}
205205

206+
; opentelemetry-instrumentation-aio-pika
207+
py3{7,8,9,10,11}-test-instrumentation-aio-pika{7,8}
208+
pypy3-test-instrumentation-aio-pika{7,8}
209+
206210
; opentelemetry-instrumentation-kafka-python
207211
py3{7,8,9,10,11}-test-instrumentation-kafka-python
208212
pypy3-test-instrumentation-kafka-python
@@ -247,6 +251,8 @@ deps =
247251
sqlalchemy14: sqlalchemy~=1.4
248252
pika0: pika>=0.12.0,<1.0.0
249253
pika1: pika>=1.0.0
254+
aio-pika7: aio_pika~=7.2.0
255+
aio-pika8: aio_pika>=8.0.0,<9.0.0
250256
pymemcache135: pymemcache ==1.3.5
251257
pymemcache200: pymemcache >2.0.0,<3.0.0
252258
pymemcache300: pymemcache >3.0.0,<3.4.2
@@ -292,6 +298,7 @@ changedir =
292298
test-instrumentation-logging: instrumentation/opentelemetry-instrumentation-logging/tests
293299
test-instrumentation-mysql: instrumentation/opentelemetry-instrumentation-mysql/tests
294300
test-instrumentation-pika{0,1}: instrumentation/opentelemetry-instrumentation-pika/tests
301+
test-instrumentation-aio-pika{7,8}: instrumentation/opentelemetry-instrumentation-aio-pika/tests
295302
test-instrumentation-psycopg2: instrumentation/opentelemetry-instrumentation-psycopg2/tests
296303
test-instrumentation-pymemcache{135,200,300,342}: instrumentation/opentelemetry-instrumentation-pymemcache/tests
297304
test-instrumentation-pymongo: instrumentation/opentelemetry-instrumentation-pymongo/tests
@@ -333,6 +340,8 @@ commands_pre =
333340

334341
pika{0,1}: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-pika[test]
335342

343+
aio-pika{7,8}: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-aio-pika[test]
344+
336345
kafka-python: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-kafka-python[test]
337346

338347
confluent-kafka: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-confluent-kafka[test]

0 commit comments

Comments
 (0)