Skip to content

Commit 1bec420

Browse files
authored
Do not block by default (#8580)
Currently, many timeouts in the project are like `-1` or other negative value with a meaning to wait indefinitely. According to distributed systems design and bad demo developing experience it is not OK to block forever. * Rework most of the timeouts in the framework to be `30` seconds. Only one remained as `1` seconds is a `PollingConsumer` where it is better to not block even for those 30 seconds when no messages in the queue, but let the polling task be rescheduled. * Remove the `MessagingGatewaySupport.replyTimeout` propagation down to the `PollingConsumer` correlator where it was a `-1` before and blocked the polling thread on the `Queue.poll()`. This fixed the problem with a single thread in a pool for auto-configured `TaskScheduler`. Now with 1 seconds wait time we are able to switch to other scheduled tasks even with only 1 thread in the pool
1 parent fcb06ba commit 1bec420

File tree

58 files changed

+284
-340
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+284
-340
lines changed

spring-integration-amqp/src/main/resources/org/springframework/integration/amqp/config/spring-integration-amqp.xsd

+3-5
Original file line numberDiff line numberDiff line change
@@ -721,7 +721,7 @@ property set to TRUE.
721721
</xsd:appinfo>
722722
</xsd:annotation>
723723
</xsd:attribute>
724-
<xsd:attribute name="reply-timeout" type="xsd:string">
724+
<xsd:attribute name="reply-timeout" type="xsd:string" default="30000">
725725
<xsd:annotation>
726726
<xsd:documentation><![CDATA[
727727
Allows you to specify how long this gateway will wait for
@@ -738,9 +738,7 @@ property set to TRUE.
738738
The "reply-timeout" attribute maps to the "sendTimeout" property of the
739739
underlying 'MessagingTemplate' instance (org.springframework.integration.core.MessagingTemplate).
740740
741-
The attribute will default, if not specified, to '-1', meaning that
742-
by default, the Gateway will wait indefinitely. The value is
743-
specified in milliseconds.
741+
The value is specified in milliseconds.
744742
]]></xsd:documentation>
745743
</xsd:annotation>
746744
</xsd:attribute>
@@ -752,7 +750,7 @@ property set to TRUE.
752750
which itself is a Map.
753751
This can only be provided if the 'header-mapper' reference is not being set directly. The values in
754752
this list can also be simple patterns to be matched against the header names (e.g. "foo*" or "*foo").
755-
A special token 'STANDARD_REPLY_HEADERS' represents all the standard AMQP headers (replyTo, correlationId etc);
753+
A special token 'STANDARD_REPLY_HEADERS' represents all the standard AMQP headers (replyTo, correlationId etc.);
756754
it is included by default. If you wish to add your own headers, you must also include this token if you wish the
757755
standard headers to also be mapped. To map all non-standard headers the 'NON_STANDARD_HEADERS' token can be used.
758756
]]></xsd:documentation>

spring-integration-core/src/main/java/org/springframework/integration/annotation/Aggregator.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -61,7 +61,7 @@
6161
/**
6262
* Specify the maximum amount of time in milliseconds to wait when sending a reply
6363
* {@link org.springframework.messaging.Message} to the {@link #outputChannel()}.
64-
* Defaults to {@code -1} - blocking indefinitely.
64+
* Defaults to {@code 30} seconds.
6565
* It is applied only if the output channel has some 'sending' limitations, e.g.
6666
* {@link org.springframework.integration.channel.QueueChannel} with
6767
* a fixed 'capacity' and is currently full.
@@ -78,7 +78,7 @@
7878
* or {@code replyChannel} from message headers. Messages are expired when their containing
7979
* {@link org.springframework.integration.store.MessageGroup} expires. One of the ways of expiring MessageGroups
8080
* is by configuring a {@link org.springframework.integration.store.MessageGroupStoreReaper}.
81-
* However MessageGroups can alternatively be expired by simply calling
81+
* However, MessageGroups can alternatively be expired by simply calling
8282
* {@code MessageGroupStore.expireMessageGroup(groupId)}. That could be accomplished via a ControlBus operation
8383
* or by simply invoking that method if you have a reference to the
8484
* {@link org.springframework.integration.store.MessageGroupStore} instance.

spring-integration-core/src/main/java/org/springframework/integration/annotation/Filter.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -100,7 +100,7 @@
100100
/**
101101
* Specify the maximum amount of time in milliseconds to wait when sending a reply
102102
* {@link org.springframework.messaging.Message} to the {@link #outputChannel()}.
103-
* Defaults to {@code -1} - blocking indefinitely.
103+
* Defaults to {@code 30} seconds.
104104
* It is applied only if the output channel has some 'sending' limitations, e.g.
105105
* {@link org.springframework.integration.channel.QueueChannel} with
106106
* fixed a 'capacity'. In this case a {@link org.springframework.messaging.MessageDeliveryException} is thrown.

spring-integration-core/src/main/java/org/springframework/integration/annotation/Gateway.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,8 @@
2222
import java.lang.annotation.RetentionPolicy;
2323
import java.lang.annotation.Target;
2424

25+
import org.springframework.integration.context.IntegrationContextUtils;
26+
2527
/**
2628
* Indicates that an interface method is capable of mapping its parameters
2729
* to a message or message payload. These method-level annotations are detected
@@ -82,7 +84,7 @@
8284
* @return the timeout.
8385
* @see #requestTimeoutExpression()
8486
*/
85-
long requestTimeout() default Long.MIN_VALUE;
87+
long requestTimeout() default IntegrationContextUtils.DEFAULT_TIMEOUT;
8688

