Skip to content

Commit 6fe759b

Browse files
committed
modify kafka context getter helper methods to work on dict and list
1 parent 05679f4 commit 6fe759b

File tree

3 files changed

+29
-4
lines changed

3 files changed

+29
-4
lines changed

CHANGELOG.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1616

1717
- Fix Flask instrumentation to only close the span if it was created by the same thread.
1818
([#1654](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1654))
19+
- Fix confluent-kafka instrumentation by allowing Producer headers to be dict or list
20+
([#1655](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1655))
1921

2022
## Version 1.16.0/0.37b0 (2023-02-17)
2123

@@ -61,8 +63,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
6163
([#1512](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1512))
6264
- Add commit method for ConfluentKafkaInstrumentor's ProxiedConsumer
6365
([#1656](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1656))
64-
- Fix confluent-kafka instrumentation by allowing Producer headers to be dict or list
65-
([#1655](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1655))
6666

6767
## Version 1.15.0/0.36b0 (2022-12-10)
6868

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py

+12-2
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,26 @@ class KafkaContextGetter(textmap.Getter):
4141
def get(self, carrier: textmap.CarrierT, key: str) -> Optional[List[str]]:
4242
if carrier is None:
4343
return None
44-
for item_key, value in carrier:
44+
45+
carrier_items = carrier
46+
if isinstance(carrier, dict):
47+
carrier_items = carrier.items()
48+
49+
for item_key, value in carrier_items:
4550
if item_key == key:
4651
if value is not None:
4752
return [value.decode()]
53+
4854
return None
4955

5056
def keys(self, carrier: textmap.CarrierT) -> List[str]:
5157
if carrier is None:
5258
return []
53-
return [key for (key, value) in carrier]
59+
60+
carrier_items = carrier
61+
if isinstance(carrier, dict):
62+
carrier_items = carrier.items()
63+
return [key for (key, value) in carrier_items]
5464

5565

5666
class KafkaContextSetter(textmap.Setter):

instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py

+15
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
ProxiedProducer,
2525
)
2626
from opentelemetry.instrumentation.confluent_kafka.utils import (
27+
KafkaContextGetter,
2728
KafkaContextSetter,
2829
)
2930

@@ -89,3 +90,17 @@ def test_context_setter(self) -> None:
8990
carrier_list = [("key1", "val1")]
9091
context_setter.set(carrier_list, "key2", "val2")
9192
self.assertTrue(("key2", "val2".encode()) in carrier_list)
93+
94+
def test_context_getter(self) -> None:
95+
context_setter = KafkaContextSetter()
96+
context_getter = KafkaContextGetter()
97+
98+
carrier_dict = {}
99+
context_setter.set(carrier_dict, "key1", "val1")
100+
self.assertEqual(context_getter.get(carrier_dict, "key1"), ["val1"])
101+
self.assertEqual(["key1"], context_getter.keys(carrier_dict))
102+
103+
carrier_list = []
104+
context_setter.set(carrier_list, "key1", "val1")
105+
self.assertEqual(context_getter.get(carrier_list, "key1"), ["val1"])
106+
self.assertEqual(["key1"], context_getter.keys(carrier_list))

0 commit comments

Comments
 (0)