Skip to content

Commit 01549a6

Browse files
GH-2116: Add blocking retries to RT (#2124)
* GH-2116: Add blocking retries to RT Before we hardcoded a no-ops back off in the DefaultErrorHandler used in the Retryable Topics feature. Adds a setter to let the user provide their own back off policy and configure blocking retries in conjunction with RT. * Change DHE in LCFC to defaultFalse With this we no longer need a no ops back off. Some minor adjustments were needed to maintain behavior when the logic gets to DLPR. * Change DHE in LCFC to defaultFalse With this we no longer need a no ops back off. Some minor adjustments were needed to maintain behavior when the logic gets to DLPR. * Improve API and docs Now retryable exceptions can be set directly in the lcfc class. Improved the docs on how to combine blocking and non-blocking behaviors. Added what's new entry for this feature. * Improve ExceptionClassifier JavaDoc Also add assertions to the LCFC new methods to warn the user if they already set the blocking configurations.
1 parent dad2061 commit 01549a6

File tree

10 files changed

+717
-12
lines changed

10 files changed

+717
-12
lines changed

Diff for: spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

+75
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,80 @@ DeadLetterPublishingRecovererFactory factory(DestinationTopicResolver resolver)
430430
----
431431
====
432432

433+
[[retry-topic-combine-blocking]]
434+
==== Combining blocking and non-blocking retries
435+
436+
Starting in 2.8.4 you can configure the framework to use both blocking and non-blocking retries in conjunction.
437+
For example, you can have a set of exceptions that would likely trigger errors on the next records as well, such as `DatabaseAccessException`, so you can retry the same record a few times before sending it to the retry topic, or straight to the DLT.
438+
439+
To configure blocking retries you just need to add the exceptions you want to retry through the `addRetryableExceptions` method in the `ListenerContainerFactoryConfigurer` bean as follows.
440+
The default policy is `FixedBackOff`, with nine retries and no delay between them.
441+
Optionally, you can provide your own back off policy.
442+
443+
====
444+
[source, java]
445+
----
446+
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
447+
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
448+
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
449+
@Qualifier(RetryTopicInternalBeanNames
450+
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
451+
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
452+
lcfc.setBlockingRetryableExceptions(MyBlockingRetryException.class, MyOtherBlockingRetryException.class);
453+
lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional
454+
return lcfc;
455+
}
456+
----
457+
====
458+
459+
If you need to further tune the exception classification, you can set your own `Map` of classifications through the `ListenerContainerFactoryConfigurer.setErrorHandlerCustomizer()` method, such as:
460+
461+
====
462+
[source, java]
463+
----
464+
lcfc.setErrorHandlerCustomizer(ceh -> ((DefaultErrorHandler) ceh).setClassifications(myClassificationsMap, myDefaultValue));
465+
----
466+
====
467+
468+
NOTE: In combination with the global retryable topic's fatal exceptions classification, you can configure the framework for any behavior you'd like, such as having some exceptions trigger both blocking and non-blocking retries, trigger only one kind or the other, or go straight to the DLT without retries of any kind.
469+
470+
Here's an example with both configurations working together:
471+
472+
====
473+
[source, java]
474+
----
475+
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
476+
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
477+
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
478+
@Qualifier(RetryTopicInternalBeanNames
479+
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
480+
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
481+
lcfc.setBlockingRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class);
482+
return lcfc;
483+
}
484+
485+
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
486+
public DefaultDestinationTopicResolver ddtr(ApplicationContext applicationContext,
487+
@Qualifier(RetryTopicInternalBeanNames
488+
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
489+
DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(clock, applicationContext);
490+
ddtr.addNotRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldSkipBothRetriesException.class);
491+
return ddtr;
492+
}
493+
494+
----
495+
====
496+
497+
In this example:
498+
499+
* `ShouldRetryOnlyBlockingException.class` would retry only via blocking and, if all retries fail, would go straight to the DLT.
500+
* `ShouldRetryViaBothException.class` would retry via blocking, and if all blocking retries fail would be forwarded to the next retry topic for another set of attempts.
501+
* `ShouldSkipBothRetriesException.class` would never be retried in any way and would go straight to the DLT if the first processing attempt failed.
502+
503+
IMPORTANT: Note that the blocking retries behavior is allowlist - you add the exceptions you do want to retry that way; while the non-blocking retries classification is geared towards FATAL exceptions and as such is denylist - you add the exceptions you don't want to do non-blocking retries, but to send directly to the DLT instead.
504+
505+
IMPORTANT: The non-blocking exception classification behavior also depends on the specific topic's configuration.
506+
433507
==== Topic Naming
434508

