Skip to content

Thread name prefix is not always set when using virtual threads #39958

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@
@ConditionalOnClass(EnableRabbit.class)
class RabbitAnnotationDrivenConfiguration {

/**
* Default Name of the thread created for simple rabbit listener.
*/
public static final String THREADNAME_RABBIT_SIMPLE = "rabbit-simple-";

/**
* Default Name of the thread created for direct rabbit listener.
*/
public static final String THREADNAME_RABBIT_DIRECT = "rabbit-direct-";

private final ObjectProvider<MessageConverter> messageConverter;

private final ObjectProvider<MessageRecoverer> messageRecoverer;
Expand Down Expand Up @@ -76,7 +86,7 @@ SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFact
@ConditionalOnThreading(Threading.VIRTUAL)
SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFactoryConfigurerVirtualThreads() {
SimpleRabbitListenerContainerFactoryConfigurer configurer = simpleListenerConfigurer();
configurer.setTaskExecutor(new VirtualThreadTaskExecutor());
configurer.setTaskExecutor(new VirtualThreadTaskExecutor(THREADNAME_RABBIT_SIMPLE));
return configurer;
}

Expand Down Expand Up @@ -105,7 +115,7 @@ DirectRabbitListenerContainerFactoryConfigurer directRabbitListenerContainerFact
@ConditionalOnThreading(Threading.VIRTUAL)
DirectRabbitListenerContainerFactoryConfigurer directRabbitListenerContainerFactoryConfigurerVirtualThreads() {
DirectRabbitListenerContainerFactoryConfigurer configurer = directListenerConfigurer();
configurer.setTaskExecutor(new VirtualThreadTaskExecutor());
configurer.setTaskExecutor(new VirtualThreadTaskExecutor(THREADNAME_RABBIT_DIRECT));
return configurer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@
@Import(PulsarConfiguration.class)
public class PulsarAutoConfiguration {

/**
* Default Name of the thread created for pulsar consumer.
*/
public static final String THREADNAME_PULSAR_CONSUMER = "pulsar-consumer-";

/**
* Default Name of the thread created for pulsar task executor.
*/
public static final String THREADNAME_PULSAR_TASKEXECUTOR = "pulsar-taskexecutor-";

private PulsarProperties properties;

private PulsarPropertiesMapper propertiesMapper;
Expand Down Expand Up @@ -158,7 +168,7 @@ ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
containerProperties.setSchemaResolver(schemaResolver);
containerProperties.setTopicResolver(topicResolver);
if (Threading.VIRTUAL.isActive(environment)) {
containerProperties.setConsumerTaskExecutor(new VirtualThreadTaskExecutor());
containerProperties.setConsumerTaskExecutor(new VirtualThreadTaskExecutor(THREADNAME_PULSAR_CONSUMER));
}
this.propertiesMapper.customizeContainerProperties(containerProperties);
return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties);
Expand Down Expand Up @@ -189,7 +199,8 @@ DefaultPulsarReaderContainerFactory<?> pulsarReaderContainerFactory(PulsarReader
PulsarReaderContainerProperties readerContainerProperties = new PulsarReaderContainerProperties();
readerContainerProperties.setSchemaResolver(schemaResolver);
if (Threading.VIRTUAL.isActive(environment)) {
readerContainerProperties.setReaderTaskExecutor(new VirtualThreadTaskExecutor());
readerContainerProperties
.setReaderTaskExecutor(new VirtualThreadTaskExecutor(THREADNAME_PULSAR_TASKEXECUTOR));
}
this.propertiesMapper.customizeReaderContainerProperties(readerContainerProperties);
return new DefaultPulsarReaderContainerFactory<>(pulsarReaderFactory, readerContainerProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

import javax.net.ssl.SSLSocketFactory;
Expand Down Expand Up @@ -545,12 +546,36 @@ void testSimpleRabbitListenerContainerFactoryWithCustomSettings() {

@Test
@EnabledForJreRange(min = JRE.JAVA_21)
void shouldConfigureVirtualThreads() {
void shouldConfigureVirtualThreadsForSimpleListener() {
this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> {
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = context
.getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class);
assertThat(rabbitListenerContainerFactory).extracting("taskExecutor")
.isInstanceOf(VirtualThreadTaskExecutor.class);
final var taskExecutor = ReflectionTestUtils.getField(rabbitListenerContainerFactory, "taskExecutor");
final var virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
final var threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName())
.containsPattern(RabbitAnnotationDrivenConfiguration.THREADNAME_RABBIT_SIMPLE + "[0-9]*");

});
}

@Test
@EnabledForJreRange(min = JRE.JAVA_21)
void shouldConfigureVirtualThreadsForDirectListener() {
this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> {
DirectRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactory = context.getBean(
"directRabbitListenerContainerFactoryConfigurer",
DirectRabbitListenerContainerFactoryConfigurer.class);
assertThat(rabbitListenerContainerFactory).extracting("taskExecutor")
.isInstanceOf(VirtualThreadTaskExecutor.class);
final var taskExecutor = ReflectionTestUtils.getField(rabbitListenerContainerFactory, "taskExecutor");
final var virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
final var threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName())
.containsPattern(RabbitAnnotationDrivenConfiguration.THREADNAME_RABBIT_DIRECT + "[0-9]*");

});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import com.github.benmanes.caffeine.cache.Caffeine;
Expand Down Expand Up @@ -69,6 +70,7 @@
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.test.util.ReflectionTestUtils;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -499,6 +501,11 @@ void whenVirtualThreadsAreEnabledOnJava21AndLaterListenerContainerShouldUseVirtu
.getBean(ConcurrentPulsarListenerContainerFactory.class);
assertThat(factory.getContainerProperties().getConsumerTaskExecutor())
.isInstanceOf(VirtualThreadTaskExecutor.class);
final var taskExecutor = factory.getContainerProperties().getConsumerTaskExecutor();
final var virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
final var threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName())
.containsPattern(PulsarAutoConfiguration.THREADNAME_PULSAR_CONSUMER + "[0-9]*");
});
}

Expand Down Expand Up @@ -554,6 +561,11 @@ void whenVirtualThreadsAreEnabledOnJava21AndLaterReaderShouldUseVirtualThreads()
.getBean(DefaultPulsarReaderContainerFactory.class);
assertThat(factory.getContainerProperties().getReaderTaskExecutor())
.isInstanceOf(VirtualThreadTaskExecutor.class);
final var taskExecutor = factory.getContainerProperties().getReaderTaskExecutor();
final var virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
final var threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName())
.containsPattern(PulsarAutoConfiguration.THREADNAME_PULSAR_TASKEXECUTOR + "[0-9]*");
});
}

Expand Down