-
Notifications
You must be signed in to change notification settings - Fork 1.6k
GH-2239: RetryableTopic Refactoring #2328
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a couple nits, LGTM as is, thanks!
@@ -510,29 +508,11 @@ private boolean processMainAndRetryListeners(KafkaListener kafkaListener, Object | |||
} | |||
|
|||
private RetryTopicConfigurer getRetryTopicConfigurer() { | |||
bootstrapRetryTopicIfNecessary(); | |||
return this.beanFactory.containsBean("internalRetryTopicConfigurer") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove these String
bean names and leave only the RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME
*/ | ||
@FunctionalInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious: why remove the annotation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this out - originally I added the new method as abstract but I changed it to default
later.
Since it is a new interface, I think I'll make the other method default too; while the methods are related, we shouldn't force the user to implement both if they only want the new one.
@@ -51,4 +53,9 @@ public void onNextBackOff(@Nullable MessageListenerContainer container, Exceptio | |||
} | |||
} | |||
|
|||
@Override | |||
public void onNextBackOff(MessageListenerContainer container, TopicPartition partition, long nextBackOff) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the interface states container
is @Nullable
, maybe a null check is in order here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's @Nullable
on the other method, but not this one.
Thanks @garyrussell, this looks great! There are a few other deprecations that I'm not sure if you missed or just decided not to deal with in this PR, such as:
Of course, these shouldn't change much overall, and can be part of a different |
Looked at the new deprecations removal and LGTM, thanks! All this slimming down will definitely make maintenance easier going forward. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some review, but I guess all of that is optional.
And may be revised in other issues/PR.
Thank you for a great overhaul! 😄
This way you can tune the precision and performance for the retry topics if you need to. | ||
|
||
NOTE: You can have separate `ListenerContainerFactory` instances for the main and retry topics - this way you can have different settings to better suit your needs, such as having a higher polling timeout setting for the main topics and a lower one for the retry topics. | ||
Starting with version 2.9, it is no longer necessary to tune the precision because a task scheduler is used to resume the partition and wake the consumer, if necessary. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why to say this if the rest of the paragraph is just removed?
May be remove the whole paragraph altogether?..
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) { | ||
// Use the new 2.9 mechanism to avoid re-fetching the same records after a pause | ||
customizersConfigurer.customizeErrorHandler(eh -> { | ||
((DefaultErrorHandler) eh).setSeekAfterError(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like there is some inconsistency in the API:
- The
CustomizersConfigurer
hascustomizeErrorHandler(Consumer<CommonErrorHandler> errorHandlerCustomizer)
. - The
ListenerContainerFactoryConfigurer
does:
DefaultErrorHandler errorHandler = createDefaultErrorHandlerInstance(deadLetterPublishingRecoverer);
...
this.errorHandlerCustomizer.accept(errorHandler);
Maybe the first one has to be changed to always accept a DefaultErrorHandler
instead?..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see you have fixed the target API for this setErrorHandlerCustomizer(Consumer<DefaultErrorHandler> errorHandlerCustomizer)
. Therefore this cast is redundant in this doc.
this.beanFactory.getBean("internalRetryTopicBootstrapper", | ||
org.springframework.kafka.retrytopic.RetryTopicBootstrapper.class).bootstrapRetryTopic(); | ||
} | ||
return this.beanFactory.getBean(RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we cache the result of this call?
I see that getRetryTopicConfigurer()
is called for every single @KafkaListener
in the project...
try { | ||
return this.beanFactory.getBean(RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME, | ||
KafkaOperations.class); | ||
return this.beanFactory.getBean(DEFAULT_SPRING_BOOT_KAFKA_TEMPLATE_NAME, KafkaOperations.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is semantic cycle with Spring Boot.
Why just plain getBean(KafkaOperations.class)
is not enough?
spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerContainerRegistry.java
Show resolved
Hide resolved
getListenerContainerFromContext(context).pausePartition(topicPartition); | ||
this.backOffHandler.onNextBackOff(getListenerContainerFromContext(context), topicPartition, backOffTime); | ||
throw new KafkaBackoffException(String.format("Partition %s from topic %s is not ready for consumption, " + | ||
"backing off for approx. %s millis.", context.getTopicPartition().partition(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like you have already extracted a topicPartition
variable...
.../java/org/springframework/kafka/listener/ContainerPartitionPausingBackOffManagerFactory.java
Show resolved
Hide resolved
...ng-kafka/src/main/java/org/springframework/kafka/listener/ListenerContainerPauseService.java
Show resolved
Hide resolved
...kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupport.java
Show resolved
Hide resolved
spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicSchedulerWrapper.java
Show resolved
Hide resolved
f9cbe4e
to
35eb5d0
Compare
New back of manager (and factory) that uses a task scheduler to resume the paused partitions. Revert change to deprecated PartitionPausingBackoffManager. Log resume.
Also fix unrelated race in EKIT. Only allow one `RetryTemplateConfigurationSupport` bean.
Resolves spring-projects#2239
@@ -508,8 +510,16 @@ private boolean processMainAndRetryListeners(KafkaListener kafkaListener, Object | |||
} | |||
|
|||
private RetryTopicConfigurer getRetryTopicConfigurer() { | |||
return this.beanFactory.getBean(RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME, | |||
RetryTopicConfigurer.class); | |||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this missing something like if (this.topicConfigurer == null)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doh. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one minor nit-pick for the doc.
Thanks
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) { | ||
// Use the new 2.9 mechanism to avoid re-fetching the same records after a pause | ||
customizersConfigurer.customizeErrorHandler(eh -> { | ||
((DefaultErrorHandler) eh).setSeekAfterError(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see you have fixed the target API for this setErrorHandlerCustomizer(Consumer<DefaultErrorHandler> errorHandlerCustomizer)
. Therefore this cast is redundant in this doc.
Cherry-picked to |
Use a Task Scheduler to resume partitions, avoiding the need for timing adjustment.
cherry-pick to 2.9.x
I will make a What's New commit after the review/merge because it will certainly conflict.
I have left the commits separate to avoid you having to re-review what you have already looked at (if you so desire).
cc/ @tomazfernandes