Skip to content

Commit 3a9fadf

Browse files
committed
Enable virtual threads for Kafka listener
Closes gh-36396
1 parent 7c5ec73 commit 3a9fadf

File tree

4 files changed

+73
-0
lines changed

4 files changed

+73
-0
lines changed

Diff for: spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java

+13
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener;
2323
import org.springframework.boot.context.properties.PropertyMapper;
24+
import org.springframework.core.task.SimpleAsyncTaskExecutor;
2425
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
2526
import org.springframework.kafka.core.ConsumerFactory;
2627
import org.springframework.kafka.core.KafkaTemplate;
@@ -42,6 +43,7 @@
4243
* @author Gary Russell
4344
* @author Eddú Meléndez
4445
* @author Thomas Kåsene
46+
* @author Moritz Halbritter
4547
* @since 1.5.0
4648
*/
4749
public class ConcurrentKafkaListenerContainerFactoryConfigurer {
@@ -70,6 +72,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
7072

7173
private Function<MessageListenerContainer, String> threadNameSupplier;
7274

75+
private SimpleAsyncTaskExecutor listenerTaskExecutor;
76+
7377
/**
7478
* Set the {@link KafkaProperties} to use.
7579
* @param properties the properties
@@ -168,6 +172,14 @@ void setThreadNameSupplier(Function<MessageListenerContainer, String> threadName
168172
this.threadNameSupplier = threadNameSupplier;
169173
}
170174

175+
/**
176+
* Set the executor for threads that poll the consumer.
177+
* @param listenerTaskExecutor task executor
178+
*/
179+
void setListenerTaskExecutor(SimpleAsyncTaskExecutor listenerTaskExecutor) {
180+
this.listenerTaskExecutor = listenerTaskExecutor;
181+
}
182+
171183
/**
172184
* Configure the specified Kafka listener container factory. The factory can be
173185
* further tuned and default settings can be overridden.
@@ -226,6 +238,7 @@ private void configureContainer(ContainerProperties container) {
226238
map.from(properties::isImmediateStop).to(container::setStopImmediate);
227239
map.from(this.transactionManager).to(container::setTransactionManager);
228240
map.from(this.rebalanceListener).to(container::setConsumerRebalanceListener);
241+
map.from(this.listenerTaskExecutor).to(container::setListenerTaskExecutor);
229242
}
230243

231244
}

Diff for: spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java

+20
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@
2121
import org.springframework.beans.factory.ObjectProvider;
2222
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
2323
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
24+
import org.springframework.boot.autoconfigure.condition.ConditionalOnThreading;
25+
import org.springframework.boot.autoconfigure.thread.Threading;
2426
import org.springframework.context.annotation.Bean;
2527
import org.springframework.context.annotation.Configuration;
28+
import org.springframework.core.task.SimpleAsyncTaskExecutor;
2629
import org.springframework.kafka.annotation.EnableKafka;
2730
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
2831
import org.springframework.kafka.config.ContainerCustomizer;
@@ -49,6 +52,7 @@
4952
* @author Gary Russell
5053
* @author Eddú Meléndez
5154
* @author Thomas Kåsene
55+
* @author Moritz Halbritter
5256
*/
5357
@Configuration(proxyBeanMethods = false)
5458
@ConditionalOnClass(EnableKafka.class)
@@ -107,7 +111,23 @@ class KafkaAnnotationDrivenConfiguration {
107111

108112
@Bean
109113
@ConditionalOnMissingBean
114+
@ConditionalOnThreading(Threading.PLATFORM)
110115
ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
116+
return configurer();
117+
}
118+
119+
@Bean(name = "kafkaListenerContainerFactoryConfigurer")
120+
@ConditionalOnMissingBean
121+
@ConditionalOnThreading(Threading.VIRTUAL)
122+
ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurerVirtualThreads() {
123+
ConcurrentKafkaListenerContainerFactoryConfigurer configurer = configurer();
124+
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor("kafka-");
125+
executor.setVirtualThreads(true);
126+
configurer.setListenerTaskExecutor(executor);
127+
return configurer;
128+
}
129+
130+
private ConcurrentKafkaListenerContainerFactoryConfigurer configurer() {
111131
ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
112132
configurer.setKafkaProperties(this.properties);
113133
configurer.setBatchMessageConverter(this.batchMessageConverter);

Diff for: spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurerTests.java

+10
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121
import org.junit.jupiter.api.BeforeEach;
2222
import org.junit.jupiter.api.Test;
2323

24+
import org.springframework.core.task.SimpleAsyncTaskExecutor;
2425
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
2526
import org.springframework.kafka.core.ConsumerFactory;
2627
import org.springframework.kafka.listener.MessageListenerContainer;
2728

29+
import static org.assertj.core.api.Assertions.assertThat;
2830
import static org.mockito.BDDMockito.then;
2931
import static org.mockito.Mockito.mock;
3032
import static org.mockito.Mockito.spy;
@@ -70,4 +72,12 @@ void shouldApplyChangeConsumerThreadName() {
7072
then(this.factory).should().setChangeConsumerThreadName(true);
7173
}
7274

75+
@Test
76+
void shouldApplyListenerTaskExecutor() {
77+
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
78+
this.configurer.setListenerTaskExecutor(executor);
79+
this.configurer.configure(this.factory, this.consumerFactory);
80+
assertThat(this.factory.getContainerProperties().getListenerTaskExecutor()).isEqualTo(executor);
81+
}
82+
7383
}

Diff for: spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

+30
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,20 @@
4040
import org.apache.kafka.streams.StreamsConfig;
4141
import org.assertj.core.api.InstanceOfAssertFactories;
4242
import org.junit.jupiter.api.Test;
43+
import org.junit.jupiter.api.condition.EnabledForJreRange;
44+
import org.junit.jupiter.api.condition.JRE;
4345
import org.junit.jupiter.params.ParameterizedTest;
4446
import org.junit.jupiter.params.provider.ValueSource;
4547

4648
import org.springframework.boot.autoconfigure.AutoConfigurations;
4749
import org.springframework.boot.test.context.assertj.AssertableApplicationContext;
4850
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
4951
import org.springframework.boot.test.context.runner.ContextConsumer;
52+
import org.springframework.boot.testsupport.assertj.SimpleAsyncTaskExecutorAssert;
5053
import org.springframework.context.annotation.Bean;
5154
import org.springframework.context.annotation.Configuration;
55+
import org.springframework.core.task.AsyncTaskExecutor;
56+
import org.springframework.core.task.SimpleAsyncTaskExecutor;
5257
import org.springframework.kafka.annotation.EnableKafkaStreams;
5358
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
5459
import org.springframework.kafka.config.AbstractKafkaListenerContainerFactory;
@@ -570,6 +575,31 @@ void streamsApplicationIdIsNotMandatoryIfEnableKafkaStreamsIsNotSet() {
570575
});
571576
}
572577