435509
Retry topics and DLT are named by suffixing the main topic with a provided or default value, appended by either the delay or index for that topic.
@@ -746,6 +820,7 @@ public RetryTopicConfigurer retryTopicConfigurer(DestinationTopicProcessor desti
746820
return retryTopicConfigurer;
747821
}
748822
----
823+
====
749824

750825
[[change-kboe-logging-level]]
751826
==== Changing KafkaBackOffException Logging Level

Diff for: spring-kafka-docs/src/main/asciidoc/whats-new.adoc

+4-1
Original file line numberDiff line numberDiff line change
@@ -88,5 +88,8 @@ See <<retry-topic-lcf>> for more information.
8888
There's now a manageable global list of fatal exceptions that will make the failed record go straight to the DLT.
8989
Refer to <<retry-topic-ex-classifier>> to see how to manage it.
9090

91+
You can now use blocking and non-blocking retries in conjunction.
92+
See <<retry-topic-combine-blocking>> for more information.
93+
9194
The KafkaBackOffException thrown when using the retryable topics feature is now logged at DEBUG level.
92-
See <<change-kboe-logging-level>> if you need to change the logging level back to WARN or set it to any other level.
95+
See <<change-kboe-logging-level>> if you need to change the logging level back to WARN or set it to any other level.

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>
131131
Consumer<?, ?> consumer, MessageListenerContainer container) {
132132

133133
SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(), // NOSONAR
134-
getRecoveryStrategy(records, thrownException), this.logger, getLogLevel());
134+
getRecoveryStrategy(records, consumer, thrownException), this.logger, getLogLevel());
135135
}
136136

