Skip to content

Commit d85c5e3

Browse files
GH-8704: Add global property for defaultTimeout (#8706)
* GH-8704: Add global property for `defaultTimeout` Fixes #8704 The default timeout for requests and replies in the integration endpoints is 30 seconds to avoid indefinite blocking in threads. Sometime those 30 seconds is not enough. * Introduce a `spring.integration.endpoints.defaultTimeout` global property to allow overriding all the timeouts to desired value. The negative number indicates an indefinite waiting time: similar to what was there before introducing 30 seconds by default * Fix language in docs Co-authored-by: Gary Russell <[email protected]> --------- Co-authored-by: Gary Russell <[email protected]>
1 parent e4bacc3 commit d85c5e3

File tree

23 files changed

+144
-82
lines changed

23 files changed

+144
-82
lines changed

Diff for: spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpInboundGatewayParserTests.java

+16-20
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,8 +18,7 @@
1818

1919
import java.lang.reflect.Field;
2020

21-
import org.junit.Test;
22-
import org.junit.runner.RunWith;
21+
import org.junit.jupiter.api.Test;
2322
import org.mockito.Mockito;
2423

2524
import org.springframework.amqp.core.Address;
@@ -40,11 +39,11 @@
4039
import org.springframework.integration.test.util.TestUtils;
4140
import org.springframework.messaging.MessageChannel;
4241
import org.springframework.test.annotation.DirtiesContext;
43-
import org.springframework.test.context.ContextConfiguration;
44-
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
42+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
4543
import org.springframework.util.ReflectionUtils;
4644

4745
import static org.assertj.core.api.Assertions.assertThat;
46+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
4847
import static org.mockito.ArgumentMatchers.isNull;
4948

5049
/**
@@ -55,8 +54,7 @@
5554
*
5655
* @since 2.1
5756
*/
58-
@ContextConfiguration
59-
@RunWith(SpringJUnit4ClassRunner.class)
57+
@SpringJUnitConfig
6058
@DirtiesContext
6159
public class AmqpInboundGatewayParserTests {
6260

@@ -66,16 +64,16 @@ public class AmqpInboundGatewayParserTests {
6664
@Test
6765
public void customMessageConverter() {
6866
Object gateway = context.getBean("gateway");
69-
MessageConverter gatewayConverter = TestUtils.getPropertyValue(gateway, "amqpMessageConverter", MessageConverter.class);
70-
MessageConverter templateConverter = TestUtils.getPropertyValue(gateway, "amqpTemplate.messageConverter", MessageConverter.class);
67+
MessageConverter gatewayConverter =
68+
TestUtils.getPropertyValue(gateway, "amqpMessageConverter", MessageConverter.class);
69+
MessageConverter templateConverter =
70+
TestUtils.getPropertyValue(gateway, "amqpTemplate.messageConverter", MessageConverter.class);
7171
TestConverter testConverter = context.getBean("testConverter", TestConverter.class);
7272
assertThat(gatewayConverter).isSameAs(testConverter);
7373
assertThat(templateConverter).isSameAs(testConverter);
7474
assertThat(TestUtils.getPropertyValue(gateway, "autoStartup")).isEqualTo(Boolean.TRUE);
7575
assertThat(TestUtils.getPropertyValue(gateway, "phase")).isEqualTo(0);
76-
assertThat(TestUtils.getPropertyValue(gateway, "replyTimeout", Long.class)).isEqualTo(Long.valueOf(1234L));
77-
assertThat(TestUtils.getPropertyValue(gateway, "messagingTemplate.receiveTimeout", Long.class))
78-
.isEqualTo(Long.valueOf(1234L));
76+
assertThat(TestUtils.getPropertyValue(gateway, "messagingTemplate.receiveTimeout")).isEqualTo(1234L);
7977
assertThat(TestUtils.getPropertyValue(gateway, "messageListenerContainer.missingQueuesFatal", Boolean.class))
8078
.isTrue();
8179
}
@@ -145,14 +143,12 @@ public void verifyUsageWithHeaderMapper() throws Exception {
145143

146144
@Test
147145
public void testInt2971HeaderMapperAndMappedHeadersExclusivity() {
148-
try {
149-
new ClassPathXmlApplicationContext("AmqpInboundGatewayParserTests-headerMapper-fail-context.xml",
150-
this.getClass()).close();
151-
}
152-
catch (BeanDefinitionParsingException e) {
153-
assertThat(e.getMessage().startsWith("Configuration problem: The 'header-mapper' attribute " +
154-
"is mutually exclusive with 'mapped-request-headers' or 'mapped-reply-headers'")).isTrue();
155-
}
146+
assertThatExceptionOfType(BeanDefinitionParsingException.class)
147+
.isThrownBy(() ->
148+
new ClassPathXmlApplicationContext("AmqpInboundGatewayParserTests-headerMapper-fail-context.xml",
149+
getClass()))
150+
.withMessageStartingWith("Configuration problem: The 'header-mapper' attribute " +
151+
"is mutually exclusive with 'mapped-request-headers' or 'mapped-reply-headers'");
156152
}
157153

158154
private static class TestConverter extends SimpleMessageConverter {

Diff for: spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationProperties.java

+31-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2022 the original author or authors.
2+
* Copyright 2014-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -37,6 +37,7 @@
3737
* <li> {@code spring.integration.endpoints.noAutoStartup=}
3838
* <li> {@code spring.integration.channels.error.requireSubscribers=true}
3939
* <li> {@code spring.integration.channels.error.ignoreFailures=true}
40+
* <li> {@code spring.integration.endpoints.defaultTimeout=30000}
4041
* </ul>
4142
*
4243
* @author Artem Bilan
@@ -112,6 +113,12 @@ public final class IntegrationProperties {
112113
*/
113114
public static final String ENDPOINTS_NO_AUTO_STARTUP = INTEGRATION_PROPERTIES_PREFIX + "endpoints.noAutoStartup";
114115

116+
/**
117+
* Specifies the default timeout for blocking operations like send and receive messages.
118+
* @since 6.2
119+
*/
120+
public static final String ENDPOINTS_DEFAULT_TIMEOUT = INTEGRATION_PROPERTIES_PREFIX + "endpoints.defaultTimeout";
121+
115122
private static final Properties DEFAULTS;
116123

117124
private boolean channelsAutoCreate = true;
@@ -132,6 +139,8 @@ public final class IntegrationProperties {
132139

133140
private String[] noAutoStartupEndpoints = {};
134141

142+
private long endpointsDefaultTimeout = IntegrationContextUtils.DEFAULT_TIMEOUT;
143+
135144
private volatile Properties properties;
136145

137146
static {
@@ -293,6 +302,23 @@ public String[] getNoAutoStartupEndpoints() {
293302
return Arrays.copyOf(this.noAutoStartupEndpoints, this.noAutoStartupEndpoints.length);
294303
}
295304

305+
/**
306+
* Return the value of {@link #ENDPOINTS_DEFAULT_TIMEOUT} option.
307+
* @return the value of {@link #ENDPOINTS_DEFAULT_TIMEOUT} option.
308+
* @since 6.2
309+
*/
310+
public long getEndpointsDefaultTimeout() {
311+
return this.endpointsDefaultTimeout;
312+
}
313+
314+
/**
315+
* Configure a value for {@link #ENDPOINTS_DEFAULT_TIMEOUT} option.
316+
* @param endpointsDefaultTimeout the value for {@link #ENDPOINTS_DEFAULT_TIMEOUT} option.
317+
*/
318+
public void setEndpointsDefaultTimeout(long endpointsDefaultTimeout) {
319+
this.endpointsDefaultTimeout = endpointsDefaultTimeout;
320+
}
321+
296322
/**
297323
* Represent the current instance as a {@link Properties}.
298324
* @return the {@link Properties} representation.
@@ -312,6 +338,7 @@ public Properties toProperties() {
312338
props.setProperty(READ_ONLY_HEADERS, StringUtils.arrayToCommaDelimitedString(this.readOnlyHeaders));
313339
props.setProperty(ENDPOINTS_NO_AUTO_STARTUP,
314340
StringUtils.arrayToCommaDelimitedString(this.noAutoStartupEndpoints));
341+
props.setProperty(ENDPOINTS_DEFAULT_TIMEOUT, "" + this.endpointsDefaultTimeout);
315342

316343
this.properties = props;
317344
}
@@ -348,7 +375,9 @@ public static IntegrationProperties parse(Properties properties) {
348375
StringUtils.commaDelimitedListToStringArray(value)))
349376
.acceptIfHasText(properties.getProperty(ENDPOINTS_NO_AUTO_STARTUP),
350377
(value) -> integrationProperties.setNoAutoStartupEndpoints(
351-
StringUtils.commaDelimitedListToStringArray(value)));
378+
StringUtils.commaDelimitedListToStringArray(value)))
379+
.acceptIfHasText(properties.getProperty(ENDPOINTS_DEFAULT_TIMEOUT),
380+
(value) -> integrationProperties.setEndpointsDefaultTimeout(Long.parseLong(value)));
352381
return integrationProperties;
353382
}
354383

Diff for: spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java

+12-5
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.springframework.integration.IntegrationPatternType;
3636
import org.springframework.integration.MessageTimeoutException;
3737
import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel;
38-
import org.springframework.integration.context.IntegrationContextUtils;
3938
import org.springframework.integration.core.MessagingTemplate;
4039
import org.springframework.integration.endpoint.AbstractEndpoint;
4140
import org.springframework.integration.endpoint.EventDrivenConsumer;
@@ -124,7 +123,9 @@ public abstract class MessagingGatewaySupport extends AbstractEndpoint
124123

125124
private String errorChannelName;
126125

127-
private long replyTimeout = IntegrationContextUtils.DEFAULT_TIMEOUT;
126+
private boolean requestTimeoutSet;
127+
128+
private boolean replyTimeoutSet;
128129

129130
private InboundMessageMapper<Object> requestMapper = new DefaultRequestMapper();
130131

@@ -167,8 +168,6 @@ public MessagingGatewaySupport() {
167168
public MessagingGatewaySupport(boolean errorOnTimeout) {
168169
ConvertingMessagingTemplate template = new ConvertingMessagingTemplate();
169170
template.setMessageConverter(this.messageConverter);
170-
template.setSendTimeout(IntegrationContextUtils.DEFAULT_TIMEOUT);
171-
template.setReceiveTimeout(this.replyTimeout);
172171
this.messagingTemplate = template;
173172
this.errorOnTimeout = errorOnTimeout;
174173
}
@@ -252,6 +251,7 @@ public void setErrorChannelName(String errorChannelName) {
252251
*/
253252
public void setRequestTimeout(long requestTimeout) {
254253
this.messagingTemplate.setSendTimeout(requestTimeout);
254+
this.requestTimeoutSet = true;
255255
}
256256

257257
/**
@@ -260,8 +260,8 @@ public void setRequestTimeout(long requestTimeout) {
260260
* @param replyTimeout the timeout value in milliseconds
261261
*/
262262
public void setReplyTimeout(long replyTimeout) {
263-
this.replyTimeout = replyTimeout;
264263
this.messagingTemplate.setReceiveTimeout(replyTimeout);
264+
this.replyTimeoutSet = true;
265265
}
266266

267267
/**
@@ -406,6 +406,13 @@ protected void onInit() {
406406
}
407407
this.messageConverter.setBeanFactory(beanFactory);
408408
}
409+
long endpointsDefaultTimeout = getIntegrationProperties().getEndpointsDefaultTimeout();
410+
if (!this.requestTimeoutSet) {
411+
this.messagingTemplate.setSendTimeout(endpointsDefaultTimeout);
412+
}
413+
if (!this.replyTimeoutSet) {
414+
this.messagingTemplate.setReceiveTimeout(endpointsDefaultTimeout);
415+
}
409416
this.initialized = true;
410417
}
411418

Diff for: spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -89,16 +89,15 @@ public abstract class AbstractMessageProducingHandler extends AbstractMessageHan
8989

9090
private boolean noHeadersPropagation;
9191

92-
{
93-
this.messagingTemplate.setSendTimeout(IntegrationContextUtils.DEFAULT_TIMEOUT);
94-
}
92+
private boolean sendTimeoutSet;
9593

9694
/**
9795
* Set the timeout for sending reply Messages.
9896
* @param sendTimeout The send timeout.
9997
*/
10098
public void setSendTimeout(long sendTimeout) {
10199
this.messagingTemplate.setSendTimeout(sendTimeout);
100+
this.sendTimeoutSet = true;
102101
}
103102

104103
@Override
@@ -189,7 +188,7 @@ protected final void updateNotPropagatedHeaders(String[] headers, boolean merge)
189188
@Override
190189
public Collection<String> getNotPropagatedHeaders() {
191190
return this.notPropagatedHeaders != null
192-
? Collections.unmodifiableSet(new HashSet<>(Arrays.asList(this.notPropagatedHeaders)))
191+
? Set.of(this.notPropagatedHeaders)
193192
: Collections.emptyList();
194193
}
195194

@@ -217,6 +216,9 @@ protected void onInit() {
217216
}
218217
this.messagingTemplate.setDestinationResolver(getChannelResolver());
219218
setAsyncIfCan();
219+
if (!this.sendTimeoutSet) {
220+
this.messagingTemplate.setSendTimeout(getIntegrationProperties().getEndpointsDefaultTimeout());
221+
}
220222
}
221223

222224
private void setAsyncIfCan() {

Diff for: spring-integration-core/src/main/java/org/springframework/integration/router/AbstractMessageRouter.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.springframework.core.convert.ConversionService;
2626
import org.springframework.core.convert.support.DefaultConversionService;
2727
import org.springframework.integration.IntegrationPatternType;
28-
import org.springframework.integration.context.IntegrationContextUtils;
2928
import org.springframework.integration.core.MessagingTemplate;
3029
import org.springframework.integration.handler.AbstractMessageHandler;
3130
import org.springframework.integration.support.management.IntegrationManagedResource;
@@ -63,9 +62,7 @@ public abstract class AbstractMessageRouter extends AbstractMessageHandler imple
6362

6463
private volatile boolean applySequence;
6564

66-
{
67-
this.messagingTemplate.setSendTimeout(IntegrationContextUtils.DEFAULT_TIMEOUT);
68-
}
65+
private boolean sendTimeoutSet;
6966

7067
/**
7168
* Set the default channel where Messages should be sent if channel resolution
@@ -115,10 +112,11 @@ public void setDefaultOutputChannelName(String defaultOutputChannelName) {
115112
*/
116113
public void setSendTimeout(long timeout) {
117114
this.messagingTemplate.setSendTimeout(timeout);
115+
this.sendTimeoutSet = true;
118116
}
119117

120118
/**
121-
* Specify whether send failures for one or more of the recipients should be ignored. By default this is
119+
* Specify whether send failures for one or more of the recipients should be ignored. By default, this is
122120
* <code>false</code> meaning that an Exception will be thrown whenever a send fails. To override this and suppress
123121
* Exceptions, set the value to <code>true</code>.
124122
* @param ignoreSendFailures true to ignore send failures.
@@ -174,6 +172,10 @@ protected void onInit() {
174172
if (beanFactory != null) {
175173
this.messagingTemplate.setBeanFactory(beanFactory);
176174
}
175+
176+
if (!this.sendTimeoutSet) {
177+
this.messagingTemplate.setSendTimeout(getIntegrationProperties().getEndpointsDefaultTimeout());
178+
}
177179
}
178180

179181
/**

Diff for: spring-integration-core/src/main/java/org/springframework/integration/scattergather/ScatterGatherHandler.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public class ScatterGatherHandler extends AbstractReplyProducingMessageHandler i
6767

6868
private String errorChannelName = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME;
6969

70-
private long gatherTimeout = IntegrationContextUtils.DEFAULT_TIMEOUT;
70+
private Long gatherTimeout;
7171

7272
private AbstractEndpoint gatherEndpoint;
7373

@@ -119,6 +119,10 @@ public IntegrationPatternType getIntegrationPatternType() {
119119

120120
@Override
121121
protected void doInit() {
122+
if (this.gatherTimeout == null) {
123+
this.gatherTimeout = getIntegrationProperties().getEndpointsDefaultTimeout();
124+
}
125+
122126
BeanFactory beanFactory = getBeanFactory();
123127
if (this.gatherChannel == null) {
124128
this.gatherChannel =

Diff for: spring-integration-core/src/main/resources/META-INF/spring.integration.default.properties

+1
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ spring.integration.messagingTemplate.throwExceptionOnLateReply=false
88
# Defaults to MessageHeaders.ID and MessageHeaders.TIMESTAMP
99
spring.integration.readOnly.headers=
1010
spring.integration.endpoints.noAutoStartup=
11+
spring.integration.endpoints.defaultTimeout=30000

Diff for: spring-integration-core/src/test/java/org/springframework/integration/aggregator/CorrelatingMessageHandlerTests.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,10 @@ public void initializeSubject() {
7171
outputChannel = mock(MessageChannel.class);
7272
handler = new AggregatingMessageHandler(processor, store, correlationStrategy, ReleaseStrategy);
7373
handler.setOutputChannel(outputChannel);
74+
handler.setBeanFactory(mock());
75+
handler.afterPropertiesSet();
7476
}
7577

76-
7778
@Test
7879
public void bufferCompletesNormally() {
7980
String correlationKey = "key";
@@ -95,7 +96,7 @@ public void bufferCompletesNormally() {
9596
}
9697

9798
@Test
98-
public void bufferCompletesWithException() throws Exception {
99+
public void bufferCompletesWithException() {
99100

100101
doAnswer(new ThrowsException(new RuntimeException("Planned test exception")))
101102
.when(processor).processMessageGroup(isA(SimpleMessageGroup.class));

Diff for: spring-integration-core/src/test/java/org/springframework/integration/config/ResequencerParserTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ void testDefaultResequencerProperties() {
5959
ResequencingMessageHandler resequencer = TestUtils.getPropertyValue(endpoint, "handler",
6060
ResequencingMessageHandler.class);
6161
assertThat(getPropertyValue(resequencer, "outputChannel")).isNull();
62-
assertThat(getPropertyValue(resequencer, "messagingTemplate.sendTimeout")).isEqualTo(30000L);
62+
assertThat(getPropertyValue(resequencer, "messagingTemplate.sendTimeout")).isEqualTo(45000L);
6363
assertThat(getPropertyValue(resequencer, "sendPartialResultOnExpiry"))
6464
.as("The ResequencerEndpoint is not configured with the appropriate 'send partial results on " +
6565
"timeout'" +

Diff for: spring-integration-core/src/test/java/org/springframework/integration/config/annotation/AggregatorAnnotationTests.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public void testAnnotationWithDefaultSettings() {
5151
assertThat(getPropertyValue(aggregator, "releaseStrategy") instanceof SimpleSequenceSizeReleaseStrategy)
5252
.isTrue();
5353
assertThat(getPropertyValue(aggregator, "outputChannel")).isNull();
54-
assertThat(getPropertyValue(aggregator, "messagingTemplate.sendTimeout")).isEqualTo(30000L);
54+
assertThat(getPropertyValue(aggregator, "messagingTemplate.sendTimeout")).isEqualTo(45000L);
5555
assertThat(getPropertyValue(aggregator, "sendPartialResultOnExpiry")).isEqualTo(false);
5656
context.close();
5757
}
@@ -72,7 +72,7 @@ public void testAnnotationWithCustomSettings() {
7272
}
7373

7474
@Test
75-
public void testAnnotationWithCustomReleaseStrategy() throws Exception {
75+
public void testAnnotationWithCustomReleaseStrategy() {
7676
ConfigurableApplicationContext context = new ClassPathXmlApplicationContext(
7777
new String[] {"classpath:/org/springframework/integration/config/annotation/testAnnotatedAggregator.xml"});
7878
final String endpointName = "endpointWithDefaultAnnotationAndCustomReleaseStrategy";
@@ -90,7 +90,7 @@ public void testAnnotationWithCustomReleaseStrategy() throws Exception {
9090
}
9191

9292
@Test
93-
public void testAnnotationWithCustomCorrelationStrategy() throws Exception {
93+
public void testAnnotationWithCustomCorrelationStrategy() {
9494
ConfigurableApplicationContext context = new ClassPathXmlApplicationContext(
9595
new String[] {"classpath:/org/springframework/integration/config/annotation/testAnnotatedAggregator.xml"});
9696
final String endpointName = "endpointWithCorrelationStrategy";

Diff for: spring-integration-core/src/test/java/org/springframework/integration/config/xml/HeaderEnricherParserTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class HeaderEnricherParserTests {
4949
void sendTimeoutDefault() {
5050
Object endpoint = context.getBean("headerEnricherWithDefaults");
5151
long sendTimeout = TestUtils.getPropertyValue(endpoint, "handler.messagingTemplate.sendTimeout", Long.class);
52-
assertThat(sendTimeout).isEqualTo(30000L);
52+
assertThat(sendTimeout).isEqualTo(45000L);
5353
}
5454

5555
@Test

0 commit comments

Comments
 (0)