578+
@Test
579+
void shouldUsePlatformThreadsByDefault() {
580+
this.contextRunner.run((context) -> {
581+
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
582+
.getBean(ConcurrentKafkaListenerContainerFactory.class);
583+
assertThat(factory).isNotNull();
584+
AsyncTaskExecutor listenerTaskExecutor = factory.getContainerProperties().getListenerTaskExecutor();
585+
assertThat(listenerTaskExecutor).isNull();
586+
});
587+
}
588+
589+
@Test
590+
@EnabledForJreRange(min = JRE.JAVA_21)
591+
void shouldUseVirtualThreadsIfEnabled() {
592+
this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> {
593+
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
594+
.getBean(ConcurrentKafkaListenerContainerFactory.class);
595+
assertThat(factory).isNotNull();
596+
AsyncTaskExecutor listenerTaskExecutor = factory.getContainerProperties().getListenerTaskExecutor();
597+
assertThat(listenerTaskExecutor).isInstanceOf(SimpleAsyncTaskExecutor.class);
598+
SimpleAsyncTaskExecutorAssert.assertThat((SimpleAsyncTaskExecutor) listenerTaskExecutor)
599+
.usesVirtualThreads();
600+
});
601+
}
602+
573603
@SuppressWarnings("unchecked")
574604
@Test
575605
void listenerProperties() {

0 commit comments

Comments
 (0)