Skip to content

Commit f8f066a

Browse files
frosiereartembilan
authored andcommitted
GH-3808: ReplyingKafkaTemplate observation on reply
Fixes: #3808 Issue link: #3808 Since `ReplyingKafkaTemplate` is a `BatchMessageListener` for the provided listener container, we cannot rely on the observation from that container. * Implement consumer observation from the `ReplyingKafkaTemplate.BatchMessageListener`. Signed-off-by: Francois Rosiere <[email protected]> [[email protected] Improve commit message] **Auto-cherry-pick to `3.2.x`** Signed-off-by: Artem Bilan <[email protected]> # Conflicts: # spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java
1 parent 6b847aa commit f8f066a

File tree

3 files changed

+100
-34
lines changed

3 files changed

+100
-34
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
* @author Gurps Bassi
105105
* @author Valentina Armenise
106106
* @author Christian Fredriksson
107+
* @author Francois Rosiere
107108
*/
108109
public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationContextAware, BeanNameAware,
109110
ApplicationListener<ContextStoppedEvent>, DisposableBean, SmartInitializingSingleton {
@@ -467,6 +468,15 @@ public void setObservationRegistry(ObservationRegistry observationRegistry) {
467468
this.observationRegistry = observationRegistry;
468469
}
469470

471+
/**
472+
* Return the {@link ObservationRegistry} used by the template.
473+
* @return the observation registry
474+
* @since 3.2.9
475+
*/
476+
protected ObservationRegistry getObservationRegistry() {
477+
return this.observationRegistry;
478+
}
479+
470480
/**
471481
* Return the {@link KafkaAdmin}, used to find the cluster id for observation, if
472482
* present.
@@ -533,8 +543,13 @@ private String getAdminBootstrapAddress() {
533543
return removeLeadingAndTrailingBrackets(adminServers);
534544
}
535545

546+
/**
547+
* Return the cluster id, if available.
548+
* @return the cluster id.
549+
* @since 3.2.9
550+
*/
536551
@Nullable
537-
private String clusterId() {
552+
protected String clusterId() {
538553
if (this.kafkaAdmin != null && this.clusterId == null) {
539554
this.clusterIdLock.lock();
540555
try {

spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java

+43-28
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.concurrent.TimeUnit;
3030
import java.util.function.Function;
3131

32+
import io.micrometer.observation.Observation;
3233
import org.apache.kafka.clients.consumer.ConsumerRecord;
3334
import org.apache.kafka.clients.producer.ProducerRecord;
3435
import org.apache.kafka.common.TopicPartition;
@@ -51,6 +52,8 @@
5152
import org.springframework.kafka.support.KafkaHeaders;
5253
import org.springframework.kafka.support.KafkaUtils;
5354
import org.springframework.kafka.support.TopicPartitionOffset;
55+
import org.springframework.kafka.support.micrometer.KafkaListenerObservation;
56+
import org.springframework.kafka.support.micrometer.KafkaRecordReceiverContext;
5457
import org.springframework.kafka.support.serializer.DeserializationException;
5558
import org.springframework.kafka.support.serializer.SerializationUtils;
5659
import org.springframework.lang.Nullable;
@@ -69,6 +72,7 @@
6972
* @author Gary Russell
7073
* @author Artem Bilan
7174
* @author Borahm Lee
75+
* @author Francois Rosiere
7276
*
7377
* @since 2.1.3
7478
*
@@ -501,39 +505,50 @@ private static <K, V> CorrelationKey defaultCorrelationIdStrategy(
501505
@Override
502506
public void onMessage(List<ConsumerRecord<K, R>> data) {
503507
data.forEach(record -> {
504-
Header correlationHeader = record.headers().lastHeader(this.correlationHeaderName);
505-
Object correlationId = null;
506-
if (correlationHeader != null) {
507-
correlationId = this.binaryCorrelation
508-
? new CorrelationKey(correlationHeader.value())
509-
: new String(correlationHeader.value(), StandardCharsets.UTF_8);
510-
}
511-
if (correlationId == null) {
512-
this.logger.error(() -> "No correlationId found in reply: " + KafkaUtils.format(record)
513-
+ " - to use request/reply semantics, the responding server must return the correlation id "
514-
+ " in the '" + this.correlationHeaderName + "' header");
508+
ContainerProperties containerProperties = this.replyContainer.getContainerProperties();
509+
Observation observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation(
510+
containerProperties.getObservationConvention(),
511+
KafkaListenerObservation.DefaultKafkaListenerObservationConvention.INSTANCE,
512+
() -> new KafkaRecordReceiverContext(record, this.replyContainer.getListenerId(), containerProperties.getClientId(), this.replyContainer.getGroupId(),
513+
this::clusterId),
514+
getObservationRegistry());
515+
observation.observe(() -> handleReply(record));
516+
});
517+
}
518+
519+
private void handleReply(ConsumerRecord<K, R> record) {
520+
Header correlationHeader = record.headers().lastHeader(this.correlationHeaderName);
521+
Object correlationId = null;
522+
if (correlationHeader != null) {
523+
correlationId = this.binaryCorrelation
524+
? new CorrelationKey(correlationHeader.value())
525+
: new String(correlationHeader.value(), StandardCharsets.UTF_8);
526+
}
527+
if (correlationId == null) {
528+
this.logger.error(() -> "No correlationId found in reply: " + KafkaUtils.format(record)
529+
+ " - to use request/reply semantics, the responding server must return the correlation id "
530+
+ " in the '" + this.correlationHeaderName + "' header");
531+
}
532+
else {
533+
RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId);
534+
Object correlationKey = correlationId;
535+
if (future == null) {
536+
logLateArrival(record, correlationId);
515537
}
516538
else {
517-
RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId);
518-
Object correlationKey = correlationId;
519-
if (future == null) {
520-
logLateArrival(record, correlationId);
539+
boolean ok = true;
540+
Exception exception = checkForErrors(record);
541+
if (exception != null) {
542+
ok = false;
543+
future.completeExceptionally(exception);
521544
}
522-
else {
523-
boolean ok = true;
524-
Exception exception = checkForErrors(record);
525-
if (exception != null) {
526-
ok = false;
527-
future.completeExceptionally(exception);
528-
}
529-
if (ok) {
530-
this.logger.debug(() -> "Received: " + KafkaUtils.format(record)
531-
+ WITH_CORRELATION_ID + correlationKey);
532-
future.complete(record);
533-
}
545+
if (ok) {
546+
this.logger.debug(() -> "Received: " + KafkaUtils.format(record)
547+
+ WITH_CORRELATION_ID + correlationKey);
548+
future.complete(record);
534549
}
535550
}
536-
});
551+
}
537552
}
538553

539554
/**

spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java

+41-5
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.support.micrometer;
1818

1919
import java.nio.charset.StandardCharsets;
20+
import java.time.Duration;
2021
import java.util.Arrays;
2122
import java.util.Deque;
2223
import java.util.List;
@@ -80,13 +81,14 @@
8081
import org.springframework.kafka.core.ProducerFactory;
8182
import org.springframework.kafka.listener.MessageListenerContainer;
8283
import org.springframework.kafka.listener.RecordInterceptor;
84+
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
8385
import org.springframework.kafka.support.ProducerListener;
8486
import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention;
8587
import org.springframework.kafka.support.micrometer.KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention;
8688
import org.springframework.kafka.test.EmbeddedKafkaBroker;
8789
import org.springframework.kafka.test.context.EmbeddedKafka;
8890
import org.springframework.kafka.test.utils.KafkaTestUtils;
89-
import org.springframework.lang.Nullable;
91+
import org.springframework.messaging.handler.annotation.SendTo;
9092
import org.springframework.test.annotation.DirtiesContext;
9193
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
9294
import org.springframework.util.StringUtils;
@@ -102,13 +104,15 @@
102104
* @author Wang Zhiyang
103105
* @author Christian Mergenthaler
104106
* @author Soby Chacko
107+
* @author Francois Rosiere
105108
*
106109
* @since 3.0
107110
*/
108111
@SpringJUnitConfig
109112
@EmbeddedKafka(topics = {ObservationTests.OBSERVATION_TEST_1, ObservationTests.OBSERVATION_TEST_2,
110-
ObservationTests.OBSERVATION_TEST_3, ObservationTests.OBSERVATION_RUNTIME_EXCEPTION,
111-
ObservationTests.OBSERVATION_ERROR, ObservationTests.OBSERVATION_TRACEPARENT_DUPLICATE}, partitions = 1)
113+
ObservationTests.OBSERVATION_TEST_3, ObservationTests.OBSERVATION_TEST_4, ObservationTests.OBSERVATION_REPLY,
114+
ObservationTests.OBSERVATION_RUNTIME_EXCEPTION, ObservationTests.OBSERVATION_ERROR,
115+
ObservationTests.OBSERVATION_TRACEPARENT_DUPLICATE}, partitions = 1)
112116
@DirtiesContext
113117
public class ObservationTests {
114118

@@ -118,6 +122,10 @@ public class ObservationTests {
118122

119123
public final static String OBSERVATION_TEST_3 = "observation.testT3";
120124

125+
public final static String OBSERVATION_TEST_4 = "observation.testT4";
126+
127+
public final static String OBSERVATION_REPLY = "observation.reply";
128+
121129
public final static String OBSERVATION_RUNTIME_EXCEPTION = "observation.runtime-exception";
122130

123131
public final static String OBSERVATION_ERROR = "observation.error.sync";
@@ -513,6 +521,19 @@ public void onSuccess(ProducerRecord<Integer, String> producerRecord, RecordMeta
513521
tracer.getSpans().clear();
514522
}
515523

524+
@Test
525+
void testReplyingKafkaTemplateObservation(
526+
@Autowired ReplyingKafkaTemplate<Integer, String, String> template,
527+
@Autowired ObservationRegistry observationRegistry) {
528+
assertThat(template.sendAndReceive(new ProducerRecord<>(OBSERVATION_TEST_4, "test"))
529+
// the current observation must be retrieved from the consumer thread of the reply
530+
.thenApply(replyRecord -> observationRegistry.getCurrentObservation().getContext()))
531+
.succeedsWithin(Duration.ofSeconds(30))
532+
.isInstanceOf(KafkaRecordReceiverContext.class)
533+
.extracting("name")
534+
.isEqualTo("spring.kafka.listener");
535+
}
536+
516537
@Configuration
517538
@EnableKafka
518539
public static class Config {
@@ -586,13 +607,22 @@ KafkaTemplate<Integer, String> reuseAdminBeanKafkaTemplate(
586607
return template;
587608
}
588609

610+
@Bean
611+
ReplyingKafkaTemplate<Integer, String, String> replyingKafkaTemplate(ProducerFactory<Integer, String> pf, ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {
612+
ReplyingKafkaTemplate<Integer, String, String> kafkaTemplate = new ReplyingKafkaTemplate<>(pf, containerFactory.createContainer(OBSERVATION_REPLY));
613+
kafkaTemplate.setObservationEnabled(true);
614+
return kafkaTemplate;
615+
}
616+
589617
@Bean
590618
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(
591-
ConsumerFactory<Integer, String> cf, ObservationRegistry observationRegistry) {
619+
ConsumerFactory<Integer, String> cf, ObservationRegistry observationRegistry,
620+
KafkaTemplate<Integer, String> kafkaTemplate) {
592621

593622
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
594623
new ConcurrentKafkaListenerContainerFactory<>();
595624
factory.setConsumerFactory(cf);
625+
factory.setReplyTemplate(kafkaTemplate);
596626
factory.getContainerProperties().setObservationEnabled(true);
597627
factory.setContainerCustomizer(container -> {
598628
if (container.getListenerId().equals("obs3")) {
@@ -659,7 +689,7 @@ public List<String> fields() {
659689
// This is called on the producer side when the message is being sent
660690
// Normally we would pass information from tracing context - for tests we don't need to
661691
@Override
662-
public <C> void inject(TraceContext context, @Nullable C carrier, Setter<C> setter) {
692+
public <C> void inject(TraceContext context, C carrier, Setter<C> setter) {
663693
setter.set(carrier, "foo", "some foo value");
664694
setter.set(carrier, "bar", "some bar value");
665695

@@ -723,6 +753,12 @@ void listen2(ConsumerRecord<?, ?> in) {
723753
void listen3(ConsumerRecord<Integer, String> in) {
724754
}
725755

756+
@KafkaListener(id = "obsReply", topics = OBSERVATION_TEST_4)
757+
@SendTo // default REPLY_TOPIC header
758+
public String replyListener(ConsumerRecord<Integer, String> in) {
759+
return in.value().toUpperCase();
760+
}
761+
726762
}
727763

728764
public static class ExceptionListener {

0 commit comments

Comments
 (0)