diff --git a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc index 031f06c700..9b3e06c29c 100644 --- a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc +++ b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc @@ -430,6 +430,80 @@ DeadLetterPublishingRecovererFactory factory(DestinationTopicResolver resolver) ---- ==== +[[retry-topic-combine-blocking]] +==== Combining blocking and non-blocking retries + +Starting in 2.8.4 you can configure the framework to use both blocking and non-blocking retries in conjunction. +For example, you can have a set of exceptions that would likely trigger errors on the next records as well, such as `DatabaseAccessException`, so you can retry the same record a few times before sending it to the retry topic, or straight to the DLT. + +To configure blocking retries you just need to add the exceptions you want to retry through the `addRetryableExceptions` method in the `ListenerContainerFactoryConfigurer` bean as follows. +The default policy is `FixedBackOff`, with nine retries and no delay between them. +Optionally, you can provide your own back off policy. + +==== +[source, java] +---- +@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME) +public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager, + DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory, + @Qualifier(RetryTopicInternalBeanNames + .INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) { + ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock); + lcfc.setBlockingRetryableExceptions(MyBlockingRetryException.class, MyOtherBlockingRetryException.class); + lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional + return lcfc; +} +---- +==== + +If you need to further tune the exception classification, you can set your own `Map` of classifications through the `ListenerContainerFactoryConfigurer.setErrorHandlerCustomizer()` method, such as: + +==== +[source, java] +---- +lcfc.setErrorHandlerCustomizer(ceh -> ((DefaultErrorHandler) ceh).setClassifications(myClassificationsMap, myDefaultValue)); +---- +==== + +NOTE: In combination with the global retryable topic's fatal exceptions classification, you can configure the framework for any behavior you'd like, such as having some exceptions trigger both blocking and non-blocking retries, trigger only one kind or the other, or go straight to the DLT without retries of any kind. + +Here's an example with both configurations working together: + +==== +[source, java] +---- +@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME) +public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager, + DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory, + @Qualifier(RetryTopicInternalBeanNames + .INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) { + ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock); + lcfc.setBlockingRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class); + return lcfc; +} + +@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME) +public DefaultDestinationTopicResolver ddtr(ApplicationContext applicationContext, + @Qualifier(RetryTopicInternalBeanNames + .INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) { + DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(clock, applicationContext); + ddtr.addNotRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldSkipBothRetriesException.class); + return ddtr; +} + +---- +==== + +In this example: + +* `ShouldRetryOnlyBlockingException.class` would retry only via blocking and, if all retries fail, would go straight to the DLT. +* `ShouldRetryViaBothException.class` would retry via blocking, and if all blocking retries fail would be forwarded to the next retry topic for another set of attempts. +* `ShouldSkipBothRetriesException.class` would never be retried in any way and would go straight to the DLT if the first processing attempt failed. + +IMPORTANT: Note that the blocking retries behavior is allowlist - you add the exceptions you do want to retry that way; while the non-blocking retries classification is geared towards FATAL exceptions and as such is denylist - you add the exceptions you don't want to do non-blocking retries, but to send directly to the DLT instead. + +IMPORTANT: The non-blocking exception classification behavior also depends on the specific topic's configuration. + ==== Topic Naming Retry topics and DLT are named by suffixing the main topic with a provided or default value, appended by either the delay or index for that topic. @@ -746,6 +820,7 @@ public RetryTopicConfigurer retryTopicConfigurer(DestinationTopicProcessor desti return retryTopicConfigurer; } ---- +==== [[change-kboe-logging-level]] ==== Changing KafkaBackOffException Logging Level diff --git a/spring-kafka-docs/src/main/asciidoc/whats-new.adoc b/spring-kafka-docs/src/main/asciidoc/whats-new.adoc index cc5efc7b79..988d72e6ba 100644 --- a/spring-kafka-docs/src/main/asciidoc/whats-new.adoc +++ b/spring-kafka-docs/src/main/asciidoc/whats-new.adoc @@ -88,5 +88,8 @@ See <> for more information. There's now a manageable global list of fatal exceptions that will make the failed record go straight to the DLT. Refer to <> to see how to manage it. +You can now use blocking and non-blocking retries in conjunction. +See <> for more information. + The KafkaBackOffException thrown when using the retryable topics feature is now logged at DEBUG level. -See <> if you need to change the logging level back to WARN or set it to any other level. +See <> if you need to change the logging level back to WARN or set it to any other level. \ No newline at end of file diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java index ba8e3a6224..02e4dd7285 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java @@ -131,7 +131,7 @@ public void handleRemaining(Exception thrownException, List Consumer consumer, MessageListenerContainer container) { SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(), // NOSONAR - getRecoveryStrategy(records, thrownException), this.logger, getLogLevel()); + getRecoveryStrategy(records, consumer, thrownException), this.logger, getLogLevel()); } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java index 4b5626794a..723b4949bf 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java @@ -22,6 +22,7 @@ import org.springframework.classify.BinaryExceptionClassifier; import org.springframework.kafka.support.converter.ConversionException; import org.springframework.kafka.support.serializer.DeserializationException; +import org.springframework.lang.Nullable; import org.springframework.messaging.converter.MessageConversionException; import org.springframework.messaging.handler.invocation.MethodArgumentResolutionException; import org.springframework.util.Assert; @@ -180,12 +181,14 @@ public boolean removeNotRetryableException(Class exceptionT * * All others will be retried, unless {@link #defaultFalse()} has been called. * @param exceptionType the exception type. - * @return true if the removal was successful. + * @return the classification of the exception if removal was successful; + * null otherwise. * @since 2.8.4 * @see #addNotRetryableExceptions(Class...) * @see #setClassifications(Map, boolean) */ - public boolean removeClassification(Class exceptionType) { + @Nullable + public Boolean removeClassification(Class exceptionType) { return this.classifier.getClassified().remove(exceptionType); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java index 276e1c649b..4f3db37ab7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java @@ -22,6 +22,7 @@ import java.util.function.BiPredicate; import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.core.log.LogAccessor; @@ -126,12 +127,26 @@ public int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) { * @since 2.7 */ protected RecoveryStrategy getRecoveryStrategy(List> records, Exception thrownException) { + return getRecoveryStrategy(records, null, thrownException); + } + + /** + * Return a {@link RecoveryStrategy} to call to determine whether the first record in the + * list should be skipped. + * @param records the records. + * @param recoveryConsumer the consumer. + * @param thrownException the exception. + * @return the {@link RecoveryStrategy}. + * @since 2.8.4 + */ + protected RecoveryStrategy getRecoveryStrategy(List> records, + @Nullable Consumer recoveryConsumer, Exception thrownException) { if (getClassifier().classify(thrownException)) { return this.failureTracker::recovered; } else { try { - this.failureTracker.getRecoverer().accept(records.get(0), thrownException); + this.failureTracker.getRecoverer().accept(records.get(0), recoveryConsumer, thrownException); this.failureTracker.getRetryListeners().forEach(rl -> rl.recovered(records.get(0), thrownException)); } catch (Exception ex) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java index 4ef6e55488..0aaa60c2d5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -240,7 +240,7 @@ void clearThreadState() { this.failures.remove(); } - BiConsumer, Exception> getRecoverer() { + ConsumerAwareRecordRecoverer getRecoverer() { return this.recoverer; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java index 495b0bed12..b1793b2492 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java @@ -17,6 +17,7 @@ package org.springframework.kafka.retrytopic; import java.time.Clock; +import java.util.Arrays; import java.util.Comparator; import java.util.HashSet; import java.util.List; @@ -42,7 +43,7 @@ import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter; import org.springframework.kafka.support.TopicPartitionOffset; import org.springframework.util.Assert; -import org.springframework.util.backoff.FixedBackOff; +import org.springframework.util.backoff.BackOff; /** * @@ -81,6 +82,10 @@ public class ListenerContainerFactoryConfigurer { private static final long LOWEST_BACKOFF_THRESHOLD = 1500L; + private BackOff providedBlockingBackOff = null; + + private Class[] blockingExceptionTypes = null; + private Consumer> containerCustomizer = container -> { }; @@ -158,6 +163,42 @@ public KafkaListenerContainerFactory decorateFactoryWithoutSettingContainerPr return new RetryTopicListenerContainerFactoryDecorator(factory, configuration, false); } + /** + * Set a {@link BackOff} to be used with blocking retries. + * If the BackOff execution returns STOP, the record will be forwarded + * to the next retry topic or to the DLT, depending on how the non-blocking retries + * are configured. + * @param blockingBackOff the BackOff policy to be used by blocking retries. + * @since 2.8.4 + * @see DefaultErrorHandler + */ + public void setBlockingRetriesBackOff(BackOff blockingBackOff) { + Assert.notNull(blockingBackOff, "The provided BackOff cannot be null"); + Assert.state(this.providedBlockingBackOff == null, () -> + "Blocking retries back off has already been set. Current: " + + this.providedBlockingBackOff + + " You provided: " + blockingBackOff); + this.providedBlockingBackOff = blockingBackOff; + } + + /** + * Specify the exceptions to be retried via blocking. + * @param exceptionTypes the exceptions that should be retried. + * @since 2.8.4 + * @see DefaultErrorHandler + */ + @SafeVarargs + @SuppressWarnings("varargs") + public final void setBlockingRetryableExceptions(Class... exceptionTypes) { + Assert.notNull(exceptionTypes, "The exception types cannot be null"); + Assert.noNullElements(exceptionTypes, "The exception types cannot have null elements"); + Assert.state(this.blockingExceptionTypes == null, + () -> "Blocking retryable exceptions have already been set." + + "Current ones: " + Arrays.toString(this.blockingExceptionTypes) + + " You provided: " + Arrays.toString(exceptionTypes)); + this.blockingExceptionTypes = exceptionTypes; + } + private ConcurrentKafkaListenerContainerFactory doConfigure( ConcurrentKafkaListenerContainerFactory containerFactory, Configuration configuration, boolean isSetContainerProperties) { @@ -193,14 +234,23 @@ public void setErrorHandlerCustomizer(Consumer errorHandlerC protected CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer, Configuration configuration) { - DefaultErrorHandler errorHandler = new DefaultErrorHandler(deadLetterPublishingRecoverer, - new FixedBackOff(0, 0)); + DefaultErrorHandler errorHandler = createDefaultErrorHandlerInstance(deadLetterPublishingRecoverer); + errorHandler.defaultFalse(); errorHandler.setCommitRecovered(true); errorHandler.setLogLevel(KafkaException.Level.DEBUG); + if (this.blockingExceptionTypes != null) { + errorHandler.addRetryableExceptions(this.blockingExceptionTypes); + } this.errorHandlerCustomizer.accept(errorHandler); return errorHandler; } + protected DefaultErrorHandler createDefaultErrorHandlerInstance(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) { + return this.providedBlockingBackOff != null + ? new DefaultErrorHandler(deadLetterPublishingRecoverer, this.providedBlockingBackOff) + : new DefaultErrorHandler(deadLetterPublishingRecoverer); + } + protected void setupBackoffAwareMessageListenerAdapter(ConcurrentMessageListenerContainer container, Configuration configuration, boolean isSetContainerProperties) { AcknowledgingConsumerAwareMessageListener listener = checkAndCast(container.getContainerProperties() diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java index e5a6606dad..48f24c19a2 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java @@ -17,12 +17,14 @@ package org.springframework.kafka.retrytopic; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.then; import static org.mockito.BDDMockito.willReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -59,13 +61,18 @@ import org.springframework.kafka.listener.adapter.AbstractDelegatingMessageListenerAdapter; import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter; import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.support.converter.ConversionException; +import org.springframework.kafka.support.serializer.DeserializationException; +import org.springframework.util.backoff.BackOff; +import org.springframework.util.backoff.BackOffExecution; +import org.springframework.util.backoff.FixedBackOff; /** * @author Tomaz Fernandes * @since 2.7 */ @ExtendWith(MockitoExtension.class) -@SuppressWarnings({"unchecked", "rawtypes"}) +@SuppressWarnings({"unchecked", "rawtypes", "deprecation"}) class ListenerContainerFactoryConfigurerTests { @Mock @@ -404,10 +411,63 @@ void shouldDecorateFactory() { .createContext(anyLong(), listenerIdCaptor.capture(), any(TopicPartition.class), eq(consumer)); assertThat(listenerIdCaptor.getValue()).isEqualTo(testListenerId); then(listener).should(times(1)).onMessage(data, ack, consumer); - then(this.configurerContainerCustomizer).should(times(1)).accept(container); } + @Test + void shouldUseGivenBackOffAndExceptions() { + + // given + given(container.getContainerProperties()).willReturn(containerProperties); + given(deadLetterPublishingRecovererFactory.create()).willReturn(recoverer); + given(containerProperties.getMessageListener()).willReturn(listener); + given(configuration.forContainerFactoryConfigurer()).willReturn(lcfcConfiguration); + willReturn(container).given(containerFactory).createListenerContainer(endpoint); + BackOff backOffMock = mock(BackOff.class); + BackOffExecution backOffExecutionMock = mock(BackOffExecution.class); + given(backOffMock.start()).willReturn(backOffExecutionMock); + + ListenerContainerFactoryConfigurer configurer = + new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, + deadLetterPublishingRecovererFactory, clock); + configurer.setBlockingRetriesBackOff(backOffMock); + configurer.setBlockingRetryableExceptions(IllegalArgumentException.class, IllegalStateException.class); + + // when + KafkaListenerContainerFactory decoratedFactory = + configurer.decorateFactory(this.containerFactory, configuration.forContainerFactoryConfigurer()); + decoratedFactory.createListenerContainer(endpoint); + + // then + then(backOffMock).should().start(); + then(container).should().setCommonErrorHandler(errorHandlerCaptor.capture()); + CommonErrorHandler errorHandler = errorHandlerCaptor.getValue(); + assertThat(DefaultErrorHandler.class.isAssignableFrom(errorHandler.getClass())).isTrue(); + DefaultErrorHandler defaultErrorHandler = (DefaultErrorHandler) errorHandler; + assertThat(defaultErrorHandler.removeClassification(IllegalArgumentException.class)).isTrue(); + assertThat(defaultErrorHandler.removeClassification(IllegalStateException.class)).isTrue(); + assertThat(defaultErrorHandler.removeClassification(ConversionException.class)).isNull(); + + } + + + @Test + void shouldThrowIfBackOffOrRetryablesAlreadySet() { + // given + BackOff backOff = new FixedBackOff(); + ListenerContainerFactoryConfigurer configurer = + new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, + deadLetterPublishingRecovererFactory, clock); + configurer.setBlockingRetriesBackOff(backOff); + configurer.setBlockingRetryableExceptions(IllegalArgumentException.class, IllegalStateException.class); + + // when / then + assertThatThrownBy(() -> configurer.setBlockingRetriesBackOff(backOff)).isInstanceOf(IllegalStateException.class); + assertThatThrownBy(() -> configurer.setBlockingRetryableExceptions(ConversionException.class, DeserializationException.class)) + .isInstanceOf(IllegalStateException.class); + } + + @Test void shouldCacheFactoryInstances() { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java new file mode 100644 index 0000000000..0c190b71b9 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java @@ -0,0 +1,498 @@ +/* + * Copyright 2021-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.retrytopic; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +import java.time.Clock; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.DltHandler; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.annotation.RetryableTopic; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaAdmin; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.KafkaConsumerBackoffManager; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.converter.ConversionException; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.retry.annotation.Backoff; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.springframework.util.backoff.FixedBackOff; + + +/** + * @author Tomaz Fernandes + * @since 2.8.4 + */ +@SpringJUnitConfig +@DirtiesContext +@EmbeddedKafka +public class RetryTopicExceptionRoutingIntegrationTests { + + private static final Logger logger = LoggerFactory.getLogger(RetryTopicExceptionRoutingIntegrationTests.class); + + public final static String BLOCKING_AND_TOPIC_RETRY = "blocking-and-topic-retry"; + public final static String ONLY_RETRY_VIA_BLOCKING = "only-retry-blocking-topic"; + public final static String ONLY_RETRY_VIA_TOPIC = "only-retry-topic"; + public final static String USER_FATAL_EXCEPTION_TOPIC = "user-fatal-topic"; + public final static String FRAMEWORK_FATAL_EXCEPTION_TOPIC = "framework-fatal-topic"; + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Autowired + private CountDownLatchContainer latchContainer; + + @Test + void shouldRetryViaBlockingAndTopics() { + logger.debug("Sending message to topic " + BLOCKING_AND_TOPIC_RETRY); + kafkaTemplate.send(BLOCKING_AND_TOPIC_RETRY, "Test message to " + BLOCKING_AND_TOPIC_RETRY); + assertThat(awaitLatch(latchContainer.blockingAndTopicsLatch)).isTrue(); + assertThat(awaitLatch(latchContainer.dltProcessorLatch)).isTrue(); + } + + @Test + void shouldRetryOnlyViaBlocking() { + logger.debug("Sending message to topic " + ONLY_RETRY_VIA_BLOCKING); + kafkaTemplate.send(ONLY_RETRY_VIA_BLOCKING, "Test message to "); + assertThat(awaitLatch(latchContainer.onlyRetryViaBlockingLatch)).isTrue(); + assertThat(awaitLatch(latchContainer.annotatedDltOnlyBlockingLatch)).isTrue(); + } + + @Test + void shouldRetryOnlyViaTopic() { + logger.debug("Sending message to topic " + ONLY_RETRY_VIA_TOPIC); + kafkaTemplate.send(ONLY_RETRY_VIA_TOPIC, "Test message to " + ONLY_RETRY_VIA_TOPIC); + assertThat(awaitLatch(latchContainer.onlyRetryViaTopicLatch)).isTrue(); + assertThat(awaitLatch(latchContainer.dltProcessorWithErrorLatch)).isTrue(); + } + + @Test + public void shouldGoStraightToDltIfUserProvidedFatal() { + logger.debug("Sending message to topic " + USER_FATAL_EXCEPTION_TOPIC); + kafkaTemplate.send(USER_FATAL_EXCEPTION_TOPIC, "Test message to " + USER_FATAL_EXCEPTION_TOPIC); + assertThat(awaitLatch(latchContainer.fatalUserLatch)).isTrue(); + assertThat(awaitLatch(latchContainer.annotatedDltUserFatalLatch)).isTrue(); + } + + @Test + public void shouldGoStraightToDltIfFrameworkProvidedFatal() { + logger.debug("Sending message to topic " + FRAMEWORK_FATAL_EXCEPTION_TOPIC); + kafkaTemplate.send(FRAMEWORK_FATAL_EXCEPTION_TOPIC, "Testing topic with annotation 1"); + assertThat(awaitLatch(latchContainer.fatalFrameworkLatch)).isTrue(); + assertThat(awaitLatch(latchContainer.annotatedDltFrameworkFatalLatch)).isTrue(); + } + + private static void countdownIfCorrectInvocations(AtomicInteger invocations, int expected, CountDownLatch latch) { + int actual = invocations.get(); + if (actual == expected) { + latch.countDown(); + } + else { + logger.error("Wrong number of Listener invocations: expected {} actual {}", expected, actual); + } + } + + private boolean awaitLatch(CountDownLatch latch) { + try { + return latch.await(30, TimeUnit.SECONDS); + } + catch (Exception e) { + fail(e.getMessage()); + throw new RuntimeException(e); + } + } + + static class BlockingAndTopicRetriesListener { + + @Autowired + CountDownLatchContainer container; + + @KafkaListener(id = "firstTopicId", topics = BLOCKING_AND_TOPIC_RETRY) + public void listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + logger.debug("Message {} received in topic {}", message, receivedTopic); + container.blockingAndTopicsLatch.countDown(); + container.blockingAndTopicsListenerInvocations.incrementAndGet(); + throw new ShouldRetryViaBothException("Woooops... in topic " + receivedTopic); + } + } + + static class DltProcessor { + + @Autowired + CountDownLatchContainer container; + + public void processDltMessage(Object message) { + countdownIfCorrectInvocations(container.blockingAndTopicsListenerInvocations, 12, + container.dltProcessorLatch); + } + } + + static class OnlyRetryViaTopicListener { + + @Autowired + CountDownLatchContainer container; + + @KafkaListener(topics = ONLY_RETRY_VIA_TOPIC) + public void listenAgain(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + logger.debug("Message {} received in topic {} ", message, receivedTopic); + container.onlyRetryViaTopicLatch.countDown(); + container.onlyRetryViaTopicListenerInvocations.incrementAndGet(); + throw new ShouldRetryOnlyByTopicException("Another woooops... " + receivedTopic); + } + } + + static class DltProcessorWithError { + + @Autowired + CountDownLatchContainer container; + + public void processDltMessage(Object message) { + countdownIfCorrectInvocations(container.onlyRetryViaTopicListenerInvocations, + 3, container.dltProcessorWithErrorLatch); + throw new RuntimeException("Dlt Error!"); + } + } + + static class OnlyRetryBlockingListener { + + @Autowired + CountDownLatchContainer container; + + @RetryableTopic(exclude = ShouldRetryOnlyBlockingException.class, traversingCauses = "true", + backoff = @Backoff(50), kafkaTemplate = "kafkaTemplate") + @KafkaListener(topics = ONLY_RETRY_VIA_BLOCKING) + public void listenWithAnnotation(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + container.onlyRetryViaBlockingLatch.countDown(); + container.onlyRetryViaBlockingListenerInvocations.incrementAndGet(); + logger.debug("Message {} received in topic {} ", message, receivedTopic); + throw new ShouldRetryOnlyBlockingException("User provided fatal exception!" + receivedTopic); + } + + @DltHandler + public void annotatedDltMethod(Object message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + logger.debug("Received message in Dlt method " + receivedTopic); + countdownIfCorrectInvocations(container.onlyRetryViaBlockingListenerInvocations, 4, + container.annotatedDltOnlyBlockingLatch); + } + } + + static class UserFatalTopicListener { + + @Autowired + CountDownLatchContainer container; + + @RetryableTopic(backoff = @Backoff(50), kafkaTemplate = "kafkaTemplate") + @KafkaListener(topics = USER_FATAL_EXCEPTION_TOPIC) + public void listenWithAnnotation(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + container.fatalUserLatch.countDown(); + container.userFatalListenerInvocations.incrementAndGet(); + logger.debug("Message {} received in topic {} ", message, receivedTopic); + throw new ShouldSkipBothRetriesException("User provided fatal exception!" + receivedTopic); + } + + @DltHandler + public void annotatedDltMethod(Object message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + logger.debug("Received message in Dlt method " + receivedTopic); + countdownIfCorrectInvocations(container.userFatalListenerInvocations, 1, + container.annotatedDltUserFatalLatch); + } + } + + static class FrameworkFatalTopicListener { + + @Autowired + CountDownLatchContainer container; + + @RetryableTopic(fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC, backoff = @Backoff(50)) + @KafkaListener(topics = FRAMEWORK_FATAL_EXCEPTION_TOPIC) + public void listenWithAnnotation(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + container.fatalFrameworkLatch.countDown(); + container.fatalFrameworkListenerInvocations.incrementAndGet(); + logger.debug("Message {} received in second annotated topic {} ", message, receivedTopic); + throw new ConversionException("Woooops... in topic " + receivedTopic, new RuntimeException("Test RTE")); + } + + @DltHandler + public void annotatedDltMethod(Object message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + logger.debug("Received message in annotated Dlt method!"); + countdownIfCorrectInvocations(container.fatalFrameworkListenerInvocations, 1, + container.annotatedDltFrameworkFatalLatch); + throw new ConversionException("Woooops... in topic " + receivedTopic, new RuntimeException("Test RTE")); + } + } + + static class CountDownLatchContainer { + + CountDownLatch blockingAndTopicsLatch = new CountDownLatch(12); + CountDownLatch onlyRetryViaBlockingLatch = new CountDownLatch(4); + CountDownLatch onlyRetryViaTopicLatch = new CountDownLatch(3); + CountDownLatch fatalUserLatch = new CountDownLatch(1); + CountDownLatch fatalFrameworkLatch = new CountDownLatch(1); + CountDownLatch annotatedDltOnlyBlockingLatch = new CountDownLatch(1); + CountDownLatch annotatedDltUserFatalLatch = new CountDownLatch(1); + CountDownLatch annotatedDltFrameworkFatalLatch = new CountDownLatch(1); + CountDownLatch dltProcessorLatch = new CountDownLatch(1); + CountDownLatch dltProcessorWithErrorLatch = new CountDownLatch(1); + + AtomicInteger blockingAndTopicsListenerInvocations = new AtomicInteger(); + AtomicInteger onlyRetryViaTopicListenerInvocations = new AtomicInteger(); + AtomicInteger onlyRetryViaBlockingListenerInvocations = new AtomicInteger(); + AtomicInteger userFatalListenerInvocations = new AtomicInteger(); + AtomicInteger fatalFrameworkListenerInvocations = new AtomicInteger(); + + } + + @SuppressWarnings("serial") + public static class ShouldRetryOnlyByTopicException extends RuntimeException { + public ShouldRetryOnlyByTopicException(String msg) { + super(msg); + } + } + + @SuppressWarnings("serial") + public static class ShouldSkipBothRetriesException extends RuntimeException { + public ShouldSkipBothRetriesException(String msg) { + super(msg); + } + } + + @SuppressWarnings("serial") + public static class ShouldRetryOnlyBlockingException extends RuntimeException { + public ShouldRetryOnlyBlockingException(String msg) { + super(msg); + } + } + + @SuppressWarnings("serial") + public static class ShouldRetryViaBothException extends RuntimeException { + public ShouldRetryViaBothException(String msg) { + super(msg); + } + } + + @Configuration + static class RetryTopicConfigurations { + + private static final String DLT_METHOD_NAME = "processDltMessage"; + + @Bean + public RetryTopicConfiguration blockingAndTopic(KafkaTemplate template) { + return RetryTopicConfigurationBuilder + .newInstance() + .fixedBackOff(50) + .includeTopic(BLOCKING_AND_TOPIC_RETRY) + .dltHandlerMethod("dltProcessor", DLT_METHOD_NAME) + .create(template); + } + + @Bean + public RetryTopicConfiguration onlyTopic(KafkaTemplate template) { + return RetryTopicConfigurationBuilder + .newInstance() + .fixedBackOff(50) + .includeTopic(ONLY_RETRY_VIA_TOPIC) + .useSingleTopicForFixedDelays() + .doNotRetryOnDltFailure() + .dltHandlerMethod("dltProcessorWithError", DLT_METHOD_NAME) + .create(template); + } + + @Bean + public BlockingAndTopicRetriesListener blockingAndTopicRetriesListener() { + return new BlockingAndTopicRetriesListener(); + } + + @Bean + public OnlyRetryViaTopicListener onlyRetryViaTopicListener() { + return new OnlyRetryViaTopicListener(); + } + + @Bean + public UserFatalTopicListener userFatalTopicListener() { + return new UserFatalTopicListener(); + } + + @Bean + public OnlyRetryBlockingListener onlyRetryBlockingListener() { + return new OnlyRetryBlockingListener(); + } + + @Bean + public FrameworkFatalTopicListener frameworkFatalTopicListener() { + return new FrameworkFatalTopicListener(); + } + + @Bean + CountDownLatchContainer latchContainer() { + return new CountDownLatchContainer(); + } + + @Bean + DltProcessor dltProcessor() { + return new DltProcessor(); + } + + @Bean + DltProcessorWithError dltProcessorWithError() { + return new DltProcessorWithError(); + } + + @Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME) + public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager, + DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory, + @Qualifier(RetryTopicInternalBeanNames + .INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) { + ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock); + + lcfc.setBlockingRetriesBackOff(new FixedBackOff(50, 3)); + lcfc.setBlockingRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class); + return lcfc; + } + + @Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME) + public DefaultDestinationTopicResolver ddtr(ApplicationContext applicationContext, + @Qualifier(RetryTopicInternalBeanNames + .INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) { + DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(clock, applicationContext); + ddtr.addNotRetryableExceptions(ShouldSkipBothRetriesException.class); + return ddtr; + } + + } + + @Configuration + public static class KafkaProducerConfig { + + @Autowired + EmbeddedKafkaBroker broker; + + @Bean + public ProducerFactory producerFactory() { + Map configProps = new HashMap<>(); + configProps.put( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + this.broker.getBrokersAsString()); + configProps.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + StringSerializer.class); + configProps.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + StringSerializer.class); + return new DefaultKafkaProducerFactory<>(configProps); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + } + + @EnableKafka + @Configuration + public static class KafkaConsumerConfig { + + @Autowired + EmbeddedKafkaBroker broker; + + @Bean + public KafkaAdmin kafkaAdmin() { + Map configs = new HashMap<>(); + configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.broker.getBrokersAsString()); + return new KafkaAdmin(configs); + } + + @Bean + public ConsumerFactory consumerFactory() { + Map props = new HashMap<>(); + props.put( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + this.broker.getBrokersAsString()); + props.put( + ConsumerConfig.GROUP_ID_CONFIG, + "groupId"); + props.put( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class); + props.put( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class); + props.put( + ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + return new DefaultKafkaConsumerFactory<>(props); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory retryTopicListenerContainerFactory( + ConsumerFactory consumerFactory) { + + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + ContainerProperties props = factory.getContainerProperties(); + props.setIdleEventInterval(100L); + props.setPollTimeout(50L); + props.setIdlePartitionEventInterval(100L); + factory.setConsumerFactory(consumerFactory); + factory.setConcurrency(1); + factory.setContainerCustomizer( + container -> container.getContainerProperties().setIdlePartitionEventInterval(100L)); + return factory; + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + ConsumerFactory consumerFactory) { + + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + factory.setConcurrency(1); + return factory; + } + + } +} diff --git a/spring-kafka/src/test/resources/log4j2-test.xml b/spring-kafka/src/test/resources/log4j2-test.xml index 9f33afab77..01973cc185 100644 --- a/spring-kafka/src/test/resources/log4j2-test.xml +++ b/spring-kafka/src/test/resources/log4j2-test.xml @@ -8,6 +8,7 @@ +