Skip to content

Commit 8f1d58a

Browse files
Change DHE in LCFC to defaultFalse
With this we no longer need a no ops back off. Some minor adjustments were needed to maintain behavior when the logic gets to DLPR.
1 parent 7998944 commit 8f1d58a

File tree

7 files changed

+575
-7
lines changed

7 files changed

+575
-7
lines changed

Diff for: spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

+30
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,36 @@ You can add or remove exceptions using the `addNotRetryableException` and `remov
360360
NOTE: In combination with the global retryable topic's fatal classification, you can configure the framework for any behavior you'd like, such as having some exceptions trigger both blocking and non-blocking retries, trigger only one kind or the other, or go straight to the DLT without retries of any kind.
361361

362362

363+
[[retry-topic-combine-blocking]]
364+
===== Combine blocking and non-blocking retries
365+
366+
Starting in 2.8.3 you can configure the framework to use both blocking and non-blocking retries in conjunction.
367+
For example, you can have a set of exceptions that would likely trigger errors on the next records as well, such as `DatabaseAccessException`, so you can retry the same record a few times before sending it to the retry topic, or straight to the DLT.
368+
369+
To configure blocking retries you just need to add the exceptions you want to retry through the `addRetryableExceptions` method as follows.
370+
The default policy is FixedBackOff, with ten retries and no delay between them.
371+
Optionally, you can also set a different back off policy.
372+
373+
====
374+
[source, java]
375+
----
376+
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
377+
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
378+
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
379+
@Qualifier(RetryTopicInternalBeanNames
380+
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
381+
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
382+
lcfc.setErrorHandlerCustomizer(commonErrorHandler -> ((DefaultErrorHandler) commonErrorHandler)
383+
.addRetryableExceptions(MyBlockingRetryException.class);
384+
lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional
385+
return lcfc;
386+
}
387+
----
388+
====
389+
390+
NOTE: In combination with the global retryable topic's fatal classification, you can configure the framework for any behavior you'd like, such as having some exceptions trigger both blocking and non-blocking retries, trigger only one kind or the other, or go straight to the DLT without retries of any kind.
391+
392+
363393
===== Include and Exclude Topics
364394

365395
You can decide which topics will and will not be handled by a `RetryTopicConfiguration` bean via the .includeTopic(String topic), .includeTopics(Collection<String> topics) .excludeTopic(String topic) and .excludeTopics(Collection<String> topics) methods.

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>
131131
Consumer<?, ?> consumer, MessageListenerContainer container) {
132132

133133
SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(), // NOSONAR
134-
getRecoveryStrategy(records, thrownException), this.logger, getLogLevel());
134+
getRecoveryStrategy(records, consumer, thrownException), this.logger, getLogLevel());
135135
}
136136

137137
@Override

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.function.BiPredicate;
2323

2424
import org.apache.commons.logging.LogFactory;
25+
import org.apache.kafka.clients.consumer.Consumer;
2526
import org.apache.kafka.clients.consumer.ConsumerRecord;
2627

2728
import org.springframework.core.log.LogAccessor;
@@ -126,12 +127,26 @@ public int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) {
126127
* @since 2.7
127128
*/
128129
protected RecoveryStrategy getRecoveryStrategy(List<ConsumerRecord<?, ?>> records, Exception thrownException) {
130+
return getRecoveryStrategy(records, null, thrownException);
131+
}
132+
133+
/**
134+
* Return a {@link RecoveryStrategy} to call to determine whether the first record in the
135+
* list should be skipped.
136+
* @param records the records.
137+
* @param recoveryConsumer the consumer.
138+
* @param thrownException the exception.
139+
* @return the {@link RecoveryStrategy}.
140+
* @since 2.8.4
141+
*/
142+
protected RecoveryStrategy getRecoveryStrategy(List<ConsumerRecord<?, ?>> records,
143+
@Nullable Consumer<?, ?> recoveryConsumer, Exception thrownException) {
129144
if (getClassifier().classify(thrownException)) {
130145
return this.failureTracker::recovered;
131146
}
132147
else {
133148
try {
134-
this.failureTracker.getRecoverer().accept(records.get(0), thrownException);
149+
this.failureTracker.getRecoverer().accept(records.get(0), recoveryConsumer, thrownException);
135150
this.failureTracker.getRetryListeners().forEach(rl -> rl.recovered(records.get(0), thrownException));
136151
}
137152
catch (Exception ex) {

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -240,7 +240,7 @@ void clearThreadState() {
240240
this.failures.remove();
241241
}
242242

243-
BiConsumer<ConsumerRecord<?, ?>, Exception> getRecoverer() {
243+
ConsumerAwareRecordRecoverer getRecoverer() {
244244
return this.recoverer;
245245
}
246246

Diff for: spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java

+25-3
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
4343
import org.springframework.kafka.support.TopicPartitionOffset;
4444
import org.springframework.util.Assert;
45-
import org.springframework.util.backoff.FixedBackOff;
45+
import org.springframework.util.backoff.BackOff;
4646

4747
/**
4848
*
@@ -81,6 +81,8 @@ public class ListenerContainerFactoryConfigurer {
8181

8282
private static final long LOWEST_BACKOFF_THRESHOLD = 1500L;
8383

84+
private BackOff providedBlockingBackOff = null;
85+
8486
private Consumer<ConcurrentMessageListenerContainer<?, ?>> containerCustomizer = container -> {
8587
};
8688

@@ -158,6 +160,20 @@ public KafkaListenerContainerFactory<?> decorateFactoryWithoutSettingContainerPr
158160
return new RetryTopicListenerContainerFactoryDecorator(factory, configuration, false);
159161
}
160162

163+
/**
164+
* Set a {@link BackOff} to be used with blocking retries.
165+
* You can specify the exceptions to be retried using the method
166+
* {@link org.springframework.kafka.listener.ExceptionClassifier#addRetryableExceptions(Class[])}
167+
* By default, no exceptions are retried via blocking.
168+
* @param blockingBackOff the BackOff policy to be used by blocking retries.
169+
* @since 2.8.4
170+
* @see DefaultErrorHandler
171+
*/
172+
public void setBlockingRetriesBackOff(BackOff blockingBackOff) {
173+
Assert.notNull(blockingBackOff, "The provided BackOff cannot be null");
174+
this.providedBlockingBackOff = blockingBackOff;
175+
}
176+
161177
private ConcurrentKafkaListenerContainerFactory<?, ?> doConfigure(
162178
ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, Configuration configuration,
163179
boolean isSetContainerProperties) {
@@ -193,14 +209,20 @@ public void setErrorHandlerCustomizer(Consumer<CommonErrorHandler> errorHandlerC
193209

194210
protected CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer,
195211
Configuration configuration) {
196-
DefaultErrorHandler errorHandler = new DefaultErrorHandler(deadLetterPublishingRecoverer,
197-
new FixedBackOff(0, 0));
212+
DefaultErrorHandler errorHandler = createDefaultErrorHandlerInstance(deadLetterPublishingRecoverer);
213+
errorHandler.defaultFalse();
198214
errorHandler.setCommitRecovered(true);
199215
errorHandler.setLogLevel(KafkaException.Level.DEBUG);
200216
this.errorHandlerCustomizer.accept(errorHandler);
201217
return errorHandler;
202218
}
203219

220+
protected DefaultErrorHandler createDefaultErrorHandlerInstance(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
221+
return this.providedBlockingBackOff != null
222+
? new DefaultErrorHandler(deadLetterPublishingRecoverer, this.providedBlockingBackOff)
223+
: new DefaultErrorHandler(deadLetterPublishingRecoverer);
224+
}
225+
204226
protected void setupBackoffAwareMessageListenerAdapter(ConcurrentMessageListenerContainer<?, ?> container,
205227
Configuration configuration, boolean isSetContainerProperties) {
206228
AcknowledgingConsumerAwareMessageListener<?, ?> listener = checkAndCast(container.getContainerProperties()

0 commit comments

Comments
 (0)