137137
@Override

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.springframework.classify.BinaryExceptionClassifier;
2323
import org.springframework.kafka.support.converter.ConversionException;
2424
import org.springframework.kafka.support.serializer.DeserializationException;
25+
import org.springframework.lang.Nullable;
2526
import org.springframework.messaging.converter.MessageConversionException;
2627
import org.springframework.messaging.handler.invocation.MethodArgumentResolutionException;
2728
import org.springframework.util.Assert;
@@ -180,12 +181,14 @@ public boolean removeNotRetryableException(Class<? extends Exception> exceptionT
180181
* </ul>
181182
* All others will be retried, unless {@link #defaultFalse()} has been called.
182183
* @param exceptionType the exception type.
183-
* @return true if the removal was successful.
184+
* @return the classification of the exception if removal was successful;
185+
* null otherwise.
184186
* @since 2.8.4
185187
* @see #addNotRetryableExceptions(Class...)
186188
* @see #setClassifications(Map, boolean)
187189
*/
188-
public boolean removeClassification(Class<? extends Exception> exceptionType) {
190+
@Nullable
191+
public Boolean removeClassification(Class<? extends Exception> exceptionType) {
189192
return this.classifier.getClassified().remove(exceptionType);
190193
}
191194

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.function.BiPredicate;
2323

2424
import org.apache.commons.logging.LogFactory;
25+
import org.apache.kafka.clients.consumer.Consumer;
2526
import org.apache.kafka.clients.consumer.ConsumerRecord;
2627

2728
import org.springframework.core.log.LogAccessor;
@@ -126,12 +127,26 @@ public int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) {
126127
* @since 2.7
127128
*/
128129
protected RecoveryStrategy getRecoveryStrategy(List<ConsumerRecord<?, ?>> records, Exception thrownException) {
130+
return getRecoveryStrategy(records, null, thrownException);
131+
}
132+
133+
/**
134+
* Return a {@link RecoveryStrategy} to call to determine whether the first record in the
135+
* list should be skipped.
136+
* @param records the records.
137+
* @param recoveryConsumer the consumer.
138+
* @param thrownException the exception.
139+
* @return the {@link RecoveryStrategy}.
140+
* @since 2.8.4
141+
*/
142+
protected RecoveryStrategy getRecoveryStrategy(List<ConsumerRecord<?, ?>> records,
143+
@Nullable Consumer<?, ?> recoveryConsumer, Exception thrownException) {
129144
if (getClassifier().classify(thrownException)) {
130145
return this.failureTracker::recovered;
131146
}
132147
else {
133148
try {
134-
this.failureTracker.getRecoverer().accept(records.get(0), thrownException);
149+
this.failureTracker.getRecoverer().accept(records.get(0), recoveryConsumer, thrownException);
135150
this.failureTracker.getRetryListeners().forEach(rl -> rl.recovered(records.get(0), thrownException));
136151
}
137152
catch (Exception ex) {

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -240,7 +240,7 @@ void clearThreadState() {
240240
this.failures.remove();
241241
}
242242

243-
BiConsumer<ConsumerRecord<?, ?>, Exception> getRecoverer() {
243+
ConsumerAwareRecordRecoverer getRecoverer() {
244244
return this.recoverer;
245245
}
246246

Diff for: spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java

+53-3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.retrytopic;
1818

1919
import java.time.Clock;
20+
import java.util.Arrays;
2021
import java.util.Comparator;
2122
import java.util.HashSet;
2223
import java.util.List;
@@ -42,7 +43,7 @@
4243
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
4344
import org.springframework.kafka.support.TopicPartitionOffset;
4445
import org.springframework.util.Assert;
45-
import org.springframework.util.backoff.FixedBackOff;
46+
import org.springframework.util.backoff.BackOff;
4647

4748
/**
4849
*
@@ -81,6 +82,10 @@ public class ListenerContainerFactoryConfigurer {
8182

8283
private static final long LOWEST_BACKOFF_THRESHOLD = 1500L;
8384

85+
private BackOff providedBlockingBackOff = null;
86+
87+
private Class<? extends Exception>[] blockingExceptionTypes = null;
88+
8489
private Consumer<ConcurrentMessageListenerContainer<?, ?>> containerCustomizer = container -> {
8590
};
8691

@@ -158,6 +163,42 @@ public KafkaListenerContainerFactory<?> decorateFactoryWithoutSettingContainerPr
158163
return new RetryTopicListenerContainerFactoryDecorator(factory, configuration, false);
159164
}
160165

166+
/**
167+
* Set a {@link BackOff} to be used with blocking retries.
168+
* If the BackOff execution returns STOP, the record will be forwarded
169+
* to the next retry topic or to the DLT, depending on how the non-blocking retries
170+
* are configured.
171+
* @param blockingBackOff the BackOff policy to be used by blocking retries.
172+
* @since 2.8.4
173+
* @see DefaultErrorHandler
174+
*/
175+
public void setBlockingRetriesBackOff(BackOff blockingBackOff) {
176+
Assert.notNull(blockingBackOff, "The provided BackOff cannot be null");
177+
Assert.state(this.providedBlockingBackOff == null, () ->
178+
"Blocking retries back off has already been set. Current: "
179+
+ this.providedBlockingBackOff
180+
+ " You provided: " + blockingBackOff);
181+
this.providedBlockingBackOff = blockingBackOff;
182+
}
183+
184+
/**
185+
* Specify the exceptions to be retried via blocking.
186+
* @param exceptionTypes the exceptions that should be retried.
187+
* @since 2.8.4
188+
* @see DefaultErrorHandler
189+
*/
190+
@SafeVarargs
191+
@SuppressWarnings("varargs")
192+
public final void setBlockingRetryableExceptions(Class<? extends Exception>... exceptionTypes) {
193+
Assert.notNull(exceptionTypes, "The exception types cannot be null");
194+
Assert.noNullElements(exceptionTypes, "The exception types cannot have null elements");
195+
Assert.state(this.blockingExceptionTypes == null,
196+
() -> "Blocking retryable exceptions have already been set."
197+
+ "Current ones: " + Arrays.toString(this.blockingExceptionTypes)
198+
+ " You provided: " + Arrays.toString(exceptionTypes));
199+
this.blockingExceptionTypes = exceptionTypes;
200+
}
201+
161202
private ConcurrentKafkaListenerContainerFactory<?, ?> doConfigure(
162203
ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, Configuration configuration,
163204
boolean isSetContainerProperties) {
@@ -193,14 +234,23 @@ public void setErrorHandlerCustomizer(Consumer<CommonErrorHandler> errorHandlerC
193234

194235
protected CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer,
195236
Configuration configuration) {
196-
DefaultErrorHandler errorHandler = new DefaultErrorHandler(deadLetterPublishingRecoverer,
197-
new FixedBackOff(0, 0));
237+
DefaultErrorHandler errorHandler = createDefaultErrorHandlerInstance(deadLetterPublishingRecoverer);
238+
errorHandler.defaultFalse();
198239
errorHandler.setCommitRecovered(true);
199240
errorHandler.setLogLevel(KafkaException.Level.DEBUG);
241+
if (this.blockingExceptionTypes != null) {
242+
errorHandler.addRetryableExceptions(this.blockingExceptionTypes);
243+
}
200244
this.errorHandlerCustomizer.accept(errorHandler);
201245
return errorHandler;
202246
}
203247

248+
protected DefaultErrorHandler createDefaultErrorHandlerInstance(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
249+
return this.providedBlockingBackOff != null
250+
? new DefaultErrorHandler(deadLetterPublishingRecoverer, this.providedBlockingBackOff)
251+
: new DefaultErrorHandler(deadLetterPublishingRecoverer);
252+
}
253+
204254
protected void setupBackoffAwareMessageListenerAdapter(ConcurrentMessageListenerContainer<?, ?> container,
205255
Configuration configuration, boolean isSetContainerProperties) {
206256
AcknowledgingConsumerAwareMessageListener<?, ?> listener = checkAndCast(container.getContainerProperties()

Diff for: spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java

+62-2
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717
package org.springframework.kafka.retrytopic;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2021
import static org.mockito.ArgumentMatchers.any;
2122
import static org.mockito.ArgumentMatchers.anyLong;
2223
import static org.mockito.ArgumentMatchers.eq;
2324
import static org.mockito.BDDMockito.given;
2425
import static org.mockito.BDDMockito.then;
2526
import static org.mockito.BDDMockito.willReturn;
27+
import static org.mockito.Mockito.mock;
2628
import static org.mockito.Mockito.never;
2729
import static org.mockito.Mockito.times;
2830

@@ -59,13 +61,18 @@
5961
import org.springframework.kafka.listener.adapter.AbstractDelegatingMessageListenerAdapter;
6062
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
6163
import org.springframework.kafka.support.Acknowledgment;
64+
import org.springframework.kafka.support.converter.ConversionException;
65+
import org.springframework.kafka.support.serializer.DeserializationException;
66+
import org.springframework.util.backoff.BackOff;
67+
import org.springframework.util.backoff.BackOffExecution;
68+
import org.springframework.util.backoff.FixedBackOff;
6269

6370
/**
6471
* @author Tomaz Fernandes
6572
* @since 2.7
6673
*/
6774
@ExtendWith(MockitoExtension.class)
68-
@SuppressWarnings({"unchecked", "rawtypes"})
75+
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
6976
class ListenerContainerFactoryConfigurerTests {
7077

7178
@Mock
@@ -404,10 +411,63 @@ void shouldDecorateFactory() {
404411
.createContext(anyLong(), listenerIdCaptor.capture(), any(TopicPartition.class), eq(consumer));
405412
assertThat(listenerIdCaptor.getValue()).isEqualTo(testListenerId);
406413
then(listener).should(times(1)).onMessage(data, ack, consumer);
407-
408414
then(this.configurerContainerCustomizer).should(times(1)).accept(container);
409415
}
410416

417+
@Test
418+
void shouldUseGivenBackOffAndExceptions() {
419+
420+
// given
421+
given(container.getContainerProperties()).willReturn(containerProperties);
422+
given(deadLetterPublishingRecovererFactory.create()).willReturn(recoverer);
423+
given(containerProperties.getMessageListener()).willReturn(listener);
424+
given(configuration.forContainerFactoryConfigurer()).willReturn(lcfcConfiguration);
425+
willReturn(container).given(containerFactory).createListenerContainer(endpoint);
426+
BackOff backOffMock = mock(BackOff.class);
427+
BackOffExecution backOffExecutionMock = mock(BackOffExecution.class);
428+
given(backOffMock.start()).willReturn(backOffExecutionMock);
429+
430+
ListenerContainerFactoryConfigurer configurer =
431+
new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager,
432+
deadLetterPublishingRecovererFactory, clock);
433+
configurer.setBlockingRetriesBackOff(backOffMock);
434+
configurer.setBlockingRetryableExceptions(IllegalArgumentException.class, IllegalStateException.class);
435+
436+
// when
437+
KafkaListenerContainerFactory<?> decoratedFactory =
438+
configurer.decorateFactory(this.containerFactory, configuration.forContainerFactoryConfigurer());
439+
decoratedFactory.createListenerContainer(endpoint);
440+
441+
// then
442+
then(backOffMock).should().start();
443+
then(container).should().setCommonErrorHandler(errorHandlerCaptor.capture());
444+
CommonErrorHandler errorHandler = errorHandlerCaptor.getValue();
445+
assertThat(DefaultErrorHandler.class.isAssignableFrom(errorHandler.getClass())).isTrue();
446+
DefaultErrorHandler defaultErrorHandler = (DefaultErrorHandler) errorHandler;
447+
assertThat(defaultErrorHandler.removeClassification(IllegalArgumentException.class)).isTrue();
448+
assertThat(defaultErrorHandler.removeClassification(IllegalStateException.class)).isTrue();
449+
assertThat(defaultErrorHandler.removeClassification(ConversionException.class)).isNull();
450+
451+
}
452+
453+
454+
@Test
455+
void shouldThrowIfBackOffOrRetryablesAlreadySet() {
456+
// given
457+
BackOff backOff = new FixedBackOff();
458+
ListenerContainerFactoryConfigurer configurer =
459+
new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager,
460+
deadLetterPublishingRecovererFactory, clock);
461+
configurer.setBlockingRetriesBackOff(backOff);
462+
configurer.setBlockingRetryableExceptions(IllegalArgumentException.class, IllegalStateException.class);
463+
464+
// when / then
465+
assertThatThrownBy(() -> configurer.setBlockingRetriesBackOff(backOff)).isInstanceOf(IllegalStateException.class);
466+
assertThatThrownBy(() -> configurer.setBlockingRetryableExceptions(ConversionException.class, DeserializationException.class))
467+
.isInstanceOf(IllegalStateException.class);
468+
}
469+
470+
411471
@Test
412472
void shouldCacheFactoryInstances() {
413473

0 commit comments

Comments
 (0)