Skip to content

Commit ad61cef

Browse files
committed
GH-9754: Add discardIndividuallyOnExpiry to aggregator
Fixes: #9754 Right now a correlation handler can discard messages in the expired group one by one. In some scenarios it would be useful to have single message in discard for the whole group. * Expose `discardIndividuallyOnExpiry` for the `AbstractCorrelatingMessageHandler`, and `AggregatorFactoryBean`, and respective `CorrelationHandlerSpec` for DSL. This new option takes action only if a `discardChannel` is provided, and `sendPartialResultOnExpiry` is not set to `true`. When `discardIndividuallyOnExpiry` is false, the messages in the expired group are packed into a list for payload of a discarding single message. * Test and document the new feature
1 parent 85b1418 commit ad61cef

File tree

6 files changed

+99
-10
lines changed

6 files changed

+99
-10
lines changed

Diff for: spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java

+28-3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.time.Duration;
2020
import java.time.Instant;
21+
import java.util.ArrayList;
2122
import java.util.Collection;
2223
import java.util.Collections;
2324
import java.util.Comparator;
@@ -63,6 +64,7 @@
6364
import org.springframework.messaging.MessageDeliveryException;
6465
import org.springframework.messaging.MessageHandlingException;
6566
import org.springframework.messaging.core.DestinationResolutionException;
67+
import org.springframework.messaging.support.GenericMessage;
6668
import org.springframework.util.Assert;
6769
import org.springframework.util.CollectionUtils;
6870
import org.springframework.util.ObjectUtils;
@@ -80,7 +82,7 @@
8082
* {@link ReleaseStrategy}, and {@link MessageGroupProcessor} implementations as
8183
* you require.
8284
* <p>
83-
* By default the {@link CorrelationStrategy} will be a
85+
* By default, the {@link CorrelationStrategy} will be a
8486
* {@link HeaderAttributeCorrelationStrategy} and the {@link ReleaseStrategy} will be a
8587
* {@link SequenceSizeReleaseStrategy}.
8688
* <p>
@@ -129,6 +131,8 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP
129131

130132
private boolean sendPartialResultOnExpiry;
131133

134+
private boolean discardIndividuallyOnExpiry = true;
135+
132136
private boolean sequenceAware;
133137

134138
private LockRegistry lockRegistry = new DefaultLockRegistry();
@@ -262,6 +266,18 @@ public void setSendPartialResultOnExpiry(boolean sendPartialResultOnExpiry) {
262266
this.sendPartialResultOnExpiry = sendPartialResultOnExpiry;
263267
}
264268

