Skip to content

GH-2496: Reuse retry topic for maxInterval delay #2497

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
133 changes: 99 additions & 34 deletions spring-kafka-docs/src/main/asciidoc/retrytopic.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -398,39 +398,6 @@ If your back off policy requires delays with values bigger than that, adjust the

IMPORTANT: The first attempt counts against `maxAttempts`, so if you provide a `maxAttempts` value of 4 there'll be the original attempt plus 3 retries.

===== Single Topic Fixed Delay Retries

If you're using fixed delay policies such as `FixedBackOffPolicy` or `NoBackOffPolicy` you can use a single topic to accomplish the non-blocking retries.
This topic will be suffixed with the provided or default suffix, and will not have either the index or the delay values appended.

====
[source, java]
----
@RetryableTopic(backoff = @Backoff(2000), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
----
====

====
[source, java]
----
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(3000)
.maxAttempts(5)
.useSingleTopicForFixedDelays()
.build();
}
----
====

NOTE: The default behavior is creating separate retry topics for each attempt, appended with their index value: retry-0, retry-1, ...

===== Global timeout

You can set the global timeout for the retrying process.
Expand Down Expand Up @@ -677,7 +644,7 @@ IMPORTANT: Note that the blocking retries behavior is allowlist - you add the ex

IMPORTANT: The non-blocking exception classification behavior also depends on the specific topic's configuration.

==== Topic Naming
==== Topic Amount and Naming
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to change the section title.


Retry topics and DLT are named by suffixing the main topic with a provided or default value, appended by either the delay or index for that topic.

Expand All @@ -687,6 +654,11 @@ Examples:

"my-other-topic" -> "my-topic-myRetrySuffix-1000", "my-topic-myRetrySuffix-2000", ..., "my-topic-myDltSuffix".

NOTE: The default behavior is creating separate retry topics for each attempt, appended with their index value: retry-0, retry-1, ..., retry-n. Therefore, by default the amount of retry topics is the configured `maxAttempts` minus 1.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
NOTE: The default behavior is creating separate retry topics for each attempt, appended with their index value: retry-0, retry-1, ..., retry-n. Therefore, by default the amount of retry topics is the configured `maxAttempts` minus 1.
NOTE: The default behavior is to create separate retry topics for each attempt, appended with an index value: retry-0, retry-1, ..., retry-n.
Therefore, by default the number of retry topics is the configured `maxAttempts` minus 1.


You can <<retry-topics-and-dlt-suffixes,configure the suffixes>>, choose whether to append <<append-index-or-delay,the attempt index or delay>>, use a <<single-topic-fixed-delay,single retry topic when using fixed backoff>>, and use a <<single-topic-maxinterval-delay,single retry topic for the attempts with the maxInterval>> when using exponential backoffs.

[[retry-topics-and-dlt-suffixes]]
===== Retry Topics and Dlt Suffixes

You can specify the suffixes that will be used by the retry and dlt topics.
Expand Down Expand Up @@ -718,6 +690,7 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> t

NOTE: The default suffixes are "-retry" and "-dlt", for retry topics and dlt respectively.

[[append-index-or-delay]]
===== Appending the Topic's Index or Delay

You can either append the topic's index or delay values after the suffix.
Expand Down Expand Up @@ -748,6 +721,98 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> templa

NOTE: The default behavior is to suffix with the delay values, except for fixed delay configurations with multiple topics, in which case the topics are suffixed with the topic's index.

[[single-topic-fixed-delay]]
===== Single Topic for Fixed Delay Retries

If you're using fixed delay policies such as `FixedBackOffPolicy` or `NoBackOffPolicy` you can use a single topic to accomplish the non-blocking retries.
This topic will be suffixed with the provided or default suffix, and will not have either the index or the delay values appended.

====
[source, java]
----
@RetryableTopic(backoff = @Backoff(2000), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
----
====

====
[source, java]
----
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(3000)
.maxAttempts(5)
.useSingleTopicForFixedDelays()
.build();
}
----
====

NOTE: The default behavior is creating separate retry topics for each attempt, appended with their index value: retry-0, retry-1, ...


