Skip to content

Commit 71b571c

Browse files
coekiesobychacko
authored andcommitted
GH-3660: Fix EmbeddedKafkaBroker seekToEnd (#3661)
Fixes: #3660 Problem: consumeFromEmbeddedTopics calls Consumer.seekToEnd, which "evaluates lazily, seeking to the final offset in all partitions only when poll(Duration) or position(TopicPartition) are called". This means that the consumer may or may not see future messages, depending on timing. This fix calls `position` so that the seek happens before consumeFromEmbeddedTopics returns. That was it is ensured that any message sent to the topic after the call to consumeFromEmbeddedTopics are seen by the consumer. Issue: #3660
1 parent ab17e3d commit 71b571c

File tree

4 files changed

+61
-0
lines changed

4 files changed

+61
-0
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java

+5
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@
6969
* @author Nakul Mishra
7070
* @author Pawel Lozinski
7171
* @author Adrian Chlebosz
72+
* @author Soby Chacko
73+
* @author Sanghyeok An
74+
* @author Wouter Coekaerts
7275
*
7376
* @since 3.1
7477
*/
@@ -553,6 +556,8 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
553556
+ (seekToEnd ? "end; " : "beginning"));
554557
if (seekToEnd) {
555558
consumer.seekToEnd(assigned.get());
559+
// seekToEnd is asynchronous. query the position to force the seek to happen now.
560+
assigned.get().forEach(consumer::position);
556561
}
557562
else {
558563
consumer.seekToBeginning(assigned.get());

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java

+6
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@
8686
* @author Nakul Mishra
8787
* @author Pawel Lozinski
8888
* @author Adrian Chlebosz
89+
* @author Soby Chacko
90+
* @author Sanghyeok An
91+
* @author Borahm Lee
92+
* @author Wouter Coekaerts
8993
*
9094
* @since 2.2
9195
*/
@@ -755,6 +759,8 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
755759
+ (seekToEnd ? "end; " : "beginning"));
756760
if (seekToEnd) {
757761
consumer.seekToEnd(assigned.get());
762+
// seekToEnd is asynchronous. query the position to force the seek to happen now.
763+
assigned.get().forEach(consumer::position);
758764
}
759765
else {
760766
consumer.seekToBeginning(assigned.get());

spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java

+24
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,21 @@
1616

1717
package org.springframework.kafka.test;
1818

19+
import java.util.Map;
20+
21+
import org.apache.kafka.clients.consumer.KafkaConsumer;
22+
import org.apache.kafka.clients.producer.KafkaProducer;
23+
import org.apache.kafka.clients.producer.ProducerRecord;
1924
import org.junit.jupiter.api.Test;
2025

26+
import org.springframework.kafka.test.utils.KafkaTestUtils;
2127
import org.springframework.util.StringUtils;
2228

2329
import static org.assertj.core.api.Assertions.assertThat;
2430

2531
/**
2632
* @author Gary Russell
33+
* @author Wouter Coekaerts
2734
* @since 3.1
2835
*
2936
*/
@@ -37,4 +44,21 @@ void testUpDown() {
3744
kafka.destroy();
3845
}
3946

47+
@Test
48+
void testConsumeFromEmbeddedWithSeekToEnd() {
49+
EmbeddedKafkaKraftBroker kafka = new EmbeddedKafkaKraftBroker(1, 1, "seekTestTopic");
50+
kafka.afterPropertiesSet();
51+
Map<String, Object> producerProps = KafkaTestUtils.producerProps(kafka);
52+
KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProps);
53+
producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "beforeSeekToEnd"));
54+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("seekTest", "false", kafka);
55+
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
56+
kafka.consumeFromAnEmbeddedTopic(consumer, true /* seekToEnd */, "seekTestTopic");
57+
producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "afterSeekToEnd"));
58+
producer.close();
59+
assertThat(KafkaTestUtils.getSingleRecord(consumer, "seekTestTopic").value())
60+
.isEqualTo("afterSeekToEnd");
61+
consumer.close();
62+
}
63+
4064
}

spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaZKBrokerTests.java

+26
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,20 @@
1616

1717
package org.springframework.kafka.test;
1818

19+
import java.util.Map;
20+
21+
import org.apache.kafka.clients.consumer.KafkaConsumer;
22+
import org.apache.kafka.clients.producer.KafkaProducer;
23+
import org.apache.kafka.clients.producer.ProducerRecord;
1924
import org.junit.jupiter.api.Test;
2025

26+
import org.springframework.kafka.test.utils.KafkaTestUtils;
27+
2128
import static org.assertj.core.api.Assertions.assertThat;
2229

2330
/**
2431
* @author Gary Russell
32+
* @author Wouter Coekaerts
2533
* @since 2.3
2634
*
2735
*/
@@ -42,4 +50,22 @@ void testUpDown() {
4250
assertThat(System.getProperty(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS)).isNull();
4351
}
4452

53+
@Test
54+
void testConsumeFromEmbeddedWithSeekToEnd() {
55+
EmbeddedKafkaZKBroker kafka = new EmbeddedKafkaZKBroker(1);
56+
kafka.afterPropertiesSet();
57+
kafka.addTopics("seekTestTopic");
58+
Map<String, Object> producerProps = KafkaTestUtils.producerProps(kafka);
59+
KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProps);
60+
producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "beforeSeekToEnd"));
61+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("seekTest", "false", kafka);
62+
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
63+
kafka.consumeFromAnEmbeddedTopic(consumer, true /* seekToEnd */, "seekTestTopic");
64+
producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "afterSeekToEnd"));
65+
producer.close();
66+
assertThat(KafkaTestUtils.getSingleRecord(consumer, "seekTestTopic").value())
67+
.isEqualTo("afterSeekToEnd");
68+
consumer.close();
69+
}
70+
4571
}

0 commit comments

Comments
 (0)