Skip to content

Commit 24b7234

Browse files
committed
Allow Kafka producer headers to be dict or list
1 parent 3bcc043 commit 24b7234

File tree

3 files changed

+24
-1
lines changed

3 files changed

+24
-1
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
5454
([#1512](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1512))
5555
- Add commit method for ConfluentKafkaInstrumentor's ProxiedConsumer
5656
([#1656](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1656))
57+
- Fix confluent-kafka instrumentation by allowing Producer headers to be dict or list
58+
([#1655](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1655))
5759

5860
## Version 1.15.0/0.36b0 (2022-12-10)
5961

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,12 @@ def set(self, carrier: textmap.CarrierT, key: str, value: str) -> None:
6060

6161
if value:
6262
value = value.encode()
63-
carrier.append((key, value))
63+
64+
if isinstance(carrier, list):
65+
carrier.append((key, value))
66+
67+
if isinstance(carrier, dict):
68+
carrier[key] = value
6469

6570

6671
_kafka_getter = KafkaContextGetter()

instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py

+16
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
ProxiedConsumer,
2424
ProxiedProducer,
2525
)
26+
from opentelemetry.instrumentation.confluent_kafka.utils import (
27+
KafkaContextSetter,
28+
)
2629

2730

2831
class TestConfluentKafka(TestCase):
@@ -73,3 +76,16 @@ def test_consumer_commit_method_exists(self) -> None:
7376
consumer = instrumentation.instrument_consumer(consumer)
7477
self.assertEqual(consumer.__class__, ProxiedConsumer)
7578
self.assertTrue(hasattr(consumer, "commit"))
79+
80+
def test_context_setter(self) -> None:
81+
context_setter = KafkaContextSetter()
82+
83+
carrier_dict = {"key1": "val1"}
84+
context_setter.set(carrier_dict, "key2", "val2")
85+
self.assertGreaterEqual(
86+
carrier_dict.items(), {"key2": "val2".encode()}.items()
87+
)
88+
89+
carrier_list = [("key1", "val1")]
90+
context_setter.set(carrier_list, "key2", "val2")
91+
self.assertTrue(("key2", "val2".encode()) in carrier_list)

0 commit comments

Comments
 (0)