Skip to content

GH-2318: ReplyingKT - Wait for Assignment #2319

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,11 @@ You can use this future to determine the result of the send operation.

If the first method is used, or the `replyTimeout` argument is `null`, the template's `defaultReplyTimeout` property is used (5 seconds by default).

Starting with version 2.8.8, the template has a new method `waitForAssignment`.
This is useful if the reply container is configured with `auto.offset.reset=latest` to avoid sending a request and a reply sent before the container is initialized.

IMPORTANT: When using manual partition assignment (no group management), the duration for the wait must be greater than the container's `pollTimeout` property because the notification will not be sent until after the first poll is completed.

The following Spring Boot application shows an example of how to use the feature:

====
Expand All @@ -570,6 +575,9 @@ public class KRequestingApplication {
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
return args -> {
if (!template.waitForAssignment(Duration.ofSeconds(10))) {
throw new IllegalStateException("Reply container did not initialize");
}
ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,7 +39,6 @@ public interface ConsumerSeekAware {
* @param callback the callback.
*/
default void registerSeekCallback(ConsumerSeekCallback callback) {
// do nothing
}

/**
Expand All @@ -48,7 +47,6 @@ default void registerSeekCallback(ConsumerSeekCallback callback) {
* @param callback the callback to perform an initial seek after assignment.
*/
default void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
// do nothing
}

/**
Expand All @@ -59,7 +57,6 @@ default void onPartitionsAssigned(Map<TopicPartition, Long> assignments, Consume
* @since 2.3
*/
default void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// do nothing
}

/**
Expand All @@ -69,7 +66,15 @@ default void onPartitionsRevoked(Collection<TopicPartition> partitions) {
* @param callback the callback to perform a seek.
*/
default void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
// do nothing
}

/**
* When using manual partition assignment, called when the first poll has completed;
* useful when using {@code auto.offset.reset=latest} and you need to wait until the
* initial position has been established.
* @since 2.8.8
*/
default void onFirstPoll() {
}

/**
Expand All @@ -78,7 +83,6 @@ default void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeek
* @since 2.4
*/
default void unregisterSeekCallback() {
// do nothing
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private volatile long lastPoll = System.currentTimeMillis();

private boolean firstPoll;

@SuppressWarnings(UNCHECKED)
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
Properties consumerProperties = propertiesFromProperties();
Expand Down Expand Up @@ -1334,6 +1336,10 @@ protected void pollAndInvoke() {
}
return;
}
if (!this.firstPoll && this.definedPartitions != null && this.consumerSeekAwareListener != null) {
this.firstPoll = true;
this.consumerSeekAwareListener.onFirstPoll();
}
debugRecords(records);

invokeIfHaveRecords(records);
Expand Down Expand Up @@ -3369,6 +3375,12 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
else {
this.userListener.onPartitionsAssigned(partitions);
}
if (!ListenerConsumer.this.firstPoll && ListenerConsumer.this.definedPartitions == null
&& ListenerConsumer.this.consumerSeekAwareListener != null) {

ListenerConsumer.this.firstPoll = true;
ListenerConsumer.this.consumerSeekAwareListener.onFirstPoll();
}
}

private void repauseIfNeeded(Collection<TopicPartition> partitions) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2021 the original author or authors.
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,6 +37,19 @@
*/
public interface ReplyingKafkaOperations<K, V, R> {

/**
* Wait until partitions are assigned, e.g. when {@code auto.offset.reset=latest}.
* When using manual assignment, the duration must be greater than the container's
* {@code pollTimeout} property.
* @param duration how long to wait.
* @return true if the partitions have been assigned.
* @throws InterruptedException if the thread is interrupted while waiting.
* @since 2.8.8
*/
default boolean waitForAssignment(Duration duration) throws InterruptedException {
throw new UnsupportedOperationException();
}

/**
* Send a request message and receive a reply message with the default timeout.
* @param message the message to send.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -43,6 +45,7 @@
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.BatchMessageListener;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.GenericMessageListenerContainer;
import org.springframework.kafka.listener.ListenerUtils;
Expand Down Expand Up @@ -71,7 +74,7 @@
*
*/
public class ReplyingKafkaTemplate<K, V, R> extends KafkaTemplate<K, V> implements BatchMessageListener<K, R>,
InitializingBean, SmartLifecycle, DisposableBean, ReplyingKafkaOperations<K, V, R> {
InitializingBean, SmartLifecycle, DisposableBean, ReplyingKafkaOperations<K, V, R>, ConsumerSeekAware {

private static final String WITH_CORRELATION_ID = " with correlationId: ";

Expand Down Expand Up @@ -109,6 +112,8 @@ public class ReplyingKafkaTemplate<K, V, R> extends KafkaTemplate<K, V> implemen

private Function<ConsumerRecord<?, ?>, Exception> replyErrorChecker = rec -> null;

private CountDownLatch assignLatch = new CountDownLatch(1);

private volatile boolean running;

private volatile boolean schedulerInitialized;
Expand Down Expand Up @@ -295,6 +300,7 @@ public synchronized void start() {
catch (Exception e) {
throw new KafkaException("Failed to initialize", e);
}
this.assignLatch = new CountDownLatch(1);
this.replyContainer.start();
this.running = true;
}
Expand All @@ -315,6 +321,16 @@ public void stop(Runnable callback) {
callback.run();
}

@Override
public void onFirstPoll() {
this.assignLatch.countDown();
}

@Override
public boolean waitForAssignment(Duration duration) throws InterruptedException {
return this.assignLatch.await(duration.toMillis(), TimeUnit.MILLISECONDS);
}

@Override
public RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message) {
return sendAndReceive(message, this.defaultReplyTimeout, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,13 +611,16 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
new ReplyingKafkaTemplate<>(this.config.pf(), container);
template.setSharedReplyTopic(true);
template.start();
assertThat(template.waitForAssignment(Duration.ofSeconds(10))).isTrue();
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(template.getAssignedReplyTopicPartitions()).hasSize(5);
assertThat(template.getAssignedReplyTopicPartitions().iterator().next().topic()).isEqualTo(topic);
return template;
}

public ReplyingKafkaTemplate<Integer, String, String> createTemplate(TopicPartitionOffset topic) {
public ReplyingKafkaTemplate<Integer, String, String> createTemplate(TopicPartitionOffset topic)
throws InterruptedException {

ContainerProperties containerProperties = new ContainerProperties(topic);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(this.testName, "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Expand All @@ -628,6 +631,7 @@ public ReplyingKafkaTemplate<Integer, String, String> createTemplate(TopicPartit
container);
template.setSharedReplyTopic(true);
template.start();
assertThat(template.waitForAssignment(Duration.ofSeconds(10))).isTrue();
assertThat(template.getAssignedReplyTopicPartitions()).hasSize(1);
assertThat(template.getAssignedReplyTopicPartitions().iterator().next().topic()).isEqualTo(topic.getTopic());
return template;
Expand Down