Skip to content

Commit 1677ee2

Browse files
add kafka instrumentation
1 parent 100ecfe commit 1677ee2

File tree

6 files changed

+510
-0
lines changed

6 files changed

+510
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,315 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
Instrument `confluent-kafka-python` to report instrumentation-confluent-kafka produced and consumed messages
17+
18+
Usage
19+
-----
20+
21+
..code:: python
22+
23+
from opentelemetry.instrumentation.confluentkafka import ConfluentKafkaInstrumentor
24+
from confluent_kafka import Producer, Consumer
25+
26+
# Instrument kafka
27+
ConfluentKafkaInstrumentor().instrument()
28+
29+
# report a span of type producer with the default settings
30+
conf1 = {'bootstrap.servers': "localhost:9092"}
31+
producer = Producer(conf1)
32+
producer.produce('my-topic',b'raw_bytes')
33+
34+
conf2 = {'bootstrap.servers': "localhost:9092",
35+
'group.id': "foo",
36+
'auto.offset.reset': 'smallest'}
37+
# report a span of type consumer with the default settings
38+
consumer = Consumer(conf2)
39+
def basic_consume_loop(consumer, topics):
40+
try:
41+
consumer.subscribe(topics)
42+
running = True
43+
while running:
44+
msg = consumer.poll(timeout=1.0)
45+
if msg is None: continue
46+
47+
if msg.error():
48+
if msg.error().code() == KafkaError._PARTITION_EOF:
49+
# End of partition event
50+
sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
51+
(msg.topic(), msg.partition(), msg.offset()))
52+
elif msg.error():
53+
raise KafkaException(msg.error())
54+
else:
55+
msg_process(msg)
56+
finally:
57+
# Close down consumer to commit final offsets.
58+
consumer.close()
59+
60+
basic_consume_loop(consumer, "my-topic")
61+
62+
63+
The `_instrument` method accepts the following keyword args:
64+
tracer_provider (TracerProvider) - an optional tracer provider
65+
instrument_producer (Callable) - a function with extra user-defined logic to be performed before sending the message
66+
this function signature is:
67+
def instrument_producer(producer: Producer, tracer_provider=None)
68+
instrument_consumer (Callable) - a function with extra user-defined logic to be performed after consuming a message
69+
this function signature is:
70+
def instrument_consumer(consumer: Consumer, tracer_provider=None)
71+
for example:
72+
.. code: python
73+
from opentelemetry.instrumentation.confluentkafka import ConfluentKafkaInstrumentor
74+
from confluent_kafka import Producer, Consumer
75+
76+
inst = ConfluentKafkaInstrumentor()
77+
78+
p = confluent_kafka.Producer({'bootstrap.servers': 'localhost:29092'})
79+
c = confluent_kafka.Consumer({
80+
'bootstrap.servers': 'localhost:29092',
81+
'group.id': 'mygroup',
82+
'auto.offset.reset': 'earliest'
83+
})
84+
85+
# instrument confluent kafka with produce and consume hooks
86+
p = inst.instrument_producer(p, tracer_provider)
87+
c = inst.instrument_consumer(c, tracer_provider=tracer_provider)
88+
89+
90+
# Using kafka as normal now will automatically generate spans,
91+
# including user custom attributes added from the hooks
92+
conf = {'bootstrap.servers': "localhost:9092"}
93+
p.produce('my-topic',b'raw_bytes')
94+
msg = c.poll()
95+
96+
97+
API
98+
___
99+
"""
100+
from typing import Collection
101+
102+
import confluent_kafka
103+
import wrapt
104+
from confluent_kafka import Producer, Consumer
105+
from opentelemetry import trace, propagate, context
106+
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
107+
from opentelemetry.semconv.trace import MessagingOperationValues
108+
from opentelemetry.trace import Tracer, Link, SpanKind
109+
from opentelemetry.instrumentation.utils import unwrap
110+
111+
from kafka_instrumentation.package import _instruments
112+
from kafka_instrumentation.utils import KafkaPropertiesExtractor, _get_span_name, \
113+
_kafka_setter, _enrich_span, _kafka_getter
114+
from kafka_instrumentation.version import __version__
115+
116+
117+
class AutoInstrumentedProducer(Producer):
118+
def __init__(self, config):
119+
super().__init__(config)
120+
121+
def produce(self, topic, value=None, *args, **kwargs):
122+
super().produce(topic, value, *args, **kwargs)
123+
124+
125+
class AutoInstrumentedConsumer(Consumer):
126+
def __init__(self, config):
127+
super().__init__(config)
128+
self._current_consume_span = None
129+
130+
def poll(self, timeout=-1):
131+
return super().poll(timeout)
132+
133+
134+
class ProxiedProducer(Producer):
135+
136+
def __init__(self, producer: Producer, tracer: Tracer):
137+
self._producer = producer
138+
self._tracer = tracer
139+
140+
def flush(self, timeout=-1):
141+
self._producer.flush(timeout)
142+
143+
def poll(self, timeout=-1):
144+
self._producer.poll(timeout)
145+
146+
def produce(self, topic, value=None, *args, **kwargs):
147+
new_kwargs = kwargs.copy()
148+
new_kwargs['topic'] = topic
149+
new_kwargs['value'] = value
150+
151+
return ConfluentKafkaInstrumentor.wrap_produce(self._producer.produce, self, self._tracer, args, new_kwargs)
152+
153+
def original_producer(self):
154+
return self._producer
155+
156+
157+
class ProxiedConsumer(Consumer):
158+
159+
def __init__(self, consumer: Consumer, tracer: Tracer):
160+
self._consumer = consumer
161+
self._tracer = tracer
162+
self._current_consume_span = None
163+
self._current_context_token = None
164+
165+
def committed(self, partitions, timeout=-1):
166+
return self._consumer.committed(partitions, timeout)
167+
168+
def consume(self, num_messages=1, *args, **kwargs):
169+
return self._consumer.consume(num_messages, *args, **kwargs)
170+
171+
def get_watermark_offsets(self, partition, timeout=-1, *args, **kwargs):
172+
return self._consumer.get_watermark_offsets(partition, timeout, *args, **kwargs)
173+
174+
def offsets_for_times(self, partitions, timeout=-1):
175+
return self._consumer.offsets_for_times(partitions, timeout)
176+
177+
def poll(self, timeout=-1):
178+
return ConfluentKafkaInstrumentor.wrap_poll(self._consumer.poll, self, self._tracer, [timeout], {})
179+
180+
def subscribe(self, topics, on_assign=lambda *args: None, *args, **kwargs):
181+
self._consumer.subscribe(topics, on_assign, *args, **kwargs)
182+
183+
def original_consumer(self):
184+
return self._consumer
185+
186+
187+
class ConfluentKafkaInstrumentor(BaseInstrumentor):
188+
"""An instrumentor for confluent kafka module
189+
See `BaseInstrumentor`
190+
"""
191+
192+
@staticmethod
193+
def instrument_producer(producer: Producer, tracer_provider=None) -> ProxiedProducer:
194+
tracer = trace.get_tracer(
195+
__name__, __version__, tracer_provider=tracer_provider
196+
)
197+
198+
manual_producer = ProxiedProducer(producer, tracer)
199+
200+
return manual_producer
201+
202+
@staticmethod
203+
def instrument_consumer(consumer: Consumer, tracer_provider=None) -> ProxiedConsumer:
204+
tracer = trace.get_tracer(
205+
__name__, __version__, tracer_provider=tracer_provider
206+
)
207+
208+
manual_consumer = ProxiedConsumer(consumer, tracer)
209+
210+
return manual_consumer
211+
212+
@staticmethod
213+
def uninstrument_producer(producer) -> Producer:
214+
if isinstance(producer, ProxiedProducer):
215+
return producer.original_producer()
216+
217+
@staticmethod
218+
def uninstrument_consumer(consumer) -> Consumer:
219+
if isinstance(consumer, ProxiedConsumer):
220+
return consumer.original_consumer()
221+
222+
def instrumentation_dependencies(self) -> Collection[str]:
223+
return _instruments
224+
225+
def _instrument(self, **kwargs):
226+
self._original_kafka_producer = confluent_kafka.Producer
227+
self._original_kafka_consumer = confluent_kafka.Consumer
228+
229+
confluent_kafka.Producer = AutoInstrumentedProducer
230+
confluent_kafka.Consumer = AutoInstrumentedConsumer
231+
232+
tracer_provider = kwargs.get("tracer_provider")
233+
tracer = trace.get_tracer(
234+
__name__, __version__, tracer_provider=tracer_provider
235+
)
236+
237+
self._tracer = tracer
238+
239+
def _inner_wrap_produce(func, instance, args, kwargs):
240+
return ConfluentKafkaInstrumentor.wrap_produce(func, instance, self._tracer, args, kwargs)
241+
242+
def _inner_wrap_poll(func, instance, args, kwargs):
243+
return ConfluentKafkaInstrumentor.wrap_poll(func, instance, self._tracer, args, kwargs)
244+
245+
wrapt.wrap_function_wrapper("kafka_instrumentation",
246+
"AutoInstrumentedProducer.produce", _inner_wrap_produce)
247+
248+
wrapt.wrap_function_wrapper("kafka_instrumentation",
249+
"AutoInstrumentedConsumer.poll", _inner_wrap_poll)
250+
251+
def _uninstrument(self, **kwargs):
252+
confluent_kafka.Producer = self._original_kafka_producer
253+
confluent_kafka.Consumer = self._original_kafka_consumer
254+
255+
unwrap(AutoInstrumentedProducer, "produce")
256+
unwrap(AutoInstrumentedConsumer, "poll")
257+
258+
@staticmethod
259+
def wrap_produce(func, instance, tracer, args, kwargs):
260+
topic = kwargs.get("topic")
261+
if not topic:
262+
topic = args[0]
263+
264+
span_name = _get_span_name("send", topic)
265+
with tracer.start_as_current_span(name=span_name, kind=trace.SpanKind.PRODUCER) as span:
266+
headers = KafkaPropertiesExtractor.extract_produce_headers(args, kwargs)
267+
if headers is None:
268+
headers = []
269+
kwargs["headers"] = headers
270+
271+
topic = KafkaPropertiesExtractor.extract_produce_topic(args)
272+
bootstrap_servers = KafkaPropertiesExtractor.extract_bootstrap_servers(instance)
273+
_enrich_span(span, topic, bootstrap_servers, operation=MessagingOperationValues.RECEIVE) # Replace
274+
propagate.inject(
275+
headers,
276+
setter=_kafka_setter,
277+
)
278+
return func(*args, **kwargs)
279+
280+
@staticmethod
281+
def wrap_poll(func, instance, tracer, args, kwargs):
282+
if instance._current_consume_span:
283+
context.detach(instance._current_context_token)
284+
instance._current_context_token = None
285+
instance._current_consume_span.end()
286+
instance._current_consume_span = None
287+
288+
with tracer.start_as_current_span("recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER) as span:
289+
record = func(*args, **kwargs)
290+
if record:
291+
links = []
292+
ctx = propagate.extract(record.headers(), getter=_kafka_getter)
293+
if ctx:
294+
for item in ctx.values():
295+
if hasattr(item, "get_span_context"):
296+
links.append(Link(context=item.get_span_context()))
297+
298+
instance._current_consume_span = tracer.start_span(
299+
name=f"{record.topic()} process", links=links, kind=SpanKind.CONSUMER
300+
)
301+
302+
bootstrap_servers = KafkaPropertiesExtractor.extract_bootstrap_servers(instance)
303+
_enrich_span(
304+
instance._current_consume_span,
305+
record.topic(),
306+
bootstrap_servers,
307+
record.partition(),
308+
record.offset(),
309+
operation=MessagingOperationValues.PROCESS,
310+
311+
)
312+
instance._current_context_token = context.attach(
313+
trace.set_span_in_context(instance._current_consume_span))
314+
315+
return record
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
_instruments = ("confluent-kafka ~= 1.8.2",)

0 commit comments

Comments
 (0)