Skip to content

Commit fe71001

Browse files
authored
GH-3660: Fix EmbeddedKafkaBroker seekToEnd (#3661)
Fixes: #3660 Problem: consumeFromEmbeddedTopics calls Consumer.seekToEnd, which "evaluates lazily, seeking to the final offset in all partitions only when poll(Duration) or position(TopicPartition) are called". This means that the consumer may or may not see future messages, depending on timing. This fix calls `position` so that the seek happens before consumeFromEmbeddedTopics returns. That was it is ensured that any message sent to the topic after the call to consumeFromEmbeddedTopics are seen by the consumer. Issue: #3660 **Auto-cherry-pick to `3.2.x`**
1 parent c639cac commit fe71001

File tree

4 files changed

+56
-0
lines changed

4 files changed

+56
-0
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java

+3
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
* @author Adrian Chlebosz
7272
* @author Soby Chacko
7373
* @author Sanghyeok An
74+
* @author Wouter Coekaerts
7475
*
7576
* @since 3.1
7677
*/
@@ -560,6 +561,8 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
560561
+ (seekToEnd ? "end; " : "beginning"));
561562
if (seekToEnd) {
562563
consumer.seekToEnd(assigned.get());
564+
// seekToEnd is asynchronous. query the position to force the seek to happen now.
565+
assigned.get().forEach(consumer::position);
563566
}
564567
else {
565568
consumer.seekToBeginning(assigned.get());

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java

+3
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
* @author Soby Chacko
9090
* @author Sanghyeok An
9191
* @author Borahm Lee
92+
* @author Wouter Coekaerts
9293
*
9394
* @since 2.2
9495
*/
@@ -763,6 +764,8 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
763764
+ (seekToEnd ? "end; " : "beginning"));
764765
if (seekToEnd) {
765766
consumer.seekToEnd(assigned.get());
767+
// seekToEnd is asynchronous. query the position to force the seek to happen now.
768+
assigned.get().forEach(consumer::position);
766769
}
767770
else {
768771
consumer.seekToBeginning(assigned.get());

spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java

+24
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,21 @@
1616

1717
package org.springframework.kafka.test;
1818

19+
import java.util.Map;
20+
21+
import org.apache.kafka.clients.consumer.KafkaConsumer;
22+
import org.apache.kafka.clients.producer.KafkaProducer;
23+
import org.apache.kafka.clients.producer.ProducerRecord;
1924
import org.junit.jupiter.api.Test;
2025

26+
import org.springframework.kafka.test.utils.KafkaTestUtils;
2127
import org.springframework.util.StringUtils;
2228

2329
import static org.assertj.core.api.Assertions.assertThat;
2430

2531
/**
2632
* @author Gary Russell
33+
* @author Wouter Coekaerts
2734
* @since 3.1
2835
*
2936
*/
@@ -37,4 +44,21 @@ void testUpDown() {
3744
kafka.destroy();
3845
}
3946

47+
@Test
48+
void testConsumeFromEmbeddedWithSeekToEnd() {
49+
EmbeddedKafkaKraftBroker kafka = new EmbeddedKafkaKraftBroker(1, 1, "seekTestTopic");
50+
kafka.afterPropertiesSet();
51+
Map<String, Object> producerProps = KafkaTestUtils.producerProps(kafka);
52+
KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProps);
53+
producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "beforeSeekToEnd"));
54+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("seekTest", "false", kafka);
55+
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
56+
kafka.consumeFromAnEmbeddedTopic(consumer, true /* seekToEnd */, "seekTestTopic");
57+
producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "afterSeekToEnd"));
58+
producer.close();
59+
assertThat(KafkaTestUtils.getSingleRecord(consumer, "seekTestTopic").value())
60+
.isEqualTo("afterSeekToEnd");
61+
consumer.close();
62+
}
63+
4064
}

spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaZKBrokerTests.java

+26
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,20 @@
1616

1717
package org.springframework.kafka.test;
1818

19+
import java.util.Map;
20+
21+
import org.apache.kafka.clients.consumer.KafkaConsumer;
22+
import org.apache.kafka.clients.producer.KafkaProducer;
23+
import org.apache.kafka.clients.producer.ProducerRecord;
1924
import org.junit.jupiter.api.Test;
2025

26+
import org.springframework.kafka.test.utils.KafkaTestUtils;
27+
2128
import static org.assertj.core.api.Assertions.assertThat;
2229

2330
/**
2431
* @author Gary Russell
32+
* @author Wouter Coekaerts
2533
* @since 2.3
2634
*
2735
*/
@@ -42,4 +50,22 @@ void testUpDown() {
4250
assertThat(System.getProperty(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS)).isNull();
4351
}
4452

53+
@Test
54+
void testConsumeFromEmbeddedWithSeekToEnd() {
55+
EmbeddedKafkaZKBroker kafka = new EmbeddedKafkaZKBroker(1);
56+
kafka.afterPropertiesSet();
57+
kafka.addTopics("seekTestTopic");
58+
Map<String, Object> producerProps = KafkaTestUtils.producerProps(kafka);
59+
KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProps);
60+
producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "beforeSeekToEnd"));
61+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("seekTest", "false", kafka);
62+
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
63+
kafka.consumeFromAnEmbeddedTopic(consumer, true /* seekToEnd */, "seekTestTopic");
64+
producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "afterSeekToEnd"));
65+
producer.close();
66+
assertThat(KafkaTestUtils.getSingleRecord(consumer, "seekTestTopic").value())
67+
.isEqualTo("afterSeekToEnd");
68+
consumer.close();
69+
}
70+
4571
}

0 commit comments

Comments
 (0)