8789
/**
8890
* Specify a SpEL Expression to determine the timeout (ms) when sending to the request
@@ -101,10 +103,10 @@
101103
* @return the timeout.
102104
* @see #replyTimeoutExpression()
103105
*/
104-
long replyTimeout() default Long.MIN_VALUE;
106+
long replyTimeout() default IntegrationContextUtils.DEFAULT_TIMEOUT;
105107

106108
/**
107-
* Specify a SpEL Expression to determine the the time (ms) that the thread sending
109+
* Specify a SpEL Expression to determine the time (ms) that the thread sending
108110
* the request will wait for a reply. The timer starts when the thread returns to the
109111
* gateway, not when the request message is sent. Overrides the encompassing gateway's
110112
* default reply timeout. Overrides {@link #replyTimeout()}.

spring-integration-core/src/main/java/org/springframework/integration/annotation/MessagingGateway.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.lang.annotation.Target;
2323

2424
import org.springframework.core.annotation.AliasFor;
25+
import org.springframework.integration.context.IntegrationContextUtils;
2526

2627
/**
2728
* A stereotype annotation to provide an Integration Messaging Gateway Proxy
@@ -103,17 +104,17 @@
103104
* See {@link Gateway#requestTimeout()} for per-method configuration.
104105
* @return the suggested timeout in milliseconds, if any
105106
*/
106-
String defaultRequestTimeout() default "-9223372036854775808";
107+
String defaultRequestTimeout() default IntegrationContextUtils.DEFAULT_TIMEOUT_STRING;
107108

108109
/**
109110
* Allows to specify how long this gateway will wait for the reply {@code Message}
110-
* before returning. By default, it will wait indefinitely. {@code null} is returned if
111-
* the gateway times out. Value is specified in milliseconds; it can be a simple long
111+
* before returning. The {@code null} is returned if the gateway times out.
112+
* Value is specified in milliseconds; it can be a simple long
112113
* value or a SpEL expression; array variable #args is available.
113114
* See {@link Gateway#replyTimeout()} for per-method configuration.
114115
* @return the suggested timeout in milliseconds, if any
115116
*/
116-
String defaultReplyTimeout() default "-9223372036854775808";
117+
String defaultReplyTimeout() default IntegrationContextUtils.DEFAULT_TIMEOUT_STRING;
117118

