Skip to content

Commit 6e9fd47

Browse files
committed
GH-9706: Allow collection of payloads as an aggregator result
Fixes: #9706 Previously, if a `MessageGroupProcessor` returns a collection of payloads, the `AbstractCorrelatingMessageHandler` has failed with the `IllegalArgumentException` stating that only collection of messages is possible. From now on such a restriction is eliminated and returned collection of payloads is emitted as a single reply message from the aggregator. * Add `AbstractCorrelatingMessageHandler.isResultCollectionOfMessages()` to return `true` only if result is a collection of messages, treating them as a "partial sequence". * Deprecate `AbstractCorrelatingMessageHandler.verifyResultCollectionConsistsOfMessages()` since it is out of use now.
1 parent 54fbce4 commit 6e9fd47

File tree

4 files changed

+30
-8
lines changed

4 files changed

+30
-8
lines changed

Diff for: spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java

+16-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2024 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -904,8 +904,7 @@ protected Collection<Message<?>> completeGroup(Message<?> message, Object correl
904904
this.logger.debug(() -> "Completing group with correlationKey [" + correlationKey + "]");
905905

906906
result = this.outputProcessor.processMessageGroup(group);
907-
if (result instanceof Collection<?>) {
908-
verifyResultCollectionConsistsOfMessages((Collection<?>) result);
907+
if (isResultCollectionOfMessages(result)) {
909908
partialSequence = (Collection<Message<?>>) result;
910909
}
911910

@@ -944,12 +943,26 @@ private static boolean compareSequences(Message<?> msg1, Message<?> msg2) {
944943

945944
}
946945

946+
/**
947+
* Probably the method is {@code protected} by mistake.
948+
* @param elements the group processor result.
949+
* @deprecated without replacement - out of use from now on.
950+
*/
951+
@Deprecated(since = "6.5", forRemoval = true)
947952
protected void verifyResultCollectionConsistsOfMessages(Collection<?> elements) {
948953
Class<?> commonElementType = CollectionUtils.findCommonElementType(elements);
949954
Assert.isAssignable(Message.class, commonElementType, () ->
950955
"The expected collection of Messages contains non-Message element: " + commonElementType);
951956
}
952957

958+
private static boolean isResultCollectionOfMessages(Object result) {
959+
if (result instanceof Collection<?> resultCollection) {
960+
Class<?> commonElementType = CollectionUtils.findCommonElementType(resultCollection);
961+
return commonElementType != null && Message.class.isAssignableFrom(commonElementType);
962+
}
963+
return false;
964+
}
965+
953966
protected Object obtainGroupTimeout(MessageGroup group) {
954967
if (this.groupTimeoutExpression != null) {
955968
Object timeout = this.groupTimeoutExpression.getValue(this.evaluationContext, group);

Diff for: spring-integration-core/src/test/java/org/springframework/integration/dsl/extensions/IntegrationFlowExtensionTests.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2022 the original author or authors.
2+
* Copyright 2020-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,8 +18,8 @@
1818

1919
import java.util.Arrays;
2020
import java.util.function.Consumer;
21-
import java.util.stream.Collectors;
2221

22+
import org.assertj.core.api.InstanceOfAssertFactories;
2323
import org.junit.jupiter.api.Test;
2424

2525
import org.springframework.beans.factory.annotation.Autowired;
@@ -66,7 +66,8 @@ public void testCustomFlowDefinition() {
6666
assertThat(replyMessage)
6767
.isNotNull()
6868
.extracting(Message::getPayload)
69-
.isEqualTo("ONE, TWO, THREE");
69+
.asInstanceOf(InstanceOfAssertFactories.LIST)
70+
.containsOnly("ONE", "TWO", "THREE");
7071
}
7172

7273
@Configuration
@@ -109,8 +110,7 @@ public static class CustomAggregatorSpec extends AggregatorSpec {
109110
group.getMessages()
110111
.stream()
111112
.map(Message::getPayload)
112-
.map(String.class::cast)
113-
.collect(Collectors.joining(", ")));
113+
.toList());
114114
}
115115

116116
}

Diff for: src/reference/antora/modules/ROOT/pages/aggregator.adoc

+4
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,10 @@ Starting with version 6.0, the splitting behaviour, described above, works only
144144
Otherwise, with any other `MessageGroupProcessor` implementation that returns a `Collection<Message>`, only a single reply message is emitted with the whole collection of messages as its payload.
145145
Such logic is dictated by the canonical purpose of an aggregator - collect request messages by some key and produce a single grouped message.
146146

147+
Prior to version 6.5, if a `MessageGroupProcessor` (usually lambda from DSL) returns a collection of payloads, the `AbstractCorrelatingMessageHandler` has failed with the `IllegalArgumentException` stating that only collection of messages is possible.
148+
From now on such a restriction is eliminated and returned collection of payloads is emitted as a single reply message from the aggregator with just headers from the last request message.
149+
If headers aggregation is required alongside with a collection of payloads, an `AbstractAggregatingMessageGroupProcessor` implementations are recommended to be used instead of plain `MessageGroupProcessor` functional interface.
150+
147151
[[releasestrategy]]
148152
=== `ReleaseStrategy`
149153

Diff for: src/reference/antora/modules/ROOT/pages/whats-new.adoc

+5
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,8 @@ The deprecated previously usage of `org.springframework.util.concurrent.Listenab
2121
The previously deprecated SpEL-based Control Bus components have been removed in favor of functionality around `ControlBusCommandRegistry`.
2222
The `<control-bus use-registry="">` attribute is deprecated now without replacement since only `ControlBusCommandRegistry` functionality is available.
2323
The Java DSL `controlBusOnRegistry()` operator is deprecated in favor of restored `controlBus()` which is fully based now on the `ControlBusCommandRegistry`.
24+
See xref:control-bus.adoc[Control Bus] for more information.
25+
26+
The `AbstractCorrelatingMessageHandler` does not throw an `IllegalArgumentException` for the collection of payloads as a result of the `MessageGroupProcessor`.
27+
Instead, such a collection is wrapped into a single reply message.
28+
See xref:aggregator.adoc[Aggregator] for more information.

0 commit comments

Comments
 (0)