Skip to content

Commit 29ced81

Browse files
Improve API and docs
Now retryable exceptions can be set directly in the lcfc class. Improved the docs on how to combine blocking and non-blocking behaviors. Added what's new entry for this feature.
1 parent a7e069b commit 29ced81

File tree

6 files changed

+116
-40
lines changed

6 files changed

+116
-40
lines changed

Diff for: spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

+75-29
Original file line numberDiff line numberDiff line change
@@ -328,35 +328,6 @@ public DefaultDestinationTopicResolver topicResolver(ApplicationContext applicat
328328

329329
NOTE: To disable fatal exceptions' classification, clear the default list using the `setClassifications` method in `DefaultDestinationTopicResolver`.
330330

331-
[[retry-topic-combine-blocking]]
332-
===== Combine blocking and non-blocking retries
333-
334-
Starting in 2.8.4 you can configure the framework to use both blocking and non-blocking retries in conjunction.
335-
For example, you can have a set of exceptions that would likely trigger errors on the next records as well, such as `DatabaseAccessException`, so you can retry the same record a few times before sending it to the retry topic, or straight to the DLT.
336-
337-
To configure blocking retries you just need to add the exceptions you want to retry through the `addRetryableExceptions` method as follows.
338-
The default policy is FixedBackOff, with ten retries and no delay between them.
339-
Optionally, you can also set a different back off policy.
340-
341-
====
342-
[source, java]
343-
----
344-
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
345-
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
346-
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
347-
@Qualifier(RetryTopicInternalBeanNames
348-
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
349-
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
350-
lcfc.setErrorHandlerCustomizer(commonErrorHandler -> ((DefaultErrorHandler) commonErrorHandler)
351-
.addRetryableExceptions(MyBlockingRetryException.class);
352-
lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional
353-
return lcfc;
354-
}
355-
----
356-
====
357-
358-
NOTE: In combination with the global retryable topic's fatal classification, you can configure the framework for any behavior you'd like, such as having some exceptions trigger both blocking and non-blocking retries, trigger only one kind or the other, or go straight to the DLT without retries of any kind.
359-
360331

361332
===== Include and Exclude Topics
362333

@@ -459,6 +430,80 @@ DeadLetterPublishingRecovererFactory factory(DestinationTopicResolver resolver)
459430
----
460431
====
461432

433+
[[retry-topic-combine-blocking]]
434+
==== Combining blocking and non-blocking retries
435+
436+
Starting in 2.8.4 you can configure the framework to use both blocking and non-blocking retries in conjunction.
437+
For example, you can have a set of exceptions that would likely trigger errors on the next records as well, such as `DatabaseAccessException`, so you can retry the same record a few times before sending it to the retry topic, or straight to the DLT.
438+
439+
To configure blocking retries you just need to add the exceptions you want to retry through the `addRetryableExceptions` method in the `ListenerContainerFactoryConfigurer` bean as follows.
440+
The default policy is `FixedBackOff`, with nine retries and no delay between them.
441+
Optionally, you can provide your own back off policy.
442+
443+
====
444+
[source, java]
445+
----
446+
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
447+
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
448+
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
449+
@Qualifier(RetryTopicInternalBeanNames
450+
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
451+
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
452+
lcfc.setBlockingRetryableExceptions(MyBlockingRetryException.class, MyOtherBlockingRetryException.class);
453+
lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional
454+
return lcfc;
455+
}
456+
----
457+
====
458+
459+
If you need to further tune the exception classification, you can set your own `Map` of classifications through the `ListenerContainerFactoryConfigurer.setErrorHandlerCustomizer()` method, such as:
460+
461+
====
462+
[source, java]
463+
----
464+
lcfc.setErrorHandlerCustomizer(ceh -> ((DefaultErrorHandler) ceh).setClassifications(myClassificationsMap, myDefaultValue));
465+
----
466+
====
467+
468+
NOTE: In combination with the global retryable topic's fatal exceptions classification, you can configure the framework for any behavior you'd like, such as having some exceptions trigger both blocking and non-blocking retries, trigger only one kind or the other, or go straight to the DLT without retries of any kind.
469+
470+
Here's an example with both configurations working together:
471+
472+
====
473+
[source, java]
474+
----
475+
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
476+
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
477+
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
478+
@Qualifier(RetryTopicInternalBeanNames
479+
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
480+
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
481+
lcfc.setBlockingRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class);
482+
return lcfc;
483+
}
484+
485+
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
486+
public DefaultDestinationTopicResolver ddtr(ApplicationContext applicationContext,
487+
@Qualifier(RetryTopicInternalBeanNames
488+
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
489+
DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(clock, applicationContext);
490+
ddtr.addNotRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldSkipBothRetriesException.class);
491+
return ddtr;
492+
}
493+
494+
----
495+
====
496+
497+
In this example:
498+
499+
* `ShouldRetryOnlyBlockingException.class` would retry only via blocking and, if all retries fail, would go straight to the DLT.
500+
* `ShouldRetryViaBothException.class` would retry via blocking, and if all blocking retries fail would be forwarded to the next retry topic for another set of attempts.
501+
* `ShouldSkipBothRetriesException.class` would never be retried in any way and would go straight to the DLT if the first processing attempt failed.
502+
503+
IMPORTANT: Note that the blocking retries behavior is allowlist - you add the exceptions you do want to retry that way; while the non-blocking retries classification is geared towards FATAL exceptions and as such is denylist - you add the exceptions you don't want to do non-blocking retries, but to send directly to the DLT instead.
504+
505+
IMPORTANT: The non-blocking exception classification behavior also depends on the specific topic's configuration.
506+
462507
==== Topic Naming
463508

464509
Retry topics and DLT are named by suffixing the main topic with a provided or default value, appended by either the delay or index for that topic.
@@ -775,6 +820,7 @@ public RetryTopicConfigurer retryTopicConfigurer(DestinationTopicProcessor desti
775820
return retryTopicConfigurer;
776821
}
777822
----
823+
====
778824

779825
[[change-kboe-logging-level]]
780826
==== Changing KafkaBackOffException Logging Level

Diff for: spring-kafka-docs/src/main/asciidoc/whats-new.adoc

+4-1
Original file line numberDiff line numberDiff line change
@@ -88,5 +88,8 @@ See <<retry-topic-lcf>> for more information.
8888
There's now a manageable global list of fatal exceptions that will make the failed record go straight to the DLT.
8989
Refer to <<retry-topic-ex-classifier>> to see how to manage it.
9090

91+
You can now use blocking and non-blocking retries in conjunction.
92+
See <<retry-topic-combine-blocking>> for more information.
93+
9194
The KafkaBackOffException thrown when using the retryable topics feature is now logged at DEBUG level.
92-
See <<change-kboe-logging-level>> if you need to change the logging level back to WARN or set it to any other level.
95+
See <<change-kboe-logging-level>> if you need to change the logging level back to WARN or set it to any other level.

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.springframework.classify.BinaryExceptionClassifier;
2323
import org.springframework.kafka.support.converter.ConversionException;
2424
import org.springframework.kafka.support.serializer.DeserializationException;
25+
import org.springframework.lang.Nullable;
2526
import org.springframework.messaging.converter.MessageConversionException;
2627
import org.springframework.messaging.handler.invocation.MethodArgumentResolutionException;
2728
import org.springframework.util.Assert;
@@ -185,7 +186,8 @@ public boolean removeNotRetryableException(Class<? extends Exception> exceptionT
185186
* @see #addNotRetryableExceptions(Class...)
186187
* @see #setClassifications(Map, boolean)
187188
*/
188-
public boolean removeClassification(Class<? extends Exception> exceptionType) {
189+
@Nullable
190+
public Boolean removeClassification(Class<? extends Exception> exceptionType) {
189191
return this.classifier.getClassified().remove(exceptionType);
190192
}
191193

Diff for: spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java

+22-3
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ public class ListenerContainerFactoryConfigurer {
8383

8484
private BackOff providedBlockingBackOff = null;
8585

86+
private Class<? extends Exception>[] blockingExceptionTypes = null;
87+
8688
private Consumer<ConcurrentMessageListenerContainer<?, ?>> containerCustomizer = container -> {
8789
};
8890

@@ -162,9 +164,9 @@ public KafkaListenerContainerFactory<?> decorateFactoryWithoutSettingContainerPr
162164

163165
/**
164166
* Set a {@link BackOff} to be used with blocking retries.
165-
* You can specify the exceptions to be retried using the method
166-
* {@link org.springframework.kafka.listener.ExceptionClassifier#addRetryableExceptions(Class[])}
167-
* By default, no exceptions are retried via blocking.
167+
* If the BackOff execution returns STOP, the record will be forwarded
168+
* to the next retry topic or to the DLT, depending on how the non-blocking retries
169+
* are configured.
168170
* @param blockingBackOff the BackOff policy to be used by blocking retries.
169171
* @since 2.8.4
170172
* @see DefaultErrorHandler
@@ -174,6 +176,20 @@ public void setBlockingRetriesBackOff(BackOff blockingBackOff) {
174176
this.providedBlockingBackOff = blockingBackOff;
175177
}
176178

179+
/**
180+
* Specify the exceptions to be retried via blocking.
181+
* @param exceptionTypes the exceptions that should be retried.
182+
* @since 2.8.4
183+
* @see DefaultErrorHandler
184+
*/
185+
@SafeVarargs
186+
@SuppressWarnings("varargs")
187+
public final void setBlockingRetryableExceptions(Class<? extends Exception>... exceptionTypes) {
188+
Assert.notNull(exceptionTypes, "The exception types cannot be null");
189+
Assert.noNullElements(exceptionTypes, "The exception types cannot have null elements");
190+
this.blockingExceptionTypes = exceptionTypes;
191+
}
192+
177193
private ConcurrentKafkaListenerContainerFactory<?, ?> doConfigure(
178194
ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, Configuration configuration,
179195
boolean isSetContainerProperties) {
@@ -213,6 +229,9 @@ protected CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer de
213229
errorHandler.defaultFalse();
214230
errorHandler.setCommitRecovered(true);
215231
errorHandler.setLogLevel(KafkaException.Level.DEBUG);
232+
if (this.blockingExceptionTypes != null) {
233+
errorHandler.addRetryableExceptions(this.blockingExceptionTypes);
234+
}
216235
this.errorHandlerCustomizer.accept(errorHandler);
217236
return errorHandler;
218237
}

Diff for: spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java

+11-3
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.springframework.kafka.listener.adapter.AbstractDelegatingMessageListenerAdapter;
6161
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
6262
import org.springframework.kafka.support.Acknowledgment;
63+
import org.springframework.kafka.support.converter.ConversionException;
6364
import org.springframework.util.backoff.BackOff;
6465
import org.springframework.util.backoff.BackOffExecution;
6566

@@ -407,12 +408,11 @@ void shouldDecorateFactory() {
407408
.createContext(anyLong(), listenerIdCaptor.capture(), any(TopicPartition.class), eq(consumer));
408409
assertThat(listenerIdCaptor.getValue()).isEqualTo(testListenerId);
409410
then(listener).should(times(1)).onMessage(data, ack, consumer);
410-
411411
then(this.configurerContainerCustomizer).should(times(1)).accept(container);
412412
}
413413

414414
@Test
415-
void shouldUseGivenBackOff() {
415+
void shouldUseGivenBackOffAndExceptions() {
416416

417417
// given
418418
given(container.getContainerProperties()).willReturn(containerProperties);
@@ -427,8 +427,8 @@ void shouldUseGivenBackOff() {
427427
ListenerContainerFactoryConfigurer configurer =
428428
new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager,
429429
deadLetterPublishingRecovererFactory, clock);
430-
431430
configurer.setBlockingRetriesBackOff(backOffMock);
431+
configurer.setBlockingRetryableExceptions(IllegalArgumentException.class, IllegalStateException.class);
432432

433433
// when
434434
KafkaListenerContainerFactory<?> decoratedFactory =
@@ -437,6 +437,14 @@ void shouldUseGivenBackOff() {
437437

438438
// then
439439
then(backOffMock).should().start();
440+
then(container).should().setCommonErrorHandler(errorHandlerCaptor.capture());
441+
CommonErrorHandler errorHandler = errorHandlerCaptor.getValue();
442+
assertThat(DefaultErrorHandler.class.isAssignableFrom(errorHandler.getClass())).isTrue();
443+
DefaultErrorHandler defaultErrorHandler = (DefaultErrorHandler) errorHandler;
444+
assertThat(defaultErrorHandler.removeClassification(IllegalArgumentException.class)).isTrue();
445+
assertThat(defaultErrorHandler.removeClassification(IllegalStateException.class)).isTrue();
446+
assertThat(defaultErrorHandler.removeClassification(ConversionException.class)).isNull();
447+
440448
}
441449

442450
@Test

Diff for: spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@
5252
import org.springframework.kafka.core.KafkaTemplate;
5353
import org.springframework.kafka.core.ProducerFactory;
5454
import org.springframework.kafka.listener.ContainerProperties;
55-
import org.springframework.kafka.listener.DefaultErrorHandler;
5655
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
5756
import org.springframework.kafka.support.KafkaHeaders;
5857
import org.springframework.kafka.support.converter.ConversionException;
@@ -391,8 +390,7 @@ public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafka
391390
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
392391

393392
lcfc.setBlockingRetriesBackOff(new FixedBackOff(50, 3));
394-
lcfc.setErrorHandlerCustomizer(eh -> ((DefaultErrorHandler) eh)
395-
.addRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class));
393+
lcfc.setBlockingRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class);
396394
return lcfc;
397395
}
398396

0 commit comments

Comments
 (0)