Skip to content

Commit e4d9641

Browse files
garyrussellartembilan
authored andcommitted
GH-2179: Exit RetryingBatchErrorHandler on Stop
Resolves #2179
1 parent bf3dc8c commit e4d9641

File tree

2 files changed

+37
-0
lines changed

2 files changed

+37
-0
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java

+3
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
7575
seeker.handleBatch(thrownException, records, consumer, container, () -> { });
7676
throw new KafkaException("Interrupted during retry", logLevel, e1);
7777
}
78+
if (!container.isRunning()) {
79+
throw new KafkaException("Container stopped during retries");
80+
}
7881
try {
7982
invokeListener.run();
8083
return;

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerTests.java

+34
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2121
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.BDDMockito.given;
23+
import static org.mockito.BDDMockito.willAnswer;
2224
import static org.mockito.Mockito.mock;
2325
import static org.mockito.Mockito.times;
2426
import static org.mockito.Mockito.verify;
@@ -29,13 +31,15 @@
2931
import java.util.HashMap;
3032
import java.util.List;
3133
import java.util.Map;
34+
import java.util.concurrent.atomic.AtomicBoolean;
3235

3336
import org.apache.kafka.clients.consumer.Consumer;
3437
import org.apache.kafka.clients.consumer.ConsumerRecord;
3538
import org.apache.kafka.clients.consumer.ConsumerRecords;
3639
import org.apache.kafka.common.TopicPartition;
3740
import org.junit.jupiter.api.Test;
3841

42+
import org.springframework.kafka.KafkaException;
3943
import org.springframework.util.backoff.FixedBackOff;
4044

4145
/**
@@ -62,6 +66,7 @@ void recover() {
6266
ConsumerRecords<?, ?> records = new ConsumerRecords<>(map);
6367
Consumer<?, ?> consumer = mock(Consumer.class);
6468
MessageListenerContainer container = mock(MessageListenerContainer.class);
69+
given(container.isRunning()).willReturn(true);
6570
eh.handle(new RuntimeException(), records, consumer, container, () -> {
6671
this.invoked++;
6772
throw new RuntimeException();
@@ -90,6 +95,7 @@ void successOnRetry() {
9095
ConsumerRecords<?, ?> records = new ConsumerRecords<>(map);
9196
Consumer<?, ?> consumer = mock(Consumer.class);
9297
MessageListenerContainer container = mock(MessageListenerContainer.class);
98+
given(container.isRunning()).willReturn(true);
9399
eh.handle(new RuntimeException(), records, consumer, container, () -> this.invoked++);
94100
assertThat(this.invoked).isEqualTo(1);
95101
assertThat(recovered).hasSize(0);
@@ -116,6 +122,7 @@ void recoveryFails() {
116122
ConsumerRecords<?, ?> records = new ConsumerRecords<>(map);
117123
Consumer<?, ?> consumer = mock(Consumer.class);
118124
MessageListenerContainer container = mock(MessageListenerContainer.class);
125+
given(container.isRunning()).willReturn(true);
119126
assertThatExceptionOfType(RuntimeException.class).isThrownBy(() ->
120127
eh.handle(new RuntimeException(), records, consumer, container, () -> {
121128
this.invoked++;
@@ -131,4 +138,31 @@ void recoveryFails() {
131138
verify(consumer).seek(new TopicPartition("foo", 1), 0L);
132139
}
133140

141+
@Test
142+
void exitOnContainerStop() {
143+
this.invoked = 0;
144+
List<ConsumerRecord<?, ?>> recovered = new ArrayList<>();
145+
FallbackBatchErrorHandler eh = new FallbackBatchErrorHandler(new FixedBackOff(0, 99999), (cr, ex) -> {
146+
recovered.add(cr);
147+
});
148+
Map<TopicPartition, List<ConsumerRecord<Object, Object>>> map = new HashMap<>();
149+
map.put(new TopicPartition("foo", 0),
150+
Collections.singletonList(new ConsumerRecord<>("foo", 0, 0L, "foo", "bar")));
151+
map.put(new TopicPartition("foo", 1),
152+
Collections.singletonList(new ConsumerRecord<>("foo", 1, 0L, "foo", "bar")));
153+
ConsumerRecords<?, ?> records = new ConsumerRecords<>(map);
154+
Consumer<?, ?> consumer = mock(Consumer.class);
155+
MessageListenerContainer container = mock(MessageListenerContainer.class);
156+
AtomicBoolean stopped = new AtomicBoolean(true);
157+
willAnswer(inv -> stopped.get()).given(container).isRunning();
158+
assertThatExceptionOfType(KafkaException.class).isThrownBy(() ->
159+
eh.handle(new RuntimeException(), records, consumer, container, () -> {
160+
this.invoked++;
161+
stopped.set(false);
162+
throw new RuntimeException();
163+
})
164+
).withMessage("Container stopped during retries");
165+
assertThat(this.invoked).isEqualTo(1);
166+
}
167+
134168
}

0 commit comments

Comments
 (0)