Skip to content

Aggregator: discardChannel() to have an optional flag to discard the whole group, instead of individual messages #9754

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
shintasmith opened this issue Jan 13, 2025 · 0 comments

Comments

@shintasmith
Copy link

Expected Behavior

We are using Spring Integration 6.3.3 Aggregator pattern to group messages together. We are setting these:

        return IntegrationFlow.from(petStoreSubscriptionChannel)
                .enrichHeaders(spec -> spec.header("petStore.FlowName", "PetStoreIntake"))
                .aggregate(aggregatorSpec -> aggregatorSpec
                        // .messageStore(jdbcMessageStore)
                        .outputProcessor(petOutputProcessor)
                        .expireGroupsUponCompletion(true)
                        .groupTimeout(300 * 1000)
                        .sendPartialResultOnExpiry(true)
                        .correlationStrategy(message -> ((Pet) message.getPayload()).getBreed())
                        .releaseStrategy(group -> group.size() >= 3))
                .handle(petGroupHandler, "handle")
                .get();

(Link to full repo: here, link to IntegrationFlow where the Aggregator spec is defined)

We have a need to process a partial/incomplete group differently than a complete group. We want to set .sendPartialResultOnExpiry(false) and a discard channel for processing the incomplete group. But the problem is the discard channel is receiving the individual Messages of the discarded group, instead of receiving a MessageGroup.

We would like the ability to receive discarded messages as a partial/incomplete group.

Current Behavior

Currently, in AbstractCorrelatingMessageHandler.expireGroup(), messages are discarded individually, instead of as a group.

https://github.com/spring-projects/spring-integration/blob/6.3.x/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java#L878-L879

Context

Our unit of work in the handler (Service Activator) is really a group of messages. We do not want to be processing the discarded messages individually because they could be very many.

The current workaround we are using right now is to call:

group.complete()

in our Release Strategy. Then implement a MessageGroupProcessor that checks on group.isComplete() and insert a boolean flag in the header. The Service Activator checks this header. If the boolean flag indicates the group is complete, then normal processing happens. Otherwise, it goes into an different path to handle the incomplete group (such as logging and alerting).

Our issue is also posted in StackOverflow here, where Artem suggested to open a GH issue.

@shintasmith shintasmith added status: waiting-for-triage The issue need to be evaluated and its future decided type: enhancement labels Jan 13, 2025
@artembilan artembilan added this to the 6.5.0-M1 milestone Jan 13, 2025
@artembilan artembilan added in: core and removed status: waiting-for-triage The issue need to be evaluated and its future decided labels Jan 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants