Skip to content

Commit 8e0302b

Browse files
committed
spring-projectsGH-3660: Fix EmbeddedKafkaBrokers.seekToEnd
Problem: consumeFromEmbeddedTopics calls Consumer.seekToEnd, which "evaluates lazily, seeking to the final offset in all partitions only when poll(Duration) or position(TopicPartition) are called". This means that the consumer may or may not see future messages, depending on timing. This fix calls `position` so that the seek happens before consumeFromEmbeddedTopics returns. That was it is ensured that any message sent to the topic after the call to consumeFromEmbeddedTopics are seen by the consumer. Issue: spring-projects#3660
1 parent eca3750 commit 8e0302b

File tree

4 files changed

+52
-0
lines changed

4 files changed

+52
-0
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java

+2
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,8 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
560560
+ (seekToEnd ? "end; " : "beginning"));
561561
if (seekToEnd) {
562562
consumer.seekToEnd(assigned.get());
563+
// seekToEnd is asynchronous. query the position to force the seek to happen now.
564+
assigned.get().forEach(consumer::position);
563565
}
564566
else {
565567
consumer.seekToBeginning(assigned.get());

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java

+2
Original file line numberDiff line numberDiff line change
@@ -763,6 +763,8 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
763763
+ (seekToEnd ? "end; " : "beginning"));
764764
if (seekToEnd) {
765765
consumer.seekToEnd(assigned.get());
766+
// seekToEnd is asynchronous. query the position to force the seek to happen now.
767+
assigned.get().forEach(consumer::position);
766768
}
767769
else {
768770
consumer.seekToBeginning(assigned.get());

spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java

+23
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,14 @@
1616

1717
package org.springframework.kafka.test;
1818

19+
import java.util.Map;
20+
21+
import org.apache.kafka.clients.consumer.KafkaConsumer;
22+
import org.apache.kafka.clients.producer.KafkaProducer;
23+
import org.apache.kafka.clients.producer.ProducerRecord;
1924
import org.junit.jupiter.api.Test;
2025

26+
import org.springframework.kafka.test.utils.KafkaTestUtils;
2127
import org.springframework.util.StringUtils;
2228

2329
import static org.assertj.core.api.Assertions.assertThat;
@@ -37,4 +43,21 @@ void testUpDown() {
3743
kafka.destroy();
3844
}
3945

46+
@Test
47+
void testConsumeFromEmbeddedWithSeekToEnd() {
48+
EmbeddedKafkaKraftBroker kafka = new EmbeddedKafkaKraftBroker(1, 1, "seekTestTopic");
49+
kafka.afterPropertiesSet();
50+
Map<String, Object> producerProps = KafkaTestUtils.producerProps(kafka);
51+
KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProps);
52+
producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "beforeSeekToEnd"));
53+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("seekTest", "false", kafka);
54+
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
55+
kafka.consumeFromAnEmbeddedTopic(consumer, true /* seekToEnd */, "seekTestTopic");
56+
producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "afterSeekToEnd"));
57+
producer.close();
58+
assertThat(KafkaTestUtils.getSingleRecord(consumer, "seekTestTopic").value())
59+
.isEqualTo("afterSeekToEnd");
60+
consumer.close();
61+
}
62+
4063
}

spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaZKBrokerTests.java

+25
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,15 @@
1616

1717
package org.springframework.kafka.test;
1818

19+
import java.util.Map;
20+
21+
import org.apache.kafka.clients.consumer.KafkaConsumer;
22+
import org.apache.kafka.clients.producer.KafkaProducer;
23+
import org.apache.kafka.clients.producer.ProducerRecord;
1924
import org.junit.jupiter.api.Test;
2025

26+
import org.springframework.kafka.test.utils.KafkaTestUtils;
27+
2128
import static org.assertj.core.api.Assertions.assertThat;
2229

2330
/**
@@ -42,4 +49,22 @@ void testUpDown() {
4249
assertThat(System.getProperty(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS)).isNull();
4350
}
4451

52+
@Test
53+
void testConsumeFromEmbeddedWithSeekToEnd() {
54+
EmbeddedKafkaZKBroker kafka = new EmbeddedKafkaZKBroker(1);
55+
kafka.afterPropertiesSet();
56+
kafka.addTopics("seekTestTopic");
57+
Map<String, Object> producerProps = KafkaTestUtils.producerProps(kafka);
58+
KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProps);
59+
producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "beforeSeekToEnd"));
60+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("seekTest", "false", kafka);
61+
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
62+
kafka.consumeFromAnEmbeddedTopic(consumer, true /* seekToEnd */, "seekTestTopic");
63+
producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "afterSeekToEnd"));
64+
producer.close();
65+
assertThat(KafkaTestUtils.getSingleRecord(consumer, "seekTestTopic").value())
66+
.isEqualTo("afterSeekToEnd");
67+
consumer.close();
68+
}
69+
4570
}

0 commit comments

Comments
 (0)