Skip to content

Commit ea6fa8f

Browse files
garyrussellartembilan
authored andcommitted
Add Missing Pause/Resume Events
Resolves #2387 - `FallbackBatchErrorHandler` - `Acknowledgment.nack()` **2.9.x only - I will back port/forward port**
1 parent 781e726 commit ea6fa8f

File tree

9 files changed

+103
-20
lines changed

9 files changed

+103
-20
lines changed

spring-kafka/src/main/java/org/springframework/kafka/event/ConsumerPausedEvent.java

+21-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2020 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -33,6 +33,8 @@ public class ConsumerPausedEvent extends KafkaEvent {
3333

3434
private final Collection<TopicPartition> partitions;
3535

36+
private final String reason;
37+
3638
/**
3739
* Construct an instance with the provided source and partitions.
3840
* @param source the container instance that generated the event.
@@ -43,6 +45,23 @@ public class ConsumerPausedEvent extends KafkaEvent {
4345
public ConsumerPausedEvent(Object source, Object container, Collection<TopicPartition> partitions) {
4446
super(source, container);
4547
this.partitions = partitions;
48+
this.reason = null;
49+
}
50+
51+
/**
52+
* Construct an instance with the provided source and partitions.
53+
* @param source the container instance that generated the event.
54+
* @param container the container or the parent container if the container is a child.
55+
* @param partitions the partitions.
56+
* @param reason the reason for the pause.
57+
* @since 2.8.9
58+
*/
59+
public ConsumerPausedEvent(Object source, Object container, Collection<TopicPartition> partitions,
60+
String reason) {
61+
62+
super(source, container);
63+
this.partitions = partitions;
64+
this.reason = reason;
4665
}
4766

4867
/**
@@ -55,7 +74,7 @@ public Collection<TopicPartition> getPartitions() {
5574

5675
@Override
5776
public String toString() {
58-
return "ConsumerPausedEvent [partitions=" + this.partitions + "]";
77+
return "ConsumerPausedEvent [reason=" + this.reason + ", partitions=" + this.partitions + "]";
5978
}
6079

6180
}

spring-kafka/src/main/java/org/springframework/kafka/listener/CommonErrorHandler.java

+16
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,24 @@ default void setAckAfterHandle(boolean ack) {
219219
* @param consumer the consumer.
220220
* @param partitions the newly assigned partitions.
221221
* @since 2.8.8
222+
* @deprecated in favor of {@link #onPartitionsAssigned(Consumer, Collection, Runnable)}.
222223
*/
224+
@Deprecated
223225
default void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
224226
}
225227

228+
/**
229+
* Called when partitions are assigned.
230+
* @param consumer the consumer.
231+
* @param partitions the newly assigned partitions.
232+
* @param publishPause called to publish a consumer paused event.
233+
* @since 2.8.9
234+
*/
235+
@SuppressWarnings("deprecation")
236+
default void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
237+
Runnable publishPause) {
238+
239+
onPartitionsAssigned(consumer, partitions);
240+
}
241+
226242
}

spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -202,8 +202,10 @@ public void handleOtherException(Exception thrownException, Consumer<?, ?> consu
202202
}
203203

204204
@Override
205-
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
206-
getFallbackBatchHandler().onPartitionsAssigned(consumer, partitions);
205+
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
206+
Runnable publishPause) {
207+
208+
getFallbackBatchHandler().onPartitionsAssigned(consumer, partitions, publishPause);
207209
}
208210

209211
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlerAdapter.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,12 @@ public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, C
162162
}
163163

164164
@Override
165-
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
165+
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
166+
Runnable publishPause) {
167+
166168
if (this.batchErrorHandler instanceof FallbackBatchErrorHandler) {
167-
((FallbackBatchErrorHandler) this.batchErrorHandler).onPartitionsAssigned(consumer, partitions);
169+
((FallbackBatchErrorHandler) this.batchErrorHandler).onPartitionsAssigned(consumer, partitions,
170+
publishPause);
168171
}
169172
}
170173

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717
package org.springframework.kafka.listener;
1818

1919
import java.time.Duration;
20+
import java.util.Set;
2021
import java.util.function.BiConsumer;
2122

2223
import org.apache.kafka.clients.consumer.Consumer;
2324
import org.apache.kafka.clients.consumer.ConsumerRecords;
25+
import org.apache.kafka.common.TopicPartition;
2426

