Skip to content

Add Missing Pause/Resume Events #2388

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2020 the original author or authors.
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,6 +33,8 @@ public class ConsumerPausedEvent extends KafkaEvent {

private final Collection<TopicPartition> partitions;

private final String reason;

/**
* Construct an instance with the provided source and partitions.
* @param source the container instance that generated the event.
Expand All @@ -43,6 +45,23 @@ public class ConsumerPausedEvent extends KafkaEvent {
public ConsumerPausedEvent(Object source, Object container, Collection<TopicPartition> partitions) {
super(source, container);
this.partitions = partitions;
this.reason = null;
}

/**
* Construct an instance with the provided source and partitions.
* @param source the container instance that generated the event.
* @param container the container or the parent container if the container is a child.
* @param partitions the partitions.
* @param reason the reason for the pause.
* @since 2.8.9
*/
public ConsumerPausedEvent(Object source, Object container, Collection<TopicPartition> partitions,
String reason) {

super(source, container);
this.partitions = partitions;
this.reason = reason;
}

/**
Expand All @@ -55,7 +74,7 @@ public Collection<TopicPartition> getPartitions() {

@Override
public String toString() {
return "ConsumerPausedEvent [partitions=" + this.partitions + "]";
return "ConsumerPausedEvent [reason=" + this.reason + ", partitions=" + this.partitions + "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,24 @@ default void setAckAfterHandle(boolean ack) {
* @param consumer the consumer.
* @param partitions the newly assigned partitions.
* @since 2.8.8
* @deprecated in favor of {@link #onPartitionsAssigned(Consumer, Collection, Runnable)}.
*/
@Deprecated
default void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
}

/**
* Called when partitions are assigned.
* @param consumer the consumer.
* @param partitions the newly assigned partitions.
* @param publishPause called to publish a consumer paused event.
* @since 2.8.9
*/
@SuppressWarnings("deprecation")
default void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
Runnable publishPause) {

onPartitionsAssigned(consumer, partitions);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,10 @@ public void handleOtherException(Exception thrownException, Consumer<?, ?> consu
}

@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
getFallbackBatchHandler().onPartitionsAssigned(consumer, partitions);
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
Runnable publishPause) {

getFallbackBatchHandler().onPartitionsAssigned(consumer, partitions, publishPause);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,12 @@ public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, C
}

