Skip to content

Commit 61892c1

Browse files
committed
Allow Kafka producer headers to be dict or list
1 parent df32e8c commit 61892c1

File tree

2 files changed

+19
-1
lines changed

2 files changed

+19
-1
lines changed

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,12 @@ def set(self, carrier: textmap.CarrierT, key: str, value: str) -> None:
6060

6161
if value:
6262
value = value.encode()
63-
carrier.append((key, value))
63+
64+
if isinstance(carrier, list):
65+
carrier.append((key, value))
66+
67+
if isinstance(carrier, dict):
68+
carrier[key] = value
6469

6570

6671
_kafka_getter = KafkaContextGetter()

instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py

+13
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
ProxiedProducer,
2525
)
2626

27+
from opentelemetry.instrumentation.confluent_kafka.utils import KafkaContextSetter
28+
2729

2830
class TestConfluentKafka(TestCase):
2931
def test_instrument_api(self) -> None:
@@ -58,3 +60,14 @@ def test_instrument_api(self) -> None:
5860

5961
consumer = instrumentation.uninstrument_consumer(consumer)
6062
self.assertEqual(consumer.__class__, Consumer)
63+
64+
def test_context_setter(self) -> None:
65+
context_setter = KafkaContextSetter()
66+
67+
carrier_dict = {"key1": "val1"}
68+
context_setter.set(carrier_dict, "key2", "val2")
69+
self.assertGreaterEqual(carrier_dict.items(), {"key2": "val2".encode()}.items())
70+
71+
carrier_list = [("key1", "val1")]
72+
context_setter.set(carrier_list, "key2", "val2")
73+
self.assertTrue(("key2", "val2".encode()) in carrier_list)

0 commit comments

Comments
 (0)