Skip to content

Commit 8fca3da

Browse files
authoredMar 26, 2025··
GH-3808: ReplyingKafkaTemplate observation on reply
Fixes: #3808 Issue link: #3808 Since `ReplyingKafkaTemplate` is a `BatchMessageListener` for the provided listener container, we cannot rely on the observation from that container. * Implement consumer observation from the `ReplyingKafkaTemplate.BatchMessageListener`. Signed-off-by: Francois Rosiere <[email protected]> [[email protected] Improve commit message] **Auto-cherry-pick to `3.3.x` & `3.2.x`** Signed-off-by: Artem Bilan <[email protected]>
1 parent f92f766 commit 8fca3da

File tree

3 files changed

+113
-45
lines changed

3 files changed

+113
-45
lines changed
 

‎spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
* @author Gurps Bassi
105105
* @author Valentina Armenise
106106
* @author Christian Fredriksson
107+
* @author Francois Rosiere
107108
*/
108109
public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationContextAware, BeanNameAware,
109110
ApplicationListener<ContextStoppedEvent>, DisposableBean, SmartInitializingSingleton {
@@ -465,6 +466,15 @@ public void setObservationRegistry(ObservationRegistry observationRegistry) {
465466
this.observationRegistry = observationRegistry;
466467
}
467468

469+
/**
470+
* Return the {@link ObservationRegistry} used by the template.
471+
* @return the observation registry
472+
* @since 3.2.9
473+
*/
474+
protected ObservationRegistry getObservationRegistry() {
475+
return this.observationRegistry;
476+
}
477+
468478
/**
469479
* Return the {@link KafkaAdmin}, used to find the cluster id for observation, if
470480
* present.
@@ -533,8 +543,13 @@ private String getAdminBootstrapAddress() {
533543
return removeLeadingAndTrailingBrackets(adminServers == null ? "" : adminServers);
534544
}
535545

546+
/**
547+
* Return the cluster id, if available.
548+
* @return the cluster id.
549+
* @since 3.2.9
550+
*/
536551
@Nullable
537-
private String clusterId() {
552+
protected String clusterId() {
538553
if (this.kafkaAdmin != null && this.clusterId == null) {
539554
this.clusterIdLock.lock();
540555
try {

‎spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java

+43-28
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.concurrent.TimeUnit;
3030
import java.util.function.Function;
3131

32+
import io.micrometer.observation.Observation;
3233
import org.apache.kafka.clients.consumer.ConsumerRecord;
3334
import org.apache.kafka.clients.producer.ProducerRecord;
3435
import org.apache.kafka.common.TopicPartition;
@@ -52,6 +53,8 @@
5253
import org.springframework.kafka.support.KafkaHeaders;
5354
import org.springframework.kafka.support.KafkaUtils;
5455
import org.springframework.kafka.support.TopicPartitionOffset;
56+
import org.springframework.kafka.support.micrometer.KafkaListenerObservation;
57+
import org.springframework.kafka.support.micrometer.KafkaRecordReceiverContext;
5558
import org.springframework.kafka.support.serializer.DeserializationException;
5659
import org.springframework.kafka.support.serializer.SerializationUtils;
5760
import org.springframework.messaging.Message;
@@ -69,6 +72,7 @@
6972
* @author Gary Russell
7073
* @author Artem Bilan
7174
* @author Borahm Lee
75+
* @author Francois Rosiere
7276
*
7377
* @since 2.1.3
7478
*
@@ -502,39 +506,50 @@ private static <K, V> CorrelationKey defaultCorrelationIdStrategy(
502506
@Override
503507
public void onMessage(List<ConsumerRecord<K, R>> data) {
504508
data.forEach(record -> {
505-
Header correlationHeader = record.headers().lastHeader(this.correlationHeaderName);
506-
Object correlationId = null;
507-
if (correlationHeader != null) {
508-
correlationId = this.binaryCorrelation
509-
? new CorrelationKey(correlationHeader.value())
510-
: new String(correlationHeader.value(), StandardCharsets.UTF_8);
511-
}
512-
if (correlationId == null) {
513-
this.logger.error(() -> "No correlationId found in reply: " + KafkaUtils.format(record)
514-
+ " - to use request/reply semantics, the responding server must return the correlation id "
515-
+ " in the '" + this.correlationHeaderName + "' header");
509+
ContainerProperties containerProperties = this.replyContainer.getContainerProperties();
510+
Observation observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation(
511+
containerProperties.getObservationConvention(),
512+
KafkaListenerObservation.DefaultKafkaListenerObservationConvention.INSTANCE,
513+
() -> new KafkaRecordReceiverContext(record, this.replyContainer.getListenerId(), containerProperties.getClientId(), this.replyContainer.getGroupId(),
514+
this::clusterId),
515+
getObservationRegistry());
516+
observation.observe(() -> handleReply(record));
517+
});
518+
}
519+
520+
private void handleReply(ConsumerRecord<K, R> record) {
521+
Header correlationHeader = record.headers().lastHeader(this.correlationHeaderName);
522+
Object correlationId = null;
523+
if (correlationHeader != null) {
524+
correlationId = this.binaryCorrelation
525+
? new CorrelationKey(correlationHeader.value())
526+
: new String(correlationHeader.value(), StandardCharsets.UTF_8);
527+
}
528+
if (correlationId == null) {
529+
this.logger.error(() -> "No correlationId found in reply: " + KafkaUtils.format(record)
530+
+ " - to use request/reply semantics, the responding server must return the correlation id "
531+
+ " in the '" + this.correlationHeaderName + "' header");
532+
}
533+
else {
534+
RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId);
535+
Object correlationKey = correlationId;
536+
if (future == null) {
537+
logLateArrival(record, correlationId);
516538
}
517539
else {
518-
RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId);
519-
Object correlationKey = correlationId;
520-
if (future == null) {
521-
logLateArrival(record, correlationId);
540+
boolean ok = true;
541+
Exception exception = checkForErrors(record);
542+
if (exception != null) {
543+
ok = false;
544+
future.completeExceptionally(exception);
522545
}
523-
else {
524-
boolean ok = true;
525-
Exception exception = checkForErrors(record);
526-
if (exception != null) {
527-
ok = false;
528-
future.completeExceptionally(exception);
529-
}
530-
if (ok) {
531-
this.logger.debug(() -> "Received: " + KafkaUtils.format(record)
532-
+ WITH_CORRELATION_ID + correlationKey);
533-
future.complete(record);
534-
}
546+
if (ok) {
547+
this.logger.debug(() -> "Received: " + KafkaUtils.format(record)
548+
+ WITH_CORRELATION_ID + correlationKey);
549+
future.complete(record);
535550
}
536551
}
537-
});
552+
}
538553
}
539554

540555
/**

‎spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java

+54-16
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.support.micrometer;
1818

1919
import java.nio.charset.StandardCharsets;
20+
import java.time.Duration;
2021
import java.util.Arrays;
2122
import java.util.Deque;
2223
import java.util.List;
@@ -81,12 +82,14 @@
8182
import org.springframework.kafka.core.ProducerFactory;
8283
import org.springframework.kafka.listener.MessageListenerContainer;
8384
import org.springframework.kafka.listener.RecordInterceptor;
85+
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
8486
import org.springframework.kafka.support.ProducerListener;
8587
import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention;
8688
import org.springframework.kafka.support.micrometer.KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention;
8789
import org.springframework.kafka.test.EmbeddedKafkaBroker;
8890
import org.springframework.kafka.test.context.EmbeddedKafka;
8991
import org.springframework.kafka.test.utils.KafkaTestUtils;
92+
import org.springframework.messaging.handler.annotation.SendTo;
9093
import org.springframework.test.annotation.DirtiesContext;
9194
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
9295
import org.springframework.util.StringUtils;
@@ -102,13 +105,15 @@
102105
* @author Wang Zhiyang
103106
* @author Christian Mergenthaler
104107
* @author Soby Chacko
108+
* @author Francois Rosiere
105109
*
106110
* @since 3.0
107111
*/
108112
@SpringJUnitConfig
109-
@EmbeddedKafka(topics = { ObservationTests.OBSERVATION_TEST_1, ObservationTests.OBSERVATION_TEST_2,
110-
ObservationTests.OBSERVATION_TEST_3, ObservationTests.OBSERVATION_RUNTIME_EXCEPTION,
111-
ObservationTests.OBSERVATION_ERROR, ObservationTests.OBSERVATION_TRACEPARENT_DUPLICATE }, partitions = 1)
113+
@EmbeddedKafka(topics = {ObservationTests.OBSERVATION_TEST_1, ObservationTests.OBSERVATION_TEST_2,
114+
ObservationTests.OBSERVATION_TEST_3, ObservationTests.OBSERVATION_TEST_4, ObservationTests.OBSERVATION_REPLY,
115+
ObservationTests.OBSERVATION_RUNTIME_EXCEPTION, ObservationTests.OBSERVATION_ERROR,
116+
ObservationTests.OBSERVATION_TRACEPARENT_DUPLICATE}, partitions = 1)
112117
@DirtiesContext
113118
public class ObservationTests {
114119

@@ -118,6 +123,10 @@ public class ObservationTests {
118123

119124
public final static String OBSERVATION_TEST_3 = "observation.testT3";
120125

126+
public final static String OBSERVATION_TEST_4 = "observation.testT4";
127+
128+
public final static String OBSERVATION_REPLY = "observation.reply";
129+
121130
public final static String OBSERVATION_RUNTIME_EXCEPTION = "observation.runtime-exception";
122131

123132
public final static String OBSERVATION_ERROR = "observation.error.sync";
@@ -135,11 +144,12 @@ void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, St
135144
@Autowired KafkaListenerEndpointRegistry endpointRegistry, @Autowired KafkaAdmin admin,
136145
@Autowired @Qualifier("customTemplate") KafkaTemplate<Integer, String> customTemplate,
137146
@Autowired Config config)
138-
throws InterruptedException, ExecutionException, TimeoutException {
147+
throws InterruptedException, ExecutionException, TimeoutException {
139148

140149
AtomicReference<SimpleSpan> spanFromCallback = new AtomicReference<>();
141150

142151
template.setProducerInterceptor(new ProducerInterceptor<>() {
152+
143153
@Override
144154
public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
145155
tracer.currentSpanCustomizer().tag("key", "value");
@@ -327,10 +337,10 @@ private void assertThatTemplateHasTimerWithNameAndTags(MeterRegistryAssert meter
327337

328338
meterRegistryAssert.hasTimerWithNameAndTags("spring.kafka.template",
329339
KeyValues.of("spring.kafka.template.name", "template",
330-
"messaging.operation", "publish",
331-
"messaging.system", "kafka",
332-
"messaging.destination.kind", "topic",
333-
"messaging.destination.name", destName)
340+
"messaging.operation", "publish",
341+
"messaging.system", "kafka",
342+
"messaging.destination.kind", "topic",
343+
"messaging.destination.name", destName)
334344
.and(keyValues));
335345
}
336346

@@ -339,12 +349,12 @@ private void assertThatListenerHasTimerWithNameAndTags(MeterRegistryAssert meter
339349

340350
meterRegistryAssert.hasTimerWithNameAndTags("spring.kafka.listener",
341351
KeyValues.of(
342-
"messaging.kafka.consumer.group", consumerGroup,
343-
"messaging.operation", "receive",
344-
"messaging.source.kind", "topic",
345-
"messaging.source.name", destName,
346-
"messaging.system", "kafka",
347-
"spring.kafka.listener.id", listenerId)
352+
"messaging.kafka.consumer.group", consumerGroup,
353+
"messaging.operation", "receive",
354+
"messaging.source.kind", "topic",
355+
"messaging.source.name", destName,
356+
"messaging.system", "kafka",
357+
"spring.kafka.listener.id", listenerId)
348358
.and(keyValues));
349359
}
350360

@@ -394,7 +404,7 @@ void observationRuntimeException(@Autowired ExceptionListener listener, @Autowir
394404
void observationErrorException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer,
395405
@Autowired @Qualifier("throwableTemplate") KafkaTemplate<Integer, String> errorTemplate,
396406
@Autowired KafkaListenerEndpointRegistry endpointRegistry)
397-
throws ExecutionException, InterruptedException, TimeoutException {
407+
throws ExecutionException, InterruptedException, TimeoutException {
398408

399409
errorTemplate.send(OBSERVATION_ERROR, "testError").get(10, TimeUnit.SECONDS);
400410
assertThat(listener.latch5.await(10, TimeUnit.SECONDS)).isTrue();
@@ -485,6 +495,7 @@ void verifyTraceParentHeader(@Autowired KafkaTemplate<Integer, String> template,
485495
@Autowired SimpleTracer tracer) throws Exception {
486496
CompletableFuture<ProducerRecord<Integer, String>> producerRecordFuture = new CompletableFuture<>();
487497
template.setProducerListener(new ProducerListener<>() {
498+
488499
@Override
489500
public void onSuccess(ProducerRecord<Integer, String> producerRecord, RecordMetadata recordMetadata) {
490501
producerRecordFuture.complete(producerRecord);
@@ -511,6 +522,18 @@ public void onSuccess(ProducerRecord<Integer, String> producerRecord, RecordMeta
511522
tracer.getSpans().clear();
512523
}
513524

525+
@Test
526+
void testReplyingKafkaTemplateObservation(
527+
@Autowired ReplyingKafkaTemplate<Integer, String, String> template,
528+
@Autowired ObservationRegistry observationRegistry) {
529+
assertThat(template.sendAndReceive(new ProducerRecord<>(OBSERVATION_TEST_4, "test"))
530+
// the current observation must be retrieved from the consumer thread of the reply
531+
.thenApply(replyRecord -> observationRegistry.getCurrentObservation().getContext()))
532+
.isCompletedWithValueMatchingWithin(observationContext ->
533+
observationContext instanceof KafkaRecordReceiverContext
534+
&& "spring.kafka.listener".equals(observationContext.getName()), Duration.ofSeconds(30));
535+
}
536+
514537
@Configuration
515538
@EnableKafka
516539
public static class Config {
@@ -584,13 +607,22 @@ KafkaTemplate<Integer, String> reuseAdminBeanKafkaTemplate(
584607
return template;
585608
}
586609

610+
@Bean
611+
ReplyingKafkaTemplate<Integer, String, String> replyingKafkaTemplate(ProducerFactory<Integer, String> pf, ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {
612+
ReplyingKafkaTemplate<Integer, String, String> kafkaTemplate = new ReplyingKafkaTemplate<>(pf, containerFactory.createContainer(OBSERVATION_REPLY));
613+
kafkaTemplate.setObservationEnabled(true);
614+
return kafkaTemplate;
615+
}
616+
587617
@Bean
588618
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(
589-
ConsumerFactory<Integer, String> cf, ObservationRegistry observationRegistry) {
619+
ConsumerFactory<Integer, String> cf, ObservationRegistry observationRegistry,
620+
KafkaTemplate<Integer, String> kafkaTemplate) {
590621

591622
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
592623
new ConcurrentKafkaListenerContainerFactory<>();
593624
factory.setConsumerFactory(cf);
625+
factory.setReplyTemplate(kafkaTemplate);
594626
factory.getContainerProperties().setObservationEnabled(true);
595627
factory.setContainerCustomizer(container -> {
596628
if (container.getListenerId().equals("obs3")) {
@@ -721,6 +753,12 @@ void listen2(ConsumerRecord<?, ?> in) {
721753
void listen3(ConsumerRecord<Integer, String> in) {
722754
}
723755

756+
@KafkaListener(id = "obsReply", topics = OBSERVATION_TEST_4)
757+
@SendTo // default REPLY_TOPIC header
758+
public String replyListener(ConsumerRecord<Integer, String> in) {
759+
return in.value().toUpperCase();
760+
}
761+
724762
}
725763

726764
public static class ExceptionListener {

0 commit comments

Comments
 (0)
Please sign in to comment.