2527
import org.springframework.core.log.LogAccessor;
2628
import org.springframework.kafka.KafkaException;
@@ -62,7 +64,11 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
6264
BackOffExecution execution = backOff.start();
6365
long nextBackOff = execution.nextBackOff();
6466
String failed = null;
65-
consumer.pause(consumer.assignment());
67+
Set<TopicPartition> assignment = consumer.assignment();
68+
consumer.pause(assignment);
69+
if (container instanceof KafkaMessageListenerContainer) {
70+
((KafkaMessageListenerContainer<?, ?>) container).publishConsumerPausedEvent(assignment, "For batch retry");
71+
}
6672
try {
6773
while (nextBackOff != BackOffExecution.STOP) {
6874
consumer.poll(Duration.ZERO);
@@ -99,7 +105,11 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
99105
}
100106
}
101107
finally {
102-
consumer.resume(consumer.assignment());
108+
Set<TopicPartition> assignment2 = consumer.assignment();
109+
consumer.resume(assignment2);
110+
if (container instanceof KafkaMessageListenerContainer) {
111+
((KafkaMessageListenerContainer<?, ?>) container).publishConsumerResumedEvent(assignment2);
112+
}
103113
}
104114
}
105115

spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,12 @@ public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> re
112112
}
113113
}
114114

115-
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
115+
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
116+
Runnable publishPause) {
117+
116118
if (this.retrying.get()) {
117119
consumer.pause(consumer.assignment());
120+
publishPause.run();
118121
}
119122
}
120123

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+12-7
Original file line numberDiff line numberDiff line change
@@ -453,15 +453,15 @@ private void publishNonResponsiveConsumerEvent(long timeSinceLastPoll, Consumer<
453453
}
454454
}
455455

456-
private void publishConsumerPausedEvent(Collection<TopicPartition> partitions) {
456+
void publishConsumerPausedEvent(Collection<TopicPartition> partitions, String reason) {
457457
ApplicationEventPublisher publisher = getApplicationEventPublisher();
458458
if (publisher != null) {
459459
publisher.publishEvent(new ConsumerPausedEvent(this, this.thisOrParentContainer,
460-
Collections.unmodifiableCollection(partitions)));
460+
Collections.unmodifiableCollection(partitions), reason));
461461
}
462462
}
463463

