Skip to content

Commit 794aa0a

Browse files
artembilangaryrussell
authored andcommitted
Fix Kotlin DSL delegation (#8658)
The `ConsumerEndpointSpec` extensions for Kotlin don't delegate to the provided `endpointFactoryBean` * Introduce `KotlinConsumerEndpointSpec` extension for `ConsumerEndpointSpec` with the proper delegation to the provided spec * Use `KotlinConsumerEndpointSpec` in the Kotlin-specific `Spec` classes **Cherry-pick to `6.1.x`, `6.0.x` & `5.5.x`**
1 parent 42ed81f commit 794aa0a

File tree

6 files changed

+131
-11
lines changed

6 files changed

+131
-11
lines changed

spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/AbstractKotlinRouterSpec.kt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,8 @@ import org.springframework.messaging.MessageChannel
2828
*
2929
* @since 5.3
3030
*/
31-
abstract class AbstractKotlinRouterSpec<S : AbstractRouterSpec<S, R>, R : AbstractMessageRouter>(
32-
open val delegate: AbstractRouterSpec<S, R>)
33-
: ConsumerEndpointSpec<S, R>(delegate.handler) {
31+
abstract class AbstractKotlinRouterSpec<S : AbstractRouterSpec<S, R>, R : AbstractMessageRouter>(override val delegate: S)
32+
: KotlinConsumerEndpointSpec<S, R>(delegate) {
3433

3534
fun ignoreSendFailures(ignoreSendFailures: Boolean) {
3635
this.delegate.ignoreSendFailures(ignoreSendFailures)
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.dsl
18+
19+
import org.aopalliance.aop.Advice
20+
import org.aopalliance.intercept.MethodInterceptor
21+
import org.reactivestreams.Publisher
22+
import org.springframework.integration.scheduling.PollerMetadata
23+
import org.springframework.messaging.Message
24+
import org.springframework.messaging.MessageHandler
25+
import org.springframework.scheduling.TaskScheduler
26+
import org.springframework.transaction.TransactionManager
27+
import org.springframework.transaction.interceptor.TransactionInterceptor
28+
import reactor.core.publisher.Flux
29+
import reactor.core.publisher.Mono
30+
31+
/**
32+
* A [ConsumerEndpointSpec] wrapped for Kotlin DSL.
33+
*
34+
* @property delegate the [ConsumerEndpointSpec] this instance is delegating to.
35+
*
36+
* @author Artem Bilan
37+
*
38+
* @since 5.5.19
39+
*/
40+
abstract class KotlinConsumerEndpointSpec<S : ConsumerEndpointSpec<S, H>, H : MessageHandler>(open val delegate: S)
41+
: ConsumerEndpointSpec<S, H>(delegate.handler) {
42+
43+
override fun phase(phase: Int): S {
44+
return this.delegate.phase(phase)
45+
}
46+
47+
override fun autoStartup(autoStartup: Boolean): S {
48+
return this.delegate.autoStartup(autoStartup)
49+
}
50+
51+
override fun poller(pollerMetadata: PollerMetadata): S {
52+
return this.delegate.poller(pollerMetadata)
53+
}
54+
55+
override fun reactive(): S {
56+
return this.delegate.reactive()
57+
}
58+
59+
fun reactive(reactiveCustomizer: (Flux<Message<*>>) -> Publisher<Message<*>>) {
60+
this.delegate.reactive(reactiveCustomizer)
61+
}
62+
63+
override fun role(role: String): S {
64+
return this.delegate.role(role)
65+
}
66+
67+
override fun taskScheduler(taskScheduler: TaskScheduler): S {
68+
return this.delegate.taskScheduler(taskScheduler)
69+
}
70+
71+
override fun handleMessageAdvice(vararg interceptors: MethodInterceptor?): S {
72+
return this.delegate.handleMessageAdvice(*interceptors)
73+
}
74+
75+
override fun advice(vararg advice: Advice?): S {
76+
return this.delegate.advice(*advice)
77+
}
78+
79+
override fun transactional(transactionManager: TransactionManager): S {
80+
return this.delegate.transactional(transactionManager)
81+
}
82+
83+
override fun transactional(transactionManager: TransactionManager, handleMessageAdvice: Boolean): S {
84+
return this.delegate.transactional(transactionManager, handleMessageAdvice)
85+
}
86+
87+
override fun transactional(transactionInterceptor: TransactionInterceptor): S {
88+
return this.delegate.transactional(transactionInterceptor)
89+
}
90+
91+
override fun transactional(): S {
92+
return this.delegate.transactional()
93+
}
94+
95+
override fun transactional(handleMessageAdvice: Boolean): S {
96+
return this.delegate.transactional(handleMessageAdvice)
97+
}
98+
99+
fun <T : Any?, V : Any?> customizeMonoReply(replyCustomizer: (Message<*>, Mono<T>) -> Publisher<V>) {
100+
this.delegate.customizeMonoReply(replyCustomizer)
101+
}
102+
103+
override fun id(id: String?): S {
104+
return this.delegate.id(id)
105+
}
106+
107+
override fun poller(pollerMetadataSpec: PollerSpec): S {
108+
return this.delegate.poller(pollerMetadataSpec)
109+
}
110+
111+
fun poller(pollers: (PollerFactory) -> PollerSpec) {
112+
this.delegate.poller(pollers)
113+
}
114+
115+
}

spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinEnricherSpec.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ import org.springframework.messaging.MessageChannel
3030
*
3131
* @since 5.3
3232
*/
33-
class KotlinEnricherSpec(val delegate: EnricherSpec)
34-
: ConsumerEndpointSpec<EnricherSpec, ContentEnricher>(delegate.handler) {
33+
class KotlinEnricherSpec(override val delegate: EnricherSpec)
34+
: KotlinConsumerEndpointSpec<EnricherSpec, ContentEnricher>(delegate) {
3535

3636
fun requestChannel(requestChannel: MessageChannel) {
3737
this.delegate.requestChannel(requestChannel)

spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinFilterEndpointSpec.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ import org.springframework.messaging.MessageChannel
2828
*
2929
* @since 5.3
3030
*/
31-
class KotlinFilterEndpointSpec(val delegate: FilterEndpointSpec)
32-
: ConsumerEndpointSpec<FilterEndpointSpec, MessageFilter>(delegate.handler) {
31+
class KotlinFilterEndpointSpec(override val delegate: FilterEndpointSpec)
32+
: KotlinConsumerEndpointSpec<FilterEndpointSpec, MessageFilter>(delegate) {
3333

3434
fun throwExceptionOnRejection(throwExceptionOnRejection: Boolean) {
3535
this.delegate.throwExceptionOnRejection(throwExceptionOnRejection)

spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinSplitterEndpointSpec.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ import org.springframework.messaging.MessageChannel
2828
*
2929
* @since 5.3
3030
*/
31-
class KotlinSplitterEndpointSpec<H : AbstractMessageSplitter>(val delegate: SplitterEndpointSpec<H>)
32-
: ConsumerEndpointSpec<KotlinSplitterEndpointSpec<H>, H>(delegate.handler) {
31+
class KotlinSplitterEndpointSpec<H : AbstractMessageSplitter>(override val delegate: SplitterEndpointSpec<H>)
32+
: KotlinConsumerEndpointSpec<SplitterEndpointSpec<H>, H>(delegate) {
3333

3434
fun applySequence(applySequence: Boolean) {
3535
this.delegate.applySequence(applySequence)

spring-integration-core/src/test/kotlin/org/springframework/integration/dsl/KotlinDslTests.kt

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.springframework.integration.channel.QueueChannel
3030
import org.springframework.integration.config.EnableIntegration
3131
import org.springframework.integration.core.MessagingTemplate
3232
import org.springframework.integration.dsl.context.IntegrationFlowContext
33+
import org.springframework.integration.endpoint.AbstractEndpoint
3334
import org.springframework.integration.endpoint.MessageProcessorMessageSource
3435
import org.springframework.integration.handler.LoggingHandler
3536
import org.springframework.integration.scheduling.PollerMetadata
@@ -69,7 +70,7 @@ class KotlinDslTests {
6970

7071
@Test
7172
fun `convert extension`() {
72-
assertThat(this.beanFactory.containsBean("kotlinConverter"))
73+
assertThat(this.beanFactory.containsBean("kotlinConverter")).isTrue()
7374

7475
val replyChannel = QueueChannel()
7576
val date = Date()
@@ -93,6 +94,8 @@ class KotlinDslTests {
9394
@Test
9495
fun `uppercase function`() {
9596
assertThat(beanFactory.containsBean("objectToStringTransformer")).isTrue()
97+
assertThat(this.beanFactory.containsBean("splitterEndpoint")).isTrue()
98+
assertThat(this.beanFactory.getBean("splitterEndpoint", AbstractEndpoint::class.java).phase).isEqualTo(257)
9699
assertThat(this.upperCaseFunction.apply("test".toByteArray())).isEqualTo("TEST")
97100
}
98101

@@ -247,7 +250,10 @@ class KotlinDslTests {
247250
transform(Transformers.objectToString()) { id("objectToStringTransformer") }
248251
transform<String> { it.uppercase() }
249252
split<Message<*>> { it.payload }
250-
split<String>({ it }) { id("splitterEndpoint") }
253+
split<String>({ it }) {
254+
id("splitterEndpoint")
255+
phase(257)
256+
}
251257
resequence()
252258
aggregate {
253259
id("aggregator")

0 commit comments

Comments
 (0)