Skip to content

Commit f3215c8

Browse files
garyrussellartembilan
authored andcommitted
GH-2318: ReplyingKT - Wait for Assignment
Resolves #2318 Add an option to wait for assignment to the `ReplyingKafkaTemplate`; useful when using `auto.offset.reset=latest` to avoid sending a request with the reply sent before the container is initialized. **cherry-pick to 2.9.x, 2.8.x**
1 parent 85061a4 commit f3215c8

File tree

6 files changed

+66
-9
lines changed

6 files changed

+66
-9
lines changed

Diff for: spring-kafka-docs/src/main/asciidoc/kafka.adoc

+8
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,11 @@ You can use this future to determine the result of the send operation.
555555

556556
If the first method is used, or the `replyTimeout` argument is `null`, the template's `defaultReplyTimeout` property is used (5 seconds by default).
557557

558+
Starting with version 2.8.8, the template has a new method `waitForAssignment`.
559+
This is useful if the reply container is configured with `auto.offset.reset=latest` to avoid sending a request and a reply sent before the container is initialized.
560+
561+
IMPORTANT: When using manual partition assignment (no group management), the duration for the wait must be greater than the container's `pollTimeout` property because the notification will not be sent until after the first poll is completed.
562+
558563
The following Spring Boot application shows an example of how to use the feature:
559564

560565
====
@@ -570,6 +575,9 @@ public class KRequestingApplication {
570575
@Bean
571576
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
572577
return args -> {
578+
if (!template.waitForAssignment(Duration.ofSeconds(10))) {
579+
throw new IllegalStateException("Reply container did not initialize");
580+
}
573581
ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
574582
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
575583
SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -39,7 +39,6 @@ public interface ConsumerSeekAware {
3939
* @param callback the callback.
4040
*/
4141
default void registerSeekCallback(ConsumerSeekCallback callback) {
42-
// do nothing
4342
}
4443

4544
/**
@@ -48,7 +47,6 @@ default void registerSeekCallback(ConsumerSeekCallback callback) {
4847
* @param callback the callback to perform an initial seek after assignment.
4948
*/
5049
default void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
51-
// do nothing
5250
}
5351

5452
/**
@@ -59,7 +57,6 @@ default void onPartitionsAssigned(Map<TopicPartition, Long> assignments, Consume
5957
* @since 2.3
6058
*/
6159
default void onPartitionsRevoked(Collection<TopicPartition> partitions) {
62-
// do nothing
6360
}
6461

6562
/**
@@ -69,7 +66,15 @@ default void onPartitionsRevoked(Collection<TopicPartition> partitions) {
6966
* @param callback the callback to perform a seek.
7067
*/
7168
default void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
72-
// do nothing
69+
}
70+
71+
/**
72+
* When using manual partition assignment, called when the first poll has completed;
73+
* useful when using {@code auto.offset.reset=latest} and you need to wait until the
74+
* initial position has been established.
75+
* @since 2.8.8
76+
*/
77+
default void onFirstPoll() {
7378
}
7479

7580
/**
@@ -78,7 +83,6 @@ default void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeek
7883
* @since 2.4
7984
*/
8085
default void unregisterSeekCallback() {
81-
// do nothing
8286
}
8387

8488
/**

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+12
Original file line numberDiff line numberDiff line change
@@ -761,6 +761,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
761761

762762
private volatile long lastPoll = System.currentTimeMillis();
763763

764+
private boolean firstPoll;
765+
764766
@SuppressWarnings(UNCHECKED)
765767
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
766768
Properties consumerProperties = propertiesFromProperties();
@@ -1334,6 +1336,10 @@ protected void pollAndInvoke() {
13341336
}
13351337
return;
13361338
}
1339+
if (!this.firstPoll && this.definedPartitions != null && this.consumerSeekAwareListener != null) {
1340+
this.firstPoll = true;
1341+
this.consumerSeekAwareListener.onFirstPoll();
1342+
}
13371343
debugRecords(records);
13381344

13391345
invokeIfHaveRecords(records);
@@ -3369,6 +3375,12 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
33693375
else {
33703376
this.userListener.onPartitionsAssigned(partitions);
33713377
}
3378+
if (!ListenerConsumer.this.firstPoll && ListenerConsumer.this.definedPartitions == null
3379+
&& ListenerConsumer.this.consumerSeekAwareListener != null) {
3380+
3381+
ListenerConsumer.this.firstPoll = true;
3382+
ListenerConsumer.this.consumerSeekAwareListener.onFirstPoll();
3383+
}
33723384
}
33733385

33743386
private void repauseIfNeeded(Collection<TopicPartition> partitions) {

Diff for: spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaOperations.java

+14-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -37,6 +37,19 @@
3737
*/
3838
public interface ReplyingKafkaOperations<K, V, R> {
3939

40+
/**
41+
* Wait until partitions are assigned, e.g. when {@code auto.offset.reset=latest}.
42+
* When using manual assignment, the duration must be greater than the container's
43+
* {@code pollTimeout} property.
44+
* @param duration how long to wait.
45+
* @return true if the partitions have been assigned.
46+
* @throws InterruptedException if the thread is interrupted while waiting.
47+
* @since 2.8.8
48+
*/
49+
default boolean waitForAssignment(Duration duration) throws InterruptedException {
50+
throw new UnsupportedOperationException();
51+
}
52+
4053
/**
4154
* Send a request message and receive a reply message with the default timeout.
4255
* @param message the message to send.

Diff for: spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java

+17-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import java.util.UUID;
2626
import java.util.concurrent.ConcurrentHashMap;
2727
import java.util.concurrent.ConcurrentMap;
28+
import java.util.concurrent.CountDownLatch;
29+
import java.util.concurrent.TimeUnit;
2830
import java.util.function.Function;
2931

3032
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -43,6 +45,7 @@
4345
import org.springframework.kafka.core.KafkaTemplate;
4446
import org.springframework.kafka.core.ProducerFactory;
4547
import org.springframework.kafka.listener.BatchMessageListener;
48+
import org.springframework.kafka.listener.ConsumerSeekAware;
4649
import org.springframework.kafka.listener.ContainerProperties;
4750
import org.springframework.kafka.listener.GenericMessageListenerContainer;
4851
import org.springframework.kafka.listener.ListenerUtils;
@@ -71,7 +74,7 @@
7174
*
7275
*/
7376
public class ReplyingKafkaTemplate<K, V, R> extends KafkaTemplate<K, V> implements BatchMessageListener<K, R>,
74-
InitializingBean, SmartLifecycle, DisposableBean, ReplyingKafkaOperations<K, V, R> {
77+
InitializingBean, SmartLifecycle, DisposableBean, ReplyingKafkaOperations<K, V, R>, ConsumerSeekAware {
7578

7679
private static final String WITH_CORRELATION_ID = " with correlationId: ";
7780

@@ -109,6 +112,8 @@ public class ReplyingKafkaTemplate<K, V, R> extends KafkaTemplate<K, V> implemen
109112

110113
private Function<ConsumerRecord<?, ?>, Exception> replyErrorChecker = rec -> null;
111114

115+
private CountDownLatch assignLatch = new CountDownLatch(1);
116+
112117
private volatile boolean running;
113118

114119
private volatile boolean schedulerInitialized;
@@ -295,6 +300,7 @@ public synchronized void start() {
295300
catch (Exception e) {
296301
throw new KafkaException("Failed to initialize", e);
297302
}
303+
this.assignLatch = new CountDownLatch(1);
298304
this.replyContainer.start();
299305
this.running = true;
300306
}
@@ -315,6 +321,16 @@ public void stop(Runnable callback) {
315321
callback.run();
316322
}
317323

324+
@Override
325+
public void onFirstPoll() {
326+
this.assignLatch.countDown();
327+
}
328+
329+
@Override
330+
public boolean waitForAssignment(Duration duration) throws InterruptedException {
331+
return this.assignLatch.await(duration.toMillis(), TimeUnit.MILLISECONDS);
332+
}
333+
318334
@Override
319335
public RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message) {
320336
return sendAndReceive(message, this.defaultReplyTimeout, null);

Diff for: spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -611,13 +611,16 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
611611
new ReplyingKafkaTemplate<>(this.config.pf(), container);
612612
template.setSharedReplyTopic(true);
613613
template.start();
614+
assertThat(template.waitForAssignment(Duration.ofSeconds(10))).isTrue();
614615
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
615616
assertThat(template.getAssignedReplyTopicPartitions()).hasSize(5);
616617
assertThat(template.getAssignedReplyTopicPartitions().iterator().next().topic()).isEqualTo(topic);
617618
return template;
618619
}
619620

620-
public ReplyingKafkaTemplate<Integer, String, String> createTemplate(TopicPartitionOffset topic) {
621+
public ReplyingKafkaTemplate<Integer, String, String> createTemplate(TopicPartitionOffset topic)
622+
throws InterruptedException {
623+
621624
ContainerProperties containerProperties = new ContainerProperties(topic);
622625
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(this.testName, "false", embeddedKafka);
623626
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
@@ -628,6 +631,7 @@ public ReplyingKafkaTemplate<Integer, String, String> createTemplate(TopicPartit
628631
container);
629632
template.setSharedReplyTopic(true);
630633
template.start();
634+
assertThat(template.waitForAssignment(Duration.ofSeconds(10))).isTrue();
631635
assertThat(template.getAssignedReplyTopicPartitions()).hasSize(1);
632636
assertThat(template.getAssignedReplyTopicPartitions().iterator().next().topic()).isEqualTo(topic.getTopic());
633637
return template;

0 commit comments

Comments
 (0)