[[single-topic-maxinterval-delay]]
===== Single Topic for maxInterval Exponential Delay

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you know this, but there's a one sentence per line rule for docs, this line has 2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you know this, but there's a one sentence per line rule for docs, this line has 2.

I did not know that (first time with asciidoc). Maybe due to the file having been changed since then, the highlighted section does not show a multi-sentence line, but I will search here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not know that (first time with asciidoc).

I thought you did because everything else was ok :)

Maybe due to the file having been changed since then, the highlighted section does not show a multi-sentence line, but I will search here.

I probably highlighted the wrong line, sorry. It's the line right next to that one ("If you're using exponential backoff policy")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use one sentence per line to isolate future changes to a single sentence.

If you're using exponential backoff policy (`ExponentialBackOffPolicy`), you can use a single retry topic to accomplish the non-blocking retries of the attempts whose delays are the configured `maxInterval`. This "final" retry topic will be suffixed with the provided or default suffix, and will have either the index or the `maxInterval` value appended.

NOTE: By opting to use a single topic for the retries with the `maxInterval` delay, it may become more viable to configure an exponential retry policy that keeps retrying for a long time, because in this approach you do not need a large amount of topics.

The default behavior is to work with an amount of retry topics equal to the configured `maxAttempts` minus 1, and when using exponential backoff, the retry topics are suffixed with the delay values, with the last retry topics (corresponding to the `maxInterval` delay) being suffixed with an additional index.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The default behavior is to work with an amount of retry topics equal to the configured `maxAttempts` minus 1, and when using exponential backoff, the retry topics are suffixed with the delay values, with the last retry topics (corresponding to the `maxInterval` delay) being suffixed with an additional index.
The default behavior is to work with the number of retry topics equal to the configured `maxAttempts` minus 1 and, when using exponential backoff, the retry topics are suffixed with the delay values, with the last retry topic (corresponding to the `maxInterval` delay) being suffixed with an additional index.


For instance, when configuring the exponential backoff with `initialInterval=1000`, `multiplier=2`, and `maxInterval=16000`, in order to keep trying for one hour, one would need to configure `maxAttempts` as 229, and by default the needed retry topics would be:

* -retry-1000
* -retry-2000
* -retry-4000
* -retry-8000
* -retry-16000-0
* -retry-16000-1
* -retry-16000-2
* ...
* -retry-16000-224

When using the strategy that reuses the retry topic for the same intervals, in the same configuration above the needed retry topics would be:

* -retry-1000
* -retry-2000
* -retry-4000
* -retry-8000
* -retry-16000

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
This will be the default in a future release.

====
[source, java]
----
@RetryableTopic(attempts = 230,
backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 16000),
sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
----
====

====
[source, java]
----
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1000, 2, 16000)
.maxAttempts(230)
.useSingleTopicForSameIntervals()
.build();
}
----
====

===== Custom naming strategies

More complex naming strategies can be accomplished by registering a bean that implements `RetryTopicNamesProviderFactory`. The default implementation is `SuffixingRetryTopicNamesProviderFactory` and a different implementation can be registered in the following way:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.springframework.kafka.retrytopic.DltStrategy;
import org.springframework.kafka.retrytopic.FixedDelayStrategy;
import org.springframework.kafka.retrytopic.RetryTopicConstants;
import org.springframework.kafka.retrytopic.SameIntervalTopicReuseStrategy;
import org.springframework.kafka.retrytopic.TopicSuffixingStrategy;
import org.springframework.retry.annotation.Backoff;

Expand Down Expand Up @@ -177,6 +178,13 @@
*/
TopicSuffixingStrategy topicSuffixingStrategy() default TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE;


/**
* Topic reuse strategy for sequential attempts made with a same backoff interval.
* @return the strategy.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @return the strategy.
* @return the strategy.
* @since 3.0.4

*/
SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy() default SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS;

