diff --git a/spring-kafka-docs/src/main/asciidoc/kafka.adoc b/spring-kafka-docs/src/main/asciidoc/kafka.adoc index b9f6ac71ef..261c374df9 100644 --- a/spring-kafka-docs/src/main/asciidoc/kafka.adoc +++ b/spring-kafka-docs/src/main/asciidoc/kafka.adoc @@ -555,6 +555,11 @@ You can use this future to determine the result of the send operation. If the first method is used, or the `replyTimeout` argument is `null`, the template's `defaultReplyTimeout` property is used (5 seconds by default). +Starting with version 2.8.8, the template has a new method `waitForAssignment`. +This is useful if the reply container is configured with `auto.offset.reset=latest` to avoid sending a request and a reply sent before the container is initialized. + +IMPORTANT: When using manual partition assignment (no group management), the duration for the wait must be greater than the container's `pollTimeout` property because the notification will not be sent until after the first poll is completed. + The following Spring Boot application shows an example of how to use the feature: ==== @@ -570,6 +575,9 @@ public class KRequestingApplication { @Bean public ApplicationRunner runner(ReplyingKafkaTemplate template) { return args -> { + if (!template.waitForAssignment(Duration.ofSeconds(10))) { + throw new IllegalStateException("Reply container did not initialize"); + } ProducerRecord record = new ProducerRecord<>("kRequests", "foo"); RequestReplyFuture replyFuture = template.sendAndReceive(record); SendResult sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java index 200a4fac1c..44747ecf28 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,7 +39,6 @@ public interface ConsumerSeekAware { * @param callback the callback. */ default void registerSeekCallback(ConsumerSeekCallback callback) { - // do nothing } /** @@ -48,7 +47,6 @@ default void registerSeekCallback(ConsumerSeekCallback callback) { * @param callback the callback to perform an initial seek after assignment. */ default void onPartitionsAssigned(Map assignments, ConsumerSeekCallback callback) { - // do nothing } /** @@ -59,7 +57,6 @@ default void onPartitionsAssigned(Map assignments, Consume * @since 2.3 */ default void onPartitionsRevoked(Collection partitions) { - // do nothing } /** @@ -69,7 +66,15 @@ default void onPartitionsRevoked(Collection partitions) { * @param callback the callback to perform a seek. */ default void onIdleContainer(Map assignments, ConsumerSeekCallback callback) { - // do nothing + } + + /** + * When using manual partition assignment, called when the first poll has completed; + * useful when using {@code auto.offset.reset=latest} and you need to wait until the + * initial position has been established. + * @since 2.8.8 + */ + default void onFirstPoll() { } /** @@ -78,7 +83,6 @@ default void onIdleContainer(Map assignments, ConsumerSeek * @since 2.4 */ default void unregisterSeekCallback() { - // do nothing } /** diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 1848c93810..e77632748b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -761,6 +761,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private volatile long lastPoll = System.currentTimeMillis(); + private boolean firstPoll; + @SuppressWarnings(UNCHECKED) ListenerConsumer(GenericMessageListener listener, ListenerType listenerType) { Properties consumerProperties = propertiesFromProperties(); @@ -1334,6 +1336,10 @@ protected void pollAndInvoke() { } return; } + if (!this.firstPoll && this.definedPartitions != null && this.consumerSeekAwareListener != null) { + this.firstPoll = true; + this.consumerSeekAwareListener.onFirstPoll(); + } debugRecords(records); invokeIfHaveRecords(records); @@ -3369,6 +3375,12 @@ public void onPartitionsAssigned(Collection partitions) { else { this.userListener.onPartitionsAssigned(partitions); } + if (!ListenerConsumer.this.firstPoll && ListenerConsumer.this.definedPartitions == null + && ListenerConsumer.this.consumerSeekAwareListener != null) { + + ListenerConsumer.this.firstPoll = true; + ListenerConsumer.this.consumerSeekAwareListener.onFirstPoll(); + } } private void repauseIfNeeded(Collection partitions) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaOperations.java b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaOperations.java index 60113226a0..347111ac54 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaOperations.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaOperations.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,6 +37,19 @@ */ public interface ReplyingKafkaOperations { + /** + * Wait until partitions are assigned, e.g. when {@code auto.offset.reset=latest}. + * When using manual assignment, the duration must be greater than the container's + * {@code pollTimeout} property. + * @param duration how long to wait. + * @return true if the partitions have been assigned. + * @throws InterruptedException if the thread is interrupted while waiting. + * @since 2.8.8 + */ + default boolean waitForAssignment(Duration duration) throws InterruptedException { + throw new UnsupportedOperationException(); + } + /** * Send a request message and receive a reply message with the default timeout. * @param message the message to send. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java index db43e5a664..7b5b5b40a1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java @@ -25,6 +25,8 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -43,6 +45,7 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.BatchMessageListener; +import org.springframework.kafka.listener.ConsumerSeekAware; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.GenericMessageListenerContainer; import org.springframework.kafka.listener.ListenerUtils; @@ -71,7 +74,7 @@ * */ public class ReplyingKafkaTemplate extends KafkaTemplate implements BatchMessageListener, - InitializingBean, SmartLifecycle, DisposableBean, ReplyingKafkaOperations { + InitializingBean, SmartLifecycle, DisposableBean, ReplyingKafkaOperations, ConsumerSeekAware { private static final String WITH_CORRELATION_ID = " with correlationId: "; @@ -109,6 +112,8 @@ public class ReplyingKafkaTemplate extends KafkaTemplate implemen private Function, Exception> replyErrorChecker = rec -> null; + private CountDownLatch assignLatch = new CountDownLatch(1); + private volatile boolean running; private volatile boolean schedulerInitialized; @@ -295,6 +300,7 @@ public synchronized void start() { catch (Exception e) { throw new KafkaException("Failed to initialize", e); } + this.assignLatch = new CountDownLatch(1); this.replyContainer.start(); this.running = true; } @@ -315,6 +321,16 @@ public void stop(Runnable callback) { callback.run(); } + @Override + public void onFirstPoll() { + this.assignLatch.countDown(); + } + + @Override + public boolean waitForAssignment(Duration duration) throws InterruptedException { + return this.assignLatch.await(duration.toMillis(), TimeUnit.MILLISECONDS); + } + @Override public RequestReplyMessageFuture sendAndReceive(Message message) { return sendAndReceive(message, this.defaultReplyTimeout, null); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java index f8d166bbf2..267e17062f 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java @@ -611,13 +611,16 @@ public void onPartitionsAssigned(Collection partitions) { new ReplyingKafkaTemplate<>(this.config.pf(), container); template.setSharedReplyTopic(true); template.start(); + assertThat(template.waitForAssignment(Duration.ofSeconds(10))).isTrue(); assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue(); assertThat(template.getAssignedReplyTopicPartitions()).hasSize(5); assertThat(template.getAssignedReplyTopicPartitions().iterator().next().topic()).isEqualTo(topic); return template; } - public ReplyingKafkaTemplate createTemplate(TopicPartitionOffset topic) { + public ReplyingKafkaTemplate createTemplate(TopicPartitionOffset topic) + throws InterruptedException { + ContainerProperties containerProperties = new ContainerProperties(topic); Map consumerProps = KafkaTestUtils.consumerProps(this.testName, "false", embeddedKafka); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); @@ -628,6 +631,7 @@ public ReplyingKafkaTemplate createTemplate(TopicPartit container); template.setSharedReplyTopic(true); template.start(); + assertThat(template.waitForAssignment(Duration.ofSeconds(10))).isTrue(); assertThat(template.getAssignedReplyTopicPartitions()).hasSize(1); assertThat(template.getAssignedReplyTopicPartitions().iterator().next().topic()).isEqualTo(topic.getTopic()); return template;