Skip to content

Commit f67a500

Browse files
sobychackoartembilan
authored andcommitted
GH-3786: Remove duplicated trace header
Fixes: #3786 Issue link: #3786 When tracing is enabled, the KafkaRecordSenderContext was adding a new trace header without removing existing ones, resulting in multiple headers in the same record. This commit fixes the issue by Updating KafkaRecordSenderContext to remove existing trace headers before adding new ones. **Auto-cherry-pick to `3.2.x`** # Conflicts: # spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java
1 parent e248189 commit f67a500

File tree

2 files changed

+93
-18
lines changed

2 files changed

+93
-18
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2024 the original author or authors.
2+
* Copyright 2022-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,13 +21,15 @@
2121

2222
import io.micrometer.observation.transport.SenderContext;
2323
import org.apache.kafka.clients.producer.ProducerRecord;
24+
import org.apache.kafka.common.header.Headers;
2425

2526
/**
2627
* {@link SenderContext} for {@link ProducerRecord}s.
2728
*
2829
* @author Gary Russell
2930
* @author Christian Mergenthaler
3031
* @author Wang Zhiyang
32+
* @author Soby Chacko
3133
*
3234
* @since 3.0
3335
*
@@ -39,8 +41,12 @@ public class KafkaRecordSenderContext extends SenderContext<ProducerRecord<?, ?>
3941
private final ProducerRecord<?, ?> record;
4042

4143
public KafkaRecordSenderContext(ProducerRecord<?, ?> record, String beanName, Supplier<String> clusterId) {
42-
super((carrier, key, value) -> record.headers().add(key,
43-
value == null ? null : value.getBytes(StandardCharsets.UTF_8)));
44+
super((carrier, key, value) -> {
45+
Headers headers = record.headers();
46+
headers.remove(key);
47+
headers.add(key, value == null ? null : value.getBytes(StandardCharsets.UTF_8));
48+
});
49+
4450
setCarrier(record);
4551
this.beanName = beanName;
4652
this.record = record;

Diff for: spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java

+84-15
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.kafka.support.micrometer;
1818

19+
import java.nio.charset.StandardCharsets;
1920
import java.util.Arrays;
2021
import java.util.Deque;
2122
import java.util.List;
@@ -26,6 +27,7 @@
2627
import java.util.concurrent.TimeUnit;
2728
import java.util.concurrent.TimeoutException;
2829
import java.util.concurrent.atomic.AtomicReference;
30+
import java.util.stream.StreamSupport;
2931

3032
import io.micrometer.common.KeyValues;
3133
import io.micrometer.core.instrument.MeterRegistry;
@@ -56,6 +58,7 @@
5658
import org.apache.kafka.common.errors.InvalidTopicException;
5759
import org.apache.kafka.common.header.Header;
5860
import org.apache.kafka.common.header.Headers;
61+
import org.apache.kafka.common.header.internals.RecordHeader;
5962
import org.junit.jupiter.api.Test;
6063
import reactor.core.publisher.Mono;
6164

@@ -77,6 +80,7 @@
7780
import org.springframework.kafka.core.ProducerFactory;
7881
import org.springframework.kafka.listener.MessageListenerContainer;
7982
import org.springframework.kafka.listener.RecordInterceptor;
83+
import org.springframework.kafka.support.ProducerListener;
8084
import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention;
8185
import org.springframework.kafka.support.micrometer.KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention;
8286
import org.springframework.kafka.test.EmbeddedKafkaBroker;
@@ -102,9 +106,9 @@
102106
* @since 3.0
103107
*/
104108
@SpringJUnitConfig
105-
@EmbeddedKafka(topics = { ObservationTests.OBSERVATION_TEST_1, ObservationTests.OBSERVATION_TEST_2,
109+
@EmbeddedKafka(topics = {ObservationTests.OBSERVATION_TEST_1, ObservationTests.OBSERVATION_TEST_2,
106110
ObservationTests.OBSERVATION_TEST_3, ObservationTests.OBSERVATION_RUNTIME_EXCEPTION,
107-
ObservationTests.OBSERVATION_ERROR }, partitions = 1)
111+
ObservationTests.OBSERVATION_ERROR, ObservationTests.OBSERVATION_TRACEPARENT_DUPLICATE}, partitions = 1)
108112
@DirtiesContext
109113
public class ObservationTests {
110114

@@ -122,18 +126,21 @@ public class ObservationTests {
122126

123127
public final static String OBSERVATION_ERROR_MONO = "observation.error.mono";
124128

129+
public final static String OBSERVATION_TRACEPARENT_DUPLICATE = "observation.traceparent.duplicate";
130+
125131
@Test
126132
void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, String> template,
127133
@Autowired SimpleTracer tracer, @Autowired KafkaListenerEndpointRegistry rler,
128134
@Autowired MeterRegistry meterRegistry, @Autowired EmbeddedKafkaBroker broker,
129135
@Autowired KafkaListenerEndpointRegistry endpointRegistry, @Autowired KafkaAdmin admin,
130136
@Autowired @Qualifier("customTemplate") KafkaTemplate<Integer, String> customTemplate,
131137
@Autowired Config config)
132-
throws InterruptedException, ExecutionException, TimeoutException {
138+
throws InterruptedException, ExecutionException, TimeoutException {
133139

134140
AtomicReference<SimpleSpan> spanFromCallback = new AtomicReference<>();
135141

136142
template.setProducerInterceptor(new ProducerInterceptor<>() {
143+
137144
@Override
138145
public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
139146
tracer.currentSpanCustomizer().tag("key", "value");
@@ -321,10 +328,10 @@ private void assertThatTemplateHasTimerWithNameAndTags(MeterRegistryAssert meter
321328

322329
meterRegistryAssert.hasTimerWithNameAndTags("spring.kafka.template",
323330
KeyValues.of("spring.kafka.template.name", "template",
324-
"messaging.operation", "publish",
325-
"messaging.system", "kafka",
326-
"messaging.destination.kind", "topic",
327-
"messaging.destination.name", destName)
331+
"messaging.operation", "publish",
332+
"messaging.system", "kafka",
333+
"messaging.destination.kind", "topic",
334+
"messaging.destination.name", destName)
328335
.and(keyValues));
329336
}
330337

@@ -333,12 +340,12 @@ private void assertThatListenerHasTimerWithNameAndTags(MeterRegistryAssert meter
333340

334341
meterRegistryAssert.hasTimerWithNameAndTags("spring.kafka.listener",
335342
KeyValues.of(
336-
"messaging.kafka.consumer.group", consumerGroup,
337-
"messaging.operation", "receive",
338-
"messaging.source.kind", "topic",
339-
"messaging.source.name", destName,
340-
"messaging.system", "kafka",
341-
"spring.kafka.listener.id", listenerId)
343+
"messaging.kafka.consumer.group", consumerGroup,
344+
"messaging.operation", "receive",
345+
"messaging.source.kind", "topic",
346+
"messaging.source.name", destName,
347+
"messaging.system", "kafka",
348+
"spring.kafka.listener.id", listenerId)
342349
.and(keyValues));
343350
}
344351

@@ -388,7 +395,7 @@ void observationRuntimeException(@Autowired ExceptionListener listener, @Autowir
388395
void observationErrorException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer,
389396
@Autowired @Qualifier("throwableTemplate") KafkaTemplate<Integer, String> errorTemplate,
390397
@Autowired KafkaListenerEndpointRegistry endpointRegistry)
391-
throws ExecutionException, InterruptedException, TimeoutException {
398+
throws ExecutionException, InterruptedException, TimeoutException {
392399

393400
errorTemplate.send(OBSERVATION_ERROR, "testError").get(10, TimeUnit.SECONDS);
394401
assertThat(listener.latch5.await(10, TimeUnit.SECONDS)).isTrue();
@@ -449,6 +456,63 @@ void kafkaAdminNotRecreatedIfBootstrapServersSameInProducerAndAdminConfig(
449456
assertThat(template.getKafkaAdmin()).isSameAs(kafkaAdmin);
450457
}
451458

459+
@Test
460+
void verifyKafkaRecordSenderContextTraceParentHandling() {
461+
String initialTraceParent = "traceparent-from-previous";
462+
String updatedTraceParent = "traceparent-current";
463+
ProducerRecord<Integer, String> record = new ProducerRecord<>("test-topic", "test-value");
464+
record.headers().add("traceparent", initialTraceParent.getBytes(StandardCharsets.UTF_8));
465+
466+
// Create the context and update the traceparent
467+
KafkaRecordSenderContext context = new KafkaRecordSenderContext(
468+
record,
469+
"test-bean",
470+
() -> "test-cluster"
471+
);
472+
context.getSetter().set(record, "traceparent", updatedTraceParent);
473+
474+
Iterable<Header> traceparentHeaders = record.headers().headers("traceparent");
475+
476+
List<String> headerValues = StreamSupport.stream(traceparentHeaders.spliterator(), false)
477+
.map(header -> new String(header.value(), StandardCharsets.UTF_8))
478+
.toList();
479+
480+
// Verify there's only one traceparent header and it contains the updated value
481+
assertThat(headerValues).containsExactly(updatedTraceParent);
482+
}
483+
484+
@Test
485+
void verifyTraceParentHeader(@Autowired KafkaTemplate<Integer, String> template,
486+
@Autowired SimpleTracer tracer) throws Exception {
487+
CompletableFuture<ProducerRecord<Integer, String>> producerRecordFuture = new CompletableFuture<>();
488+
template.setProducerListener(new ProducerListener<>() {
489+
490+
@Override
491+
public void onSuccess(ProducerRecord<Integer, String> producerRecord, RecordMetadata recordMetadata) {
492+
producerRecordFuture.complete(producerRecord);
493+
}
494+
});
495+
String initialTraceParent = "traceparent-from-previous";
496+
Header header = new RecordHeader("traceparent", initialTraceParent.getBytes(StandardCharsets.UTF_8));
497+
ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>(
498+
OBSERVATION_TRACEPARENT_DUPLICATE,
499+
null, null, null,
500+
"test-value",
501+
List.of(header)
502+
);
503+
504+
template.send(producerRecord).get(10, TimeUnit.SECONDS);
505+
ProducerRecord<Integer, String> recordResult = producerRecordFuture.get(10, TimeUnit.SECONDS);
506+
507+
Iterable<Header> traceparentHeaders = recordResult.headers().headers("traceparent");
508+
assertThat(traceparentHeaders).hasSize(1);
509+
510+
String traceparentValue = new String(traceparentHeaders.iterator().next().value(), StandardCharsets.UTF_8);
511+
assertThat(traceparentValue).isEqualTo("traceparent-from-propagator");
512+
513+
tracer.getSpans().clear();
514+
}
515+
452516
@Configuration
453517
@EnableKafka
454518
public static class Config {
@@ -598,6 +662,9 @@ public List<String> fields() {
598662
public <C> void inject(TraceContext context, @Nullable C carrier, Setter<C> setter) {
599663
setter.set(carrier, "foo", "some foo value");
600664
setter.set(carrier, "bar", "some bar value");
665+
666+
// Add a traceparent header to simulate W3C trace context
667+
setter.set(carrier, "traceparent", "traceparent-from-propagator");
601668
}
602669

603670
// This is called on the consumer side when the message is consumed
@@ -606,7 +673,9 @@ public <C> void inject(TraceContext context, @Nullable C carrier, Setter<C> sett
606673
public <C> Span.Builder extract(C carrier, Getter<C> getter) {
607674
String foo = getter.get(carrier, "foo");
608675
String bar = getter.get(carrier, "bar");
609-
return tracer.spanBuilder().tag("foo", foo).tag("bar", bar);
676+
return tracer.spanBuilder()
677+
.tag("foo", foo)
678+
.tag("bar", bar);
610679
}
611680
};
612681
}

0 commit comments

Comments
 (0)