Skip to content

Commit cfcc466

Browse files
committed
Fix aiokafka multiple values headers
1 parent af17965 commit cfcc466

File tree

2 files changed

+58
-7
lines changed
  • instrumentation/opentelemetry-instrumentation-aiokafka

2 files changed

+58
-7
lines changed

Diff for: instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py

+10
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,15 @@ def _extract_send_headers(args: Tuple[Any], kwargs: Dict[str, Any]):
6868
return _extract_argument("headers", 5, None, args, kwargs)
6969

7070

71+
def _move_headers_to_kwargs(
72+
args: Tuple[Any], kwargs: Dict[str, Any]
73+
) -> Tuple[Tuple[Any], Dict[str, Any]]:
74+
"""Move headers from args to kwargs"""
75+
if len(args) > 5:
76+
kwargs["headers"] = args[5]
77+
return args[:5], kwargs
78+
79+
7180
async def _extract_send_partition(
7281
instance: aiokafka.AIOKafkaProducer,
7382
args: Tuple[Any],
@@ -260,6 +269,7 @@ async def _traced_send(
260269
args: Tuple[Any],
261270
kwargs: Dict[str, Any],
262271
) -> None:
272+
args, kwargs = _move_headers_to_kwargs(args, kwargs)
263273
headers = _extract_send_headers(args, kwargs)
264274
if headers is None:
265275
headers = []

Diff for: instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py

+48-7
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,19 @@
2828
)
2929
from opentelemetry.trace import SpanKind
3030

31+
SEND_RETURN_VALUE = None
32+
33+
34+
async def original_send(
35+
topic,
36+
value=None,
37+
key=None,
38+
partition=None,
39+
timestamp_ms=None,
40+
headers=None,
41+
):
42+
return SEND_RETURN_VALUE
43+
3144

3245
class TestUtils(IsolatedAsyncioTestCase):
3346
def setUp(self) -> None:
@@ -109,6 +122,36 @@ async def test_wrap_send_with_topic_as_kwarg(
109122
extract_bootstrap_servers,
110123
)
111124

125+
@mock.patch(
126+
"opentelemetry.instrumentation.aiokafka.utils._extract_bootstrap_servers"
127+
)
128+
@mock.patch(
129+
"opentelemetry.instrumentation.aiokafka.utils._extract_send_partition"
130+
)
131+
@mock.patch(
132+
"opentelemetry.instrumentation.aiokafka.utils._enrich_send_span"
133+
)
134+
@mock.patch("opentelemetry.trace.set_span_in_context")
135+
@mock.patch("opentelemetry.propagate.inject")
136+
async def test_wrap_send_with_headers_as_args(
137+
self,
138+
inject: mock.MagicMock,
139+
set_span_in_context: mock.MagicMock,
140+
enrich_span: mock.MagicMock,
141+
extract_send_partition: mock.AsyncMock,
142+
extract_bootstrap_servers: mock.MagicMock,
143+
) -> None:
144+
# like send_and_wait
145+
self.args = [self.topic_name, None, None, None, None, None]
146+
self.kwargs = {}
147+
await self.wrap_send_helper(
148+
inject,
149+
set_span_in_context,
150+
enrich_span,
151+
extract_send_partition,
152+
extract_bootstrap_servers,
153+
)
154+
112155
async def wrap_send_helper(
113156
self,
114157
inject: mock.MagicMock,
@@ -120,6 +163,8 @@ async def wrap_send_helper(
120163
tracer = mock.MagicMock()
121164
produce_hook = mock.AsyncMock()
122165
original_send_callback = mock.AsyncMock()
166+
original_send_callback.side_effect = original_send
167+
original_send_callback.return_value = SEND_RETURN_VALUE
123168
kafka_producer = mock.MagicMock()
124169
expected_span_name = _get_span_name("send", self.topic_name)
125170

@@ -131,9 +176,7 @@ async def wrap_send_helper(
131176
extract_bootstrap_servers.assert_called_once_with(
132177
kafka_producer.client
133178
)
134-
extract_send_partition.assert_awaited_once_with(
135-
kafka_producer, self.args, self.kwargs
136-
)
179+
extract_send_partition.assert_awaited_once()
137180
tracer.start_as_current_span.assert_called_once_with(
138181
expected_span_name, kind=SpanKind.PRODUCER
139182
)
@@ -154,11 +197,9 @@ async def wrap_send_helper(
154197
self.headers, context=context, setter=_aiokafka_setter
155198
)
156199

157-
produce_hook.assert_awaited_once_with(span, self.args, self.kwargs)
200+
produce_hook.assert_awaited_once()
158201

159-
original_send_callback.assert_awaited_once_with(
160-
*self.args, **self.kwargs
161-
)
202+
original_send_callback.assert_awaited_once()
162203
self.assertEqual(retval, original_send_callback.return_value)
163204

164205
@mock.patch("opentelemetry.propagate.extract")

0 commit comments

Comments
 (0)