@@ -69,14 +69,13 @@ def consume(consumer, message):
69
69
70
70
import pulsar
71
71
import wrapt
72
- from pulsar import Client , Consumer , Message , Producer
73
-
74
- from opentelemetry import propagate , trace
75
72
from opentelemetry .instrumentation .instrumentor import BaseInstrumentor
76
73
from opentelemetry .instrumentation .utils import propagator , unwrap
77
74
from opentelemetry .semconv .trace import MessagingOperationValues
78
75
from opentelemetry .trace import Span , SpanKind , Tracer
76
+ from pulsar import Client , Consumer , Message , Producer
79
77
78
+ from opentelemetry import propagate , trace
80
79
from .package import _instruments
81
80
from .utils import (
82
81
_enrich_span ,
@@ -90,14 +89,14 @@ def consume(consumer, message):
90
89
PROPERTIES_KEY = "properties"
91
90
92
91
93
- class TracerMixin :
92
+ class _TracerMixin :
94
93
_tracer : Tracer
95
94
96
95
def _set_tracer (self , tracer ):
97
96
self ._tracer = tracer
98
97
99
98
100
- class SpanMixin :
99
+ class _SpanMixin :
101
100
_current_span : Optional [Span ] = None
102
101
103
102
def _set_current_span (self , current_span ):
@@ -110,7 +109,7 @@ def _end_span(self):
110
109
self ._current_span = None
111
110
112
111
113
- class InstrumentedClient ( TracerMixin , Client ):
112
+ class _InstrumentedClient ( _TracerMixin , Client ):
114
113
_subscribe_signature = inspect .signature (Client .subscribe )
115
114
116
115
def subscribe (self , topic , * args , ** kwargs ):
@@ -125,7 +124,7 @@ def subscribe(self, topic, *args, **kwargs):
125
124
126
125
@wraps (message_listener )
127
126
def wrapper ( # pylint: disable=function-redefined
128
- consumer , message : InstrumentedMessage , * args , ** kwargs
127
+ consumer , message : _InstrumentedMessage , * args , ** kwargs
129
128
):
130
129
ctx = propagator .extract (message .properties ())
131
130
span = self ._tracer .start_span (
@@ -148,7 +147,7 @@ def wrapper( # pylint: disable=function-redefined
148
147
return super ().subscribe (* args , ** bound_arguments .kwargs )
149
148
150
149
151
- class InstrumentedProducer ( TracerMixin , Producer ):
150
+ class _InstrumentedProducer ( _TracerMixin , Producer ):
152
151
_send_signature = inspect .signature (Producer .send )
153
152
154
153
def send (self , * args , ** kwargs ):
@@ -204,20 +203,20 @@ def _create_span(self):
204
203
)
205
204
206
205
207
- class InstrumentedMessage (Message , SpanMixin ):
206
+ class _InstrumentedMessage (Message , _SpanMixin ):
208
207
pass
209
208
210
209
211
- class InstrumentedConsumer ( TracerMixin , Consumer ):
212
- _last_message : InstrumentedMessage = None
210
+ class _InstrumentedConsumer ( _TracerMixin , Consumer ):
211
+ _last_message : _InstrumentedMessage = None
213
212
214
213
def receive (self , * args , ** kwargs ):
215
214
if self ._last_message :
216
215
self ._last_message ._end_span ()
217
216
with self ._tracer .start_as_current_span (
218
- "recv" , end_on_exit = True , kind = trace .SpanKind .CONSUMER
217
+ "recv" , end_on_exit = True , kind = trace .SpanKind .CONSUMER
219
218
):
220
- message : InstrumentedMessage = super ().receive (* args , ** kwargs )
219
+ message : _InstrumentedMessage = super ().receive (* args , ** kwargs )
221
220
context = propagate .extract (message .properties ())
222
221
span = self ._tracer .start_span (
223
222
f"{ self .topic ()} process" , context = context
@@ -229,19 +228,19 @@ def receive(self, *args, **kwargs):
229
228
230
229
return message
231
230
232
- def acknowledge (self , message : InstrumentedMessage ):
231
+ def acknowledge (self , message : _InstrumentedMessage ):
233
232
message ._end_span ()
234
233
super ().acknowledge (message )
235
234
236
235
acknowledge .__doc__ = Consumer .acknowledge .__doc__
237
236
238
- def negative_acknowledge (self , message : InstrumentedMessage ):
237
+ def negative_acknowledge (self , message : _InstrumentedMessage ):
239
238
message ._end_span ()
240
239
super ().negative_acknowledge (message )
241
240
242
241
negative_acknowledge .__doc__ = Consumer .negative_acknowledge .__doc__
243
242
244
- def acknowledge_cumulative (self , message : InstrumentedMessage ):
243
+ def acknowledge_cumulative (self , message : _InstrumentedMessage ):
245
244
message ._end_span ()
246
245
super ().acknowledge_cumulative (message )
247
246
@@ -268,10 +267,10 @@ def _instrument(self, **kwargs):
268
267
self ._original_pulsar_consumer = pulsar .Consumer
269
268
self ._original_pulsar_message = pulsar .Message
270
269
271
- pulsar .Client = InstrumentedClient
272
- pulsar .Producer = InstrumentedProducer
273
- pulsar .Consumer = InstrumentedConsumer
274
- pulsar .Message = InstrumentedMessage
270
+ pulsar .Client = _InstrumentedClient
271
+ pulsar .Producer = _InstrumentedProducer
272
+ pulsar .Consumer = _InstrumentedConsumer
273
+ pulsar .Message = _InstrumentedMessage
275
274
276
275
tracer_provider = kwargs .get ("tracer_provider" )
277
276
tracer = trace .get_tracer (
@@ -284,16 +283,19 @@ def init(wrapped, self, args, kwargs):
284
283
wrapped (* args , ** kwargs )
285
284
self ._set_tracer (tracer )
286
285
287
- wrapt .wrap_function_wrapper (InstrumentedClient , "__init__" , init )
288
- wrapt .wrap_function_wrapper (InstrumentedConsumer , "__init__" , init )
289
- wrapt .wrap_function_wrapper (InstrumentedProducer , "__init__" , init )
286
+ wrapt .wrap_function_wrapper (_InstrumentedClient , "__init__" , init )
287
+ wrapt .wrap_function_wrapper (_InstrumentedConsumer , "__init__" , init )
288
+ wrapt .wrap_function_wrapper (_InstrumentedProducer , "__init__" , init )
290
289
291
290
def _uninstrument (self , ** kwargs ):
292
291
pulsar .Client = self ._original_pulsar_client
293
292
pulsar .Producer = self ._original_pulsar_producer
294
293
pulsar .Consumer = self ._original_pulsar_consumer
295
294
pulsar .Message = self ._original_pulsar_message
296
295
297
- unwrap (InstrumentedClient , "__init__" )
298
- unwrap (InstrumentedConsumer , "__init__" )
299
- unwrap (InstrumentedProducer , "__init__" )
296
+ unwrap (_InstrumentedClient , "__init__" )
297
+ unwrap (_InstrumentedConsumer , "__init__" )
298
+ unwrap (_InstrumentedProducer , "__init__" )
299
+
300
+
301
+ __all__ = (PulsarInstrumentor ,)
0 commit comments