-
Notifications
You must be signed in to change notification settings - Fork 1.6k
GH-2496: Reuse retry topic for maxInterval delay #2497
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
bdbc580
c5becce
d0d5c23
4c6cf75
20865a6
500d7dc
d466fe5
f7ff884
6cd53ef
5c2bc89
7ccdf16
c038603
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -146,7 +146,8 @@ public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPo | |||||||
---- | ||||||||
==== | ||||||||
|
||||||||
NOTE: The retry topics' and dlt's consumers will be assigned to a consumer group with a group id that is the combination of the one with you provide in the `groupId` parameter of the `@KafkaListener` annotation with the topic's suffix. If you don't provide any they'll all belong to the same group, and rebalance on a retry topic will cause an unnecessary rebalance on the main topic. | ||||||||
NOTE: The retry topics' and dlt's consumers will be assigned to a consumer group with a group id that is the combination of the one with you provide in the `groupId` parameter of the `@KafkaListener` annotation with the topic's suffix. | ||||||||
If you don't provide any they'll all belong to the same group, and rebalance on a retry topic will cause an unnecessary rebalance on the main topic. | ||||||||
|
||||||||
IMPORTANT: If the consumer is configured with an <<error-handling-deserializer,`ErrorHandlingDeserializer`>>, to handle deserilialization exceptions, it is important to configure the `KafkaTemplate` and its producer with a serializer that can handle normal objects as well as raw `byte[]` values, which result from deserialization exceptions. | ||||||||
The generic value type of the template should be `Object`. | ||||||||
|
@@ -401,39 +402,6 @@ If your back off policy requires delays with values bigger than that, adjust the | |||||||
|
||||||||
IMPORTANT: The first attempt counts against `maxAttempts`, so if you provide a `maxAttempts` value of 4 there'll be the original attempt plus 3 retries. | ||||||||
|
||||||||
===== Single Topic Fixed Delay Retries | ||||||||
|
||||||||
If you're using fixed delay policies such as `FixedBackOffPolicy` or `NoBackOffPolicy` you can use a single topic to accomplish the non-blocking retries. | ||||||||
This topic will be suffixed with the provided or default suffix, and will not have either the index or the delay values appended. | ||||||||
|
||||||||
==== | ||||||||
[source, java] | ||||||||
---- | ||||||||
@RetryableTopic(backoff = @Backoff(2000), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC) | ||||||||
@KafkaListener(topics = "my-annotated-topic") | ||||||||
public void processMessage(MyPojo message) { | ||||||||
// ... message processing | ||||||||
} | ||||||||
---- | ||||||||
==== | ||||||||
|
||||||||
==== | ||||||||
[source, java] | ||||||||
---- | ||||||||
@Bean | ||||||||
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) { | ||||||||
return RetryTopicConfigurationBuilder | ||||||||
.newInstance() | ||||||||
.fixedBackoff(3000) | ||||||||
.maxAttempts(5) | ||||||||
.useSingleTopicForFixedDelays() | ||||||||
.create(template); | ||||||||
} | ||||||||
---- | ||||||||
==== | ||||||||
|
||||||||
NOTE: The default behavior is creating separate retry topics for each attempt, appended with their index value: retry-0, retry-1, ... | ||||||||
|
||||||||
===== Global timeout | ||||||||
|
||||||||
You can set the global timeout for the retrying process. | ||||||||
|
@@ -683,7 +651,7 @@ IMPORTANT: Note that the blocking retries behavior is allowlist - you add the ex | |||||||
|
||||||||
IMPORTANT: The non-blocking exception classification behavior also depends on the specific topic's configuration. | ||||||||
|
||||||||
==== Topic Naming | ||||||||
==== Topic Amount | ||||||||
|
||||||||
Retry topics and DLT are named by suffixing the main topic with a provided or default value, appended by either the delay or index for that topic. | ||||||||
|
||||||||
|
@@ -693,6 +661,12 @@ Examples: | |||||||
|
||||||||
"my-other-topic" -> "my-topic-myRetrySuffix-1000", "my-topic-myRetrySuffix-2000", ..., "my-topic-myDltSuffix". | ||||||||
|
||||||||
NOTE: The default behavior is to create separate retry topics for each attempt, appended with an index value: retry-0, retry-1, ..., retry-n. | ||||||||
Therefore, by default the amount of retry topics is the configured `maxAttempts` minus 1. | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
|
||||||||
You can <<retry-topics-and-dlt-suffixes,configure the suffixes>>, choose whether to append <<append-index-or-delay,the attempt index or delay>>, use a <<single-topic-fixed-delay,single retry topic when using fixed backoff>>, and use a <<single-topic-maxinterval-delay,single retry topic for the attempts with the maxInterval>> when using exponential backoffs. | ||||||||
|
||||||||
[[retry-topics-and-dlt-suffixes]] | ||||||||
===== Retry Topics and Dlt Suffixes | ||||||||
|
||||||||
You can specify the suffixes that will be used by the retry and dlt topics. | ||||||||
|
@@ -724,6 +698,7 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> t | |||||||
|
||||||||
NOTE: The default suffixes are "-retry" and "-dlt", for retry topics and dlt respectively. | ||||||||
|
||||||||
[[append-index-or-delay]] | ||||||||
===== Appending the Topic's Index or Delay | ||||||||
|
||||||||
You can either append the topic's index or delay values after the suffix. | ||||||||
|
@@ -754,9 +729,106 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> templa | |||||||
|
||||||||
NOTE: The default behavior is to suffix with the delay values, except for fixed delay configurations with multiple topics, in which case the topics are suffixed with the topic's index. | ||||||||
|
||||||||
[[single-topic-fixed-delay]] | ||||||||
===== Single Topic for Fixed Delay Retries | ||||||||
|
||||||||
If you're using fixed delay policies such as `FixedBackOffPolicy` or `NoBackOffPolicy` you can use a single topic to accomplish the non-blocking retries. | ||||||||
This topic will be suffixed with the provided or default suffix, and will not have either the index or the delay values appended. | ||||||||
|
||||||||
NOTE: `FixedDelayStrategy` is now deprecated, and will be replaced by `SameIntervalTopicReuseStrategy` in a future release. | ||||||||
|
||||||||
==== | ||||||||
[source, java] | ||||||||
---- | ||||||||
@RetryableTopic(backoff = @Backoff(2000), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC) | ||||||||
@KafkaListener(topics = "my-annotated-topic") | ||||||||
public void processMessage(MyPojo message) { | ||||||||
// ... message processing | ||||||||
} | ||||||||
---- | ||||||||
==== | ||||||||
|
||||||||
==== | ||||||||
[source, java] | ||||||||
---- | ||||||||
@Bean | ||||||||
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) { | ||||||||
return RetryTopicConfigurationBuilder | ||||||||
.newInstance() | ||||||||
.fixedBackoff(3000) | ||||||||
.maxAttempts(5) | ||||||||
.useSingleTopicForFixedDelays() | ||||||||
.create(template); | ||||||||
} | ||||||||
---- | ||||||||
==== | ||||||||
|
||||||||
NOTE: The default behavior is creating separate retry topics for each attempt, appended with their index value: retry-0, retry-1, ... | ||||||||
|
||||||||
|
||||||||
[[single-topic-maxinterval-delay]] | ||||||||
===== Single Topic for maxInterval Exponential Delay | ||||||||
|
||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you know this, but there's a one sentence per line rule for docs, this line has 2. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I did not know that (first time with asciidoc). Maybe due to the file having been changed since then, the highlighted section does not show a multi-sentence line, but I will search here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I thought you did because everything else was ok :)
I probably highlighted the wrong line, sorry. It's the line right next to that one ("If you're using exponential backoff policy") There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We use one sentence per line to isolate future changes to a single sentence. |
||||||||
If you're using exponential backoff policy (`ExponentialBackOffPolicy`), you can use a single retry topic to accomplish the non-blocking retries of the attempts whose delays are the configured `maxInterval`. | ||||||||
|
||||||||
This "final" retry topic will be suffixed with the provided or default suffix, and will have either the index or the `maxInterval` value appended. | ||||||||
|
||||||||
NOTE: By opting to use a single topic for the retries with the `maxInterval` delay, it may become more viable to configure an exponential retry policy that keeps retrying for a long time, because in this approach you do not need a large amount of topics. | ||||||||
|
||||||||
The default behavior is to work with an amount of retry topics equal to the configured `maxAttempts` minus 1, and when using exponential backoff, the retry topics are suffixed with the delay values, with the last retry topics (corresponding to the `maxInterval` delay) being suffixed with an additional index. | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
|
||||||||
For instance, when configuring the exponential backoff with `initialInterval=1000`, `multiplier=2`, and `maxInterval=16000`, in order to keep trying for one hour, one would need to configure `maxAttempts` as 229, and by default the needed retry topics would be: | ||||||||
|
||||||||
* -retry-1000 | ||||||||
* -retry-2000 | ||||||||
* -retry-4000 | ||||||||
* -retry-8000 | ||||||||
* -retry-16000-0 | ||||||||
* -retry-16000-1 | ||||||||
* -retry-16000-2 | ||||||||
* ... | ||||||||
* -retry-16000-224 | ||||||||
|
||||||||
When using the strategy that reuses the retry topic for the same intervals, in the same configuration above the needed retry topics would be: | ||||||||
|
||||||||
* -retry-1000 | ||||||||
* -retry-2000 | ||||||||
* -retry-4000 | ||||||||
* -retry-8000 | ||||||||
* -retry-16000 | ||||||||
|
||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
==== | ||||||||
[source, java] | ||||||||
---- | ||||||||
@RetryableTopic(attempts = 230, | ||||||||
backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 16000), | ||||||||
sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC) | ||||||||
@KafkaListener(topics = "my-annotated-topic") | ||||||||
public void processMessage(MyPojo message) { | ||||||||
// ... message processing | ||||||||
} | ||||||||
---- | ||||||||
==== | ||||||||
|
||||||||
==== | ||||||||
[source, java] | ||||||||
---- | ||||||||
@Bean | ||||||||
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) { | ||||||||
return RetryTopicConfigurationBuilder | ||||||||
.newInstance() | ||||||||
.exponentialBackoff(1000, 2, 16000) | ||||||||
.maxAttempts(230) | ||||||||
.useSingleTopicForSameIntervals() | ||||||||
.create(template); | ||||||||
} | ||||||||
---- | ||||||||
==== | ||||||||
|
||||||||
===== Custom naming strategies | ||||||||
|
||||||||
More complex naming strategies can be accomplished by registering a bean that implements `RetryTopicNamesProviderFactory`. The default implementation is `SuffixingRetryTopicNamesProviderFactory` and a different implementation can be registered in the following way: | ||||||||
More complex naming strategies can be accomplished by registering a bean that implements `RetryTopicNamesProviderFactory`. | ||||||||
The default implementation is `SuffixingRetryTopicNamesProviderFactory` and a different implementation can be registered in the following way: | ||||||||
|
||||||||
==== | ||||||||
[source, java] | ||||||||
|
@@ -836,7 +908,9 @@ The framework will configure and use a separate set of retry topics for each lis | |||||||
|
||||||||
==== Dlt Strategies | ||||||||
|
||||||||
The framework provides a few strategies for working with DLTs. You can provide a method for DLT processing, use the default logging method, or have no DLT at all. Also you can choose what happens if DLT processing fails. | ||||||||
The framework provides a few strategies for working with DLTs. | ||||||||
You can provide a method for DLT processing, use the default logging method, or have no DLT at all. | ||||||||
Also you can choose what happens if DLT processing fails. | ||||||||
|
||||||||
===== Dlt Processing Method | ||||||||
|
||||||||
|
@@ -1045,7 +1119,8 @@ Use the `DestinationTopicResolver` interface if you need to weigh in these facto | |||||||
[[change-kboe-logging-level]] | ||||||||
==== Changing KafkaBackOffException Logging Level | ||||||||
|
||||||||
When a message in the retry topic is not due for consumption, a `KafkaBackOffException` is thrown. Such exceptions are logged by default at `DEBUG` level, but you can change this behavior by setting an error handler customizer in the `ListenerContainerFactoryConfigurer` in a `@Configuration` class. | ||||||||
When a message in the retry topic is not due for consumption, a `KafkaBackOffException` is thrown. | ||||||||
Such exceptions are logged by default at `DEBUG` level, but you can change this behavior by setting an error handler customizer in the `ListenerContainerFactoryConfigurer` in a `@Configuration` class. | ||||||||
|
||||||||
For example, to change the logging level to WARN you might add: | ||||||||
|
||||||||
|
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -1,5 +1,5 @@ | ||||||||
/* | ||||||||
* Copyright 2018-2022 the original author or authors. | ||||||||
* Copyright 2018-2023 the original author or authors. | ||||||||
* | ||||||||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||||||||
* you may not use this file except in compliance with the License. | ||||||||
|
@@ -23,8 +23,8 @@ | |||||||
import java.lang.annotation.Target; | ||||||||
|
||||||||
import org.springframework.kafka.retrytopic.DltStrategy; | ||||||||
import org.springframework.kafka.retrytopic.FixedDelayStrategy; | ||||||||
import org.springframework.kafka.retrytopic.RetryTopicConstants; | ||||||||
import org.springframework.kafka.retrytopic.SameIntervalTopicReuseStrategy; | ||||||||
import org.springframework.kafka.retrytopic.TopicSuffixingStrategy; | ||||||||
import org.springframework.retry.annotation.Backoff; | ||||||||
|
||||||||
|
@@ -38,6 +38,7 @@ | |||||||
* @author Tomaz Fernandes | ||||||||
* @author Gary Russell | ||||||||
* @author Fabio da Silva Jr. | ||||||||
* @author João Lima | ||||||||
* @since 2.7 | ||||||||
* | ||||||||
* @see org.springframework.kafka.retrytopic.RetryTopicConfigurer | ||||||||
|
@@ -177,6 +178,17 @@ | |||||||
*/ | ||||||||
TopicSuffixingStrategy topicSuffixingStrategy() default TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE; | ||||||||
|
||||||||
|
||||||||
/** | ||||||||
* Topic reuse strategy for sequential attempts made with a same backoff interval. | ||||||||
* | ||||||||
* <p>Note: for fixed backoffs, when this is configured as | ||||||||
* {@link SameIntervalTopicReuseStrategy#SINGLE_TOPIC}, it has precedence over | ||||||||
* the configuration in {@link #fixedDelayTopicStrategy()}. | ||||||||
* @return the strategy. | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
*/ | ||||||||
SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy() default SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS; | ||||||||
|
||||||||
/** | ||||||||
* Whether or not create a DLT, and redeliver to the DLT if delivery fails or just give up. | ||||||||
* @return the dlt strategy. | ||||||||
|
@@ -186,8 +198,10 @@ | |||||||
/** | ||||||||
* Whether to use a single or multiple topics when using a fixed delay. | ||||||||
* @return the fixed delay strategy. | ||||||||
* @deprecated in favor of {@link #sameIntervalTopicReuseStrategy()}. | ||||||||
*/ | ||||||||
FixedDelayStrategy fixedDelayTopicStrategy() default FixedDelayStrategy.MULTIPLE_TOPICS; | ||||||||
@Deprecated | ||||||||
org.springframework.kafka.retrytopic.FixedDelayStrategy fixedDelayTopicStrategy() default org.springframework.kafka.retrytopic.FixedDelayStrategy.MULTIPLE_TOPICS; | ||||||||
|
||||||||
/** | ||||||||
* Override the container factory's {@code autoStartup} property for just the DLT container. | ||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
/* | ||
* Copyright 2018-2022 the original author or authors. | ||
* Copyright 2018-2023 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
|
@@ -35,6 +35,7 @@ | |
import org.springframework.kafka.listener.ExceptionClassifier; | ||
import org.springframework.kafka.listener.ListenerExecutionFailedException; | ||
import org.springframework.kafka.listener.TimestampedException; | ||
import org.springframework.kafka.retrytopic.DestinationTopic.Type; | ||
import org.springframework.lang.Nullable; | ||
import org.springframework.util.Assert; | ||
|
||
|
@@ -132,8 +133,10 @@ && isNotFatalException(e) | |
: destinationTopicHolder.getNextDestination(); | ||
} | ||
|
||
@SuppressWarnings("deprecation") | ||
private DestinationTopic resolveRetryDestination(DestinationTopicHolder destinationTopicHolder) { | ||
return destinationTopicHolder.getSourceDestination().isSingleTopicRetry() | ||
return ((destinationTopicHolder.getSourceDestination().isReusableRetryTopic()) || | ||
(destinationTopicHolder.getSourceDestination().isSingleTopicRetry())) | ||
? destinationTopicHolder.getSourceDestination() | ||
: destinationTopicHolder.getNextDestination(); | ||
} | ||
|
@@ -192,13 +195,26 @@ public void addDestinationTopics(String mainListenerId, List<DestinationTopic> d | |
throw new IllegalStateException("Cannot add new destinations, " | ||
+ DefaultDestinationTopicResolver.class.getSimpleName() + " is already refreshed."); | ||
} | ||
validateDestinations(destinationsToAdd); | ||
synchronized (this.sourceDestinationsHolderMap) { | ||
Map<String, DestinationTopicHolder> map = this.sourceDestinationsHolderMap.computeIfAbsent(mainListenerId, | ||
id -> new HashMap<>()); | ||
map.putAll(correlatePairSourceAndDestinationValues(destinationsToAdd)); | ||
} | ||
} | ||
|
||
private void validateDestinations(List<DestinationTopic> destinationsToAdd) { | ||
for (int i = 0; i < destinationsToAdd.size(); i++) { | ||
DestinationTopic destination = destinationsToAdd.get(i); | ||
if (destination.isReusableRetryTopic()) { | ||
Assert.isTrue((i == (destinationsToAdd.size() - 1) || | ||
((i == (destinationsToAdd.size() - 2)) && (destinationsToAdd.get(i + 1).isDltTopic()))), | ||
String.format("In the destination topic chain, the type %s can only be " | ||
+ "specified as the last retry topic.", Type.REUSABLE_RETRY_TOPIC)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This invariant was implemented because right now, the idea is to have topic reuse only for exponential backoff, therefore for the But anyway, no existing backoff policy should have attempts sequences with a same interval in the middle of the chain. |
||
} | ||
} | ||
} | ||
|
||
private Map<String, DestinationTopicHolder> correlatePairSourceAndDestinationValues( | ||
List<DestinationTopic> destinationList) { | ||
return IntStream | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -68,6 +68,18 @@ public boolean isNoOpsTopic() { | |
return Type.NO_OPS.equals(this.properties.type); | ||
} | ||
|
||
public boolean isReusableRetryTopic() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is a breaking change - we can keep both methods and deprecate the old one. |
||
return Type.REUSABLE_RETRY_TOPIC.equals(this.properties.type); | ||
} | ||
|
||
/** | ||
* Whether this is a single retry topic. | ||
* | ||
* @return whether this is a single retry topic. | ||
* @deprecated in favor of using {@link DestinationTopic.Type#REUSABLE_RETRY_TOPIC} | ||
* and {@link #isReusableRetryTopic()}. | ||
*/ | ||
@Deprecated | ||
public boolean isSingleTopicRetry() { | ||
return Type.SINGLE_TOPIC_RETRY.equals(this.properties.type); | ||
} | ||
|
@@ -213,7 +225,8 @@ public boolean isDltTopic() { | |
} | ||
|
||
public boolean isRetryTopic() { | ||
return Type.RETRY.equals(this.type) || Type.SINGLE_TOPIC_RETRY.equals(this.type); | ||
return Type.RETRY.equals(this.type) || Type.SINGLE_TOPIC_RETRY.equals(this.type) | ||
|| Type.REUSABLE_RETRY_TOPIC.equals(this.type); | ||
} | ||
|
||
public String suffix() { | ||
|
@@ -284,6 +297,14 @@ public boolean isMainEndpoint() { | |
} | ||
|
||
enum Type { | ||
MAIN, RETRY, SINGLE_TOPIC_RETRY, DLT, NO_OPS | ||
MAIN, RETRY, | ||
/** | ||
* A single retry topic for all retries. | ||
* | ||
* @deprecated Use {@code REUSABLE_RETRY_TOPIC} instead. | ||
*/ | ||
@Deprecated | ||
SINGLE_TOPIC_RETRY, | ||
REUSABLE_RETRY_TOPIC, DLT, NO_OPS | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please make them all one per line and add |
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.