Version 2.9 changed the mechanism to bootstrap infrastructure beans; see Configuration for the two mechanisms that are now required to bootstrap the feature.
Achieving non-blocking retry / dlt functionality with Kafka usually requires setting up extra topics and creating and configuring the corresponding listeners.
Since 2.7 Spring for Apache Kafka offers support for that via the @RetryableTopic
annotation and RetryTopicConfiguration
class to simplify that bootstrapping.
If message processing fails, the message is forwarded to a retry topic with a back off timestamp. The retry topic consumer then checks the timestamp and if it’s not due it pauses the consumption for that topic’s partition. When it is due the partition consumption is resumed, and the message is consumed again. If the message processing fails again the message will be forwarded to the next retry topic, and the pattern is repeated until a successful processing occurs, or the attempts are exhausted, and the message is sent to the Dead Letter Topic (if configured).
To illustrate, if you have a "main-topic" topic, and want to setup non-blocking retry with an exponential backoff of 1000ms with a multiplier of 2 and 4 max attempts, it will create the main-topic-retry-1000, main-topic-retry-2000, main-topic-retry-4000 and main-topic-dlt topics and configure the respective consumers. The framework also takes care of creating the topics and setting up and configuring the listeners.
Important
|
By using this strategy you lose Kafka’s ordering guarantees for that topic. |
Important
|
You can set the AckMode mode you prefer, but RECORD is suggested.
|
Important
|
At this time this functionality doesn’t support class level @KafkaListener annotations
|
All message processing and backing off is handled by the consumer thread, and, as such, delay precision is guaranteed on a best-effort basis. If one message’s processing takes longer than the next message’s back off period for that consumer, the next message’s delay will be higher than expected. Also, for short delays (about 1s or less), the maintenance work the thread has to do, such as committing offsets, may delay the message processing execution. The precision can also be affected if the retry topic’s consumer is handling more than one partition, because we rely on waking up the consumer from polling and having full pollTimeouts to make timing adjustments.
That being said, for consumers handling a single partition the message’s processing should occur approximately at its exact due time for most situations.
Important
|
It is guaranteed that a message will never be processed before its due time. |
Starting with version 2.9, for default configuration, the @EnableKafkaRetryTopic
annotation should be used in a @Configuration
annotated class.
This enables the feature to bootstrap properly and gives access to injecting some of the feature’s components to be looked up at runtime.
Note
|
It is not necessary to also add @EnableKafka , if you add this annotation, because @EnableKafkaRetryTopic is meta-annotated with @EnableKafka .
|
Also, starting with that version, for more advanced configuration of the feature’s components and global features, the RetryTopicConfigurationSupport
class should be extended in a @Configuration
class, and the appropriate methods overridden.
For more details refer to Configuring Global Settings and Features.
By default, the containers for the retry topics will have the same concurrency as the main container.
Starting with version 3.0, you can set a different concurrency
for the retry containers (either on the annotation, or in RetryConfigurationBuilder
).
Important
|
Only one of the above techniques can be used, and only one @Configuration class can extend RetryTopicConfigurationSupport .
|
To configure the retry topic and dlt for a @KafkaListener
annotated method, you just have to add the @RetryableTopic
annotation to it and Spring for Apache Kafka will bootstrap all the necessary topics and consumers with the default configurations.
@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
// ... message processing
}
You can specify a method in the same class to process the dlt messages by annotating it with the @DltHandler
annotation.
If no DltHandler method is provided a default consumer is created which only logs the consumption.
@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
Note
|
If you don’t specify a kafkaTemplate name a bean with name defaultRetryTopicKafkaTemplate will be looked up.
If no bean is found an exception is thrown.
|
Starting with version 3.0, the @RetryableTopic
annotation can be used as a meta-annotation on custom annotations; for example:
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@RetryableTopic
static @interface MetaAnnotatedRetryableTopic {
@AliasFor(attribute = "concurrency", annotation = RetryableTopic.class)
String parallelism() default "3";
}
You can also configure the non-blocking retry support by creating RetryTopicConfiguration
beans in a @Configuration
annotated class.
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.create(template);
}
This will create retry topics and a dlt, as well as the corresponding consumers, for all topics in methods annotated with '@KafkaListener' using the default configurations. The KafkaTemplate
instance is required for message forwarding.
To achieve more fine-grained control over how to handle non-blocking retrials for each topic, more than one RetryTopicConfiguration
bean can be provided.
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(3000)
.maxAttempts(5)
.concurrency(1)
.includeTopics("my-topic", "my-other-topic")
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1000, 2, 5000)
.maxAttempts(4)
.excludeTopics("my-topic", "my-other-topic")
.retryOn(MyException.class)
.create(template);
}
Note
|
The retry topics' and dlt’s consumers will be assigned to a consumer group with a group id that is the combination of the one with you provide in the groupId parameter of the @KafkaListener annotation with the topic’s suffix.
If you don’t provide any they’ll all belong to the same group, and rebalance on a retry topic will cause an unnecessary rebalance on the main topic.
|
Important
|
If the consumer is configured with an ErrorHandlingDeserializer , to handle deserilialization exceptions, it is important to configure the KafkaTemplate and its producer with a serializer that can handle normal objects as well as raw byte[] values, which result from deserialization exceptions.
The generic value type of the template should be Object .
One technique is to use the DelegatingByTypeSerializer ; an example follows:
|
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
Important
|
Multiple @KafkaListener annotations can be used for the same topic with or without manual partition assignment along with non-blocking retries, but only one configuration will be used for a given topic.
It’s best to use a single RetryTopicConfiguration bean for configuration of such topics; if multiple @RetryableTopic annotations are being used for the same topic, all of them should have the same values, otherwise one of them will be applied to all of that topic’s listeners and the other annotations' values will be ignored.
|
Since 2.9, the previous bean overriding approach for configuring components has been removed (without deprecation, due to the aforementioned experimental nature of the API).
This does not change the RetryTopicConfiguration
beans approach - only infrastructure components' configurations.
Now the RetryTopicConfigurationSupport
class should be extended in a (single) @Configuration
class, and the proper methods overridden.
An example follows:
@EnableKafka
@Configuration
public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {
@Override
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
blockingRetries
.retryOn(MyBlockingRetriesException.class, MyOtherBlockingRetriesException.class)
.backOff(new FixedBackOff(3000, 3));
}
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
// Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
customizersConfigurer.customizeErrorHandler(eh -> {
eh.setSeekAfterError(false);
});
}
}
Important
|
When using this configuration approach, the @EnableKafkaRetryTopic annotation should not be used to prevent context failing to start due to duplicated beans.
Use the simple @EnableKafka annotation instead.
|
When autoCreateTopics
is true, the main and retry topics will be created with the specified number of partitions and replication factor.
Starting with version 3.0, the default replication factor is -1
, meaning use the broker default.
If your broker version is earlier than 2.4, you will need to set an explicit value.
To override these values for a particular topic (e.g. the main topic or DLT), simply add a NewTopic
@Bean
with the required properties; that will override the auto creation properties.
Important
|
By default, records are published to the retry topic(s) using the original partition of the received record. If the retry topics have fewer partitions than the main topic, you should configure the framework appropriately; an example follows. |
@EnableKafka
@Configuration
public class Config extends RetryTopicConfigurationSupport {
@Override
protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory() {
return dlprf -> dlprf.setPartitionResolver((cr, nextTopic) -> null);
}
...
}
The parameters to the function are the consumer record and the name of the next topic.
You can return a specific partition number, or null
to indicate that the KafkaProducer
should determine the partition.
By default, all values of retry headers (number of attempts, timestamps) are retained when a record transitions through the retry topics.
Starting with version 2.9.6, if you want to retain just the last value of these headers, use the configureDeadLetterPublishingContainerFactory()
method shown above to set the factory’s retainAllRetryHeaderValues
property to false
.
The feature is designed to be used with @KafkaListener
; however, several users have requested information on how to configure non-blocking retries programmatically.
The following Spring Boot application provides an example of how to do so.
@SpringBootApplication
public class Application extends RetryTopicConfigurationSupport {
public static void main(String[] args) {
SpringApplication.run(2Application.class, args);
}
@Bean
RetryTopicConfiguration retryConfig(KafkaTemplate<String, String> template) {
return RetryTopicConfigurationBuilder.newInstance()
.maxAttempts(4)
.autoCreateTopicsWith(2, (short) 1)
.create(template);
}
@Bean
TaskScheduler scheduler() {
return new ThreadPoolTaskScheduler();
}
@Bean
@Order(0)
SmartInitializingSingleton dynamicRetry(RetryTopicConfigurer configurer, RetryTopicConfiguration config,
KafkaListenerAnnotationBeanPostProcessor<?, ?> bpp, KafkaListenerContainerFactory<?> factory,
Listener listener, KafkaListenerEndpointRegistry registry) {
return () -> {
KafkaListenerEndpointRegistrar registrar = bpp.getEndpointRegistrar();
MethodKafkaListenerEndpoint<String, String> mainEndpoint = new MethodKafkaListenerEndpoint<>();
EndpointProcessor endpointProcessor = endpoint -> {
// customize as needed (e.g. apply attributes to retry endpoints).
if (!endpoint.equals(mainEndpoint)) {
endpoint.setConcurrency(1);
}
// these are required
endpoint.setMessageHandlerMethodFactory(bpp.getMessageHandlerMethodFactory());
endpoint.setTopics("topic");
endpoint.setId("id");
endpoint.setGroupId("group");
};
mainEndpoint.setBean(listener);
try {
mainEndpoint.setMethod(Listener.class.getDeclaredMethod("onMessage", ConsumerRecord.class));
}
catch (NoSuchMethodException | SecurityException ex) {
throw new IllegalStateException(ex);
}
mainEndpoint.setConcurrency(2);
mainEndpoint.setTopics("topic");
mainEndpoint.setId("id");
mainEndpoint.setGroupId("group");
configurer.processMainAndRetryListeners(endpointProcessor, mainEndpoint, config, registrar, factory,
"kafkaListenerContainerFactory");
};
}
@Bean
ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("topic", "test");
};
}
}
@Component
class Listener implements MessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println(KafkaUtils.format(record));
throw new RuntimeException("test");
}
}
Important
|
Auto creation of topics will only occur if the configuration is processed before the application context is refreshed, as in the above example. To configure containers at runtime, the topics will need to be created using some other technique. |
Most of the features are available both for the @RetryableTopic
annotation and the RetryTopicConfiguration
beans.
The BackOff configuration relies on the BackOffPolicy
interface from the Spring Retry
project.
It includes:
-
Fixed Back Off
-
Exponential Back Off
-
Random Exponential Back Off
-
Uniform Random Back Off
-
No Back Off
-
Custom Back Off
@RetryableTopic(attempts = 5,
backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(3000)
.maxAttempts(4)
.create(template);
}
You can also provide a custom implementation of Spring Retry’s SleepingBackOffPolicy
interface:
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.customBackOff(new MyCustomBackOffPolicy())
.maxAttempts(5)
.create(template);
}
Note
|
The default backoff policy is FixedBackOffPolicy with a maximum of 3 attempts and 1000ms intervals.
|
Note
|
There is a 30-second default maximum delay for the ExponentialBackOffPolicy .
If your back off policy requires delays with values bigger than that, adjust the maxDelay property accordingly.
|
Important
|
The first attempt counts against maxAttempts , so if you provide a maxAttempts value of 4 there’ll be the original attempt plus 3 retries.
|
You can set the global timeout for the retrying process. If that time is reached, the next time the consumer throws an exception the message goes straight to the DLT, or just ends the processing if no DLT is available.
@RetryableTopic(backoff = @Backoff(2000), timeout = 5000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(2000)
.timeoutAfter(5000)
.create(template);
}
Note
|
The default is having no timeout set, which can also be achieved by providing -1 as the timout value. |
You can specify which exceptions you want to retry on and which not to. You can also set it to traverse the causes to lookup nested exceptions.
@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = true)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
throw new RuntimeException(new MyRetryException()); // Will retry
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.notRetryOn(MyDontRetryException.class)
.create(template);
}
Note
|
The default behavior is retrying on all exceptions and not traversing causes. |
Since 2.8.3 there’s a global list of fatal exceptions which will cause the record to be sent to the DLT without any retries.
See [default-eh] for the default list of fatal exceptions.
You can add or remove exceptions to and from this list by overriding the configureNonBlockingRetries
method in a @Configuration
class that extends RetryTopicConfigurationSupport
.
See Configuring Global Settings and Features for more information.
@Override
protected void manageNonBlockingRetriesFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
Note
|
To disable fatal exceptions' classification, just clear the provided list. |
You can decide which topics will and will not be handled by a RetryTopicConfiguration
bean via the .includeTopic(String topic), .includeTopics(Collection<String> topics) .excludeTopic(String topic) and .excludeTopics(Collection<String> topics) methods.
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.includeTopics(List.of("my-included-topic", "my-other-included-topic"))
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.excludeTopic("my-excluded-topic")
.create(template);
}
Note
|
The default behavior is to include all topics. |
Unless otherwise specified the framework will auto create the required topics using NewTopic
beans that are consumed by the KafkaAdmin
bean.
You can specify the number of partitions and the replication factor with which the topics will be created, and you can turn this feature off.
Starting with version 3.0, the default replication factor is -1
, meaning use the broker default.
If your broker version is earlier than 2.4, you will need to set an explicit value.
Important
|
Note that if you’re not using Spring Boot you’ll have to provide a KafkaAdmin bean in order to use this feature. |
@RetryableTopic(numPartitions = 2, replicationFactor = 3)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@RetryableTopic(autoCreateTopics = false)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.autoCreateTopicsWith(2, 3)
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.doNotAutoCreateRetryTopics()
.create(template);
}
Note
|
By default the topics are autocreated with one partition and a replication factor of -1 (meaning use the broker default). If your broker version is earlier than 2.4, you will need to set an explicit value. |
When considering how to manage failure headers (original headers and exception headers), the framework delegates to the DeadLetterPublishingRecover
to decide whether to append or replace the headers.
By default, it explicitly sets appendOriginalHeaders
to false
and leaves stripPreviousExceptionHeaders
to the default used by the DeadLetterPublishingRecover
.
This means that only the first "original" and last exception headers are retained with the default configuration. This is to avoid creation of excessively large messages (due to the stack trace header, for example) when many retry steps are involved.
See [dlpr-headers] for more information.
To reconfigure the framework to use different settings for these properties, configure a DeadLetterPublishingRecoverer
customizer by overriding the configureCustomizers
method in a @Configuration
class that extends RetryTopicConfigurationSupport
.
See Configuring Global Settings and Features for more details.
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
customizersConfigurer.customizeDeadLetterPublishingRecoverer(dlpr -> {
dlpr.setAppendOriginalHeaders(true);
dlpr.setStripPreviousExceptionHeaders(false);
});
}
Starting with version 2.8.4, if you wish to add custom headers (in addition to the retry information headers added by the factory, you can add a headersFunction
to the factory - factory.setHeadersFunction((rec, ex) → { … })
By default, any headers added will be cumulative - Kafka headers can contain multiple values.
Starting with version 2.9.5, if the Headers
returned by the function contains a header of type DeadLetterPublishingRecoverer.SingleRecordHeader
, then any existing values for that header will be removed and only the new single value will remain.
Starting in 2.8.4 you can configure the framework to use both blocking and non-blocking retries in conjunction.
For example, you can have a set of exceptions that would likely trigger errors on the next records as well, such as DatabaseAccessException
, so you can retry the same record a few times before sending it to the retry topic, or straight to the DLT.
To configure blocking retries, override the configureBlockingRetries
method in a @Configuration
class that extends RetryTopicConfigurationSupport
and add the exceptions you want to retry, along with the BackOff
to be used.
The default BackOff
is a FixedBackOff
with no delay and 9 attempts.
See Configuring Global Settings and Features for more information.
@Override
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
blockingRetries
.retryOn(MyBlockingRetryException.class, MyOtherBlockingRetryException.class)
.backOff(new FixedBackOff(3000, 5));
}
Note
|
In combination with the global retryable topic’s fatal exceptions classification, you can configure the framework for any behavior you’d like, such as having some exceptions trigger both blocking and non-blocking retries, trigger only one kind or the other, or go straight to the DLT without retries of any kind. |
Here’s an example with both configurations working together:
@Override
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
blockingRetries
.retryOn(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class)
.backOff(new FixedBackOff(50, 3));
}
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(ShouldSkipBothRetriesException.class);
}
In this example:
-
ShouldRetryOnlyBlockingException.class
would retry only via blocking and, if all retries fail, would go straight to the DLT. -
ShouldRetryViaBothException.class
would retry via blocking, and if all blocking retries fail would be forwarded to the next retry topic for another set of attempts. -
ShouldSkipBothRetriesException.class
would never be retried in any way and would go straight to the DLT if the first processing attempt failed.
Important
|
Note that the blocking retries behavior is allowlist - you add the exceptions you do want to retry that way; while the non-blocking retries classification is geared towards FATAL exceptions and as such is denylist - you add the exceptions you don’t want to do non-blocking retries, but to send directly to the DLT instead. |
Important
|
The non-blocking exception classification behavior also depends on the specific topic’s configuration. |
Retry topics and DLT are named by suffixing the main topic with a provided or default value, appended by either the delay or index for that topic.
Examples:
"my-topic" → "my-topic-retry-0", "my-topic-retry-1", …, "my-topic-dlt"
"my-other-topic" → "my-topic-myRetrySuffix-1000", "my-topic-myRetrySuffix-2000", …, "my-topic-myDltSuffix".
Note
|
The default behavior is to create separate retry topics for each attempt, appended with an index value: retry-0, retry-1, …, retry-n.
Therefore, by default the amount of retry topics is the configured maxAttempts minus 1.
|
You can configure the suffixes, choose whether to append the attempt index or delay, use a single retry topic when using fixed backoff, and use a single retry topic for the attempts with the maxInterval when using exponential backoffs.
You can specify the suffixes that will be used by the retry and dlt topics.
@RetryableTopic(retryTopicSuffix = "-my-retry-suffix", dltTopicSuffix = "-my-dlt-suffix")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.retryTopicSuffix("-my-retry-suffix")
.dltTopicSuffix("-my-dlt-suffix")
.create(template);
}
Note
|
The default suffixes are "-retry" and "-dlt", for retry topics and dlt respectively. |
You can either append the topic’s index or delay values after the suffix.
@RetryableTopic(topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.suffixTopicsWithIndexValues()
.create(template);
}
Note
|
The default behavior is to suffix with the delay values, except for fixed delay configurations with multiple topics, in which case the topics are suffixed with the topic’s index. |
If you’re using fixed delay policies such as FixedBackOffPolicy
or NoBackOffPolicy
you can use a single topic to accomplish the non-blocking retries.
This topic will be suffixed with the provided or default suffix, and will not have either the index or the delay values appended.
Note
|
FixedDelayStrategy is now deprecated, and will be replaced by SameIntervalTopicReuseStrategy in a future release.
|
@RetryableTopic(backoff = @Backoff(2000), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(3000)
.maxAttempts(5)
.useSingleTopicForFixedDelays()
.create(template);
}
Note
|
The default behavior is creating separate retry topics for each attempt, appended with their index value: retry-0, retry-1, … |
If you’re using exponential backoff policy (ExponentialBackOffPolicy
), you can use a single retry topic to accomplish the non-blocking retries of the attempts whose delays are the configured maxInterval
.
This "final" retry topic will be suffixed with the provided or default suffix, and will have either the index or the maxInterval
value appended.
Note
|
By opting to use a single topic for the retries with the maxInterval delay, it may become more viable to configure an exponential retry policy that keeps retrying for a long time, because in this approach you do not need a large amount of topics.
|
The default behavior is to work with an amount of retry topics equal to the configured maxAttempts
minus 1, and when using exponential backoff, the retry topics are suffixed with the delay values, with the last retry topics (corresponding to the maxInterval
delay) being suffixed with an additional index.
For instance, when configuring the exponential backoff with initialInterval=1000
, multiplier=2
, and maxInterval=16000
, in order to keep trying for one hour, one would need to configure maxAttempts
as 229, and by default the needed retry topics would be:
-
-retry-1000
-
-retry-2000
-
-retry-4000
-
-retry-8000
-
-retry-16000-0
-
-retry-16000-1
-
-retry-16000-2
-
…
-
-retry-16000-224
When using the strategy that reuses the retry topic for the same intervals, in the same configuration above the needed retry topics would be:
-
-retry-1000
-
-retry-2000
-
-retry-4000
-
-retry-8000
-
-retry-16000
@RetryableTopic(attempts = 230,
backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 16000),
sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1000, 2, 16000)
.maxAttempts(230)
.useSingleTopicForSameIntervals()
.create(template);
}
More complex naming strategies can be accomplished by registering a bean that implements RetryTopicNamesProviderFactory
.
The default implementation is SuffixingRetryTopicNamesProviderFactory
and a different implementation can be registered in the following way:
@Override
protected RetryTopicComponentFactory createComponentFactory() {
return new RetryTopicComponentFactory() {
@Override
public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
return new CustomRetryTopicNamesProviderFactory();
}
};
}
As an example the following implementation, in addition to the standard suffix, adds a prefix to retry/dl topics names:
public class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProviderFactory {
@Override
public RetryTopicNamesProvider createRetryTopicNamesProvider(
DestinationTopic.Properties properties) {
if(properties.isMainEndpoint()) {
return new SuffixingRetryTopicNamesProvider(properties);
}
else {
return new SuffixingRetryTopicNamesProvider(properties) {
@Override
public String getTopicName(String topic) {
return "my-prefix-" + super.getTopicName(topic);
}
};
}
}
}
Starting with version 3.0, it is now possible to configure multiple listeners on the same topic(s). In order to do this, you must use custom topic naming to isolate the retry topics from each other. This is best shown with an example:
@RetryableTopic(...
retryTopicSuffix = "-listener1", dltTopicSuffix = "-listener1-dlt",
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(id = "listener1", groupId = "group1", topics = TWO_LISTENERS_TOPIC, ...)
void listen1(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
...
}
@RetryableTopic(...
retryTopicSuffix = "-listener2", dltTopicSuffix = "-listener2-dlt",
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(id = "listener2", groupId = "group2", topics = TWO_LISTENERS_TOPIC, ...)
void listen2(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
...
}
The topicSuffixingStrategy
is optional.
The framework will configure and use a separate set of retry topics for each listener.
The framework provides a few strategies for working with DLTs. You can provide a method for DLT processing, use the default logging method, or have no DLT at all. Also you can choose what happens if DLT processing fails.
You can specify the method used to process the DLT for the topic, as well as the behavior if that processing fails.
To do that you can use the @DltHandler
annotation in a method of the class with the @RetryableTopic
annotation(s).
Note that the same method will be used for all the @RetryableTopic
annotated methods within that class.
@RetryableTopic
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
The DLT handler method can also be provided through the RetryTopicConfigurationBuilder.dltHandlerMethod(String, String) method, passing as arguments the bean name and method name that should process the DLT’s messages.
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
.create(template);
}
@Component
public class MyCustomDltProcessor {
private final MyDependency myDependency;
public MyCustomDltProcessor(MyDependency myDependency) {
this.myDependency = myDependency;
}
public void processDltMessage(MyPojo message) {
// ... message processing, persistence, etc
}
}
Note
|
If no DLT handler is provided, the default RetryTopicConfigurer.LoggingDltListenerHandlerMethod is used. |
Starting with version 2.8, if you don’t want to consume from the DLT in this application at all, including by the default handler (or you wish to defer consumption), you can control whether or not the DLT container starts, independent of the container factory’s autoStartup
property.
When using the @RetryableTopic
annotation, set the autoStartDltHandler
property to false
; when using the configuration builder, use .autoStartDltHandler(false)
.
You can later start the DLT handler via the KafkaListenerEndpointRegistry
.
Should the DLT processing fail, there are two possible behaviors available: ALWAYS_RETRY_ON_ERROR
and FAIL_ON_ERROR
.
In the former the record is forwarded back to the DLT topic so it doesn’t block other DLT records' processing. In the latter the consumer ends the execution without forwarding the message.
@RetryableTopic(dltProcessingFailureStrategy =
DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
.doNotRetryOnDltFailure()
.create(template);
}
Note
|
The default behavior is to ALWAYS_RETRY_ON_ERROR .
|
Important
|
Starting with version 2.8.3, ALWAYS_RETRY_ON_ERROR will NOT route a record back to the DLT if the record causes a fatal exception to be thrown,
such as a DeserializationException because, generally, such exceptions will always be thrown.
|
Exceptions that are considered fatal are:
-
DeserializationException
-
MessageConversionException
-
ConversionException
-
MethodArgumentResolutionException
-
NoSuchMethodException
-
ClassCastException
You can add exceptions to and remove exceptions from this list using methods on the DestinationTopicResolver
bean.
See Exception Classifier for more information.
The framework also provides the possibility of not configuring a DLT for the topic. In this case after retrials are exhausted the processing simply ends.
@RetryableTopic(dltProcessingFailureStrategy =
DltStrategy.NO_DLT)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.doNotConfigureDlt()
.create(template);
}
By default the RetryTopic configuration will use the provided factory from the @KafkaListener
annotation, but you can specify a different one to be used to create the retry topic and dlt listener containers.
For the @RetryableTopic
annotation you can provide the factory’s bean name, and using the RetryTopicConfiguration
bean you can either provide the bean name or the instance itself.
@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template,
ConcurrentKafkaListenerContainerFactory<Integer, MyPojo> factory) {
return RetryTopicConfigurationBuilder
.newInstance()
.listenerFactory(factory)
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.listenerFactory("my-retry-topic-factory")
.create(template);
}
Important
|
Since 2.8.3 you can use the same factory for retryable and non-retryable topics. |
If you need to revert the factory configuration behavior to prior 2.8.3, you can override the configureRetryTopicConfigurer
method of a @Configuration
class that extends RetryTopicConfigurationSupport
as explained in Configuring Global Settings and Features and set useLegacyFactoryConfigurer
to true
, such as:
@Override
protected Consumer<RetryTopicConfigurer> configureRetryTopicConfigurer() {
return rtc -> rtc.useLegacyFactoryConfigurer(true);
}
Since 2.9, you can access information regarding the topic chain at runtime by injecting the provided DestinationTopicContainer
bean.
This interface provides methods to look up the next topic in the chain or the DLT for a topic if configured, as well as useful properties such as the topic’s name, delay and type.
As a real-world use-case example, you can use such information so a console application can resend a record from the DLT to the first retry topic in the chain after the cause of the failed processing, e.g. bug / inconsistent state, has been resolved.
Important
|
The DestinationTopic provided by the DestinationTopicContainer#getNextDestinationTopicFor() method corresponds to the next topic registered in the chain for the input topic.
The actual topic the message will be forwarded to may differ due to different factors such as exception classification, number of attempts or single-topic fixed-delay strategies.
Use the DestinationTopicResolver interface if you need to weigh in these factors.
|
When a message in the retry topic is not due for consumption, a KafkaBackOffException
is thrown.
Such exceptions are logged by default at DEBUG
level, but you can change this behavior by setting an error handler customizer in the ListenerContainerFactoryConfigurer
in a @Configuration
class.
For example, to change the logging level to WARN you might add:
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
customizersConfigurer.customizeErrorHandler(defaultErrorHandler ->
defaultErrorHandler.setLogLevel(KafkaException.Level.WARN))
}