-
Notifications
You must be signed in to change notification settings - Fork 1.6k
GH-2496: Reuse retry topic for maxInterval delay #2497
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
bdbc580
c5becce
d0d5c23
4c6cf75
20865a6
500d7dc
d466fe5
f7ff884
6cd53ef
5c2bc89
7ccdf16
c038603
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -245,6 +245,11 @@ public RetryTopicConfigurationBuilder setTopicSuffixingStrategy(TopicSuffixingSt | |||||||
|
||||||||
/** | ||||||||
* Configure the {@link SameIntervalTopicReuseStrategy}. | ||||||||
* | ||||||||
* <p>Note: for fixed backoffs, when this is configured as | ||||||||
* {@link SameIntervalTopicReuseStrategy#SINGLE_TOPIC}, it has precedence over | ||||||||
* the configuration done through | ||||||||
* {@link #useSingleTopicForFixedDelays(FixedDelayStrategy)}. | ||||||||
* @param sameIntervalTopicReuseStrategy the strategy. | ||||||||
* @return the builder. | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
*/ | ||||||||
|
@@ -404,10 +409,10 @@ public RetryTopicConfigurationBuilder fixedBackOff(int interval) { | |||||||
/** | ||||||||
* Configure the use of a single retry topic with fixed delays. | ||||||||
* @return the builder. | ||||||||
* @deprecated in a future release, configuration for single retry topic with fixed delays will have to be done with {@link #useSingleTopicForSameIntervals()}. | ||||||||
* @deprecated in favor of {@link #useSingleTopicForSameIntervals()}. | ||||||||
* @see FixedDelayStrategy#SINGLE_TOPIC | ||||||||
*/ | ||||||||
@Deprecated(forRemoval = true) // in 3.1 | ||||||||
@Deprecated | ||||||||
public RetryTopicConfigurationBuilder useSingleTopicForFixedDelays() { | ||||||||
this.fixedDelayStrategy = FixedDelayStrategy.SINGLE_TOPIC; | ||||||||
return this; | ||||||||
|
@@ -418,9 +423,10 @@ public RetryTopicConfigurationBuilder useSingleTopicForFixedDelays() { | |||||||
* {@link FixedDelayStrategy#MULTIPLE_TOPICS}. | ||||||||
* @param delayStrategy the delay strategy. | ||||||||
* @return the builder. | ||||||||
* @deprecated in a future release, retry topic reuse configuration for fixed delays will have to be done with {@link #sameIntervalTopicReuseStrategy(SameIntervalTopicReuseStrategy)}. | ||||||||
* @deprecated in favor of | ||||||||
* {@link #sameIntervalTopicReuseStrategy(SameIntervalTopicReuseStrategy)}. | ||||||||
*/ | ||||||||
@Deprecated(forRemoval = true) // in 3.1 | ||||||||
@Deprecated | ||||||||
public RetryTopicConfigurationBuilder useSingleTopicForFixedDelays(FixedDelayStrategy delayStrategy) { | ||||||||
this.fixedDelayStrategy = delayStrategy; | ||||||||
return this; | ||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -238,6 +238,48 @@ void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicStrategy() { | |
assertThat(dltTopic.getDestinationPartitions()).isEqualTo(numPartitions); | ||
} | ||
|
||
@Test | ||
void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicSameIntervalReuseStrategy() { | ||
|
||
// when | ||
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy(); | ||
backOffPolicy.setBackOffPeriod(1000); | ||
int maxAttempts = 5; | ||
|
||
List<Long> backOffValues = new BackOffValuesGenerator(maxAttempts, backOffPolicy).generateValues(); | ||
|
||
List<DestinationTopic.Properties> propertiesList = | ||
new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, | ||
classifier, numPartitions, kafkaOperations, FixedDelayStrategy.MULTIPLE_TOPICS, | ||
dltStrategy, suffixWithDelayValueSuffixingStrategy, singleTopicSameIntervalReuseStrategy, | ||
-1).createProperties(); | ||
|
||
List<DestinationTopic> destinationTopicList = propertiesList | ||
.stream() | ||
.map(properties -> new DestinationTopic("mainTopic" + properties.suffix(), properties)) | ||
.collect(Collectors.toList()); | ||
|
||
// then | ||
assertThat(propertiesList.size() == 3).isTrue(); | ||
|
||
DestinationTopic mainDestinationTopic = destinationTopicList.get(0); | ||
assertThat(mainDestinationTopic.isMainTopic()).isTrue(); | ||
|
||
DestinationTopic.Properties firstRetryProperties = propertiesList.get(1); | ||
assertThat(firstRetryProperties.suffix()).isEqualTo(retryTopicSuffix); | ||
DestinationTopic retryDestinationTopic = destinationTopicList.get(1); | ||
assertThat(retryDestinationTopic.isSingleTopicRetry()).isTrue(); | ||
assertThat(retryDestinationTopic.isReusableRetryTopic()).isFalse(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here, it is a matter of convention. We might consider every |
||
assertThat(retryDestinationTopic.getDestinationDelay()).isEqualTo(1000); | ||
|
||
DestinationTopic.Properties dltProperties = propertiesList.get(2); | ||
assertThat(dltProperties.suffix()).isEqualTo(dltSuffix); | ||
assertThat(dltProperties.isDltTopic()).isTrue(); | ||
DestinationTopic dltTopic = destinationTopicList.get(2); | ||
assertThat(dltTopic.getDestinationDelay()).isEqualTo(0); | ||
assertThat(dltTopic.getDestinationPartitions()).isEqualTo(numPartitions); | ||
} | ||
|
||
@Test | ||
void shouldCreateRetryPropertiesForFixedBackoffWithMultiTopicStrategy() { | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.