@@ -101,24 +101,30 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None)
101
101
102
102
import confluent_kafka
103
103
import wrapt
104
- from confluent_kafka import Producer , Consumer
105
- from opentelemetry import trace , propagate , context
104
+ from confluent_kafka import Consumer , Producer
105
+
106
+ from opentelemetry import context , propagate , trace
107
+ from opentelemetry .instrumentation .confluent_kafka .package import _instruments
108
+ from opentelemetry .instrumentation .confluent_kafka .utils import (
109
+ KafkaPropertiesExtractor ,
110
+ _enrich_span ,
111
+ _get_span_name ,
112
+ _kafka_getter ,
113
+ _kafka_setter ,
114
+ )
115
+ from opentelemetry .instrumentation .confluent_kafka .version import __version__
106
116
from opentelemetry .instrumentation .instrumentor import BaseInstrumentor
107
- from opentelemetry .semconv .trace import MessagingOperationValues
108
- from opentelemetry .trace import Tracer , Link , SpanKind
109
117
from opentelemetry .instrumentation .utils import unwrap
110
-
111
- from kafka_instrumentation .package import _instruments
112
- from kafka_instrumentation .utils import KafkaPropertiesExtractor , _get_span_name , \
113
- _kafka_setter , _enrich_span , _kafka_getter
114
- from kafka_instrumentation .version import __version__
118
+ from opentelemetry .semconv .trace import MessagingOperationValues
119
+ from opentelemetry .trace import Link , SpanKind , Tracer
115
120
116
121
117
122
class AutoInstrumentedProducer (Producer ):
118
- def __init__ (self , config ):
119
- super ().__init__ (config )
120
123
121
- def produce (self , topic , value = None , * args , ** kwargs ):
124
+ # This method is deliberately implemented in order to allow wrapt to wrap this function
125
+ def produce (
126
+ self , topic , value = None , * args , ** kwargs
127
+ ): # pylint: disable=keyword-arg-before-vararg,useless-super-delegation
122
128
super ().produce (topic , value , * args , ** kwargs )
123
129
124
130
@@ -127,12 +133,12 @@ def __init__(self, config):
127
133
super ().__init__ (config )
128
134
self ._current_consume_span = None
129
135
130
- def poll (self , timeout = - 1 ):
136
+ # This method is deliberately implemented in order to allow wrapt to wrap this function
137
+ def poll (self , timeout = - 1 ): # pylint: disable=useless-super-delegation
131
138
return super ().poll (timeout )
132
139
133
140
134
141
class ProxiedProducer (Producer ):
135
-
136
142
def __init__ (self , producer : Producer , tracer : Tracer ):
137
143
self ._producer = producer
138
144
self ._tracer = tracer
@@ -143,19 +149,22 @@ def flush(self, timeout=-1):
143
149
def poll (self , timeout = - 1 ):
144
150
self ._producer .poll (timeout )
145
151
146
- def produce (self , topic , value = None , * args , ** kwargs ):
152
+ def produce (
153
+ self , topic , value = None , * args , ** kwargs
154
+ ): # pylint: disable=keyword-arg-before-vararg
147
155
new_kwargs = kwargs .copy ()
148
- new_kwargs [' topic' ] = topic
149
- new_kwargs [' value' ] = value
156
+ new_kwargs [" topic" ] = topic
157
+ new_kwargs [" value" ] = value
150
158
151
- return ConfluentKafkaInstrumentor .wrap_produce (self ._producer .produce , self , self ._tracer , args , new_kwargs )
159
+ return ConfluentKafkaInstrumentor .wrap_produce (
160
+ self ._producer .produce , self , self ._tracer , args , new_kwargs
161
+ )
152
162
153
163
def original_producer (self ):
154
164
return self ._producer
155
165
156
166
157
167
class ProxiedConsumer (Consumer ):
158
-
159
168
def __init__ (self , consumer : Consumer , tracer : Tracer ):
160
169
self ._consumer = consumer
161
170
self ._tracer = tracer
@@ -165,19 +174,29 @@ def __init__(self, consumer: Consumer, tracer: Tracer):
165
174
def committed (self , partitions , timeout = - 1 ):
166
175
return self ._consumer .committed (partitions , timeout )
167
176
168
- def consume (self , num_messages = 1 , * args , ** kwargs ):
177
+ def consume (
178
+ self , num_messages = 1 , * args , ** kwargs
179
+ ): # pylint: disable=keyword-arg-before-vararg
169
180
return self ._consumer .consume (num_messages , * args , ** kwargs )
170
181
171
- def get_watermark_offsets (self , partition , timeout = - 1 , * args , ** kwargs ):
172
- return self ._consumer .get_watermark_offsets (partition , timeout , * args , ** kwargs )
182
+ def get_watermark_offsets (
183
+ self , partition , timeout = - 1 , * args , ** kwargs
184
+ ): # pylint: disable=keyword-arg-before-vararg
185
+ return self ._consumer .get_watermark_offsets (
186
+ partition , timeout , * args , ** kwargs
187
+ )
173
188
174
189
def offsets_for_times (self , partitions , timeout = - 1 ):
175
190
return self ._consumer .offsets_for_times (partitions , timeout )
176
191
177
192
def poll (self , timeout = - 1 ):
178
- return ConfluentKafkaInstrumentor .wrap_poll (self ._consumer .poll , self , self ._tracer , [timeout ], {})
193
+ return ConfluentKafkaInstrumentor .wrap_poll (
194
+ self ._consumer .poll , self , self ._tracer , [timeout ], {}
195
+ )
179
196
180
- def subscribe (self , topics , on_assign = lambda * args : None , * args , ** kwargs ):
197
+ def subscribe (
198
+ self , topics , on_assign = lambda * args : None , * args , ** kwargs
199
+ ): # pylint: disable=keyword-arg-before-vararg
181
200
self ._consumer .subscribe (topics , on_assign , * args , ** kwargs )
182
201
183
202
def original_consumer (self ):
@@ -189,8 +208,11 @@ class ConfluentKafkaInstrumentor(BaseInstrumentor):
189
208
See `BaseInstrumentor`
190
209
"""
191
210
211
+ # pylint: disable=attribute-defined-outside-init
192
212
@staticmethod
193
- def instrument_producer (producer : Producer , tracer_provider = None ) -> ProxiedProducer :
213
+ def instrument_producer (
214
+ producer : Producer , tracer_provider = None
215
+ ) -> ProxiedProducer :
194
216
tracer = trace .get_tracer (
195
217
__name__ , __version__ , tracer_provider = tracer_provider
196
218
)
@@ -200,7 +222,9 @@ def instrument_producer(producer: Producer, tracer_provider=None) -> ProxiedProd
200
222
return manual_producer
201
223
202
224
@staticmethod
203
- def instrument_consumer (consumer : Consumer , tracer_provider = None ) -> ProxiedConsumer :
225
+ def instrument_consumer (
226
+ consumer : Consumer , tracer_provider = None
227
+ ) -> ProxiedConsumer :
204
228
tracer = trace .get_tracer (
205
229
__name__ , __version__ , tracer_provider = tracer_provider
206
230
)
@@ -210,14 +234,16 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None) -> ProxiedCons
210
234
return manual_consumer
211
235
212
236
@staticmethod
213
- def uninstrument_producer (producer ) -> Producer :
237
+ def uninstrument_producer (producer : Producer ) -> Producer :
214
238
if isinstance (producer , ProxiedProducer ):
215
239
return producer .original_producer ()
240
+ return producer
216
241
217
242
@staticmethod
218
- def uninstrument_consumer (consumer ) -> Consumer :
243
+ def uninstrument_consumer (consumer : Consumer ) -> Consumer :
219
244
if isinstance (consumer , ProxiedConsumer ):
220
245
return consumer .original_consumer ()
246
+ return consumer
221
247
222
248
def instrumentation_dependencies (self ) -> Collection [str ]:
223
249
return _instruments
@@ -237,16 +263,26 @@ def _instrument(self, **kwargs):
237
263
self ._tracer = tracer
238
264
239
265
def _inner_wrap_produce (func , instance , args , kwargs ):
240
- return ConfluentKafkaInstrumentor .wrap_produce (func , instance , self ._tracer , args , kwargs )
266
+ return ConfluentKafkaInstrumentor .wrap_produce (
267
+ func , instance , self ._tracer , args , kwargs
268
+ )
241
269
242
270
def _inner_wrap_poll (func , instance , args , kwargs ):
243
- return ConfluentKafkaInstrumentor .wrap_poll (func , instance , self ._tracer , args , kwargs )
271
+ return ConfluentKafkaInstrumentor .wrap_poll (
272
+ func , instance , self ._tracer , args , kwargs
273
+ )
244
274
245
- wrapt .wrap_function_wrapper ("kafka_instrumentation" ,
246
- "AutoInstrumentedProducer.produce" , _inner_wrap_produce )
275
+ wrapt .wrap_function_wrapper (
276
+ AutoInstrumentedProducer ,
277
+ "produce" ,
278
+ _inner_wrap_produce ,
279
+ )
247
280
248
- wrapt .wrap_function_wrapper ("kafka_instrumentation" ,
249
- "AutoInstrumentedConsumer.poll" , _inner_wrap_poll )
281
+ wrapt .wrap_function_wrapper (
282
+ AutoInstrumentedConsumer ,
283
+ "poll" ,
284
+ _inner_wrap_poll ,
285
+ )
250
286
251
287
def _uninstrument (self , ** kwargs ):
252
288
confluent_kafka .Producer = self ._original_kafka_producer
@@ -261,22 +297,29 @@ def wrap_produce(func, instance, tracer, args, kwargs):
261
297
if not topic :
262
298
topic = args [0 ]
263
299
264
- span_name = _get_span_name ("send" , topic )
265
- with tracer .start_as_current_span (name = span_name , kind = trace .SpanKind .PRODUCER ) as span :
266
- headers = KafkaPropertiesExtractor .extract_produce_headers (args , kwargs )
300
+ span_name = _get_span_name ("send" , topic )
301
+ with tracer .start_as_current_span (
302
+ name = span_name , kind = trace .SpanKind .PRODUCER
303
+ ) as span :
304
+ headers = KafkaPropertiesExtractor .extract_produce_headers (
305
+ args , kwargs
306
+ )
267
307
if headers is None :
268
308
headers = []
269
309
kwargs ["headers" ] = headers
270
310
271
311
topic = KafkaPropertiesExtractor .extract_produce_topic (args )
272
- bootstrap_servers = KafkaPropertiesExtractor .extract_bootstrap_servers (instance )
273
- _enrich_span (span , topic , bootstrap_servers , operation = MessagingOperationValues .RECEIVE ) # Replace
312
+ _enrich_span (
313
+ span ,
314
+ topic ,
315
+ operation = MessagingOperationValues .RECEIVE ,
316
+ ) # Replace
274
317
propagate .inject (
275
318
headers ,
276
319
setter = _kafka_setter ,
277
320
)
278
321
return func (* args , ** kwargs )
279
-
322
+
280
323
@staticmethod
281
324
def wrap_poll (func , instance , tracer , args , kwargs ):
282
325
if instance ._current_consume_span :
@@ -285,7 +328,9 @@ def wrap_poll(func, instance, tracer, args, kwargs):
285
328
instance ._current_consume_span .end ()
286
329
instance ._current_consume_span = None
287
330
288
- with tracer .start_as_current_span ("recv" , end_on_exit = True , kind = trace .SpanKind .CONSUMER ) as span :
331
+ with tracer .start_as_current_span (
332
+ "recv" , end_on_exit = True , kind = trace .SpanKind .CONSUMER
333
+ ):
289
334
record = func (* args , ** kwargs )
290
335
if record :
291
336
links = []
@@ -296,20 +341,20 @@ def wrap_poll(func, instance, tracer, args, kwargs):
296
341
links .append (Link (context = item .get_span_context ()))
297
342
298
343
instance ._current_consume_span = tracer .start_span (
299
- name = f"{ record .topic ()} process" , links = links , kind = SpanKind .CONSUMER
344
+ name = f"{ record .topic ()} process" ,
345
+ links = links ,
346
+ kind = SpanKind .CONSUMER ,
300
347
)
301
348
302
- bootstrap_servers = KafkaPropertiesExtractor .extract_bootstrap_servers (instance )
303
349
_enrich_span (
304
350
instance ._current_consume_span ,
305
351
record .topic (),
306
- bootstrap_servers ,
307
352
record .partition (),
308
353
record .offset (),
309
354
operation = MessagingOperationValues .PROCESS ,
310
-
311
355
)
312
356
instance ._current_context_token = context .attach (
313
- trace .set_span_in_context (instance ._current_consume_span ))
357
+ trace .set_span_in_context (instance ._current_consume_span )
358
+ )
314
359
315
- return record
360
+ return record
0 commit comments