Skip to content

Commit f526b23

Browse files
committed
Harmonize WebSocket message broker to use Executor
This commit harmonizes the configuration of the WebSocket message broker to use Executor rather than TaskExecutor as only the former is enforced. This lets custom configuration to use a wider range of implementations. Closes gh-32129
1 parent 2fc8b13 commit f526b23

File tree

5 files changed

+78
-79
lines changed

5 files changed

+78
-79
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java

+22-22
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.HashMap;
2222
import java.util.List;
2323
import java.util.Map;
24+
import java.util.concurrent.Executor;
2425
import java.util.function.Supplier;
2526

2627
import org.springframework.beans.factory.BeanInitializationException;
@@ -30,7 +31,6 @@
3031
import org.springframework.context.SmartLifecycle;
3132
import org.springframework.context.annotation.Bean;
3233
import org.springframework.context.event.SmartApplicationListener;
33-
import org.springframework.core.task.TaskExecutor;
3434
import org.springframework.lang.Nullable;
3535
import org.springframework.messaging.MessageHandler;
3636
import org.springframework.messaging.converter.ByteArrayMessageConverter;
@@ -158,7 +158,7 @@ public ApplicationContext getApplicationContext() {
158158

159159
@Bean
160160
public AbstractSubscribableChannel clientInboundChannel(
161-
@Qualifier("clientInboundChannelExecutor") TaskExecutor executor) {
161+
@Qualifier("clientInboundChannelExecutor") Executor executor) {
162162

163163
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(executor);
164164
channel.setLogger(SimpLogging.forLog(channel.getLogger()));
@@ -170,9 +170,9 @@ public AbstractSubscribableChannel clientInboundChannel(
170170
}
171171

172172
@Bean
173-
public TaskExecutor clientInboundChannelExecutor() {
173+
public Executor clientInboundChannelExecutor() {
174174
ChannelRegistration registration = getClientInboundChannelRegistration();
175-
TaskExecutor executor = getTaskExecutor(registration, "clientInboundChannel-", this::defaultTaskExecutor);
175+
Executor executor = getExecutor(registration, "clientInboundChannel-", this::defaultExecutor);
176176
if (executor instanceof ExecutorConfigurationSupport executorSupport) {
177177
executorSupport.setPhase(getPhase());
178178
}
@@ -209,7 +209,7 @@ protected void configureClientInboundChannel(ChannelRegistration registration) {
209209

210210
@Bean
211211
public AbstractSubscribableChannel clientOutboundChannel(
212-
@Qualifier("clientOutboundChannelExecutor") TaskExecutor executor) {
212+
@Qualifier("clientOutboundChannelExecutor") Executor executor) {
213213

214214
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(executor);
215215
channel.setLogger(SimpLogging.forLog(channel.getLogger()));
@@ -221,9 +221,9 @@ public AbstractSubscribableChannel clientOutboundChannel(
221221
}
222222

223223
@Bean
224-
public TaskExecutor clientOutboundChannelExecutor() {
224+
public Executor clientOutboundChannelExecutor() {
225225
ChannelRegistration registration = getClientOutboundChannelRegistration();
226-
TaskExecutor executor = getTaskExecutor(registration, "clientOutboundChannel-", this::defaultTaskExecutor);
226+
Executor executor = getExecutor(registration, "clientOutboundChannel-", this::defaultExecutor);
227227
if (executor instanceof ExecutorConfigurationSupport executorSupport) {
228228
executorSupport.setPhase(getPhase());
229229
}
@@ -250,11 +250,11 @@ protected void configureClientOutboundChannel(ChannelRegistration registration)
250250
@Bean
251251
public AbstractSubscribableChannel brokerChannel(
252252
AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel,
253-
@Qualifier("brokerChannelExecutor") TaskExecutor executor) {
253+
@Qualifier("brokerChannelExecutor") Executor executor) {
254254

255255
MessageBrokerRegistry registry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel);
256256
ChannelRegistration registration = registry.getBrokerChannelRegistration();
257-
ExecutorSubscribableChannel channel = (registration.hasTaskExecutor() ?
257+
ExecutorSubscribableChannel channel = (registration.hasExecutor() ?
258258
new ExecutorSubscribableChannel(executor) : new ExecutorSubscribableChannel());
259259
registration.interceptors(new ImmutableMessageChannelInterceptor());
260260
channel.setLogger(SimpLogging.forLog(channel.getLogger()));
@@ -263,38 +263,38 @@ public AbstractSubscribableChannel brokerChannel(
263263
}
264264

265265
@Bean
266-
public TaskExecutor brokerChannelExecutor(
266+
public Executor brokerChannelExecutor(
267267
AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel) {
268268

269269
MessageBrokerRegistry registry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel);
270270
ChannelRegistration registration = registry.getBrokerChannelRegistration();
271-
TaskExecutor executor = getTaskExecutor(registration, "brokerChannel-", () -> {
271+
Executor executor = getExecutor(registration, "brokerChannel-", () -> {
272272
// Should never be used
273-
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
274-
threadPoolTaskExecutor.setCorePoolSize(0);
275-
threadPoolTaskExecutor.setMaxPoolSize(1);
276-
threadPoolTaskExecutor.setQueueCapacity(0);
277-
return threadPoolTaskExecutor;
273+
ThreadPoolTaskExecutor fallbackExecutor = new ThreadPoolTaskExecutor();
274+
fallbackExecutor.setCorePoolSize(0);
275+
fallbackExecutor.setMaxPoolSize(1);
276+
fallbackExecutor.setQueueCapacity(0);
277+
return fallbackExecutor;
278278
});
279279
if (executor instanceof ExecutorConfigurationSupport executorSupport) {
280280
executorSupport.setPhase(getPhase());
281281
}
282282
return executor;
283283
}
284284

285-
private TaskExecutor defaultTaskExecutor() {
285+
private Executor defaultExecutor() {
286286
return new TaskExecutorRegistration().getTaskExecutor();
287287
}
288288

289-
private static TaskExecutor getTaskExecutor(ChannelRegistration registration,
290-
String threadNamePrefix, Supplier<TaskExecutor> fallback) {
289+
private static Executor getExecutor(ChannelRegistration registration,
290+
String threadNamePrefix, Supplier<Executor> fallback) {
291291

292-
return registration.getTaskExecutor(fallback,
292+
return registration.getExecutor(fallback,
293293
executor -> setThreadNamePrefix(executor, threadNamePrefix));
294294
}
295295

296-
private static void setThreadNamePrefix(TaskExecutor taskExecutor, String name) {
297-
if (taskExecutor instanceof CustomizableThreadCreator ctc) {
296+
private static void setThreadNamePrefix(Executor executor, String name) {
297+
if (executor instanceof CustomizableThreadCreator ctc) {
298298
ctc.setThreadNamePrefix(name);
299299
}
300300
}

spring-messaging/src/main/java/org/springframework/messaging/simp/config/ChannelRegistration.java

+17-18
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
import java.util.ArrayList;
2020
import java.util.Arrays;
2121
import java.util.List;
22+
import java.util.concurrent.Executor;
2223
import java.util.function.Consumer;
2324
import java.util.function.Supplier;
2425

25-
import org.springframework.core.task.TaskExecutor;
2626
import org.springframework.lang.Nullable;
2727
import org.springframework.messaging.support.ChannelInterceptor;
2828
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@@ -41,7 +41,7 @@ public class ChannelRegistration {
4141
private TaskExecutorRegistration registration;
4242

4343
@Nullable
44-
private TaskExecutor executor;
44+
private Executor executor;
4545

4646
private final List<ChannelInterceptor> interceptors = new ArrayList<>();
4747

@@ -67,14 +67,14 @@ public TaskExecutorRegistration taskExecutor(@Nullable ThreadPoolTaskExecutor ta
6767
}
6868

6969
/**
70-
* Configure the given {@link TaskExecutor} for this message channel,
70+
* Configure the given {@link Executor} for this message channel,
7171
* taking precedence over a {@linkplain #taskExecutor() task executor
7272
* registration} if any.
73-
* @param taskExecutor the task executor to use
73+
* @param executor the executor to use
7474
* @since 6.1.4
7575
*/
76-
public ChannelRegistration executor(TaskExecutor taskExecutor) {
77-
this.executor = taskExecutor;
76+
public ChannelRegistration executor(Executor executor) {
77+
this.executor = executor;
7878
return this;
7979
}
8080

@@ -89,7 +89,7 @@ public ChannelRegistration interceptors(ChannelInterceptor... interceptors) {
8989
}
9090

9191

92-
protected boolean hasTaskExecutor() {
92+
protected boolean hasExecutor() {
9393
return (this.registration != null || this.executor != null);
9494
}
9595

@@ -98,18 +98,17 @@ protected boolean hasInterceptors() {
9898
}
9999

100100
/**
101-
* Return the {@link TaskExecutor} to use. If no task executor has been
102-
* configured, the {@code fallback} supplier is used to provide a fallback
103-
* instance.
101+
* Return the {@link Executor} to use. If no executor has been configured,
102+
* the {@code fallback} supplier is used to provide a fallback instance.
104103
* <p>
105-
* If the {@link TaskExecutor} to use is suitable for further customizations,
104+
* If the {@link Executor} to use is suitable for further customizations,
106105
* the {@code customizer} consumer is invoked.
107-
* @param fallback a supplier of a fallback task executor in case none is configured
106+
* @param fallback a supplier of a fallback executor in case none is configured
108107
* @param customizer further customizations
109-
* @return the task executor to use
110-
* @since 6.1.4
108+
* @return the executor to use
109+
* @since 6.2
111110
*/
112-
protected TaskExecutor getTaskExecutor(Supplier<TaskExecutor> fallback, Consumer<TaskExecutor> customizer) {
111+
protected Executor getExecutor(Supplier<Executor> fallback, Consumer<Executor> customizer) {
113112
if (this.executor != null) {
114113
return this.executor;
115114
}
@@ -119,9 +118,9 @@ else if (this.registration != null) {
119118
return registeredTaskExecutor;
120119
}
121120
else {
122-
TaskExecutor taskExecutor = fallback.get();
123-
customizer.accept(taskExecutor);
124-
return taskExecutor;
121+
Executor fallbackExecutor = fallback.get();
122+
customizer.accept(fallbackExecutor);
123+
return fallbackExecutor;
125124
}
126125
}
127126

spring-messaging/src/test/java/org/springframework/messaging/simp/config/ChannelRegistrationTests.java

+28-28
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@
1616

1717
package org.springframework.messaging.simp.config;
1818

19+
import java.util.concurrent.Executor;
1920
import java.util.function.Consumer;
2021
import java.util.function.Supplier;
2122

2223
import org.junit.jupiter.api.Test;
2324

24-
import org.springframework.core.task.TaskExecutor;
2525
import org.springframework.messaging.support.ChannelInterceptor;
2626
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
2727

@@ -38,20 +38,20 @@
3838
*/
3939
class ChannelRegistrationTests {
4040

41-
private final Supplier<TaskExecutor> fallback = mock();
41+
private final Supplier<Executor> fallback = mock();
4242

43-
private final Consumer<TaskExecutor> customizer = mock();
43+
private final Consumer<Executor> customizer = mock();
4444

4545
@Test
4646
void emptyRegistrationUsesFallback() {
47-
TaskExecutor fallbackTaskExecutor = mock(TaskExecutor.class);
48-
given(this.fallback.get()).willReturn(fallbackTaskExecutor);
47+
Executor fallbackExecutor = mock(Executor.class);
48+
given(this.fallback.get()).willReturn(fallbackExecutor);
4949
ChannelRegistration registration = new ChannelRegistration();
50-
assertThat(registration.hasTaskExecutor()).isFalse();
51-
TaskExecutor actual = registration.getTaskExecutor(this.fallback, this.customizer);
52-
assertThat(actual).isSameAs(fallbackTaskExecutor);
50+
assertThat(registration.hasExecutor()).isFalse();
51+
Executor actual = registration.getExecutor(this.fallback, this.customizer);
52+
assertThat(actual).isSameAs(fallbackExecutor);
5353
verify(this.fallback).get();
54-
verify(this.customizer).accept(fallbackTaskExecutor);
54+
verify(this.customizer).accept(fallbackExecutor);
5555
}
5656

5757
@Test
@@ -65,45 +65,45 @@ void emptyRegistrationDoesNotHaveInterceptors() {
6565
void taskRegistrationCreatesDefaultInstance() {
6666
ChannelRegistration registration = new ChannelRegistration();
6767
registration.taskExecutor();
68-
assertThat(registration.hasTaskExecutor()).isTrue();
69-
TaskExecutor taskExecutor = registration.getTaskExecutor(this.fallback, this.customizer);
70-
assertThat(taskExecutor).isInstanceOf(ThreadPoolTaskExecutor.class);
68+
assertThat(registration.hasExecutor()).isTrue();
69+
Executor executor = registration.getExecutor(this.fallback, this.customizer);
70+
assertThat(executor).isInstanceOf(ThreadPoolTaskExecutor.class);
7171
verifyNoInteractions(this.fallback);
72-
verify(this.customizer).accept(taskExecutor);
72+
verify(this.customizer).accept(executor);
7373
}
7474

7575
@Test
7676
void taskRegistrationWithExistingThreadPoolTaskExecutor() {
77-
ThreadPoolTaskExecutor existingTaskExecutor = mock(ThreadPoolTaskExecutor.class);
77+
ThreadPoolTaskExecutor existingExecutor = mock(ThreadPoolTaskExecutor.class);
7878
ChannelRegistration registration = new ChannelRegistration();
79-
registration.taskExecutor(existingTaskExecutor);
80-
assertThat(registration.hasTaskExecutor()).isTrue();
81-
TaskExecutor taskExecutor = registration.getTaskExecutor(this.fallback, this.customizer);
82-
assertThat(taskExecutor).isSameAs(existingTaskExecutor);
79+
registration.taskExecutor(existingExecutor);
80+
assertThat(registration.hasExecutor()).isTrue();
81+
Executor executor = registration.getExecutor(this.fallback, this.customizer);
82+
assertThat(executor).isSameAs(existingExecutor);
8383
verifyNoInteractions(this.fallback);
84-
verify(this.customizer).accept(taskExecutor);
84+
verify(this.customizer).accept(executor);
8585
}
8686

8787
@Test
8888
void configureExecutor() {
8989
ChannelRegistration registration = new ChannelRegistration();
90-
TaskExecutor taskExecutor = mock(TaskExecutor.class);
91-
registration.executor(taskExecutor);
92-
assertThat(registration.hasTaskExecutor()).isTrue();
93-
TaskExecutor taskExecutor1 = registration.getTaskExecutor(this.fallback, this.customizer);
94-
assertThat(taskExecutor1).isSameAs(taskExecutor);
90+
Executor executor = mock(Executor.class);
91+
registration.executor(executor);
92+
assertThat(registration.hasExecutor()).isTrue();
93+
Executor actualExecutor = registration.getExecutor(this.fallback, this.customizer);
94+
assertThat(actualExecutor).isSameAs(executor);
9595
verifyNoInteractions(this.fallback, this.customizer);
9696
}
9797

9898
@Test
9999
void configureExecutorTakesPrecedenceOverTaskRegistration() {
100100
ChannelRegistration registration = new ChannelRegistration();
101-
TaskExecutor taskExecutor = mock(TaskExecutor.class);
102-
registration.executor(taskExecutor);
101+
Executor executor = mock(Executor.class);
102+
registration.executor(executor);
103103
ThreadPoolTaskExecutor ignored = mock(ThreadPoolTaskExecutor.class);
104104
registration.taskExecutor(ignored);
105-
assertThat(registration.hasTaskExecutor()).isTrue();
106-
assertThat(registration.getTaskExecutor(this.fallback, this.customizer)).isSameAs(taskExecutor);
105+
assertThat(registration.hasExecutor()).isTrue();
106+
assertThat(registration.getExecutor(this.fallback, this.customizer)).isSameAs(executor);
107107
verifyNoInteractions(ignored, this.fallback, this.customizer);
108108

109109
}

spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Set;
2323
import java.util.concurrent.CompletableFuture;
2424
import java.util.concurrent.ConcurrentHashMap;
25+
import java.util.concurrent.Executor;
2526

2627
import org.junit.jupiter.api.Test;
2728

@@ -31,7 +32,6 @@
3132
import org.springframework.context.annotation.Configuration;
3233
import org.springframework.context.support.StaticApplicationContext;
3334
import org.springframework.core.Ordered;
34-
import org.springframework.core.task.TaskExecutor;
3535
import org.springframework.lang.Nullable;
3636
import org.springframework.messaging.Message;
3737
import org.springframework.messaging.MessageChannel;
@@ -599,20 +599,20 @@ public TestController subscriptionController() {
599599

600600
@Override
601601
@Bean
602-
public AbstractSubscribableChannel clientInboundChannel(TaskExecutor clientInboundChannelExecutor) {
602+
public AbstractSubscribableChannel clientInboundChannel(Executor clientInboundChannelExecutor) {
603603
return new TestChannel();
604604
}
605605

606606
@Override
607607
@Bean
608-
public AbstractSubscribableChannel clientOutboundChannel(TaskExecutor clientOutboundChannelExecutor) {
608+
public AbstractSubscribableChannel clientOutboundChannel(Executor clientOutboundChannelExecutor) {
609609
return new TestChannel();
610610
}
611611

612612
@Override
613613
@Bean
614614
public AbstractSubscribableChannel brokerChannel(AbstractSubscribableChannel clientInboundChannel,
615-
AbstractSubscribableChannel clientOutboundChannel, TaskExecutor brokerChannelExecutor) {
615+
AbstractSubscribableChannel clientOutboundChannel, Executor brokerChannelExecutor) {
616616
return new TestChannel();
617617
}
618618
}
@@ -688,21 +688,21 @@ protected void configureMessageBroker(MessageBrokerRegistry registry) {
688688

689689
@Override
690690
@Bean
691-
public AbstractSubscribableChannel clientInboundChannel(TaskExecutor clientInboundChannelExecutor) {
691+
public AbstractSubscribableChannel clientInboundChannel(Executor clientInboundChannelExecutor) {
692692
// synchronous
693693
return new ExecutorSubscribableChannel(null);
694694
}
695695

696696
@Override
697697
@Bean
698-
public AbstractSubscribableChannel clientOutboundChannel(TaskExecutor clientOutboundChannelExecutor) {
698+
public AbstractSubscribableChannel clientOutboundChannel(Executor clientOutboundChannelExecutor) {
699699
return new TestChannel();
700700
}
701701

702702
@Override
703703
@Bean
704704
public AbstractSubscribableChannel brokerChannel(AbstractSubscribableChannel clientInboundChannel,
705-
AbstractSubscribableChannel clientOutboundChannel, TaskExecutor brokerChannelExecutor) {
705+
AbstractSubscribableChannel clientOutboundChannel, Executor brokerChannelExecutor) {
706706
// synchronous
707707
return new ExecutorSubscribableChannel(null);
708708
}

0 commit comments

Comments
 (0)