@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
Runnable publishPause) {

if (this.batchErrorHandler instanceof FallbackBatchErrorHandler) {
((FallbackBatchErrorHandler) this.batchErrorHandler).onPartitionsAssigned(consumer, partitions);
((FallbackBatchErrorHandler) this.batchErrorHandler).onPartitionsAssigned(consumer, partitions,
publishPause);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
package org.springframework.kafka.listener;

import java.time.Duration;
import java.util.Set;
import java.util.function.BiConsumer;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;

import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
Expand Down Expand Up @@ -62,7 +64,11 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
BackOffExecution execution = backOff.start();
long nextBackOff = execution.nextBackOff();
String failed = null;
consumer.pause(consumer.assignment());
Set<TopicPartition> assignment = consumer.assignment();
consumer.pause(assignment);
if (container instanceof KafkaMessageListenerContainer) {
((KafkaMessageListenerContainer<?, ?>) container).publishConsumerPausedEvent(assignment, "For batch retry");
}
try {
while (nextBackOff != BackOffExecution.STOP) {
consumer.poll(Duration.ZERO);
Expand Down Expand Up @@ -99,7 +105,11 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
}
}
finally {
consumer.resume(consumer.assignment());
Set<TopicPartition> assignment2 = consumer.assignment();
consumer.resume(assignment2);
if (container instanceof KafkaMessageListenerContainer) {
((KafkaMessageListenerContainer<?, ?>) container).publishConsumerResumedEvent(assignment2);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,12 @@ public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> re
}
}

public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
Runnable publishPause) {

if (this.retrying.get()) {
consumer.pause(consumer.assignment());
publishPause.run();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,15 +453,15 @@ private void publishNonResponsiveConsumerEvent(long timeSinceLastPoll, Consumer<
}
}

private void publishConsumerPausedEvent(Collection<TopicPartition> partitions) {
void publishConsumerPausedEvent(Collection<TopicPartition> partitions, String reason) {
ApplicationEventPublisher publisher = getApplicationEventPublisher();
if (publisher != null) {
publisher.publishEvent(new ConsumerPausedEvent(this, this.thisOrParentContainer,
Collections.unmodifiableCollection(partitions)));
Collections.unmodifiableCollection(partitions), reason));
}
}

private void publishConsumerResumedEvent(Collection<TopicPartition> partitions) {
void publishConsumerResumedEvent(Collection<TopicPartition> partitions) {
ApplicationEventPublisher publisher = getApplicationEventPublisher();
if (publisher != null) {
publisher.publishEvent(new ConsumerResumedEvent(this, this.thisOrParentContainer,
Expand Down Expand Up @@ -1691,7 +1691,9 @@ private void doPauseConsumerIfNecessary() {
this.consumerPaused = true;
this.pauseForPending = false;
this.logger.debug(() -> "Paused consumption from: " + this.consumer.paused());
publishConsumerPausedEvent(assigned);
publishConsumerPausedEvent(assigned, this.pausedForAsyncAcks
? "Incomplete out of order acks"
: "User requested");
}
}
}
Expand All @@ -1702,6 +1704,7 @@ private void resumeConsumerIfNeccessary() {
this.nackWakeTimeMillis = 0;
this.consumer.resume(this.pausedForNack);
this.logger.debug(() -> "Resumed after nack sleep: " + this.pausedForNack);
publishConsumerResumedEvent(this.pausedForNack);
this.pausedForNack.clear();
}
}
Expand Down Expand Up @@ -2649,6 +2652,7 @@ private void pauseForNackSleep() {
this.logger.debug(() -> "Pausing for nack sleep: " + ListenerConsumer.this.pausedForNack);
try {
this.consumer.pause(this.pausedForNack);
publishConsumerPausedEvent(this.pausedForNack, "Nack with sleep time received");
}
catch (IllegalStateException ex) {
// this should never happen; defensive, just in case...
Expand Down Expand Up @@ -3507,7 +3511,8 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
if (ListenerConsumer.this.commonErrorHandler != null) {
ListenerConsumer.this.commonErrorHandler.onPartitionsAssigned(ListenerConsumer.this.consumer,
partitions);
partitions, () -> publishConsumerPausedEvent(partitions,
"Paused by error handler after rebalance"));
}
}

Expand All @@ -3518,7 +3523,7 @@ private void repauseIfNeeded(Collection<TopicPartition> partitions) {
ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
+ "consumer paused again, so the initial poll() will never return any records");
ListenerConsumer.this.logger.debug(() -> "Paused consumption from: " + partitions);
publishConsumerPausedEvent(partitions);
publishConsumerPausedEvent(partitions, "Re-paused after rebalance");
}
Collection<TopicPartition> toRepause = new LinkedList<>();
partitions.forEach(tp -> {
Expand All @@ -3529,7 +3534,7 @@ private void repauseIfNeeded(Collection<TopicPartition> partitions) {
if (!ListenerConsumer.this.consumerPaused && toRepause.size() > 0) {
ListenerConsumer.this.consumer.pause(toRepause);
ListenerConsumer.this.logger.debug(() -> "Paused consumption from: " + toRepause);
publishConsumerPausedEvent(toRepause);
publishConsumerPausedEvent(toRepause, "Re-paused after rebalance");
}
this.revoked.removeAll(toRepause);
ListenerConsumer.this.pausedPartitions.removeAll(this.revoked);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,25 +189,31 @@ void rePauseOnRebalance() {
Collections.singletonList(new ConsumerRecord<>("foo", 1, 0L, "foo", "bar")));
ConsumerRecords<?, ?> records = new ConsumerRecords<>(map);
Consumer<?, ?> consumer = mock(Consumer.class);
given(consumer.assignment()).willReturn(map.keySet());
AtomicBoolean pubPauseCalled = new AtomicBoolean();
willAnswer(inv -> {
eh.onPartitionsAssigned(consumer, List.of(new TopicPartition("foo", 0), new TopicPartition("foo", 1)));
eh.onPartitionsAssigned(consumer, List.of(new TopicPartition("foo", 0), new TopicPartition("foo", 1)),
() -> pubPauseCalled.set(true));
return records;
}).given(consumer).poll(any());
MessageListenerContainer container = mock(MessageListenerContainer.class);
KafkaMessageListenerContainer<?, ?> container = mock(KafkaMessageListenerContainer.class);
given(container.isRunning()).willReturn(true);
eh.handle(new RuntimeException(), records, consumer, container, () -> {
this.invoked++;
throw new RuntimeException();
});
assertThat(this.invoked).isEqualTo(1);
assertThat(recovered).hasSize(2);
InOrder inOrder = inOrder(consumer);
InOrder inOrder = inOrder(consumer, container);
inOrder.verify(consumer).pause(any());
inOrder.verify(container).publishConsumerPausedEvent(map.keySet(), "For batch retry");
inOrder.verify(consumer).poll(any());
inOrder.verify(consumer).pause(any());
inOrder.verify(consumer).resume(any());
inOrder.verify(container).publishConsumerResumedEvent(map.keySet());
verify(consumer, times(3)).assignment();
verifyNoMoreInteractions(consumer);
assertThat(pubPauseCalled.get()).isTrue();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2021 the original author or authors.
* Copyright 2017-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -51,11 +51,14 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.event.ConsumerPausedEvent;
import org.springframework.kafka.event.ConsumerResumedEvent;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.test.utils.KafkaTestUtils;
Expand Down Expand Up @@ -93,6 +96,8 @@ public void dontResumeAlreadyPaused() throws Exception {
assertThat(this.config.resumedForNack).hasSize(1);
assertThat(this.config.pausedForNack).contains(new TopicPartition("foo", 1));
assertThat(this.config.resumedForNack).contains(new TopicPartition("foo", 1));
assertThat(this.config.pauseEvents).hasSize(1);
assertThat(this.config.resumeEvents).hasSize(1);
}

@Configuration
Expand All @@ -113,6 +118,10 @@ public static class Config {

final Set<TopicPartition> resumedForNack = new HashSet<>();

final List<ConsumerPausedEvent> pauseEvents = new ArrayList<>();

final List<ConsumerResumedEvent> resumeEvents = new ArrayList<>();

volatile int count;

volatile long replayTime;
Expand Down Expand Up @@ -232,6 +241,16 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
return factory;
}

@EventListener
public void paused(ConsumerPausedEvent event) {
this.pauseEvents.add(event);
}

@EventListener
public void resumed(ConsumerResumedEvent event) {
this.resumeEvents.add(event);
}

}

}