14
14
from logging import getLogger
15
15
from typing import Any , Collection , Dict , Optional
16
16
17
- import pkg_resources
17
+ import pika
18
18
import wrapt
19
19
from packaging import version
20
20
from pika .adapters import BlockingConnection
34
34
_FUNCTIONS_TO_UNINSTRUMENT = ["basic_publish" ]
35
35
36
36
37
+ def _consumer_callback_attribute_name () -> str :
38
+ pika_version = version .parse (pika .__version__ )
39
+ return (
40
+ "on_message_callback"
41
+ if pika_version >= version .parse ("1.0.0" )
42
+ else "consumer_cb"
43
+ )
44
+
45
+
37
46
class PikaInstrumentor (BaseInstrumentor ): # type: ignore
47
+ CONSUMER_CALLBACK_ATTR = _consumer_callback_attribute_name ()
48
+
38
49
# pylint: disable=attribute-defined-outside-init
39
50
@staticmethod
40
51
def _instrument_blocking_channel_consumers (
@@ -43,9 +54,7 @@ def _instrument_blocking_channel_consumers(
43
54
consume_hook : utils .HookT = utils .dummy_callback ,
44
55
) -> Any :
45
56
for consumer_tag , consumer_info in channel ._consumer_infos .items ():
46
- callback_attr = (
47
- PikaInstrumentor ._consumer_callback_attribute_name ()
48
- )
57
+ callback_attr = PikaInstrumentor .CONSUMER_CALLBACK_ATTR
49
58
consumer_callback = getattr (consumer_info , callback_attr )
50
59
decorated_callback = utils ._decorate_callback (
51
60
consumer_callback ,
@@ -132,27 +141,14 @@ def uninstrument_channel(channel: BlockingChannel) -> None:
132
141
return
133
142
134
143
for consumers_tag , client_info in channel ._consumer_infos .items ():
135
- callback_attr = (
136
- PikaInstrumentor ._consumer_callback_attribute_name ()
137
- )
144
+ callback_attr = PikaInstrumentor .CONSUMER_CALLBACK_ATTR
138
145
consumer_callback = getattr (client_info , callback_attr )
139
146
if hasattr (consumer_callback , "_original_callback" ):
140
147
channel ._consumer_infos [
141
148
consumers_tag
142
149
] = consumer_callback ._original_callback
143
150
PikaInstrumentor ._uninstrument_channel_functions (channel )
144
151
145
- @staticmethod
146
- def _consumer_callback_attribute_name () -> str :
147
- pika_version = version .parse (
148
- pkg_resources .get_distribution ("pika" ).version
149
- )
150
- return (
151
- "on_message_callback"
152
- if pika_version >= version .parse ("1.0.0" )
153
- else "consumer_cb"
154
- )
155
-
156
152
def _decorate_channel_function (
157
153
self ,
158
154
tracer_provider : Optional [TracerProvider ],
0 commit comments