Skip to content

Do not block by default #8580

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ property set to TRUE.
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="reply-timeout" type="xsd:string">
<xsd:attribute name="reply-timeout" type="xsd:string" default="30000">
<xsd:annotation>
<xsd:documentation><![CDATA[
Allows you to specify how long this gateway will wait for
Expand All @@ -738,9 +738,7 @@ property set to TRUE.
The "reply-timeout" attribute maps to the "sendTimeout" property of the
underlying 'MessagingTemplate' instance (org.springframework.integration.core.MessagingTemplate).

The attribute will default, if not specified, to '-1', meaning that
by default, the Gateway will wait indefinitely. The value is
specified in milliseconds.
The value is specified in milliseconds.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
Expand All @@ -752,7 +750,7 @@ property set to TRUE.
which itself is a Map.
This can only be provided if the 'header-mapper' reference is not being set directly. The values in
this list can also be simple patterns to be matched against the header names (e.g. "foo*" or "*foo").
A special token 'STANDARD_REPLY_HEADERS' represents all the standard AMQP headers (replyTo, correlationId etc);
A special token 'STANDARD_REPLY_HEADERS' represents all the standard AMQP headers (replyTo, correlationId etc.);
it is included by default. If you wish to add your own headers, you must also include this token if you wish the
standard headers to also be mapped. To map all non-standard headers the 'NON_STANDARD_HEADERS' token can be used.
]]></xsd:documentation>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -61,7 +61,7 @@
/**
* Specify the maximum amount of time in milliseconds to wait when sending a reply
* {@link org.springframework.messaging.Message} to the {@link #outputChannel()}.
* Defaults to {@code -1} - blocking indefinitely.
* Defaults to {@code 30} seconds.
* It is applied only if the output channel has some 'sending' limitations, e.g.
* {@link org.springframework.integration.channel.QueueChannel} with
* a fixed 'capacity' and is currently full.
Expand All @@ -78,7 +78,7 @@
* or {@code replyChannel} from message headers. Messages are expired when their containing
* {@link org.springframework.integration.store.MessageGroup} expires. One of the ways of expiring MessageGroups
* is by configuring a {@link org.springframework.integration.store.MessageGroupStoreReaper}.
* However MessageGroups can alternatively be expired by simply calling
* However, MessageGroups can alternatively be expired by simply calling
* {@code MessageGroupStore.expireMessageGroup(groupId)}. That could be accomplished via a ControlBus operation
* or by simply invoking that method if you have a reference to the
* {@link org.springframework.integration.store.MessageGroupStore} instance.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -100,7 +100,7 @@
/**
* Specify the maximum amount of time in milliseconds to wait when sending a reply
* {@link org.springframework.messaging.Message} to the {@link #outputChannel()}.
* Defaults to {@code -1} - blocking indefinitely.
* Defaults to {@code 30} seconds.
* It is applied only if the output channel has some 'sending' limitations, e.g.
* {@link org.springframework.integration.channel.QueueChannel} with
* fixed a 'capacity'. In this case a {@link org.springframework.messaging.MessageDeliveryException} is thrown.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,8 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.springframework.integration.context.IntegrationContextUtils;

/**
* Indicates that an interface method is capable of mapping its parameters
* to a message or message payload. These method-level annotations are detected
Expand Down Expand Up @@ -82,7 +84,7 @@
* @return the timeout.
* @see #requestTimeoutExpression()
*/
long requestTimeout() default Long.MIN_VALUE;
long requestTimeout() default IntegrationContextUtils.DEFAULT_TIMEOUT;

/**
* Specify a SpEL Expression to determine the timeout (ms) when sending to the request
Expand All @@ -101,10 +103,10 @@
* @return the timeout.
* @see #replyTimeoutExpression()
*/
long replyTimeout() default Long.MIN_VALUE;
long replyTimeout() default IntegrationContextUtils.DEFAULT_TIMEOUT;

/**
* Specify a SpEL Expression to determine the the time (ms) that the thread sending
* Specify a SpEL Expression to determine the time (ms) that the thread sending
* the request will wait for a reply. The timer starts when the thread returns to the
* gateway, not when the request message is sent. Overrides the encompassing gateway's
* default reply timeout. Overrides {@link #replyTimeout()}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.lang.annotation.Target;

import org.springframework.core.annotation.AliasFor;
import org.springframework.integration.context.IntegrationContextUtils;

/**
* A stereotype annotation to provide an Integration Messaging Gateway Proxy
Expand Down Expand Up @@ -103,17 +104,17 @@
* See {@link Gateway#requestTimeout()} for per-method configuration.
* @return the suggested timeout in milliseconds, if any
*/
String defaultRequestTimeout() default "-9223372036854775808";
String defaultRequestTimeout() default IntegrationContextUtils.DEFAULT_TIMEOUT_STRING;

