Skip to content

Commit 653fdeb

Browse files
committed
spring-projectsGH-3808: ReplyingKafkaTemplate observation on reply
1 parent 35542b0 commit 653fdeb

File tree

3 files changed

+98
-32
lines changed

3 files changed

+98
-32
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,15 @@ public void setObservationRegistry(ObservationRegistry observationRegistry) {
465465
this.observationRegistry = observationRegistry;
466466
}
467467

468+
/**
469+
* Return the {@link ObservationRegistry} used by the template.
470+
* @return the observation registry
471+
* @since 3.3.5
472+
*/
473+
public ObservationRegistry getObservationRegistry() {
474+
return this.observationRegistry;
475+
}
476+
468477
/**
469478
* Return the {@link KafkaAdmin}, used to find the cluster id for observation, if
470479
* present.
@@ -533,8 +542,13 @@ private String getAdminBootstrapAddress() {
533542
return removeLeadingAndTrailingBrackets(adminServers == null ? "" : adminServers);
534543
}
535544

545+
/**
546+
* Return the cluster id, if available.
547+
* @return the cluster id.
548+
* @since 3.3.5
549+
*/
536550
@Nullable
537-
private String clusterId() {
551+
protected String clusterId() {
538552
if (this.kafkaAdmin != null && this.clusterId == null) {
539553
this.clusterIdLock.lock();
540554
try {

spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java

+43-28
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.concurrent.TimeUnit;
3030
import java.util.function.Function;
3131

32+
import io.micrometer.observation.Observation;
3233
import org.apache.kafka.clients.consumer.ConsumerRecord;
3334
import org.apache.kafka.clients.producer.ProducerRecord;
3435
import org.apache.kafka.common.TopicPartition;
@@ -52,6 +53,8 @@
5253
import org.springframework.kafka.support.KafkaHeaders;
5354
import org.springframework.kafka.support.KafkaUtils;
5455
import org.springframework.kafka.support.TopicPartitionOffset;
56+
import org.springframework.kafka.support.micrometer.KafkaListenerObservation;
57+
import org.springframework.kafka.support.micrometer.KafkaRecordReceiverContext;
5558
import org.springframework.kafka.support.serializer.DeserializationException;
5659
import org.springframework.kafka.support.serializer.SerializationUtils;
5760
import org.springframework.messaging.Message;
@@ -69,6 +72,7 @@
6972
* @author Gary Russell
7073
* @author Artem Bilan
7174
* @author Borahm Lee
75+
* @author Francois Rosiere
7276
*
7377
* @since 2.1.3
7478
*
@@ -502,39 +506,50 @@ private static <K, V> CorrelationKey defaultCorrelationIdStrategy(
502506
@Override
503507
public void onMessage(List<ConsumerRecord<K, R>> data) {
504508
data.forEach(record -> {
505-
Header correlationHeader = record.headers().lastHeader(this.correlationHeaderName);
506-
Object correlationId = null;
507-
if (correlationHeader != null) {
508-
correlationId = this.binaryCorrelation
509-
? new CorrelationKey(correlationHeader.value())
510-
: new String(correlationHeader.value(), StandardCharsets.UTF_8);
511-
}
512-
if (correlationId == null) {
513-
this.logger.error(() -> "No correlationId found in reply: " + KafkaUtils.format(record)
514-
+ " - to use request/reply semantics, the responding server must return the correlation id "
515-
+ " in the '" + this.correlationHeaderName + "' header");
509+
ContainerProperties containerProperties = this.replyContainer.getContainerProperties();
510+
Observation observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation(
511+
containerProperties.getObservationConvention(),
512+
KafkaListenerObservation.DefaultKafkaListenerObservationConvention.INSTANCE,
513+
() -> new KafkaRecordReceiverContext(record, this.replyContainer.getListenerId(), containerProperties.getClientId(), this.replyContainer.getGroupId(),
514+
this::clusterId),
515+
getObservationRegistry());
516+
observation.observe(() -> handleReply(record));
517+
});
518+
}
519+
520+
private void handleReply(ConsumerRecord<K, R> record) {
521+
Header correlationHeader = record.headers().lastHeader(this.correlationHeaderName);
522+
Object correlationId = null;
523+
if (correlationHeader != null) {
524+
correlationId = this.binaryCorrelation
525+
? new CorrelationKey(correlationHeader.value())
526+
: new String(correlationHeader.value(), StandardCharsets.UTF_8);
527+
}
528+
if (correlationId == null) {
529+
this.logger.error(() -> "No correlationId found in reply: " + KafkaUtils.format(record)
530+
+ " - to use request/reply semantics, the responding server must return the correlation id "
531+
+ " in the '" + this.correlationHeaderName + "' header");
532+
}
533+
else {
534+
RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId);
535+
Object correlationKey = correlationId;
536+
if (future == null) {
537+
logLateArrival(record, correlationId);
516538
}
517539
else {
518-
RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId);
519-
Object correlationKey = correlationId;
520-
if (future == null) {
521-
logLateArrival(record, correlationId);
540+
boolean ok = true;
541+
Exception exception = checkForErrors(record);
542+
if (exception != null) {
543+
ok = false;
544+
future.completeExceptionally(exception);
522545
}
523-
else {
524-
boolean ok = true;
525-
Exception exception = checkForErrors(record);
526-
if (exception != null) {
527-
ok = false;
528-
future.completeExceptionally(exception);
529-
}
530-
if (ok) {
531-
this.logger.debug(() -> "Received: " + KafkaUtils.format(record)
532-
+ WITH_CORRELATION_ID + correlationKey);
533-
future.complete(record);
534-
}
546+
if (ok) {
547+
this.logger.debug(() -> "Received: " + KafkaUtils.format(record)
548+
+ WITH_CORRELATION_ID + correlationKey);
549+
future.complete(record);
535550
}
536551
}
537-
});
552+
}
538553
}
539554

540555
/**

spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java

+40-3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.support.micrometer;
1818

1919
import java.nio.charset.StandardCharsets;
20+
import java.time.Duration;
2021
import java.util.Arrays;
2122
import java.util.Deque;
2223
import java.util.List;
@@ -59,6 +60,7 @@
5960
import org.apache.kafka.common.header.Header;
6061
import org.apache.kafka.common.header.Headers;
6162
import org.apache.kafka.common.header.internals.RecordHeader;
63+
import org.awaitility.Awaitility;
6264
import org.jspecify.annotations.Nullable;
6365
import org.junit.jupiter.api.Test;
6466
import reactor.core.publisher.Mono;
@@ -81,12 +83,14 @@
8183
import org.springframework.kafka.core.ProducerFactory;
8284
import org.springframework.kafka.listener.MessageListenerContainer;
8385
import org.springframework.kafka.listener.RecordInterceptor;
86+
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
8487
import org.springframework.kafka.support.ProducerListener;
8588
import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention;
8689
import org.springframework.kafka.support.micrometer.KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention;
8790
import org.springframework.kafka.test.EmbeddedKafkaBroker;
8891
import org.springframework.kafka.test.context.EmbeddedKafka;
8992
import org.springframework.kafka.test.utils.KafkaTestUtils;
93+
import org.springframework.messaging.handler.annotation.SendTo;
9094
import org.springframework.test.annotation.DirtiesContext;
9195
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
9296
import org.springframework.util.StringUtils;
@@ -107,8 +111,9 @@
107111
*/
108112
@SpringJUnitConfig
109113
@EmbeddedKafka(topics = { ObservationTests.OBSERVATION_TEST_1, ObservationTests.OBSERVATION_TEST_2,
110-
ObservationTests.OBSERVATION_TEST_3, ObservationTests.OBSERVATION_RUNTIME_EXCEPTION,
111-
ObservationTests.OBSERVATION_ERROR, ObservationTests.OBSERVATION_TRACEPARENT_DUPLICATE }, partitions = 1)
114+
ObservationTests.OBSERVATION_TEST_3, ObservationTests.OBSERVATION_TEST_4, ObservationTests.OBSERVATION_REPLY,
115+
ObservationTests.OBSERVATION_RUNTIME_EXCEPTION, ObservationTests.OBSERVATION_ERROR,
116+
ObservationTests.OBSERVATION_TRACEPARENT_DUPLICATE }, partitions = 1)
112117
@DirtiesContext
113118
public class ObservationTests {
114119

@@ -118,6 +123,10 @@ public class ObservationTests {
118123

119124
public final static String OBSERVATION_TEST_3 = "observation.testT3";
120125

126+
public final static String OBSERVATION_TEST_4 = "observation.testT4";
127+
128+
public final static String OBSERVATION_REPLY = "observation.reply";
129+
121130
public final static String OBSERVATION_RUNTIME_EXCEPTION = "observation.runtime-exception";
122131

123132
public final static String OBSERVATION_ERROR = "observation.error.sync";
@@ -511,6 +520,20 @@ public void onSuccess(ProducerRecord<Integer, String> producerRecord, RecordMeta
511520
tracer.getSpans().clear();
512521
}
513522

523+
@Test
524+
void testReplyingKafkaTemplateObservation(
525+
@Autowired ReplyingKafkaTemplate<Integer, String, String> template,
526+
@Autowired ObservationRegistry observationRegistry) {
527+
AtomicReference<KafkaRecordReceiverContext> replyObservationContext = new AtomicReference<>();
528+
template.sendAndReceive(new ProducerRecord<>(OBSERVATION_TEST_4, "test")).thenAccept(replyRecord -> {
529+
Observation.Context observationContext = observationRegistry.getCurrentObservation().getContext();
530+
assertThat(observationContext).isInstanceOf(KafkaRecordReceiverContext.class);
531+
replyObservationContext.set((KafkaRecordReceiverContext) observationContext);
532+
});
533+
Awaitility.await().atMost(Duration.ofSeconds(60)).until(() ->
534+
replyObservationContext.get() != null && "spring.kafka.listener".equals(replyObservationContext.get().getName()));
535+
}
536+
514537
@Configuration
515538
@EnableKafka
516539
public static class Config {
@@ -584,13 +607,22 @@ KafkaTemplate<Integer, String> reuseAdminBeanKafkaTemplate(
584607
return template;
585608
}
586609

610+
@Bean
611+
ReplyingKafkaTemplate<Integer, String, String> replyingKafkaTemplate(ProducerFactory<Integer, String> pf, ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {
612+
ReplyingKafkaTemplate<Integer, String, String> kafkaTemplate = new ReplyingKafkaTemplate<>(pf, containerFactory.createContainer(OBSERVATION_REPLY));
613+
kafkaTemplate.setObservationEnabled(true);
614+
return kafkaTemplate;
615+
}
616+
587617
@Bean
588618
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(
589-
ConsumerFactory<Integer, String> cf, ObservationRegistry observationRegistry) {
619+
ConsumerFactory<Integer, String> cf, ObservationRegistry observationRegistry,
620+
KafkaTemplate<Integer, String> kafkaTemplate) {
590621

591622
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
592623
new ConcurrentKafkaListenerContainerFactory<>();
593624
factory.setConsumerFactory(cf);
625+
factory.setReplyTemplate(kafkaTemplate);
594626
factory.getContainerProperties().setObservationEnabled(true);
595627
factory.setContainerCustomizer(container -> {
596628
if (container.getListenerId().equals("obs3")) {
@@ -721,6 +753,11 @@ void listen2(ConsumerRecord<?, ?> in) {
721753
void listen3(ConsumerRecord<Integer, String> in) {
722754
}
723755

756+
@KafkaListener(id = "obsReply", topics = OBSERVATION_TEST_4)
757+
@SendTo // default REPLY_TOPIC header
758+
public String replyListener(ConsumerRecord<Integer, String> in) {
759+
return in.value().toUpperCase();
760+
}
724761
}
725762

726763
public static class ExceptionListener {

0 commit comments

Comments
 (0)