464-
private void publishConsumerResumedEvent(Collection<TopicPartition> partitions) {
464+
void publishConsumerResumedEvent(Collection<TopicPartition> partitions) {
465465
ApplicationEventPublisher publisher = getApplicationEventPublisher();
466466
if (publisher != null) {
467467
publisher.publishEvent(new ConsumerResumedEvent(this, this.thisOrParentContainer,
@@ -1691,7 +1691,9 @@ private void doPauseConsumerIfNecessary() {
16911691
this.consumerPaused = true;
16921692
this.pauseForPending = false;
16931693
this.logger.debug(() -> "Paused consumption from: " + this.consumer.paused());
1694-
publishConsumerPausedEvent(assigned);
1694+
publishConsumerPausedEvent(assigned, this.pausedForAsyncAcks
1695+
? "Incomplete out of order acks"
1696+
: "User requested");
16951697
}
16961698
}
16971699
}
@@ -1702,6 +1704,7 @@ private void resumeConsumerIfNeccessary() {
17021704
this.nackWakeTimeMillis = 0;
17031705
this.consumer.resume(this.pausedForNack);
17041706
this.logger.debug(() -> "Resumed after nack sleep: " + this.pausedForNack);
1707+
publishConsumerResumedEvent(this.pausedForNack);
17051708
this.pausedForNack.clear();
17061709
}
17071710
}
@@ -2649,6 +2652,7 @@ private void pauseForNackSleep() {
26492652
this.logger.debug(() -> "Pausing for nack sleep: " + ListenerConsumer.this.pausedForNack);
26502653
try {
26512654
this.consumer.pause(this.pausedForNack);
2655+
publishConsumerPausedEvent(this.pausedForNack, "Nack with sleep time received");
26522656
}
26532657
catch (IllegalStateException ex) {
26542658
// this should never happen; defensive, just in case...
@@ -3507,7 +3511,8 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
35073511
}
35083512
if (ListenerConsumer.this.commonErrorHandler != null) {
35093513
ListenerConsumer.this.commonErrorHandler.onPartitionsAssigned(ListenerConsumer.this.consumer,
3510-
partitions);
3514+
partitions, () -> publishConsumerPausedEvent(partitions,
3515+
"Paused by error handler after rebalance"));
35113516
}
35123517
}
35133518

@@ -3518,7 +3523,7 @@ private void repauseIfNeeded(Collection<TopicPartition> partitions) {
35183523
ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
35193524
+ "consumer paused again, so the initial poll() will never return any records");
35203525
ListenerConsumer.this.logger.debug(() -> "Paused consumption from: " + partitions);
3521-
publishConsumerPausedEvent(partitions);
3526+
publishConsumerPausedEvent(partitions, "Re-paused after rebalance");
35223527
}
35233528
Collection<TopicPartition> toRepause = new LinkedList<>();
35243529
partitions.forEach(tp -> {
@@ -3529,7 +3534,7 @@ private void repauseIfNeeded(Collection<TopicPartition> partitions) {
35293534
if (!ListenerConsumer.this.consumerPaused && toRepause.size() > 0) {
35303535
ListenerConsumer.this.consumer.pause(toRepause);
35313536
ListenerConsumer.this.logger.debug(() -> "Paused consumption from: " + toRepause);
3532-
publishConsumerPausedEvent(toRepause);
3537+
publishConsumerPausedEvent(toRepause, "Re-paused after rebalance");
35333538
}
35343539
this.revoked.removeAll(toRepause);
35353540
ListenerConsumer.this.pausedPartitions.removeAll(this.revoked);

spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerTests.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -189,25 +189,31 @@ void rePauseOnRebalance() {
189189
Collections.singletonList(new ConsumerRecord<>("foo", 1, 0L, "foo", "bar")));
190190
ConsumerRecords<?, ?> records = new ConsumerRecords<>(map);
191191
Consumer<?, ?> consumer = mock(Consumer.class);
192+
given(consumer.assignment()).willReturn(map.keySet());
193+
AtomicBoolean pubPauseCalled = new AtomicBoolean();
192194
willAnswer(inv -> {
193-
eh.onPartitionsAssigned(consumer, List.of(new TopicPartition("foo", 0), new TopicPartition("foo", 1)));
195+
eh.onPartitionsAssigned(consumer, List.of(new TopicPartition("foo", 0), new TopicPartition("foo", 1)),
196+
() -> pubPauseCalled.set(true));
194197
return records;
195198
}).given(consumer).poll(any());
196-
MessageListenerContainer container = mock(MessageListenerContainer.class);
199+
KafkaMessageListenerContainer<?, ?> container = mock(KafkaMessageListenerContainer.class);
197200
given(container.isRunning()).willReturn(true);
198201
eh.handle(new RuntimeException(), records, consumer, container, () -> {
199202
this.invoked++;
200203
throw new RuntimeException();
201204
});
202205
assertThat(this.invoked).isEqualTo(1);
203206
assertThat(recovered).hasSize(2);
204-
InOrder inOrder = inOrder(consumer);
207+
InOrder inOrder = inOrder(consumer, container);
205208
inOrder.verify(consumer).pause(any());
209+
inOrder.verify(container).publishConsumerPausedEvent(map.keySet(), "For batch retry");
206210
inOrder.verify(consumer).poll(any());
207211
inOrder.verify(consumer).pause(any());
208212
inOrder.verify(consumer).resume(any());
213+
inOrder.verify(container).publishConsumerResumedEvent(map.keySet());
209214
verify(consumer, times(3)).assignment();
210215
verifyNoMoreInteractions(consumer);
216+
assertThat(pubPauseCalled.get()).isTrue();
211217
}
212218

213219
@Test

spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackPauseResumeTests.java

+20-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2021 the original author or authors.
2+
* Copyright 2017-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -51,11 +51,14 @@
5151
import org.springframework.beans.factory.annotation.Autowired;
5252
import org.springframework.context.annotation.Bean;
5353
import org.springframework.context.annotation.Configuration;
54+
import org.springframework.context.event.EventListener;
5455
import org.springframework.kafka.annotation.EnableKafka;
5556
import org.springframework.kafka.annotation.KafkaListener;
5657
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
5758
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
5859
import org.springframework.kafka.core.ConsumerFactory;
60+
import org.springframework.kafka.event.ConsumerPausedEvent;
61+
import org.springframework.kafka.event.ConsumerResumedEvent;
5962
import org.springframework.kafka.listener.ContainerProperties.AckMode;
6063
import org.springframework.kafka.support.Acknowledgment;
6164
import org.springframework.kafka.test.utils.KafkaTestUtils;
@@ -93,6 +96,8 @@ public void dontResumeAlreadyPaused() throws Exception {
9396
assertThat(this.config.resumedForNack).hasSize(1);
9497
assertThat(this.config.pausedForNack).contains(new TopicPartition("foo", 1));
9598
assertThat(this.config.resumedForNack).contains(new TopicPartition("foo", 1));
99+
assertThat(this.config.pauseEvents).hasSize(1);
100+
assertThat(this.config.resumeEvents).hasSize(1);
96101
}
97102

98103
@Configuration
@@ -113,6 +118,10 @@ public static class Config {
113118

114119
final Set<TopicPartition> resumedForNack = new HashSet<>();
115120

121+
final List<ConsumerPausedEvent> pauseEvents = new ArrayList<>();
122+
123+
final List<ConsumerResumedEvent> resumeEvents = new ArrayList<>();
124+
116125
volatile int count;
117126

118127
volatile long replayTime;
@@ -232,6 +241,16 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
232241
return factory;
233242
}
234243

244+
@EventListener
245+
public void paused(ConsumerPausedEvent event) {
246+
this.pauseEvents.add(event);
247+
}
248+
249+
@EventListener
250+
public void resumed(ConsumerResumedEvent event) {
251+
this.resumeEvents.add(event);
252+
}
253+
235254
}
236255

237256
}

0 commit comments

Comments
 (0)