Skip to content

An application using default spring boot configuration fails at the startup when there are multiple @KafkaListener with @RetryableTopic #2554

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
cenkakin opened this issue Jan 20, 2023 · 1 comment · Fixed by #2555

Comments

@cenkakin
Copy link
Contributor

cenkakin commented Jan 20, 2023

In what version(s) of Spring for Apache Kafka are you seeing this issue?

Since 2.9.0

Describe the bug
An application using default spring boot auto configuration beans fails at the startup when there are two kafka listeners with retry topic configuration. It throws the following exception:

Caused by: org.springframework.beans.factory.BeanNotOfRequiredTypeException: Bean named 'defaultRetryTopicKafkaTemplate' is expected to be of type 'org.springframework.kafka.core.KafkaOperations' but was actually of type 'org.springframework.kafka.retrytopic.RetryTopicSchedulerWrapper'
	at org.springframework.beans.factory.support.AbstractBeanFactory.adaptBeanInstance(AbstractBeanFactory.java:408)
	at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:389)
	at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:205)
	at org.springframework.kafka.annotation.RetryableTopicAnnotationProcessor.getKafkaTemplate(RetryableTopicAnnotationProcessor.java:216)
	at org.springframework.kafka.annotation.RetryableTopicAnnotationProcessor.processAnnotation(RetryableTopicAnnotationProcessor.java:150)
	at org.springframework.kafka.annotation.RetryTopicConfigurationProvider.findRetryConfigurationFor(RetryTopicConfigurationProvider.java:101)
	at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processMainAndRetryListeners(KafkaListenerAnnotationBeanPostProcessor.java:511)
	at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processKafkaListener(KafkaListenerAnnotationBeanPostProcessor.java:490)
	at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(KafkaListenerAnnotationBeanPostProcessor.java:391)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsAfterInitialization(AbstractAutowireCapableBeanFactory.java:435)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1754)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:599)
	... 92 more

After some investigation, if I don't miss anything I found that the problem stems from this simple mistake:

public static final String DEFAULT_KAFKA_TEMPLATE_BEAN_NAME =
"defaultRetryTopicKafkaTemplate";
/**
* The bean name of the internally registered scheduler wrapper, if needed.
*/
public static final String DEFAULT_SCHEDULER_WRAPPER_BEAN_NAME =
"defaultRetryTopicKafkaTemplate";

DEFAULT_KAFKA_TEMPLATE_BEAN_NAME and DEFAULT_SCHEDULER_WRAPPER_BEAN_NAME are the same. That leads to replacing KafkaTemplate bean with RetryTopicSchedulerWrapper here in the first kafka listener initialization:

gac.registerBean(RetryTopicBeanNames.DEFAULT_SCHEDULER_WRAPPER_BEAN_NAME,
RetryTopicSchedulerWrapper.class, () -> newSchedW);
schedW = gac.getBean(RetryTopicSchedulerWrapper.class);

This line throws an error on the second RetryableTopic annotation process since it retrieves RetryTopicSchedulerWrapper rather than KafkaTemplate:

return this.beanFactory.getBean(RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME,
KafkaOperations.class);

We just need to change the value of DEFAULT_SCHEDULER_WRAPPER_BEAN_NAME variable for the fix.

As a workaround for the current version, an application can inject its TaskScheduler bean or specify kafkaTemplate in @RetryableTopic.

To Reproduce

  • Add multiple @KafkaListener with @RetryableTopic
  • Use minimal configuration
    -- Don't inject your own TaskScheduler
    -- Don't specify kafkaTemplate in RetryableTopic

Expected behavior

Do not fail at the startup and configure the beans successfully.

Sample

A sample test case with spring boot and testcontainers:
https://github.com/cenkakin/spring-kafka-bug-demo/blob/main/src/test/java/com/github/cenk/springkafkabugdemo/SpringKafkaBugDemoTests.java

Also added another test case to my spring-kafka fork.
cenkakin@56640ed

Fix
The test passes in this branch:
main...cenkakin:spring-kafka:fix-multiple-listener-with-retry-bug

I can tidy up the code and create a PR if you like.

@artembilan
Copy link
Member

Yes, @cenkakin , we will very like to see and accept a PR with the fix.
Thank you!

cenkakin added a commit to cenkakin/spring-kafka that referenced this issue Jan 21, 2023
cenkakin added a commit to cenkakin/spring-kafka that referenced this issue Jan 21, 2023
cenkakin added a commit to cenkakin/spring-kafka that referenced this issue Jan 23, 2023
cenkakin added a commit to cenkakin/spring-kafka that referenced this issue Jan 23, 2023
artembilan pushed a commit that referenced this issue Jan 23, 2023
Fixes #2557

* Remove new IT and modify RetryTopicSameContainerFactoryIntegrationTests instead
* Re-add BasicKafkaListener

# Conflicts:
#	spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicSameContainerFactoryIntegrationTests.java
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants