Skip to content

GH-2116: Add blocking retries to RT #2124

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 24, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/retrytopic.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,35 @@ public DefaultDestinationTopicResolver topicResolver(ApplicationContext applicat

NOTE: To disable fatal exceptions' classification, clear the default list using the `setClassifications` method in `DefaultDestinationTopicResolver`.

[[retry-topic-combine-blocking]]
===== Combine blocking and non-blocking retries

Starting in 2.8.4 you can configure the framework to use both blocking and non-blocking retries in conjunction.
For example, you can have a set of exceptions that would likely trigger errors on the next records as well, such as `DatabaseAccessException`, so you can retry the same record a few times before sending it to the retry topic, or straight to the DLT.

To configure blocking retries you just need to add the exceptions you want to retry through the `addRetryableExceptions` method as follows.
The default policy is FixedBackOff, with ten retries and no delay between them.
Optionally, you can also set a different back off policy.

====
[source, java]
----
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
lcfc.setErrorHandlerCustomizer(commonErrorHandler -> ((DefaultErrorHandler) commonErrorHandler)
.addRetryableExceptions(MyBlockingRetryException.class);
lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional
return lcfc;
}
----
====

NOTE: In combination with the global retryable topic's fatal classification, you can configure the framework for any behavior you'd like, such as having some exceptions trigger both blocking and non-blocking retries, trigger only one kind or the other, or go straight to the DLT without retries of any kind.


===== Include and Exclude Topics

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>
Consumer<?, ?> consumer, MessageListenerContainer container) {

SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(), // NOSONAR
getRecoveryStrategy(records, thrownException), this.logger, getLogLevel());
getRecoveryStrategy(records, consumer, thrownException), this.logger, getLogLevel());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.function.BiPredicate;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.core.log.LogAccessor;
Expand Down Expand Up @@ -126,12 +127,26 @@ public int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) {
* @since 2.7
*/
protected RecoveryStrategy getRecoveryStrategy(List<ConsumerRecord<?, ?>> records, Exception thrownException) {
return getRecoveryStrategy(records, null, thrownException);
}

/**
* Return a {@link RecoveryStrategy} to call to determine whether the first record in the
* list should be skipped.
* @param records the records.
* @param recoveryConsumer the consumer.
* @param thrownException the exception.
* @return the {@link RecoveryStrategy}.
* @since 2.8.4
*/
protected RecoveryStrategy getRecoveryStrategy(List<ConsumerRecord<?, ?>> records,
@Nullable Consumer<?, ?> recoveryConsumer, Exception thrownException) {
if (getClassifier().classify(thrownException)) {
return this.failureTracker::recovered;
}
else {
try {
this.failureTracker.getRecoverer().accept(records.get(0), thrownException);
this.failureTracker.getRecoverer().accept(records.get(0), recoveryConsumer, thrownException);
this.failureTracker.getRetryListeners().forEach(rl -> rl.recovered(records.get(0), thrownException));
}
catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2021 the original author or authors.
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -240,7 +240,7 @@ void clearThreadState() {
this.failures.remove();
}

BiConsumer<ConsumerRecord<?, ?>, Exception> getRecoverer() {
ConsumerAwareRecordRecoverer getRecoverer() {
return this.recoverer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.util.Assert;
import org.springframework.util.backoff.FixedBackOff;
import org.springframework.util.backoff.BackOff;

/**
*
Expand Down Expand Up @@ -81,6 +81,8 @@ public class ListenerContainerFactoryConfigurer {

private static final long LOWEST_BACKOFF_THRESHOLD = 1500L;

private BackOff providedBlockingBackOff = null;

private Consumer<ConcurrentMessageListenerContainer<?, ?>> containerCustomizer = container -> {
};

Expand Down Expand Up @@ -158,6 +160,20 @@ public KafkaListenerContainerFactory<?> decorateFactoryWithoutSettingContainerPr
return new RetryTopicListenerContainerFactoryDecorator(factory, configuration, false);
}

/**
* Set a {@link BackOff} to be used with blocking retries.
* You can specify the exceptions to be retried using the method
* {@link org.springframework.kafka.listener.ExceptionClassifier#addRetryableExceptions(Class[])}
* By default, no exceptions are retried via blocking.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add something like "If the backoff execution returns STOP (retries are exhausted), the record will then go to the next retry topic (if present)."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think the aggregated behavior of blocking and non-blocking can be better documented in general. I'll see what I can come up with.

* @param blockingBackOff the BackOff policy to be used by blocking retries.
* @since 2.8.4
* @see DefaultErrorHandler
*/
public void setBlockingRetriesBackOff(BackOff blockingBackOff) {
Assert.notNull(blockingBackOff, "The provided BackOff cannot be null");
this.providedBlockingBackOff = blockingBackOff;
}

private ConcurrentKafkaListenerContainerFactory<?, ?> doConfigure(
ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, Configuration configuration,
boolean isSetContainerProperties) {
Expand Down Expand Up @@ -193,14 +209,20 @@ public void setErrorHandlerCustomizer(Consumer<CommonErrorHandler> errorHandlerC

protected CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer,
Configuration configuration) {
DefaultErrorHandler errorHandler = new DefaultErrorHandler(deadLetterPublishingRecoverer,
new FixedBackOff(0, 0));
DefaultErrorHandler errorHandler = createDefaultErrorHandlerInstance(deadLetterPublishingRecoverer);
errorHandler.defaultFalse();
errorHandler.setCommitRecovered(true);
errorHandler.setLogLevel(KafkaException.Level.DEBUG);
this.errorHandlerCustomizer.accept(errorHandler);
return errorHandler;
}

protected DefaultErrorHandler createDefaultErrorHandlerInstance(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return this.providedBlockingBackOff != null
? new DefaultErrorHandler(deadLetterPublishingRecoverer, this.providedBlockingBackOff)
: new DefaultErrorHandler(deadLetterPublishingRecoverer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't right; the default fixed back off is (0, 9), we need (0, 0).

Why not just set the field to new FixedBackOff(0, 0);? We then don't need this test. Rename the field too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, considering all exceptions will return false when classified by default, it doesn't really matter what's the backoff policy - we can just leave the default one and then the user would have only to add the retryable exceptions, which would then make the default or provided back off kick in.

Anyway, we can set the no ops back off too, no worries. Either way I think the other classes modifications to pass the consumer to DLPR are still needed, since we might go that route for exception classifications that return false.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh; Ok; makes sense to leave it.

}

protected void setupBackoffAwareMessageListenerAdapter(ConcurrentMessageListenerContainer<?, ?> container,
Configuration configuration, boolean isSetContainerProperties) {
AcknowledgingConsumerAwareMessageListener<?, ?> listener = checkAndCast(container.getContainerProperties()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;

Expand Down Expand Up @@ -59,6 +60,8 @@
import org.springframework.kafka.listener.adapter.AbstractDelegatingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.BackOffExecution;

/**
* @author Tomaz Fernandes
Expand Down Expand Up @@ -408,6 +411,34 @@ void shouldDecorateFactory() {
then(this.configurerContainerCustomizer).should(times(1)).accept(container);
}

@Test
void shouldUseGivenBackOff() {

// given
given(container.getContainerProperties()).willReturn(containerProperties);
given(deadLetterPublishingRecovererFactory.create()).willReturn(recoverer);
given(containerProperties.getMessageListener()).willReturn(listener);
given(configuration.forContainerFactoryConfigurer()).willReturn(lcfcConfiguration);
willReturn(container).given(containerFactory).createListenerContainer(endpoint);
BackOff backOffMock = mock(BackOff.class);
BackOffExecution backOffExecutionMock = mock(BackOffExecution.class);
given(backOffMock.start()).willReturn(backOffExecutionMock);

ListenerContainerFactoryConfigurer configurer =
new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager,
deadLetterPublishingRecovererFactory, clock);

configurer.setBlockingRetriesBackOff(backOffMock);

// when
KafkaListenerContainerFactory<?> decoratedFactory =
configurer.decorateFactory(this.containerFactory, configuration.forContainerFactoryConfigurer());
decoratedFactory.createListenerContainer(endpoint);

// then
then(backOffMock).should().start();
}

@Test
void shouldCacheFactoryInstances() {

Expand Down
Loading