Skip to content

Commit 97ed09e

Browse files
garyrussellartembilan
authored andcommitted
GH-2427: Allow RuntimeException to be Classified
Resolves #2457 Previously, classifying `RuntimeException` either for no retries or for blocking retries would cause undesirable effects - its classification would be found first because the classifier first traverses up the class hierarchy to find a match before traversing down the cause links. When classifying exception for retry, unwrap the 'LEFE' cause from a `TimestampedException` and/or `ListenerExecutionFailedException` so that the classification is made on that cause. By default, the retry classifier has no classified exceptions when used in "classify for retry" mode (instead of the default "classify for no retry" mode). This means that, if `retryOn(RuntimeException.class)` is used, then all `RuntimeException`s will be retried (including those that are usually considered fatal). With mixed blocking/non-blocking retries, change the behavior to include the standard fatal exceptions in case the user configures all `RuntimeException`s to use blocking retries. Also tested with a Boot app to see that a conversion exception bypasses all retries and goes straight to the DLT.
1 parent 4fedd75 commit 97ed09e

File tree

7 files changed

+135
-14
lines changed

7 files changed

+135
-14
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java

+22-3
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public abstract class ExceptionClassifier extends KafkaExceptionLogLevelAware {
4545
* Construct the instance.
4646
*/
4747
public ExceptionClassifier() {
48-
this.classifier = configureDefaultClassifier();
48+
this.classifier = configureDefaultClassifier(true);
4949
}
5050

5151
/**
@@ -64,9 +64,27 @@ public static List<Class<? extends Throwable>> defaultFatalExceptionsList() {
6464
ClassCastException.class);
6565
}
6666

67-
private static ExtendedBinaryExceptionClassifier configureDefaultClassifier() {
67+
private static ExtendedBinaryExceptionClassifier configureDefaultClassifier(boolean defaultClassification) {
6868
return new ExtendedBinaryExceptionClassifier(defaultFatalExceptionsList().stream()
69-
.collect(Collectors.toMap(ex -> ex, ex -> false)), true);
69+
.collect(Collectors.toMap(ex -> ex, ex -> false)), defaultClassification);
70+
}
71+
72+
/**
73+
* By default, unmatched types classify as true. Call this method to make the default
74+
* false, and optionally retain types implicitly classified as false. This should be
75+
* called before calling any of the classification modification methods. This can be
76+
* useful if you want to classify a super class of one or more of the standard fatal
77+
* exceptions as retryable.
78+
* @param retainStandardFatal true to retain.
79+
* @since 3.0
80+
*/
81+
public void defaultFalse(boolean retainStandardFatal) {
82+
if (retainStandardFatal) {
83+
this.classifier = configureDefaultClassifier(false);
84+
}
85+
else {
86+
defaultFalse();
87+
}
7088
}
7189

7290
/**
@@ -94,6 +112,7 @@ protected BinaryExceptionClassifier getClassifier() {
94112
* <ul>
95113
* <li>{@link DeserializationException}</li>
96114
* <li>{@link MessageConversionException}</li>
115+
* <li>{@link ConversionException}</li>
97116
* <li>{@link MethodArgumentResolutionException}</li>
98117
* <li>{@link NoSuchMethodException}</li>
99118
* <li>{@link ClassCastException}</li>

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,14 @@ public abstract class FailedRecordProcessor extends ExceptionClassifier implemen
4545

4646
private final BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> noRetriesForClassified =
4747
(rec, ex) -> {
48-
if (!getClassifier().classify(ex)) {
48+
Exception theEx = ex;
49+
if (theEx instanceof TimestampedException && theEx.getCause() instanceof Exception cause) {
50+
theEx = cause;
51+
}
52+
if (theEx instanceof ListenerExecutionFailedException && theEx.getCause() instanceof Exception cause) {
53+
theEx = cause;
54+
}
55+
if (!getClassifier().classify(theEx)) {
4956
return NO_RETRIES_OR_DELAY_BACKOFF;
5057
}
5158
return this.userBackOffFunction.apply(rec, ex);

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ public class ListenerContainerFactoryConfigurer {
6262

6363
private Class<? extends Exception>[] blockingExceptionTypes = null;
6464

65+
private boolean retainStandardFatal;
66+
6567
private Consumer<ConcurrentMessageListenerContainer<?, ?>> containerCustomizer = container -> {
6668
};
6769

@@ -141,6 +143,16 @@ public final void setBlockingRetryableExceptions(Class<? extends Exception>... e
141143
this.blockingExceptionTypes = Arrays.copyOf(exceptionTypes, exceptionTypes.length);
142144
}
143145

146+
/**
147+
* Set to true to retain standard fatal exceptions as not retryable when configuring
148+
* blocking retries.
149+
* @param retainStandardFatal true to retain standard fatal exceptions.
150+
* @since 3.0
151+
*/
152+
public void setRetainStandardFatal(boolean retainStandardFatal) {
153+
this.retainStandardFatal = retainStandardFatal;
154+
}
155+
144156
public void setContainerCustomizer(Consumer<ConcurrentMessageListenerContainer<?, ?>> containerCustomizer) {
145157
Assert.notNull(containerCustomizer, "'containerCustomizer' cannot be null");
146158
this.containerCustomizer = containerCustomizer;
@@ -153,7 +165,7 @@ public void setErrorHandlerCustomizer(Consumer<DefaultErrorHandler> errorHandler
153165
protected CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer,
154166
Configuration configuration) {
155167
DefaultErrorHandler errorHandler = createDefaultErrorHandlerInstance(deadLetterPublishingRecoverer);
156-
errorHandler.defaultFalse();
168+
errorHandler.defaultFalse(this.retainStandardFatal);
157169
errorHandler.setCommitRecovered(true);
158170
errorHandler.setLogLevel(KafkaException.Level.DEBUG);
159171
if (this.blockingExceptionTypes != null) {

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupport.java

+35-9
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ private void processDeadLetterPublishingContainerFactory(
141141
CustomizersConfigurer customizersConfigurer = new CustomizersConfigurer();
142142
configureCustomizers(customizersConfigurer);
143143
JavaUtils.INSTANCE
144-
.acceptIfNotNull(customizersConfigurer.deadLetterPublishingRecovererCustomizer,
144+
.acceptIfNotNull(customizersConfigurer.getDeadLetterPublishingRecovererCustomizer(),
145145
deadLetterPublishingRecovererFactory::setDeadLetterPublishingRecovererCustomizer);
146146
Consumer<DeadLetterPublishingRecovererFactory> dlprfConsumer = configureDeadLetterPublishingContainerFactory();
147147
Assert.notNull(dlprfConsumer, "configureDeadLetterPublishingContainerFactory must not return null");
@@ -159,24 +159,28 @@ protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPubl
159159

160160
/**
161161
* Internal method for processing the {@link ListenerContainerFactoryConfigurer}.
162-
* Consider overriding {@link #configureListenerContainerFactoryConfigurer()}
163-
* if further customization is required.
164-
* @param listenerContainerFactoryConfigurer the {@link ListenerContainerFactoryConfigurer} instance.
162+
* Consider overriding {@link #configureListenerContainerFactoryConfigurer()} if
163+
* further customization is required.
164+
* @param listenerContainerFactoryConfigurer the
165+
* {@link ListenerContainerFactoryConfigurer} instance.
165166
*/
166-
private void processListenerContainerFactoryConfigurer(ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer) {
167+
private void processListenerContainerFactoryConfigurer(
168+
ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer) {
169+
167170
CustomizersConfigurer customizersConfigurer = new CustomizersConfigurer();
168171
configureCustomizers(customizersConfigurer);
169172
BlockingRetriesConfigurer blockingRetriesConfigurer = new BlockingRetriesConfigurer();
170173
configureBlockingRetries(blockingRetriesConfigurer);
171174
JavaUtils.INSTANCE
172-
.acceptIfNotNull(blockingRetriesConfigurer.backOff,
175+
.acceptIfNotNull(blockingRetriesConfigurer.getBackOff(),
173176
listenerContainerFactoryConfigurer::setBlockingRetriesBackOff)
174-
.acceptIfNotNull(blockingRetriesConfigurer.retryableExceptions,
177+
.acceptIfNotNull(blockingRetriesConfigurer.getRetryableExceptions(),
175178
listenerContainerFactoryConfigurer::setBlockingRetryableExceptions)
176-
.acceptIfNotNull(customizersConfigurer.errorHandlerCustomizer,
179+
.acceptIfNotNull(customizersConfigurer.getErrorHandlerCustomizer(),
177180
listenerContainerFactoryConfigurer::setErrorHandlerCustomizer)
178-
.acceptIfNotNull(customizersConfigurer.listenerContainerCustomizer,
181+
.acceptIfNotNull(customizersConfigurer.getListenerContainerCustomizer(),
179182
listenerContainerFactoryConfigurer::setContainerCustomizer);
183+
listenerContainerFactoryConfigurer.setRetainStandardFatal(true);
180184
Consumer<ListenerContainerFactoryConfigurer> lcfcConfigurer = configureListenerContainerFactoryConfigurer();
181185
Assert.notNull(lcfcConfigurer, "configureListenerContainerFactoryConfigurer must not return null.");
182186
lcfcConfigurer.accept(listenerContainerFactoryConfigurer);
@@ -341,6 +345,15 @@ public BlockingRetriesConfigurer backOff(BackOff backoff) {
341345
this.backOff = backoff;
342346
return this;
343347
}
348+
349+
BackOff getBackOff() {
350+
return this.backOff;
351+
}
352+
353+
Class<? extends Exception>[] getRetryableExceptions() {
354+
return this.retryableExceptions;
355+
}
356+
344357
}
345358

346359
/**
@@ -387,6 +400,19 @@ public CustomizersConfigurer customizeDeadLetterPublishingRecoverer(Consumer<Dea
387400
this.deadLetterPublishingRecovererCustomizer = dlprCustomizer;
388401
return this;
389402
}
403+
404+
Consumer<DefaultErrorHandler> getErrorHandlerCustomizer() {
405+
return this.errorHandlerCustomizer;
406+
}
407+
408+
Consumer<ConcurrentMessageListenerContainer<?, ?>> getListenerContainerCustomizer() {
409+
return this.listenerContainerCustomizer;
410+
}
411+
412+
Consumer<DeadLetterPublishingRecoverer> getDeadLetterPublishingRecovererCustomizer() {
413+
return this.deadLetterPublishingRecovererCustomizer;
414+
}
415+
390416
}
391417

392418
}

spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java

+19
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer.HeaderNames;
7171
import org.springframework.kafka.support.KafkaHeaders;
7272
import org.springframework.kafka.support.SendResult;
73+
import org.springframework.kafka.support.converter.ConversionException;
7374
import org.springframework.kafka.support.serializer.DeserializationException;
7475
import org.springframework.kafka.support.serializer.SerializationUtils;
7576
import org.springframework.kafka.test.utils.KafkaTestUtils;
@@ -898,4 +899,22 @@ void nonCompliantProducerFactory() throws Exception {
898899
assertThat(timeoutCaptor.getValue()).isEqualTo(Duration.ofSeconds(125).toMillis());
899900
}
900901

902+
@SuppressWarnings({ "unchecked", "rawtypes" })
903+
@Test
904+
void blockingRetryRuntimeException() {
905+
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
906+
CompletableFuture future = mock(CompletableFuture.class);
907+
given(template.send(any(ProducerRecord.class))).willReturn(future);
908+
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", null);
909+
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
910+
recoverer.defaultFalse(true);
911+
recoverer.addRetryableExceptions(RuntimeException.class);
912+
recoverer.accept(record, new ListenerExecutionFailedException("test", "group",
913+
new TimestampedException(
914+
new ListenerExecutionFailedException("test", new ConversionException("test", null)))));
915+
ArgumentCaptor<ProducerRecord> producerRecordCaptor = ArgumentCaptor.forClass(ProducerRecord.class);
916+
verify(template).send(producerRecordCaptor.capture());
917+
ProducerRecord outRecord = producerRecordCaptor.getValue();
918+
}
919+
901920
}

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java

+37
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868

6969
/**
7070
* @author Tomaz Fernandes
71+
* @author Gary Russell
7172
* @since 2.7
7273
*/
7374
@ExtendWith(MockitoExtension.class)
@@ -295,6 +296,42 @@ void shouldUseGivenBackOffAndExceptions() {
295296

296297
}
297298

299+
@Test
300+
void shouldUseGivenBackOffAndExceptionsKeepStandard() {
301+
302+
// given
303+
given(container.getContainerProperties()).willReturn(containerProperties);
304+
given(deadLetterPublishingRecovererFactory.create()).willReturn(recoverer);
305+
given(containerProperties.getMessageListener()).willReturn(listener);
306+
given(configuration.forContainerFactoryConfigurer()).willReturn(lcfcConfiguration);
307+
willReturn(container).given(containerFactory).createListenerContainer(endpoint);
308+
BackOff backOffMock = mock(BackOff.class);
309+
BackOffExecution backOffExecutionMock = mock(BackOffExecution.class);
310+
given(backOffMock.start()).willReturn(backOffExecutionMock);
311+
312+
ListenerContainerFactoryConfigurer configurer =
313+
new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager,
314+
deadLetterPublishingRecovererFactory, clock);
315+
configurer.setBlockingRetriesBackOff(backOffMock);
316+
configurer.setBlockingRetryableExceptions(IllegalArgumentException.class, IllegalStateException.class);
317+
configurer.setRetainStandardFatal(true);
318+
319+
// when
320+
KafkaListenerContainerFactory<?> decoratedFactory =
321+
configurer.decorateFactory(this.containerFactory, configuration.forContainerFactoryConfigurer());
322+
decoratedFactory.createListenerContainer(endpoint);
323+
324+
// then
325+
then(backOffMock).should().start();
326+
then(container).should().setCommonErrorHandler(errorHandlerCaptor.capture());
327+
CommonErrorHandler errorHandler = errorHandlerCaptor.getValue();
328+
assertThat(DefaultErrorHandler.class.isAssignableFrom(errorHandler.getClass())).isTrue();
329+
DefaultErrorHandler defaultErrorHandler = (DefaultErrorHandler) errorHandler;
330+
assertThat(defaultErrorHandler.removeClassification(IllegalArgumentException.class)).isTrue();
331+
assertThat(defaultErrorHandler.removeClassification(IllegalStateException.class)).isTrue();
332+
assertThat(defaultErrorHandler.removeClassification(ConversionException.class)).isFalse();
333+
334+
}
298335

299336
@Test
300337
void shouldThrowIfBackOffOrRetryablesAlreadySet() {

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupportTests.java

+1
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetrie
143143
then(lcfc).should().setErrorHandlerCustomizer(errorHandlerCustomizer);
144144
assertThatThrownBy(lcfc::setBlockingRetryableExceptions).isInstanceOf(IllegalStateException.class);
145145
then(lcfc).should().setBlockingRetriesBackOff(backoff);
146+
then(lcfc).should().setRetainStandardFatal(true);
146147
then(dlprfCustomizer).should().accept(dlprf);
147148
then(rtconfigurer).should().accept(topicConfigurer);
148149
then(lcfcConsumer).should().accept(lcfc);

0 commit comments

Comments
 (0)