118119
/**
119120
* Provide a reference to an implementation of {@link java.util.concurrent.Executor}

spring-integration-core/src/main/java/org/springframework/integration/annotation/Router.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -123,7 +123,7 @@
123123
/**
124124
* Specify the maximum amount of time in milliseconds to wait when sending a reply
125125
* {@link org.springframework.messaging.Message} to the {@code outputChannel}.
126-
* Defaults to {@code -1} - blocking indefinitely.
126+
* Defaults to {@code 30} seconds.
127127
* It is applied only if the output channel has some 'sending' limitations, e.g.
128128
* {@link org.springframework.integration.channel.QueueChannel} with
129129
* fixed a 'capacity'. In this case a {@link org.springframework.messaging.MessageDeliveryException} is thrown.

spring-integration-core/src/main/java/org/springframework/integration/annotation/ServiceActivator.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -86,7 +86,7 @@
8686
/**
8787
* Specify the maximum amount of time in milliseconds to wait when sending a reply
8888
* {@link org.springframework.messaging.Message} to the {@code outputChannel}.
89-
* Defaults to {@code -1} - blocking indefinitely.
89+
* Defaults to {@code 30} seconds.
9090
* It is applied only if the output channel has some 'sending' limitations, e.g.
9191
* {@link org.springframework.integration.channel.QueueChannel} with
9292
* fixed a 'capacity'. In this case a {@link org.springframework.messaging.MessageDeliveryException} is thrown.

spring-integration-core/src/main/java/org/springframework/integration/annotation/Splitter.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -88,7 +88,7 @@
8888
/**
8989
* Specify the maximum amount of time in milliseconds to wait when sending a reply
9090
* {@link org.springframework.messaging.Message} to the {@code outputChannel}.
91-
* Defaults to {@code -1} - blocking indefinitely.
91+
* Defaults to {@code 30} seconds.
9292
* It is applied only if the output channel has some 'sending' limitations, e.g.
9393
* {@link org.springframework.integration.channel.QueueChannel} with
9494
* fixed a 'capacity'. In this case a {@link org.springframework.messaging.MessageDeliveryException} is thrown.

spring-integration-core/src/main/java/org/springframework/integration/annotation/Transformer.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -64,7 +64,7 @@
6464
/**
6565
* Specify the maximum amount of time in milliseconds to wait when sending a reply
6666
* {@link org.springframework.messaging.Message} to the {@code outputChannel}.
67-
* Defaults to {@code -1} - blocking indefinitely.
67+
* Defaults to {@code 30} seconds.
6868
* It is applied only if the output channel has some 'sending' limitations, e.g.
6969
* {@link org.springframework.integration.channel.QueueChannel} with
7070
* fixed a 'capacity'. In this case a {@link org.springframework.messaging.MessageDeliveryException} is thrown.

spring-integration-core/src/main/java/org/springframework/integration/config/xml/DefaultRouterParser.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -50,14 +50,14 @@ boolean hasDefaultOption() {
5050
protected void postProcess(BeanDefinitionBuilder builder, Element element, ParserContext parserContext) {
5151
List<Element> mappingElements = DomUtils.getChildElementsByTagName(element, "mapping");
5252
if (!CollectionUtils.isEmpty(mappingElements)) {
53-
ManagedMap<String, String> channelMappings = new ManagedMap<String, String>();
53+
ManagedMap<String, String> channelMappings = new ManagedMap<>();
5454
for (Element mappingElement : mappingElements) {
5555
channelMappings.put(mappingElement.getAttribute("value"), mappingElement.getAttribute("channel"));
5656
}
5757
builder.addPropertyValue("channelMappings", channelMappings);
5858
}
5959
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "default-output-channel");
60-
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "timeout");
60+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "send-timeout");
6161
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "resolution-required");
6262
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "apply-sequence");
6363
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "ignore-send-failures");

spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -101,6 +101,18 @@ public abstract class IntegrationContextUtils {
101101

102102
public static final String LIST_MESSAGE_HANDLER_FACTORY_BEAN_NAME = "integrationListMessageHandlerMethodFactory";
103103

104+
/**
105+
* The default timeout for blocking operations like send and receive messages.
106+
* @since 6.1
107+
*/
108+
public static final long DEFAULT_TIMEOUT = 30000L;
109+
110+
/**
111+
* A string representation for {@link #DEFAULT_TIMEOUT}, e.g. for annotation attributes.
112+
* @since 6.1
113+
*/
114+
public static final String DEFAULT_TIMEOUT_STRING = "" + DEFAULT_TIMEOUT;
115+
104116
/**
105117
* @param beanFactory BeanFactory for lookup, must not be null.
106118
* @return The {@link MetadataStore} bean whose name is "metadataStore".

spring-integration-core/src/main/java/org/springframework/integration/dsl/GatewayProxySpec.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2022 the original author or authors.
2+
* Copyright 2019-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -145,7 +145,7 @@ public GatewayProxySpec requestTimeout(long requestTimeout) {
145145

146146
/**
147147
* Allows to specify how long this gateway will wait for the reply {@code Message}
148-
* before returning. By default, it will wait indefinitely. {@code null} is returned if
148+
* before returning. By default, it will wait 30 seconds. {@code null} is returned if
149149
* the gateway times out. Value is specified in milliseconds.
150150
* @param replyTimeout the timeout for replies in milliseconds.
151151
* @return current {@link GatewayProxySpec}.

spring-integration-core/src/main/java/org/springframework/integration/dsl/ScatterGatherSpec.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -50,7 +50,7 @@ public ScatterGatherSpec gatherChannel(MessageChannel gatherChannel) {
5050
* Specify a timeout (in milliseconds) for the
5151
* {@link org.springframework.messaging.PollableChannel#receive(long)} operation
5252
* to wait for gathering results to output.
53-
* Defaults to {@code -1} - to wait indefinitely.
53+
* Defaults to {@code 30} seconds.
5454
* @param gatherTimeout the {@link org.springframework.messaging.PollableChannel} receive timeout.
5555
* @return the current {@link ScatterGatherSpec} instance.
5656
*/

spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java

+22-38
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -782,14 +782,9 @@ private Expression extractRequestTimeoutFromAnnotationOrMetadata(@Nullable Gatew
782782
Expression requestTimeout = this.defaultRequestTimeout;
783783

784784
if (gatewayAnnotation != null) {
785-
/*
786-
* INT-2636 Unspecified annotation attributes should not
787-
* override the default values supplied by explicit configuration.
788-
* There is a small risk that someone has used Long.MIN_VALUE explicitly
789-
* to indicate an indefinite timeout on a gateway method and that will
790-
* no longer work as expected; they will need to use, say, -1 instead.
791-
*/
792-
if (requestTimeout == null || gatewayAnnotation.requestTimeout() != Long.MIN_VALUE) {
785+
if (requestTimeout == null ||
786+
gatewayAnnotation.requestTimeout() != IntegrationContextUtils.DEFAULT_TIMEOUT) {
787+
793788
requestTimeout = new ValueExpression<>(gatewayAnnotation.requestTimeout());
794789
}
795790
if (StringUtils.hasText(gatewayAnnotation.requestTimeoutExpression())) {
@@ -813,14 +808,7 @@ private Expression extractReplyTimeoutFromAnnotationOrMetadata(@Nullable Gateway
813808
Expression replyTimeout = this.defaultReplyTimeout;
814809

815810
if (gatewayAnnotation != null) {
816-
/*
817-
* INT-2636 Unspecified annotation attributes should not
818-
* override the default values supplied by explicit configuration.
819-
* There is a small risk that someone has used Long.MIN_VALUE explicitly
820-
* to indicate an indefinite timeout on a gateway method and that will
821-
* no longer work as expected; they will need to use, say, -1 instead.
822-
*/
823-
if (replyTimeout == null || gatewayAnnotation.replyTimeout() != Long.MIN_VALUE) {
811+
if (replyTimeout == null || gatewayAnnotation.replyTimeout() != IntegrationContextUtils.DEFAULT_TIMEOUT) {
824812
replyTimeout = new ValueExpression<>(gatewayAnnotation.replyTimeout());
825813
}
826814
if (StringUtils.hasText(gatewayAnnotation.replyTimeoutExpression())) {
@@ -968,31 +956,27 @@ private void channels(@Nullable String requestChannelName, @Nullable String repl
968956

969957
private void timeouts(@Nullable Expression requestTimeout, @Nullable Expression replyTimeout,
970958
GatewayMethodInboundMessageMapper messageMapper, MethodInvocationGateway gateway) {
971-
if (requestTimeout == null) {
972-
gateway.setRequestTimeout(-1);
973-
}
974-
else if (requestTimeout instanceof ValueExpression) {
975-
Long timeout = requestTimeout.getValue(Long.class);
976-
if (timeout != null) {
977-
gateway.setRequestTimeout(timeout);
959+
if (requestTimeout != null) {
960+
if (requestTimeout instanceof ValueExpression) {
961+
Long timeout = requestTimeout.getValue(Long.class);
962+
if (timeout != null) {
963+
gateway.setRequestTimeout(timeout);
964+
}
978965
}
979-
}
980-
else {
981-
messageMapper.setSendTimeoutExpression(requestTimeout);
982-
}
983-
if (replyTimeout == null) {
984-
gateway.setReplyTimeout(-1);
985-
}
986-
else if (replyTimeout instanceof ValueExpression) {
987-
Long timeout = replyTimeout.getValue(Long.class);
988-
if (timeout != null) {
989-
gateway.setReplyTimeout(timeout);
966+
else {
967+
messageMapper.setSendTimeoutExpression(requestTimeout);
990968
}
991969
}
992-
else {
993-
messageMapper.setReplyTimeoutExpression(replyTimeout);
994-
}
995970
if (replyTimeout != null) {
971+
if (replyTimeout instanceof ValueExpression) {
972+
Long timeout = replyTimeout.getValue(Long.class);
973+
if (timeout != null) {
974+
gateway.setReplyTimeout(timeout);
975+
}
976+
}
977+
else {
978+
messageMapper.setReplyTimeoutExpression(replyTimeout);
979+
}
996980
gateway.setReceiveTimeoutExpression(replyTimeout);
997981
}
998982
}

0 commit comments

Comments
 (0)