Skip to content

Commit 1e89854

Browse files
Add confluent kafka docs (#1668)
* add elasticsearch to docs * add confluent kafka to docs * tox generate fix * tox docs fix --------- Co-authored-by: Srikanth Chekuri <[email protected]>
1 parent 5e4766e commit 1e89854

File tree

5 files changed

+59
-42
lines changed

5 files changed

+59
-42
lines changed

docs-requirements.txt

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ asyncpg>=0.12.0
2525
boto~=2.0
2626
botocore~=1.0
2727
celery>=4.0
28+
confluent-kafka>= 1.8.2,< 2.0.0
2829
elasticsearch>=2.0,<9.0
2930
flask~=2.0
3031
falcon~=2.0

docs/conf.py

+8-19
Original file line numberDiff line numberDiff line change
@@ -126,25 +126,14 @@ def getlistcfg(strval):
126126
]
127127

128128

129-
if "class_references" in mcfg:
130-
class_references = getlistcfg(mcfg["class_references"])
131-
for class_reference in class_references:
132-
nitpick_ignore.append(
133-
(
134-
"py:class",
135-
class_reference,
136-
)
137-
)
138-
139-
if "anys" in mcfg:
140-
anys = getlistcfg(mcfg["anys"])
141-
for _any in anys:
142-
nitpick_ignore.append(
143-
(
144-
"any",
145-
_any,
146-
)
147-
)
129+
ignore_categories = ["py-class", "py-func", "py-exc", "any"]
130+
131+
for category in ignore_categories:
132+
if category in mcfg:
133+
items = getlistcfg(mcfg[category])
134+
for item in items:
135+
nitpick_ignore.append((category.replace("-", ":"), item))
136+
148137

149138
# Add any paths that contain templates here, relative to this directory.
150139
templates_path = ["_templates"]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
.. include:: ../../../instrumentation/opentelemetry-instrumentation-confluent-kafka/README.rst
2+
3+
.. automodule:: opentelemetry.instrumentation.confluent_kafka
4+
:members:
5+
:undoc-members:
6+
:show-inheritance:
7+
:noindex:

docs/nitpick-exceptions.ini

+21-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[default]
2-
class_references=
2+
py-class=
33
; TODO: Understand why sphinx is not able to find this local class
44
opentelemetry.propagators.textmap.CarrierT
55
opentelemetry.propagators.textmap.Setter
@@ -11,6 +11,8 @@ class_references=
1111
opentelemetry.propagators.textmap.Getter
1212
; - AWSXRayPropagator
1313
opentelemetry.sdk.trace.id_generator.IdGenerator
14+
opentelemetry.instrumentation.confluent_kafka.ProxiedProducer
15+
opentelemetry.instrumentation.confluent_kafka.ProxiedConsumer
1416
; - AwsXRayIdGenerator
1517
TextMapPropagator
1618
CarrierT
@@ -26,8 +28,16 @@ class_references=
2628
httpx.AsyncByteStream
2729
httpx.Response
2830
yarl.URL
31+
cimpl.Producer
32+
cimpl.Consumer
33+
func
34+
Message
35+
TopicPartition
36+
callable
37+
Consumer
38+
confluent_kafka.Message
2939

30-
anys=
40+
any=
3141
; API
3242
opentelemetry.propagators.textmap.TextMapPropagator.fields
3343
; - AWSXRayPropagator
@@ -44,3 +54,12 @@ anys=
4454
; - instrumentation.*
4555
Setter
4656
httpx
57+
;
58+
py-func=
59+
poll
60+
flush
61+
Message.error
62+
63+
py-exc=
64+
KafkaException
65+
KafkaError

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py

+22-21
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@
1313
# limitations under the License.
1414

1515
"""
16-
Instrument `confluent-kafka-python` to report instrumentation-confluent-kafka produced and consumed messages
16+
Instrument confluent-kafka-python to report instrumentation-confluent-kafka produced and consumed messages
1717
1818
Usage
1919
-----
2020
21-
..code:: python
21+
.. code-block:: python
2222
2323
from opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor
2424
from confluent_kafka import Producer, Consumer
@@ -30,24 +30,21 @@
3030
conf1 = {'bootstrap.servers': "localhost:9092"}
3131
producer = Producer(conf1)
3232
producer.produce('my-topic',b'raw_bytes')
33-
34-
conf2 = {'bootstrap.servers': "localhost:9092",
35-
'group.id': "foo",
36-
'auto.offset.reset': 'smallest'}
33+
conf2 = {'bootstrap.servers': "localhost:9092", 'group.id': "foo", 'auto.offset.reset': 'smallest'}
3734
# report a span of type consumer with the default settings
3835
consumer = Consumer(conf2)
36+
3937
def basic_consume_loop(consumer, topics):
4038
try:
4139
consumer.subscribe(topics)
4240
running = True
4341
while running:
4442
msg = consumer.poll(timeout=1.0)
4543
if msg is None: continue
46-
4744
if msg.error():
4845
if msg.error().code() == KafkaError._PARTITION_EOF:
4946
# End of partition event
50-
sys.stderr.write(f"{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset()}}\n")
47+
sys.stderr.write(f"{msg.topic() [{msg.partition()}] reached end at offset {msg.offset()}}")
5148
elif msg.error():
5249
raise KafkaException(msg.error())
5350
else:
@@ -57,19 +54,26 @@ def basic_consume_loop(consumer, topics):
5754
consumer.close()
5855
5956
basic_consume_loop(consumer, "my-topic")
57+
---
58+
59+
The _instrument method accepts the following keyword args:
60+
tracer_provider (TracerProvider) - an optional tracer provider
61+
62+
instrument_producer (Callable) - a function with extra user-defined logic to be performed before sending the message
63+
this function signature is:
64+
65+
def instrument_producer(producer: Producer, tracer_provider=None)
6066
67+
instrument_consumer (Callable) - a function with extra user-defined logic to be performed after consuming a message
68+
this function signature is:
69+
70+
def instrument_consumer(consumer: Consumer, tracer_provider=None)
71+
for example:
72+
73+
.. code:: python
6174
62-
The `_instrument` method accepts the following keyword args:
63-
tracer_provider (TracerProvider) - an optional tracer provider
64-
instrument_producer (Callable) - a function with extra user-defined logic to be performed before sending the message
65-
this function signature is:
66-
def instrument_producer(producer: Producer, tracer_provider=None)
67-
instrument_consumer (Callable) - a function with extra user-defined logic to be performed after consuming a message
68-
this function signature is:
69-
def instrument_consumer(consumer: Consumer, tracer_provider=None)
70-
for example:
71-
.. code: python
7275
from opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor
76+
7377
from confluent_kafka import Producer, Consumer
7478
7579
inst = ConfluentKafkaInstrumentor()
@@ -85,15 +89,12 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None)
8589
p = inst.instrument_producer(p, tracer_provider)
8690
c = inst.instrument_consumer(c, tracer_provider=tracer_provider)
8791
88-
8992
# Using kafka as normal now will automatically generate spans,
9093
# including user custom attributes added from the hooks
9194
conf = {'bootstrap.servers': "localhost:9092"}
9295
p.produce('my-topic',b'raw_bytes')
9396
msg = c.poll()
9497
95-
96-
API
9798
___
9899
"""
99100
from typing import Collection

0 commit comments

Comments
 (0)