269+
/**
270+
* Set to {@code false} to send to discard channel a whole expired group as a single message.
271+
* This option makes sense only if {@link #sendPartialResultOnExpiry} is set to {@code false} (default).
272+
* And also if {@link #discardChannel} is injected.
273+
* @param discardIndividuallyOnExpiry false to discard the whole group as one message.
274+
* @since 6.5
275+
* @see #sendPartialResultOnExpiry
276+
*/
277+
public void setDiscardIndividuallyOnExpiry(boolean discardIndividuallyOnExpiry) {
278+
this.discardIndividuallyOnExpiry = discardIndividuallyOnExpiry;
279+
}
280+
265281
/**
266282
* By default, when a MessageGroupStoreReaper is configured to expire partial
267283
* groups, empty groups are also removed. Empty groups exist after a group
@@ -876,8 +892,17 @@ protected void expireGroup(Object correlationKey, MessageGroup group, Lock lock)
876892
if (this.releaseLockBeforeSend) {
877893
lock.unlock();
878894
}
879-
group.getMessages()
880-
.forEach(this::discardMessage);
895+
MessageChannel messageChannel = getDiscardChannel();
896+
if (messageChannel != null) {
897+
if (this.discardIndividuallyOnExpiry) {
898+
group.getMessages()
899+
.forEach(this::discardMessage);
900+
}
901+
else {
902+
List<Message<?>> messagesInGroupToDiscard = new ArrayList<>(group.getMessages());
903+
discardMessage(new GenericMessage<>(messagesInGroupToDiscard));
904+
}
905+
}
881906
}
882907
if (this.applicationEventPublisher != null) {
883908
this.applicationEventPublisher.publishEvent(

Diff for: spring-integration-core/src/main/java/org/springframework/integration/config/AggregatorFactoryBean.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -86,6 +86,8 @@ public class AggregatorFactoryBean extends AbstractSimpleMessageHandlerFactoryBe
8686

8787
private Boolean sendPartialResultOnExpiry;
8888

89+
private Boolean discardIndividuallyOnExpiry;
90+
8991
private Long minimumTimeoutForEmptyGroups;
9092

9193
private Boolean expireGroupsUponTimeout;
@@ -195,6 +197,16 @@ public void setGroupConditionSupplier(BiFunction<Message<?>, String, String> gro
195197
this.groupConditionSupplier = groupConditionSupplier;
196198
}
197199

200+
/**
201+
* Set to {@code false} to send to discard channel a whole expired group as a single message.
202+
* @param discardIndividuallyOnExpiry false to discard the whole group as one message.
203+
* @since 6.5
204+
* @see org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler#setDiscardIndividuallyOnExpiry(boolean)
205+
*/
206+
public void setDiscardIndividuallyOnExpiry(Boolean discardIndividuallyOnExpiry) {
207+
this.discardIndividuallyOnExpiry = discardIndividuallyOnExpiry;
208+
}
209+
198210
@Override
199211
protected AggregatingMessageHandler createHandler() {
200212
MessageGroupProcessor outputProcessor;
@@ -242,7 +254,8 @@ protected AggregatingMessageHandler createHandler() {
242254
.acceptIfNotNull(this.expireDuration,
243255
(duration) -> aggregator.setExpireDuration(Duration.ofMillis(duration)))
244256
.acceptIfNotNull(this.groupConditionSupplier, aggregator::setGroupConditionSupplier)
245-
.acceptIfNotNull(this.expireTimeout, aggregator::setExpireTimeout);
257+
.acceptIfNotNull(this.expireTimeout, aggregator::setExpireTimeout)
258+
.acceptIfNotNull(this.discardIndividuallyOnExpiry, aggregator::setDiscardIndividuallyOnExpiry);
246259

247260
return aggregator;
248261
}

Diff for: spring-integration-core/src/main/java/org/springframework/integration/dsl/CorrelationHandlerSpec.java

+14-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -85,6 +85,19 @@ public S sendPartialResultOnExpiry(boolean sendPartialResultOnExpiry) {
8585
return _this();
8686
}
8787

88+
/**
89+
* Set to {@code false} to send to discard channel a whole expired group as a single message.
90+
* This option makes sense only if {@link #sendPartialResultOnExpiry(boolean)} is set to {@code false} (default).
91+
* And also if {@link #discardChannel(MessageChannel)} is injected.
92+
* @param discardIndividuallyOnExpiry false to discard whole expired group as a single message.
93+
* @return the handler spec.
94+
* @since 6.5
95+
*/
96+
public S discardIndividuallyOnExpiry(boolean discardIndividuallyOnExpiry) {
97+
this.handler.setDiscardIndividuallyOnExpiry(discardIndividuallyOnExpiry);
98+
return _this();
99+
}
100+
88101
/**
89102
* @param minimumTimeoutForEmptyGroups the minimumTimeoutForEmptyGroups
90103
* @return the handler spec.

Diff for: spring-integration-core/src/test/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandlerTests.java

+28
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.Executors;
2828
import java.util.concurrent.TimeUnit;
2929

30+
import org.assertj.core.api.InstanceOfAssertFactories;
3031
import org.junit.jupiter.api.Disabled;
3132
import org.junit.jupiter.api.Test;
3233

@@ -531,4 +532,31 @@ public void testPurgeOrphanedGroupsScheduled() {
531532
taskScheduler.destroy();
532533
}
533534

535+
@Test
536+
public void expiredGroupIsDiscardedAsOneMessage() throws InterruptedException {
537+
AggregatingMessageHandler handler = new AggregatingMessageHandler(group -> group);
538+
handler.setReleaseStrategy(group -> false);
539+
QueueChannel discardChannel = new QueueChannel();
540+
handler.setDiscardChannel(discardChannel);
541+
handler.setExpireTimeout(1);
542+
handler.setDiscardIndividuallyOnExpiry(false);
543+
544+
Message<String> message1 = MessageBuilder.withPayload("test1").setCorrelationId("test").build();
545+
Message<String> message2 = MessageBuilder.withPayload("test2").setCorrelationId("test").build();
546+
547+
handler.handleMessageInternal(message1);
548+
handler.handleMessageInternal(message2);
549+
550+
// Slight delay to let the group be treated as expired.
551+
Thread.sleep(100);
552+
553+
handler.purgeOrphanedGroups();
554+
555+
Message<?> receive = discardChannel.receive(10000);
556+
assertThat(receive)
557+
.extracting(Message::getPayload)
558+
.asInstanceOf(InstanceOfAssertFactories.LIST)
559+
.containsOnly(message1, message2);
560+
}
561+
534562
}

Diff for: src/reference/antora/modules/ROOT/pages/aggregator.adoc

+8-4
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,10 @@ Any new messages for this group are sent to the discard channel (if defined).
213213
Setting `expire-groups-upon-completion` to `true` (the default is `false`) removes the entire group, and any new messages (with the same correlation ID as the removed group) form a new group.
214214
You can release partial sequences by using a `MessageGroupStoreReaper` together with `send-partial-result-on-expiry` being set to `true`.
215215

216+
Starting with version 6.5, the correlation handler can also be configured with a `discardIndividuallyOnExpiry` option to discard the whole group as a single message.
217+
Essentially, the payload of this message is a list of messages from the expired group.
218+
Works only if `sendPartialResultOnExpiry` is set to `false` (default) and `dicardChannel` is provided.
219+
216220
IMPORTANT: To facilitate discarding of late-arriving messages, the aggregator must maintain state about the group after it has been released.
217221
This can eventually cause out-of-memory conditions.
218222
To avoid such situations, you should consider configuring a `MessageGroupStoreReaper` to remove the group metadata.
@@ -519,7 +523,7 @@ Empty groups can be removed later by using a `MessageGroupStoreReaper` in combin
519523
`expire-groups-upon-completion` relates to "`normal`" completion when the `ReleaseStrategy` releases the group.
520524
This defaults to `false`.
521525
522-
If a group is not completed normally but is released or discarded because of a timeout, the group is normally expired.
526+
If a group is not complete normally but is released or discarded because of a timeout, the group is normally expired.
523527
Since version 4.1, you can control this behavior by using `expire-groups-upon-timeout`.
524528
It defaults to `true` for backwards compatibility.
525529
@@ -531,12 +535,12 @@ Timed-out groups are either discarded or a partial release occurs (based on `sen
531535
Since version 5.0, empty groups are also scheduled for removal after `empty-group-min-timeout`.
532536
If `expireGroupsUponCompletion == false` and `minimumTimeoutForEmptyGroups > 0`, the task to remove the group is scheduled when normal or partial sequences release happens.
533537
534-
Starting with version 5.4, the aggregator (and resequencer) can be configured to expire orphaned groups (groups in a persistent message store that might not otherwise be released).
538+
Starting with version 5.4, the aggregator (and resequencer) can be configured to expire orphaned groups (those in a persistent message store that might not otherwise be released).
535539
The `expireTimeout` (if greater than `0`) indicates that groups older than this value in the store should be purged.
536540
The `purgeOrphanedGroups()` method is called on start up and, together with the provided `expireDuration`, periodically within a scheduled task.
537541
This method is also can be called externally at any time.
538542
The expiration logic is fully delegated to the `forceComplete(MessageGroup)` functionality according to the provided expiration options mentioned above.
539-
Such a periodic purge functionality is useful when a message store is needed to be cleaned up from those old groups which are not going to be released any more with regular message arrival logic.
543+
Such a periodic purge functionality is useful when a message store is needed to be cleaned up from those old groups which are not going to be released anymore with regular message arrival logic.
540544
In most cases this happens after an application restart, when using a persistent message group store.
541545
The functionality is similar to the `MessageGroupStoreReaper` with a scheduled task, but provides a convenient way to deal with old groups within specific components, when using group timeout instead of a reaper.
542546
The `MessageGroupStore` must be provided exclusively for the current correlation endpoint.
@@ -695,7 +699,7 @@ Otherwise, it is discarded.
695699
There is a difference between `groupTimeout` behavior and `MessageGroupStoreReaper` (see xref:aggregator.adoc#aggregator-xml[Configuring an Aggregator with XML]).
696700
The reaper initiates forced completion for all `MessageGroup` s in the `MessageGroupStore` periodically.
697701
The `groupTimeout` does it for each `MessageGroup` individually if a new message does not arrive during the `groupTimeout`.
698-
Also, the reaper can be used to remove empty groups (empty groups are retained in order to discard late messages if `expire-groups-upon-completion` is false).
702+
Also, the reaper can be used to remove empty groups (those retained in order to discard late messages if `expire-groups-upon-completion` is false).
699703

700704
Starting with version 5.5, the `groupTimeoutExpression` can be evaluated to a `java.util.Date` instance.
701705
This can be useful in cases like determining a scheduled task moment based on the group creation time (`MessageGroup.getTimestamp()`) instead of a current message arrival as it is calculated when `groupTimeoutExpression` is evaluated to `long`:

Diff for: src/reference/antora/modules/ROOT/pages/whats-new.adoc

+6
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@ The `AbstractCorrelatingMessageHandler` does not throw an `IllegalArgumentExcept
2727
Instead, such a collection is wrapped into a single reply message.
2828
See xref:aggregator.adoc[Aggregator] for more information.
2929

30+
[[x6.4-correlation-changes]]
31+
== The `discardIndividuallyOnExpiry` Option For Correlation Handlers
32+
33+
The aggregator and resequencer can now discard the whole expired group as a single message via setting `discardIndividuallyOnExpiry` to `false`.
34+
See xref:aggregator.adoc#releasestrategy[ReleaseStrategy] for more information.
35+
3036
[[x6.4-message-store-with-locks]]
3137
== The `LockRegistry` in the `MessageStore`
3238

0 commit comments

Comments
 (0)