/**
* Allows to specify how long this gateway will wait for the reply {@code Message}
* before returning. By default, it will wait indefinitely. {@code null} is returned if
* the gateway times out. Value is specified in milliseconds; it can be a simple long
* before returning. The {@code null} is returned if the gateway times out.
* Value is specified in milliseconds; it can be a simple long
* value or a SpEL expression; array variable #args is available.
* See {@link Gateway#replyTimeout()} for per-method configuration.
* @return the suggested timeout in milliseconds, if any
*/
String defaultReplyTimeout() default "-9223372036854775808";
String defaultReplyTimeout() default IntegrationContextUtils.DEFAULT_TIMEOUT_STRING;

/**
* Provide a reference to an implementation of {@link java.util.concurrent.Executor}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -123,7 +123,7 @@
/**
* Specify the maximum amount of time in milliseconds to wait when sending a reply
* {@link org.springframework.messaging.Message} to the {@code outputChannel}.
* Defaults to {@code -1} - blocking indefinitely.
* Defaults to {@code 30} seconds.
* It is applied only if the output channel has some 'sending' limitations, e.g.
* {@link org.springframework.integration.channel.QueueChannel} with
* fixed a 'capacity'. In this case a {@link org.springframework.messaging.MessageDeliveryException} is thrown.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -86,7 +86,7 @@
/**
* Specify the maximum amount of time in milliseconds to wait when sending a reply
* {@link org.springframework.messaging.Message} to the {@code outputChannel}.
* Defaults to {@code -1} - blocking indefinitely.
* Defaults to {@code 30} seconds.
* It is applied only if the output channel has some 'sending' limitations, e.g.
* {@link org.springframework.integration.channel.QueueChannel} with
* fixed a 'capacity'. In this case a {@link org.springframework.messaging.MessageDeliveryException} is thrown.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -88,7 +88,7 @@
/**
* Specify the maximum amount of time in milliseconds to wait when sending a reply
* {@link org.springframework.messaging.Message} to the {@code outputChannel}.
* Defaults to {@code -1} - blocking indefinitely.
* Defaults to {@code 30} seconds.
* It is applied only if the output channel has some 'sending' limitations, e.g.
* {@link org.springframework.integration.channel.QueueChannel} with
* fixed a 'capacity'. In this case a {@link org.springframework.messaging.MessageDeliveryException} is thrown.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -64,7 +64,7 @@
/**
* Specify the maximum amount of time in milliseconds to wait when sending a reply
* {@link org.springframework.messaging.Message} to the {@code outputChannel}.
* Defaults to {@code -1} - blocking indefinitely.
* Defaults to {@code 30} seconds.
* It is applied only if the output channel has some 'sending' limitations, e.g.
* {@link org.springframework.integration.channel.QueueChannel} with
* fixed a 'capacity'. In this case a {@link org.springframework.messaging.MessageDeliveryException} is thrown.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,14 +50,14 @@ boolean hasDefaultOption() {
protected void postProcess(BeanDefinitionBuilder builder, Element element, ParserContext parserContext) {
List<Element> mappingElements = DomUtils.getChildElementsByTagName(element, "mapping");
if (!CollectionUtils.isEmpty(mappingElements)) {
ManagedMap<String, String> channelMappings = new ManagedMap<String, String>();
ManagedMap<String, String> channelMappings = new ManagedMap<>();
for (Element mappingElement : mappingElements) {
channelMappings.put(mappingElement.getAttribute("value"), mappingElement.getAttribute("channel"));
}
builder.addPropertyValue("channelMappings", channelMappings);
}
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "default-output-channel");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "timeout");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "send-timeout");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "resolution-required");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "apply-sequence");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "ignore-send-failures");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -101,6 +101,18 @@ public abstract class IntegrationContextUtils {

public static final String LIST_MESSAGE_HANDLER_FACTORY_BEAN_NAME = "integrationListMessageHandlerMethodFactory";

/**
* The default timeout for blocking operations like send and receive messages.
* @since 6.1
*/
public static final long DEFAULT_TIMEOUT = 30000L;

/**
* A string representation for {@link #DEFAULT_TIMEOUT}, e.g. for annotation attributes.
* @since 6.1
*/
public static final String DEFAULT_TIMEOUT_STRING = "" + DEFAULT_TIMEOUT;

