Skip to content

Commit f6cbf11

Browse files
garyrussellartembilan
authored andcommitted
GH-2387: Fix FallbackBatchErrorHandler Events
Resolves #2387 Events were not published for `ConcurrentMessageListenerContainer`s. Also resolves a class tangle in #2417 between `ErrorHandlingUtils` and KMLC. **cherry-pick to 2.9.x, 2.8.x** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java # spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerIntegrationTests.java
1 parent 2e1feb3 commit f6cbf11

7 files changed

+118
-13
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

+17
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,23 @@ public List<KafkaMessageListenerContainer<K, V>> getContainers() {
118118
}
119119
}
120120

121+
@Override
122+
public MessageListenerContainer getContainerFor(String topic, int partition) {
123+
synchronized (this.lifecycleMonitor) {
124+
for (KafkaMessageListenerContainer<K, V> container : this.containers) {
125+
Collection<TopicPartition> assignedPartitions = container.getAssignedPartitions();
126+
if (assignedPartitions != null) {
127+
for (TopicPartition part : assignedPartitions) {
128+
if (part.topic().equals(topic) && part.partition() == partition) {
129+
return container;
130+
}
131+
}
132+
}
133+
}
134+
return this;
135+
}
136+
}
137+
121138
@Override
122139
public Collection<TopicPartition> getAssignedPartitions() {
123140
synchronized (this.lifecycleMonitor) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.Collection;
20+
21+
import org.apache.kafka.common.TopicPartition;
22+
23+
/**
24+
* Objects that can publish consumer pause/resume events.
25+
*
26+
* @author Gary Russell
27+
* @since 2.8.10
28+
*
29+
*/
30+
public interface ConsumerPauseResumeEventPublisher {
31+
32+
/**
33+
* Publish a consumer paused event.
34+
* @param partitions the paused partitions.
35+
* @param reason the reason.
36+
*/
37+
void publishConsumerPausedEvent(Collection<TopicPartition> partitions, String reason);
38+
39+
/**
40+
* Publish a consumer resumed event.
41+
* @param partitions the resumed partitions.
42+
*/
43+
void publishConsumerResumedEvent(Collection<TopicPartition> partitions);
44+
45+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.function.BiConsumer;
2323

2424
import org.apache.kafka.clients.consumer.Consumer;
25+
import org.apache.kafka.clients.consumer.ConsumerRecord;
2526
import org.apache.kafka.clients.consumer.ConsumerRecords;
2627
import org.apache.kafka.common.TopicPartition;
2728

@@ -90,8 +91,11 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
9091
List<RetryListener> listeners = retryListeners.get();
9192
int attempt = 1;
9293
listen(listeners, records, thrownException, attempt++);
93-
if (container instanceof KafkaMessageListenerContainer) {
94-
((KafkaMessageListenerContainer<?, ?>) container).publishConsumerPausedEvent(assignment, "For batch retry");
94+
ConsumerRecord<?, ?> first = records.iterator().next();
95+
MessageListenerContainer childOrSingle = container.getContainerFor(first.topic(), first.partition());
96+
if (childOrSingle instanceof ConsumerPauseResumeEventPublisher) {
97+
((ConsumerPauseResumeEventPublisher) childOrSingle)
98+
.publishConsumerPausedEvent(assignment, "For batch retry");
9599
}
96100
try {
97101
while (nextBackOff != BackOffExecution.STOP) {
@@ -138,8 +142,8 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
138142
finally {
139143
Set<TopicPartition> assignment2 = consumer.assignment();
140144
consumer.resume(assignment2);
141-
if (container instanceof KafkaMessageListenerContainer) {
142-
((KafkaMessageListenerContainer<?, ?>) container).publishConsumerResumedEvent(assignment2);
145+
if (childOrSingle instanceof ConsumerPauseResumeEventPublisher) {
146+
((ConsumerPauseResumeEventPublisher) childOrSingle).publishConsumerResumedEvent(assignment2);
143147
}
144148
}
145149
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@
150150
* @author Francois Rosiere
151151
*/
152152
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
153-
extends AbstractMessageListenerContainer<K, V> {
153+
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
154154

155155
private static final String UNUSED = "unused";
156156

@@ -442,15 +442,17 @@ private void publishNonResponsiveConsumerEvent(long timeSinceLastPoll, Consumer<
442442
}
443443
}
444444

445-
void publishConsumerPausedEvent(Collection<TopicPartition> partitions, String reason) {
445+
@Override
446+
public void publishConsumerPausedEvent(Collection<TopicPartition> partitions, String reason) {
446447
ApplicationEventPublisher publisher = getApplicationEventPublisher();
447448
if (publisher != null) {
448449
publisher.publishEvent(new ConsumerPausedEvent(this, this.thisOrParentContainer,
449450
Collections.unmodifiableCollection(partitions), reason));
450451
}
451452
}
452453

453-
void publishConsumerResumedEvent(Collection<TopicPartition> partitions) {
454+
@Override
455+
public void publishConsumerResumedEvent(Collection<TopicPartition> partitions) {
454456
ApplicationEventPublisher publisher = getApplicationEventPublisher();
455457
if (publisher != null) {
456458
publisher.publishEvent(new ConsumerResumedEvent(this, this.thisOrParentContainer,

spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java

+11
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,17 @@ default void stopAbnormally(Runnable callback) {
239239
stop(callback);
240240
}
241241

242+
/**
243+
* If this container has child containers, return the child container that is assigned
244+
* the topic/partition. Return this when there are no child containers.
245+
* @param topic the topic.
246+
* @param partition the partition.
247+
* @return the container.
248+
*/
249+
default MessageListenerContainer getContainerFor(String topic, int partition) {
250+
return this;
251+
}
252+
242253
@Override
243254
default void destroy() {
244255
stop();

spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerIntegrationTests.java

+30-6
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222
import static org.mockito.BDDMockito.given;
2323
import static org.mockito.Mockito.mock;
2424

25+
import java.util.ArrayList;
2526
import java.util.List;
2627
import java.util.Map;
2728
import java.util.concurrent.CountDownLatch;
2829
import java.util.concurrent.TimeUnit;
2930
import java.util.concurrent.atomic.AtomicBoolean;
3031
import java.util.concurrent.atomic.AtomicReference;
32+
import java.util.stream.Collectors;
3133

3234
import org.apache.kafka.clients.consumer.Consumer;
3335
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -37,11 +39,14 @@
3739
import org.junit.jupiter.api.BeforeAll;
3840
import org.junit.jupiter.api.Test;
3941

40-
import org.springframework.kafka.core.ConsumerFactory;
42+
import org.springframework.context.ApplicationEvent;
43+
import org.springframework.context.ApplicationEventPublisher;
4144
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
4245
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
4346
import org.springframework.kafka.core.KafkaOperations;
4447
import org.springframework.kafka.core.KafkaTemplate;
48+
import org.springframework.kafka.event.ConsumerPausedEvent;
49+
import org.springframework.kafka.event.ConsumerResumedEvent;
4550
import org.springframework.kafka.event.ConsumerStoppedEvent;
4651
import org.springframework.kafka.support.TopicPartitionOffset;
4752
import org.springframework.kafka.test.EmbeddedKafkaBroker;
@@ -96,8 +101,8 @@ public void testRetriesAndDlt() throws InterruptedException {
96101
throw new ListenerExecutionFailedException("fail for retry batch");
97102
});
98103

99-
KafkaMessageListenerContainer<Integer, String> container =
100-
new KafkaMessageListenerContainer<>(cf, containerProps);
104+
ConcurrentMessageListenerContainer<Integer, String> container =
105+
new ConcurrentMessageListenerContainer<>(cf, containerProps);
101106
container.setBeanName("retryBatch");
102107
final CountDownLatch recoverLatch = new CountDownLatch(1);
103108
final AtomicReference<String> failedGroupId = new AtomicReference<>();
@@ -118,10 +123,21 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
118123
FallbackBatchErrorHandler errorHandler = new FallbackBatchErrorHandler(new FixedBackOff(0L, 3), recoverer);
119124
container.setBatchErrorHandler(errorHandler);
120125
final CountDownLatch stopLatch = new CountDownLatch(1);
121-
container.setApplicationEventPublisher(e -> {
122-
if (e instanceof ConsumerStoppedEvent) {
123-
stopLatch.countDown();
126+
List<ApplicationEvent> events = new ArrayList<>();
127+
container.setApplicationEventPublisher(new ApplicationEventPublisher() {
128+
129+
@Override
130+
public void publishEvent(ApplicationEvent e) {
131+
events.add(e);
132+
if (e instanceof ConsumerStoppedEvent) {
133+
stopLatch.countDown();
134+
}
135+
}
136+
137+
@Override
138+
public void publishEvent(Object event) {
124139
}
140+
125141
});
126142
container.start();
127143

@@ -142,6 +158,14 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
142158
pf.destroy();
143159
consumer.close();
144160
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
161+
assertThat(events.stream()
162+
.filter(ev -> ev instanceof ConsumerPausedEvent)
163+
.collect(Collectors.toList()))
164+
.hasSize(1);
165+
assertThat(events.stream()
166+
.filter(ev -> ev instanceof ConsumerResumedEvent)
167+
.collect(Collectors.toList()))
168+
.hasSize(1);
145169
}
146170

147171
@SuppressWarnings("deprecation")

spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerTests.java

+2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2222
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2323
import static org.mockito.ArgumentMatchers.any;
24+
import static org.mockito.ArgumentMatchers.anyInt;
2425
import static org.mockito.BDDMockito.given;
2526
import static org.mockito.BDDMockito.willAnswer;
2627
import static org.mockito.BDDMockito.willThrow;
@@ -197,6 +198,7 @@ void rePauseOnRebalance() {
197198
return records;
198199
}).given(consumer).poll(any());
199200
KafkaMessageListenerContainer<?, ?> container = mock(KafkaMessageListenerContainer.class);
201+
given(container.getContainerFor(any(), anyInt())).willReturn(container);
200202
given(container.isRunning()).willReturn(true);
201203
eh.handle(new RuntimeException(), records, consumer, container, () -> {
202204
this.invoked++;

0 commit comments

Comments
 (0)