Skip to content

Commit 5489b15

Browse files
artembilangaryrussell
authored andcommitted
Fix maxMessagesPerPoll for SourcePollingChAdapter (#8747)
* Fix maxMessagesPerPoll for SourcePollingChAdapter The `AbstractMethodAnnotationPostProcessor` does not check for `PollerMetadata.MAX_MESSAGES_UNBOUNDED` before setting `maxMessagesPerPoll` into a `SourcePollingChannelAdapter` which in this case must be `1` Also fix `SourcePollingChannelAdapterFactoryBean` to not mutate the provided `PollerMetadata` (which might be global default) with a new `maxMessagesPerPoll` **Cherry-pick to `6.1.x` & `6.0.x`** * * Fix `this.` prefix in `SourcePollingChannelAdapterFactoryBean`
1 parent 18aff78 commit 5489b15

File tree

3 files changed

+18
-6
lines changed

3 files changed

+18
-6
lines changed

Diff for: spring-integration-core/src/main/java/org/springframework/integration/config/AbstractMethodAnnotationPostProcessor.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -688,7 +688,14 @@ protected void configurePollingEndpoint(AbstractPollingEndpoint pollingEndpoint,
688688
pollingEndpoint.setTaskExecutor(pollerMetadata.getTaskExecutor());
689689
pollingEndpoint.setTrigger(pollerMetadata.getTrigger());
690690
pollingEndpoint.setAdviceChain(pollerMetadata.getAdviceChain());
691-
pollingEndpoint.setMaxMessagesPerPoll(pollerMetadata.getMaxMessagesPerPoll());
691+
long maxMessagesPerPoll = pollerMetadata.getMaxMessagesPerPoll();
692+
if (maxMessagesPerPoll == PollerMetadata.MAX_MESSAGES_UNBOUNDED &&
693+
pollingEndpoint instanceof SourcePollingChannelAdapter) {
694+
// the default is 1 since a source might return
695+
// a non-null and non-interruptible value every time it is invoked
696+
maxMessagesPerPoll = 1;
697+
}
698+
pollingEndpoint.setMaxMessagesPerPoll(maxMessagesPerPoll);
692699
pollingEndpoint.setErrorHandler(pollerMetadata.getErrorHandler());
693700
if (pollingEndpoint instanceof PollingConsumer) {
694701
((PollingConsumer) pollingEndpoint).setReceiveTimeout(pollerMetadata.getReceiveTimeout());

Diff for: spring-integration-core/src/main/java/org/springframework/integration/config/SourcePollingChannelAdapterFactoryBean.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -181,12 +181,13 @@ private void initializeAdapter() {
181181
Assert.notNull(this.pollerMetadata, () -> "No poller has been defined for channel-adapter '"
182182
+ this.beanName + "', and no default poller is available within the context.");
183183
}
184-
if (this.pollerMetadata.getMaxMessagesPerPoll() == Integer.MIN_VALUE) {
184+
long maxMessagesPerPoll = this.pollerMetadata.getMaxMessagesPerPoll();
185+
if (maxMessagesPerPoll == PollerMetadata.MAX_MESSAGES_UNBOUNDED) {
185186
// the default is 1 since a source might return
186187
// a non-null and non-interruptible value every time it is invoked
187-
this.pollerMetadata.setMaxMessagesPerPoll(1);
188+
maxMessagesPerPoll = 1;
188189
}
189-
spca.setMaxMessagesPerPoll(this.pollerMetadata.getMaxMessagesPerPoll());
190+
spca.setMaxMessagesPerPoll(maxMessagesPerPoll);
190191
if (this.sendTimeout != null) {
191192
spca.setSendTimeout(this.sendTimeout);
192193
}

Diff for: spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
import org.springframework.integration.endpoint.MethodInvokingMessageSource;
105105
import org.springframework.integration.endpoint.PollingConsumer;
106106
import org.springframework.integration.endpoint.ReactiveStreamsConsumer;
107+
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
107108
import org.springframework.integration.expression.SpelPropertyAccessorRegistrar;
108109
import org.springframework.integration.gateway.GatewayProxyFactoryBean;
109110
import org.springframework.integration.handler.ServiceActivatingHandler;
@@ -416,12 +417,15 @@ public void testAnnotatedServiceActivator() throws Exception {
416417

417418
assertThat(this.counterChannel.receive(10)).isNull();
418419

419-
SmartLifecycle countSA = this.context.getBean("annotationTestService.count.inboundChannelAdapter",
420-
SmartLifecycle.class);
420+
SourcePollingChannelAdapter countSA =
421+
this.context.getBean("annotationTestService.count.inboundChannelAdapter",
422+
SourcePollingChannelAdapter.class);
421423
assertThat(countSA.isAutoStartup()).isFalse();
422424
assertThat(countSA.getPhase()).isEqualTo(23);
423425
countSA.start();
424426

427+
assertThat(countSA.getMaxMessagesPerPoll()).isEqualTo(1);
428+
425429
for (int i = 0; i < 10; i++) {
426430
Message<?> message = this.counterChannel.receive(10_000);
427431
assertThat(message).isNotNull();

0 commit comments

Comments
 (0)