/**
* @param beanFactory BeanFactory for lookup, must not be null.
* @return The {@link MetadataStore} bean whose name is "metadataStore".
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2022 the original author or authors.
* Copyright 2019-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -145,7 +145,7 @@ public GatewayProxySpec requestTimeout(long requestTimeout) {

/**
* Allows to specify how long this gateway will wait for the reply {@code Message}
* before returning. By default, it will wait indefinitely. {@code null} is returned if
* before returning. By default, it will wait 30 seconds. {@code null} is returned if
* the gateway times out. Value is specified in milliseconds.
* @param replyTimeout the timeout for replies in milliseconds.
* @return current {@link GatewayProxySpec}.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,7 +50,7 @@ public ScatterGatherSpec gatherChannel(MessageChannel gatherChannel) {
* Specify a timeout (in milliseconds) for the
* {@link org.springframework.messaging.PollableChannel#receive(long)} operation
* to wait for gathering results to output.
* Defaults to {@code -1} - to wait indefinitely.
* Defaults to {@code 30} seconds.
* @param gatherTimeout the {@link org.springframework.messaging.PollableChannel} receive timeout.
* @return the current {@link ScatterGatherSpec} instance.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -782,14 +782,9 @@ private Expression extractRequestTimeoutFromAnnotationOrMetadata(@Nullable Gatew
Expression requestTimeout = this.defaultRequestTimeout;

if (gatewayAnnotation != null) {
/*
* INT-2636 Unspecified annotation attributes should not
* override the default values supplied by explicit configuration.
* There is a small risk that someone has used Long.MIN_VALUE explicitly
* to indicate an indefinite timeout on a gateway method and that will
* no longer work as expected; they will need to use, say, -1 instead.
*/
if (requestTimeout == null || gatewayAnnotation.requestTimeout() != Long.MIN_VALUE) {
if (requestTimeout == null ||
gatewayAnnotation.requestTimeout() != IntegrationContextUtils.DEFAULT_TIMEOUT) {

requestTimeout = new ValueExpression<>(gatewayAnnotation.requestTimeout());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment in timeouts().

if (StringUtils.hasText(gatewayAnnotation.requestTimeoutExpression())) {
Expand All @@ -813,14 +808,7 @@ private Expression extractReplyTimeoutFromAnnotationOrMetadata(@Nullable Gateway
Expression replyTimeout = this.defaultReplyTimeout;

if (gatewayAnnotation != null) {
/*
* INT-2636 Unspecified annotation attributes should not
* override the default values supplied by explicit configuration.
* There is a small risk that someone has used Long.MIN_VALUE explicitly
* to indicate an indefinite timeout on a gateway method and that will
* no longer work as expected; they will need to use, say, -1 instead.
*/
if (replyTimeout == null || gatewayAnnotation.replyTimeout() != Long.MIN_VALUE) {
if (replyTimeout == null || gatewayAnnotation.replyTimeout() != IntegrationContextUtils.DEFAULT_TIMEOUT) {
replyTimeout = new ValueExpression<>(gatewayAnnotation.replyTimeout());
}
if (StringUtils.hasText(gatewayAnnotation.replyTimeoutExpression())) {
Expand Down Expand Up @@ -968,31 +956,27 @@ private void channels(@Nullable String requestChannelName, @Nullable String repl

private void timeouts(@Nullable Expression requestTimeout, @Nullable Expression replyTimeout,
GatewayMethodInboundMessageMapper messageMapper, MethodInvocationGateway gateway) {
if (requestTimeout == null) {
gateway.setRequestTimeout(-1);
}
else if (requestTimeout instanceof ValueExpression) {
Long timeout = requestTimeout.getValue(Long.class);
if (timeout != null) {
gateway.setRequestTimeout(timeout);
if (requestTimeout != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need a null check here and use the default; GenericMessagingTemplate has -1 as its default.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! See MessagingGatewaySupport:

private long replyTimeout = IntegrationContextUtils.DEFAULT_TIMEOUT;
...
template.setReceiveTimeout(this.replyTimeout);

So, if we don't call setReplyTimeout(), then that default value is used in the GenericMessagingTemplate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah - ok.

if (requestTimeout instanceof ValueExpression) {
Long timeout = requestTimeout.getValue(Long.class);
if (timeout != null) {
gateway.setRequestTimeout(timeout);
}
}
}
else {
messageMapper.setSendTimeoutExpression(requestTimeout);
}
if (replyTimeout == null) {
gateway.setReplyTimeout(-1);
}
else if (replyTimeout instanceof ValueExpression) {
Long timeout = replyTimeout.getValue(Long.class);
if (timeout != null) {
gateway.setReplyTimeout(timeout);
else {
messageMapper.setSendTimeoutExpression(requestTimeout);
}
}
else {
messageMapper.setReplyTimeoutExpression(replyTimeout);
}
if (replyTimeout != null) {
if (replyTimeout instanceof ValueExpression) {
Long timeout = replyTimeout.getValue(Long.class);
if (timeout != null) {
gateway.setReplyTimeout(timeout);
}
}
else {
messageMapper.setReplyTimeoutExpression(replyTimeout);
}
gateway.setReceiveTimeoutExpression(replyTimeout);
}
}
Expand Down
Loading