Skip to content

Commit 16468af

Browse files
replace dummy hook with callable check, add hook documentation
1 parent 28150a5 commit 16468af

File tree

2 files changed

+40
-12
lines changed
  • instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka

2 files changed

+40
-12
lines changed

instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/__init__.py

+31-3
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,35 @@
3838
for message in consumer:
3939
# process message
4040
41+
The `_instrument` method accepts the following keyword args:
42+
tracer_provider (TracerProvider) - an optional tracer provider
43+
produce_hook (Callable) - a function with extra user-defined logic to be performed before sending the message
44+
this function signature is:
45+
def produce_hook(span: Span, args, kwargs)
46+
consume_hook (Callable) - a function with extra user-defined logic to be performed after consuming a message
47+
this function signature is:
48+
def consume
49+
_hook(span: Span, record: , kafka.record.ABCRecord, args, kwargs)
50+
for example:
51+
.. code: python
52+
from opentelemetry.instrumentation.kafka import KafkaInstrumentor
53+
from kafka import KafkaProducer, KafkaConsumer
54+
55+
def produce_hook(span, args, kwargs):
56+
if span and span.is_recording():
57+
span.set_attribute("custom_user_attribute_from_produce_hook", "some-value")
58+
def consume_hook(span, record, args, kwargs):
59+
if span and span.is_recording():
60+
span.set_attribute("custom_user_attribute_from_consume_hook", "some-value")
61+
62+
# instrument kafka with produce and consume hooks
63+
KafkaInstrumentor().instrument(produce_hook=produce_hook, consume_hook=consume_hook)
64+
65+
# Using kafka as normal now will automatically generate spans,
66+
# including user custom attributes added from the hooks
67+
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
68+
producer.send('my-topic', b'raw_bytes')
69+
4170
API
4271
___
4372
"""
@@ -52,7 +81,6 @@
5281
from opentelemetry.instrumentation.kafka.utils import (
5382
_wrap_next,
5483
_wrap_send,
55-
dummy_callback,
5684
)
5785
from opentelemetry.instrumentation.kafka.version import __version__
5886
from opentelemetry.instrumentation.utils import unwrap
@@ -76,8 +104,8 @@ def _instrument(self, **kwargs):
76104
``consume_hook``: a callable to be executed just after consuming a message
77105
"""
78106
tracer_provider = kwargs.get("tracer_provider")
79-
produce_hook = kwargs.get("produce_hook", dummy_callback)
80-
consume_hook = kwargs.get("consume_hook", dummy_callback)
107+
produce_hook = kwargs.get("produce_hook")
108+
consume_hook = kwargs.get("consume_hook")
81109

82110
tracer = trace.get_tracer(
83111
__name__, __version__, tracer_provider=tracer_provider

instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py

+9-9
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import json
22
from logging import getLogger
33
from typing import Callable, Dict, List, Optional
4+
from kafka.record.abc import ABCRecord
45

56
from opentelemetry import context, propagate, trace
67
from opentelemetry.propagators import textmap
@@ -76,11 +77,8 @@ def extract_send_partition(instance, args, kwargs):
7677
)
7778

7879

79-
HookT = Callable[[Span, List, Dict], None]
80-
81-
82-
def dummy_callback(span, args, kwargs):
83-
...
80+
ProduceHookT = Optional[Callable[[Span, List, Dict], None]]
81+
ConsumeHookT = Optional[Callable[[Span, ABCRecord, List, Dict], None]]
8482

8583

8684
class KafkaContextGetter(textmap.Getter):
@@ -130,7 +128,7 @@ def _get_span_name(operation: str, topic: str):
130128
return f"{topic} {operation}"
131129

132130

133-
def _wrap_send(tracer: Tracer, produce_hook: HookT) -> Callable:
131+
def _wrap_send(tracer: Tracer, produce_hook: ProduceHookT) -> Callable:
134132
def _traced_send(func, instance, args, kwargs):
135133
headers = KafkaPropertiesExtractor.extract_send_headers(args, kwargs)
136134
if headers is None:
@@ -155,7 +153,8 @@ def _traced_send(func, instance, args, kwargs):
155153
setter=_kafka_setter,
156154
)
157155
try:
158-
produce_hook(span, args, kwargs)
156+
if callable(produce_hook):
157+
produce_hook(span, args, kwargs)
159158
except Exception as hook_exception: # pylint: disable=W0703
160159
_LOG.exception(hook_exception)
161160

@@ -166,7 +165,7 @@ def _traced_send(func, instance, args, kwargs):
166165

167166
def _wrap_next(
168167
tracer: Tracer,
169-
consume_hook: HookT,
168+
consume_hook: ConsumeHookT,
170169
) -> Callable:
171170
def _traced_next(func, instance, args, kwargs):
172171

@@ -194,7 +193,8 @@ def _traced_next(func, instance, args, kwargs):
194193
span, bootstrap_servers, record.topic, record.partition
195194
)
196195
try:
197-
consume_hook(span, args, kwargs)
196+
if callable(consume_hook):
197+
consume_hook(span, record, args, kwargs)
198198
except Exception as hook_exception: # pylint: disable=W0703
199199
_LOG.exception(hook_exception)
200200
context.detach(token)

0 commit comments

Comments
 (0)