14
14
from logging import getLogger
15
15
from typing import Any , Collection , Dict , Optional
16
16
17
- import pkg_resources
18
17
import wrapt
19
18
from packaging import version
19
+ import pika
20
20
from pika .adapters import BlockingConnection
21
21
from pika .adapters .blocking_connection import BlockingChannel
22
22
34
34
_FUNCTIONS_TO_UNINSTRUMENT = ["basic_publish" ]
35
35
36
36
37
+ def _consumer_callback_attribute_name () -> str :
38
+ pika_version = version .parse (
39
+ pika .__version__
40
+ )
41
+ return (
42
+ "on_message_callback"
43
+ if pika_version >= version .parse ("1.0.0" )
44
+ else "consumer_cb"
45
+ )
46
+
47
+
37
48
class PikaInstrumentor (BaseInstrumentor ): # type: ignore
49
+ CONSUMER_CALLBACK_ATTR = _consumer_callback_attribute_name ()
50
+
38
51
# pylint: disable=attribute-defined-outside-init
39
52
@staticmethod
40
53
def _instrument_blocking_channel_consumers (
@@ -44,7 +57,7 @@ def _instrument_blocking_channel_consumers(
44
57
) -> Any :
45
58
for consumer_tag , consumer_info in channel ._consumer_infos .items ():
46
59
callback_attr = (
47
- PikaInstrumentor ._consumer_callback_attribute_name ()
60
+ PikaInstrumentor .CONSUMER_CALLBACK_ATTR
48
61
)
49
62
consumer_callback = getattr (consumer_info , callback_attr )
50
63
decorated_callback = utils ._decorate_callback (
@@ -133,7 +146,7 @@ def uninstrument_channel(channel: BlockingChannel) -> None:
133
146
134
147
for consumers_tag , client_info in channel ._consumer_infos .items ():
135
148
callback_attr = (
136
- PikaInstrumentor ._consumer_callback_attribute_name ()
149
+ PikaInstrumentor .CONSUMER_CALLBACK_ATTR
137
150
)
138
151
consumer_callback = getattr (client_info , callback_attr )
139
152
if hasattr (consumer_callback , "_original_callback" ):
@@ -142,17 +155,6 @@ def uninstrument_channel(channel: BlockingChannel) -> None:
142
155
] = consumer_callback ._original_callback
143
156
PikaInstrumentor ._uninstrument_channel_functions (channel )
144
157
145
- @staticmethod
146
- def _consumer_callback_attribute_name () -> str :
147
- pika_version = version .parse (
148
- pkg_resources .get_distribution ("pika" ).version
149
- )
150
- return (
151
- "on_message_callback"
152
- if pika_version >= version .parse ("1.0.0" )
153
- else "consumer_cb"
154
- )
155
-
156
158
def _decorate_channel_function (
157
159
self ,
158
160
tracer_provider : Optional [TracerProvider ],
0 commit comments