14
14
from logging import getLogger
15
15
from typing import Any , Callable , Collection , Dict , Optional
16
16
17
+ from pika .adapters import BlockingConnection
17
18
from pika .channel import Channel
18
19
19
20
from opentelemetry import trace
24
25
from opentelemetry .trace import Tracer , TracerProvider
25
26
26
27
_LOG = getLogger (__name__ )
27
- CTX_KEY = "__otel_task_span"
28
+ _CTX_KEY = "__otel_task_span"
28
29
29
- FUNCTIONS_TO_UNINSTRUMENT = ["basic_publish" ]
30
+ _FUNCTIONS_TO_UNINSTRUMENT = ["basic_publish" ]
30
31
31
32
32
33
class PikaInstrumentor (BaseInstrumentor ): # type: ignore
34
+ # pylint: disable=attribute-defined-outside-init
33
35
@staticmethod
34
36
def _instrument_consumers (
35
37
consumers_dict : Dict [str , Callable [..., Any ]], tracer : Tracer
36
38
) -> Any :
37
39
for key , callback in consumers_dict .items ():
38
- decorated_callback = utils .decorate_callback (callback , tracer , key )
40
+ decorated_callback = utils ._decorate_callback (
41
+ callback , tracer , key
42
+ )
39
43
setattr (decorated_callback , "_original_callback" , callback )
40
44
consumers_dict [key ] = decorated_callback
41
45
42
46
@staticmethod
43
47
def _instrument_basic_publish (channel : Channel , tracer : Tracer ) -> None :
44
48
original_function = getattr (channel , "basic_publish" )
45
- decorated_function = utils .decorate_basic_publish (
49
+ decorated_function = utils ._decorate_basic_publish (
46
50
original_function , channel , tracer
47
51
)
48
52
setattr (decorated_function , "_original_function" , original_function )
@@ -58,7 +62,7 @@ def _instrument_channel_functions(
58
62
59
63
@staticmethod
60
64
def _uninstrument_channel_functions (channel : Channel ) -> None :
61
- for function_name in FUNCTIONS_TO_UNINSTRUMENT :
65
+ for function_name in _FUNCTIONS_TO_UNINSTRUMENT :
62
66
if not hasattr (channel , function_name ):
63
67
continue
64
68
function = getattr (channel , function_name )
@@ -69,30 +73,19 @@ def _uninstrument_channel_functions(channel: Channel) -> None:
69
73
def instrument_channel (
70
74
channel : Channel , tracer_provider : Optional [TracerProvider ] = None ,
71
75
) -> None :
76
+ tracer = trace .get_tracer (__name__ , __version__ , tracer_provider )
77
+ channel .__setattr__ ("__opentelemetry_tracer" , tracer )
72
78
if not hasattr (channel , "_impl" ):
73
79
_LOG .error ("Could not find implementation for provided channel!" )
74
80
return
75
- tracer = trace .get_tracer (__name__ , __version__ , tracer_provider )
76
- channel .__setattr__ ("__opentelemetry_tracer" , tracer )
77
81
if channel ._impl ._consumers :
78
82
PikaInstrumentor ._instrument_consumers (
79
83
channel ._impl ._consumers , tracer
80
84
)
81
85
PikaInstrumentor ._instrument_channel_functions (channel , tracer )
82
86
83
- def _instrument (self , ** kwargs : Dict [str , Any ]) -> None :
84
- channel : Channel = kwargs .get ("channel" , None )
85
- if not channel or not isinstance (channel , Channel ):
86
- return
87
- tracer_provider : TracerProvider = kwargs .get ("tracer_provider" , None )
88
- PikaInstrumentor .instrument_channel (
89
- channel , tracer_provider = tracer_provider
90
- )
91
-
92
- def _uninstrument (self , ** kwargs : Dict [str , Any ]) -> None :
93
- channel : Channel = kwargs .get ("channel" , None )
94
- if not channel or not isinstance (channel , Channel ):
95
- return
87
+ @staticmethod
88
+ def uninstrument_channel (channel : Channel ) -> None :
96
89
if not hasattr (channel , "_impl" ):
97
90
_LOG .error ("Could not find implementation for provided channel!" )
98
91
return
@@ -101,5 +94,24 @@ def _uninstrument(self, **kwargs: Dict[str, Any]) -> None:
101
94
channel ._impl ._consumers [key ] = callback ._original_callback
102
95
PikaInstrumentor ._uninstrument_channel_functions (channel )
103
96
97
+ def _decorate_channel_function (
98
+ self , tracer_provider : Optional [TracerProvider ]
99
+ ) -> None :
100
+ self .original_channel_func = BlockingConnection .channel
101
+
102
+ def _wrapper (* args , ** kwargs ):
103
+ channel = self .original_channel_func (* args , ** kwargs )
104
+ self .instrument_channel (channel , tracer_provider = tracer_provider )
105
+ return channel
106
+
107
+ BlockingConnection .channel = _wrapper
108
+
109
+ def _instrument (self , ** kwargs : Dict [str , Any ]) -> None :
110
+ tracer_provider : TracerProvider = kwargs .get ("tracer_provider" , None )
111
+ self ._decorate_channel_function (tracer_provider )
112
+
113
+ def _uninstrument (self , ** kwargs : Dict [str , Any ]) -> None :
114
+ BlockingConnection .channel = self .original_channel_func
115
+
104
116
def instrumentation_dependencies (self ) -> Collection [str ]:
105
117
return _instruments
0 commit comments