/**
* Whether or not create a DLT, and redeliver to the DLT if delivery fails or just give up.
* @return the dlt strategy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method,
.dltProcessingFailureStrategy(annotation.dltStrategy())
.autoStartDltHandler(autoStartDlt)
.setTopicSuffixingStrategy(annotation.topicSuffixingStrategy())
.sameIntervalTopicReuseStrategy(annotation.sameIntervalTopicReuseStrategy())
.timeoutAfter(timeout)
.create(getKafkaTemplate(resolveExpressionAsString(annotation.kafkaTemplate(), "kafkaTemplate"), topics));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.springframework.kafka.listener.ExceptionClassifier;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.listener.TimestampedException;
import org.springframework.kafka.retrytopic.DestinationTopic.Type;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

Expand Down Expand Up @@ -133,7 +134,7 @@ && isNotFatalException(e)
}

private DestinationTopic resolveRetryDestination(DestinationTopicHolder destinationTopicHolder) {
return destinationTopicHolder.getSourceDestination().isSingleTopicRetry()
return destinationTopicHolder.getSourceDestination().isReusableRetryTopic()
? destinationTopicHolder.getSourceDestination()
: destinationTopicHolder.getNextDestination();
}
Expand Down Expand Up @@ -192,13 +193,26 @@ public void addDestinationTopics(String mainListenerId, List<DestinationTopic> d
throw new IllegalStateException("Cannot add new destinations, "
+ DefaultDestinationTopicResolver.class.getSimpleName() + " is already refreshed.");
}
validateDestinations(destinationsToAdd);
synchronized (this.sourceDestinationsHolderMap) {
Map<String, DestinationTopicHolder> map = this.sourceDestinationsHolderMap.computeIfAbsent(mainListenerId,
id -> new HashMap<>());
map.putAll(correlatePairSourceAndDestinationValues(destinationsToAdd));
}
}

private void validateDestinations(List<DestinationTopic> destinationsToAdd) {
for (int i = 0; i < destinationsToAdd.size(); i++) {
DestinationTopic destination = destinationsToAdd.get(i);
if (destination.isReusableRetryTopic()) {
Assert.isTrue((i == (destinationsToAdd.size() - 1) ||
((i == (destinationsToAdd.size() - 2)) && (destinationsToAdd.get(i + 1).isDltTopic()))),
String.format("In the destination topic chain, the type %s can only be "
+ "specified as the last retry topic.", Type.REUSABLE_RETRY_TOPIC));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This invariant was implemented because right now, the idea is to have topic reuse only for exponential backoff, therefore for the maxDelay retries, which stay in the end of the chain.
In order to allow reuse of attempts with the same interval in the middle of the chain, more sophistication would be needed, maybe to have DestinationTopic.Properties to carry the maxAttempts of the specific topic.

But anyway, no existing backoff policy should have attempts sequences with a same interval in the middle of the chain.

}
}
}

private Map<String, DestinationTopicHolder> correlatePairSourceAndDestinationValues(
List<DestinationTopic> destinationList) {
return IntStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public boolean isNoOpsTopic() {
return Type.NO_OPS.equals(this.properties.type);
}

public boolean isSingleTopicRetry() {
return Type.SINGLE_TOPIC_RETRY.equals(this.properties.type);
public boolean isReusableRetryTopic() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a breaking change - we can keep both methods and deprecate the old one.

return Type.REUSABLE_RETRY_TOPIC.equals(this.properties.type);
}

public boolean isMainTopic() {
Expand Down Expand Up @@ -280,6 +280,6 @@ public boolean isMainEndpoint() {
}

enum Type {
MAIN, RETRY, SINGLE_TOPIC_RETRY, DLT, NO_OPS
MAIN, RETRY, REUSABLE_RETRY_TOPIC, DLT, NO_OPS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't do this in a patch release; need to keep (and deprecate) the old value and make it an alias of the new one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't do this in a patch release; need to keep (and deprecate) the old value and make it an alias of the new one.

I did not push this now. This was in the first commit at Dec 4, 2022.

It is one of the basis of the logic reuse for both kinds of backoff. `DefaultDestinationTopicResolver' will not have a single concept anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?? I only see 5 commits total, and none of the subsequent patches reverted it - this is still the state of the code in the PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant is that the original change was not pushed in the last commits. It was in the PR since the beginning.

I am working to revert it right now.

}
}
Loading