Skip to content

Commit 11ba09b

Browse files
committed
spring-projectsGH-2387: Fix FallbackBatchErrorHandlerEvents
Resolves spring-projects#2387 Events were not published for `ConcurrentMessageListenerContainer`s. Also resolves a class tangle in spring-projects#2417 between `ErrorHandlingUtils` and KMLC.
1 parent bb34b9c commit 11ba09b

7 files changed

+113
-12
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

+12
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,18 @@ public List<KafkaMessageListenerContainer<K, V>> getContainers() {
118118
}
119119
}
120120

121+
@Override
122+
public MessageListenerContainer getChildFor(String topic, int partition) {
123+
for (KafkaMessageListenerContainer<K, V> container : getContainers()) {
124+
for (TopicPartition part : container.getAssignedPartitions()) {
125+
if (part.topic().equals(topic) && part.partition() == partition) {
126+
return container;
127+
}
128+
}
129+
}
130+
return this;
131+
}
132+
121133
@Override
122134
public Collection<TopicPartition> getAssignedPartitions() {
123135
synchronized (this.lifecycleMonitor) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.Collection;
20+
21+
import org.apache.kafka.common.TopicPartition;
22+
23+
/**
24+
* Objects that can publish consumer pause/resume events.
25+
*
26+
* @author Gary Russell
27+
* @since 2.8.10
28+
*
29+
*/
30+
public interface ConsumerPauseResumeEventPublisher {
31+
32+
/**
33+
* Publish a consumer paused event.
34+
* @param partitions the paused partitions.
35+
* @param reason the reason.
36+
*/
37+
void publishConsumerPausedEvent(Collection<TopicPartition> partitions, String reason);
38+
39+
/**
40+
* Publish a consumer resumed event.
41+
* @param partitions the resumed partitions.
42+
*/
43+
void publishConsumerResumedEvent(Collection<TopicPartition> partitions);
44+
45+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.function.BiConsumer;
2323

2424
import org.apache.kafka.clients.consumer.Consumer;
25+
import org.apache.kafka.clients.consumer.ConsumerRecord;
2526
import org.apache.kafka.clients.consumer.ConsumerRecords;
2627
import org.apache.kafka.common.TopicPartition;
2728

@@ -71,8 +72,11 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
7172
consumer.pause(assignment);
7273
int attempt = 1;
7374
listen(retryListeners, records, thrownException, attempt++);
74-
if (container instanceof KafkaMessageListenerContainer) {
75-
((KafkaMessageListenerContainer<?, ?>) container).publishConsumerPausedEvent(assignment, "For batch retry");
75+
ConsumerRecord<?, ?> first = records.iterator().next();
76+
MessageListenerContainer childOrSingle = container.getChildFor(first.topic(), first.partition());
77+
if (childOrSingle instanceof ConsumerPauseResumeEventPublisher) {
78+
((ConsumerPauseResumeEventPublisher) childOrSingle)
79+
.publishConsumerPausedEvent(assignment, "For batch retry");
7680
}
7781
try {
7882
while (nextBackOff != BackOffExecution.STOP) {
@@ -115,8 +119,8 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
115119
finally {
116120
Set<TopicPartition> assignment2 = consumer.assignment();
117121
consumer.resume(assignment2);
118-
if (container instanceof KafkaMessageListenerContainer) {
119-
((KafkaMessageListenerContainer<?, ?>) container).publishConsumerResumedEvent(assignment2);
122+
if (childOrSingle instanceof ConsumerPauseResumeEventPublisher) {
123+
((ConsumerPauseResumeEventPublisher) childOrSingle).publishConsumerResumedEvent(assignment2);
120124
}
121125
}
122126
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@
159159
* @author Daniel Gentes
160160
*/
161161
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
162-
extends AbstractMessageListenerContainer<K, V> {
162+
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
163163

164164
private static final String UNUSED = "unused";
165165

@@ -466,15 +466,17 @@ private void publishNonResponsiveConsumerEvent(long timeSinceLastPoll, Consumer<
466466
}
467467
}
468468

469-
void publishConsumerPausedEvent(Collection<TopicPartition> partitions, String reason) {
469+
@Override
470+
public void publishConsumerPausedEvent(Collection<TopicPartition> partitions, String reason) {
470471
ApplicationEventPublisher publisher = getApplicationEventPublisher();
471472
if (publisher != null) {
472473
publisher.publishEvent(new ConsumerPausedEvent(this, this.thisOrParentContainer,
473474
Collections.unmodifiableCollection(partitions), reason));
474475
}
475476
}
476477

477-
void publishConsumerResumedEvent(Collection<TopicPartition> partitions) {
478+
@Override
479+
public void publishConsumerResumedEvent(Collection<TopicPartition> partitions) {
478480
ApplicationEventPublisher publisher = getApplicationEventPublisher();
479481
if (publisher != null) {
480482
publisher.publishEvent(new ConsumerResumedEvent(this, this.thisOrParentContainer,

spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java

+11
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,17 @@ default void stopAbnormally(Runnable callback) {
239239
stop(callback);
240240
}
241241

242+
/**
243+
* If this container has child containers, return the child container that is assigned
244+
* the topic/partition/.
245+
* @param topic the topic.
246+
* @param partition the partition.
247+
* @return the container.
248+
*/
249+
default MessageListenerContainer getChildFor(String topic, int partition) {
250+
return this;
251+
}
252+
242253
@Override
243254
default void destroy() {
244255
stop();

spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerIntegrationTests.java

+30-5
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21+
import java.util.ArrayList;
2122
import java.util.List;
2223
import java.util.Map;
2324
import java.util.concurrent.CountDownLatch;
2425
import java.util.concurrent.TimeUnit;
2526
import java.util.concurrent.atomic.AtomicBoolean;
2627
import java.util.concurrent.atomic.AtomicReference;
28+
import java.util.stream.Collectors;
2729

2830
import org.apache.kafka.clients.consumer.Consumer;
2931
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -32,10 +34,14 @@
3234
import org.junit.jupiter.api.BeforeAll;
3335
import org.junit.jupiter.api.Test;
3436

37+
import org.springframework.context.ApplicationEvent;
38+
import org.springframework.context.ApplicationEventPublisher;
3539
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
3640
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
3741
import org.springframework.kafka.core.KafkaOperations;
3842
import org.springframework.kafka.core.KafkaTemplate;
43+
import org.springframework.kafka.event.ConsumerPausedEvent;
44+
import org.springframework.kafka.event.ConsumerResumedEvent;
3945
import org.springframework.kafka.event.ConsumerStoppedEvent;
4046
import org.springframework.kafka.test.EmbeddedKafkaBroker;
4147
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
@@ -88,8 +94,8 @@ public void testRetriesAndDlt() throws InterruptedException {
8894
throw new ListenerExecutionFailedException("fail for retry batch");
8995
});
9096

91-
KafkaMessageListenerContainer<Integer, String> container =
92-
new KafkaMessageListenerContainer<>(cf, containerProps);
97+
ConcurrentMessageListenerContainer<Integer, String> container =
98+
new ConcurrentMessageListenerContainer<>(cf, containerProps);
9399
container.setBeanName("retryBatch");
94100
final CountDownLatch recoverLatch = new CountDownLatch(1);
95101
final AtomicReference<String> failedGroupId = new AtomicReference<>();
@@ -110,10 +116,21 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
110116
FallbackBatchErrorHandler errorHandler = new FallbackBatchErrorHandler(new FixedBackOff(0L, 3), recoverer);
111117
container.setCommonErrorHandler(errorHandler);
112118
final CountDownLatch stopLatch = new CountDownLatch(1);
113-
container.setApplicationEventPublisher(e -> {
114-
if (e instanceof ConsumerStoppedEvent) {
115-
stopLatch.countDown();
119+
List<ApplicationEvent> events = new ArrayList<>();
120+
container.setApplicationEventPublisher(new ApplicationEventPublisher() {
121+
122+
@Override
123+
public void publishEvent(ApplicationEvent e) {
124+
events.add(e);
125+
if (e instanceof ConsumerStoppedEvent) {
126+
stopLatch.countDown();
127+
}
128+
}
129+
130+
@Override
131+
public void publishEvent(Object event) {
116132
}
133+
117134
});
118135
container.start();
119136

@@ -134,6 +151,14 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
134151
pf.destroy();
135152
consumer.close();
136153
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
154+
assertThat(events.stream()
155+
.filter(ev -> ev instanceof ConsumerPausedEvent)
156+
.collect(Collectors.toList()))
157+
.hasSize(1);
158+
assertThat(events.stream()
159+
.filter(ev -> ev instanceof ConsumerResumedEvent)
160+
.collect(Collectors.toList()))
161+
.hasSize(1);
137162
}
138163

139164
@Test

spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerTests.java

+2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2222
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2323
import static org.mockito.ArgumentMatchers.any;
24+
import static org.mockito.ArgumentMatchers.anyInt;
2425
import static org.mockito.BDDMockito.given;
2526
import static org.mockito.BDDMockito.willAnswer;
2627
import static org.mockito.BDDMockito.willThrow;
@@ -194,6 +195,7 @@ void rePauseOnRebalance() {
194195
return records;
195196
}).given(consumer).poll(any());
196197
KafkaMessageListenerContainer<?, ?> container = mock(KafkaMessageListenerContainer.class);
198+
given(container.getChildFor(any(), anyInt())).willReturn(container);
197199
given(container.isRunning()).willReturn(true);
198200
eh.handleBatch(new RuntimeException(), records, consumer, container, () -> {
199201
this.invoked++;

0 commit comments

Comments
 (0)