From e47d829fe6d64dfbc2bfeff1b3c691855aea2dc7 Mon Sep 17 00:00:00 2001 From: Wouter Coekaerts Date: Sat, 7 Dec 2024 16:50:56 +0100 Subject: [PATCH] GH-3660: Fix EmbeddedKafkaBroker seekToEnd Problem: consumeFromEmbeddedTopics calls Consumer.seekToEnd, which "evaluates lazily, seeking to the final offset in all partitions only when poll(Duration) or position(TopicPartition) are called". This means that the consumer may or may not see future messages, depending on timing. This fix calls `position` so that the seek happens before consumeFromEmbeddedTopics returns. That was it is ensured that any message sent to the topic after the call to consumeFromEmbeddedTopics are seen by the consumer. Issue: https://github.com/spring-projects/spring-kafka/issues/3660 --- .../kafka/test/EmbeddedKafkaKraftBroker.java | 3 +++ .../kafka/test/EmbeddedKafkaZKBroker.java | 3 +++ .../test/EmbeddedKafkaKraftBrokerTests.java | 24 +++++++++++++++++ .../test/EmbeddedKafkaZKBrokerTests.java | 26 +++++++++++++++++++ 4 files changed, 56 insertions(+) diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java index 41bd32b196..35f80813c6 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java @@ -71,6 +71,7 @@ * @author Adrian Chlebosz * @author Soby Chacko * @author Sanghyeok An + * @author Wouter Coekaerts * * @since 3.1 */ @@ -560,6 +561,8 @@ public void onPartitionsAssigned(Collection partitions) { + (seekToEnd ? "end; " : "beginning")); if (seekToEnd) { consumer.seekToEnd(assigned.get()); + // seekToEnd is asynchronous. query the position to force the seek to happen now. + assigned.get().forEach(consumer::position); } else { consumer.seekToBeginning(assigned.get()); diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java index a4e3761acd..a9b070a7d2 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java @@ -89,6 +89,7 @@ * @author Soby Chacko * @author Sanghyeok An * @author Borahm Lee + * @author Wouter Coekaerts * * @since 2.2 */ @@ -763,6 +764,8 @@ public void onPartitionsAssigned(Collection partitions) { + (seekToEnd ? "end; " : "beginning")); if (seekToEnd) { consumer.seekToEnd(assigned.get()); + // seekToEnd is asynchronous. query the position to force the seek to happen now. + assigned.get().forEach(consumer::position); } else { consumer.seekToBeginning(assigned.get()); diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java index 50fe009357..cc1b5e9412 100644 --- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java @@ -16,14 +16,21 @@ package org.springframework.kafka.test; +import java.util.Map; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.Test; +import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.util.StringUtils; import static org.assertj.core.api.Assertions.assertThat; /** * @author Gary Russell + * @author Wouter Coekaerts * @since 3.1 * */ @@ -37,4 +44,21 @@ void testUpDown() { kafka.destroy(); } + @Test + void testConsumeFromEmbeddedWithSeekToEnd() { + EmbeddedKafkaKraftBroker kafka = new EmbeddedKafkaKraftBroker(1, 1, "seekTestTopic"); + kafka.afterPropertiesSet(); + Map producerProps = KafkaTestUtils.producerProps(kafka); + KafkaProducer producer = new KafkaProducer<>(producerProps); + producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "beforeSeekToEnd")); + Map consumerProps = KafkaTestUtils.consumerProps("seekTest", "false", kafka); + KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); + kafka.consumeFromAnEmbeddedTopic(consumer, true /* seekToEnd */, "seekTestTopic"); + producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "afterSeekToEnd")); + producer.close(); + assertThat(KafkaTestUtils.getSingleRecord(consumer, "seekTestTopic").value()) + .isEqualTo("afterSeekToEnd"); + consumer.close(); + } + } diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaZKBrokerTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaZKBrokerTests.java index 4688e38e2d..c01891556f 100644 --- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaZKBrokerTests.java +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaZKBrokerTests.java @@ -16,12 +16,20 @@ package org.springframework.kafka.test; +import java.util.Map; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.Test; +import org.springframework.kafka.test.utils.KafkaTestUtils; + import static org.assertj.core.api.Assertions.assertThat; /** * @author Gary Russell + * @author Wouter Coekaerts * @since 2.3 * */ @@ -42,4 +50,22 @@ void testUpDown() { assertThat(System.getProperty(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS)).isNull(); } + @Test + void testConsumeFromEmbeddedWithSeekToEnd() { + EmbeddedKafkaZKBroker kafka = new EmbeddedKafkaZKBroker(1); + kafka.afterPropertiesSet(); + kafka.addTopics("seekTestTopic"); + Map producerProps = KafkaTestUtils.producerProps(kafka); + KafkaProducer producer = new KafkaProducer<>(producerProps); + producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "beforeSeekToEnd")); + Map consumerProps = KafkaTestUtils.consumerProps("seekTest", "false", kafka); + KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); + kafka.consumeFromAnEmbeddedTopic(consumer, true /* seekToEnd */, "seekTestTopic"); + producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "afterSeekToEnd")); + producer.close(); + assertThat(KafkaTestUtils.getSingleRecord(consumer, "seekTestTopic").value()) + .isEqualTo("afterSeekToEnd"); + consumer.close(); + } + }