Skip to content

sendTimeout appears to be ignored in SourcePollingChannelAdapterSpec vs. creating adapters manually #9863

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
jtkb opened this issue Feb 26, 2025 · 1 comment

Comments

@jtkb
Copy link

jtkb commented Feb 26, 2025

When I create an IntegrationFlow setting the sendTimeout value on SourcePollingChannelAdapterSpec is being ignored causing the polling thread to block. This does not happen if I create a SourcePollingChannelAdapter programmatically. For example, programmatically with a @Configuration class:

@Configuration
public class JdbcPollingConfig {

    @Bean
    protected MessageChannel jdbcInboundChannel() {
        return new QueueChannel(1);
    }

    @Bean
    public ThreadPoolTaskScheduler jdbcTaskExecutor() {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(1);
        taskScheduler.initialize();
        return taskScheduler;
    }

    @Bean
    protected JdbcPollingChannelAdapter jdbcPollingChannelAdapter(final DataSource dataSource) {
        final JdbcPollingChannelAdapter adapter =
                new JdbcPollingChannelAdapter(dataSource, "select dp_id, dp_payload from data_packets");
        return adapter;
    }

    @Bean
    protected SourcePollingChannelAdapter sourcePollingChannelAdapter(
            final JdbcPollingChannelAdapter adapter,
            final MessageChannel jdbcInboundChannel,
            final ThreadPoolTaskScheduler jdbcTaskExecutor) {
        final SourcePollingChannelAdapter spcAdapter = new SourcePollingChannelAdapter();
        spcAdapter.setSource(adapter);
        spcAdapter.setOutputChannel(jdbcInboundChannel);
        spcAdapter.setSendTimeout(500); // Sets the send-to-channel timeout - throws exception on timeout though.
        spcAdapter.setTaskScheduler(jdbcTaskExecutor);
        spcAdapter.setTrigger( new PeriodicTrigger(Duration.ofSeconds(2L)));
        return spcAdapter;
    }

}

With the channel queue as configured, this configuration throws an exception as anticipated because there is no consumer of the channel jdbcInboundChannel. Contrast this using an IntegrationFlow configuration:

@Configuration
public class JdbcDSLConfig {

    @Bean
    public QueueChannelSpec jdbcInboundChannel() {
        return MessageChannels.queue(1);
    }

    @Bean
    public MessageSource<Object> jdbcMessageSource(final DataSource dataSource) {
        return new CustomJdbcPollingChannelAdapter(dataSource, "select dp_id, dp_payload from data_packets");
    }

    @Bean
    public ThreadPoolTaskExecutor jdbcExecutor() {
        final ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(1);
        taskExecutor.setMaxPoolSize(1);
        taskExecutor.initialize();
        return taskExecutor;
    }

    @Bean
    public IntegrationFlow jdbcInboundFlow(final MessageSource<Object> jdbcMessageSource,
                                           final QueueChannelSpec jdbcInboundChannel,
                                           @Qualifier("jdbcExecutor") final ThreadPoolTaskExecutor jdbcExecutor) {
        return IntegrationFlow.from(jdbcMessageSource,
                        c -> c.poller(Pollers
                                .fixedDelay(2000)
                                .sendTimeout(1) // this has no effect.
                                .taskExecutor(jdbcExecutor)))
                .channel(jdbcInboundChannel)
                .get();

    }
}

This configuration blocks on the second polling of the database even with a sendTimeout set in the configuration.

I have tested this with Spring-boot 3.4.0 & 3.2.0

@jtkb jtkb added status: waiting-for-triage The issue need to be evaluated and its future decided type: bug labels Feb 26, 2025
@artembilan artembilan added this to the 6.5.0-M3 milestone Feb 26, 2025
@artembilan artembilan added in: core for: backport-to-6.3.x for: backport-to-6.4.x and removed status: waiting-for-triage The issue need to be evaluated and its future decided labels Feb 26, 2025
@artembilan
Copy link
Member

spring-builds pushed a commit that referenced this issue Feb 26, 2025
Fixes: #9863
Issue link: #9863

The `PollerMetadata.sendTimeout` has been migrated to the `SourcePollingChannelAdapter.setSendTimeout()` long time ago.
Right now this option is not propagated anywhere

* Introduce missed `SourcePollingChannelAdapterSpec.sendTimeout`
* Deprecate (for removal) `PollerMetadata.sendTimeout`
* Verify `SourcePollingChannelAdapterSpec.sendTimeout` option in the `ReactiveStreamsTests`

(cherry picked from commit 0af3b25)
artembilan added a commit that referenced this issue Feb 26, 2025
Fixes: #9863
Issue link: #9863

The `PollerMetadata.sendTimeout` has been migrated to the `SourcePollingChannelAdapter.setSendTimeout()` long time ago.
Right now this option is not propagated anywhere

* Introduce missed `SourcePollingChannelAdapterSpec.sendTimeout`
* Deprecate (for removal) `PollerMetadata.sendTimeout`
* Verify `SourcePollingChannelAdapterSpec.sendTimeout` option in the `ReactiveStreamsTests`

(cherry picked from commit 0af3b25)

# Conflicts:
#	spring-integration-core/src/main/java/org/springframework/integration/dsl/SourcePollingChannelAdapterSpec.java
artembilan added a commit that referenced this issue Feb 26, 2025
Issue link: #9863

The `PollerMetadata.sendTimeout` has been deprecated in previous versions
in favor of ``

* Remove `PollerMetadata.sendTimeout` and `PollerSpec.sendTimeout`
in favor of `SourcePollingChannelAdapterSpec.sendTimeout`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants