@@ -37,8 +37,10 @@ import org.springframework.integration.MessageRejectedException
37
37
import org.springframework.integration.channel.QueueChannel
38
38
import org.springframework.integration.config.EnableIntegration
39
39
import org.springframework.integration.dsl.IntegrationFlow
40
- import org.springframework.integration.dsl.IntegrationFlows
41
40
import org.springframework.integration.dsl.Pollers
41
+ import org.springframework.integration.dsl.kotlin.filterReified
42
+ import org.springframework.integration.dsl.kotlin.integrationFlow
43
+ import org.springframework.integration.dsl.kotlin.split
42
44
import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer
43
45
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter
44
46
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler
@@ -87,7 +89,7 @@ import java.util.stream.Stream
87
89
@SpringJUnitConfig
88
90
@DirtiesContext
89
91
@EmbeddedKafka(topics = [KafkaDslKotlinTests .TEST_TOPIC1 , KafkaDslKotlinTests .TEST_TOPIC2 ,
90
- KafkaDslKotlinTests .TEST_TOPIC3 , KafkaDslKotlinTests .TEST_TOPIC4 , KafkaDslKotlinTests .TEST_TOPIC5 ])
92
+ KafkaDslKotlinTests .TEST_TOPIC3 , KafkaDslKotlinTests .TEST_TOPIC4 , KafkaDslKotlinTests .TEST_TOPIC5 ])
91
93
class KafkaDslKotlinTests {
92
94
93
95
companion object {
@@ -159,8 +161,8 @@ class KafkaDslKotlinTests {
159
161
assertThat(receive!! .payload).isEqualTo(" FOO" )
160
162
val headers = receive.headers
161
163
assertThat(headers.containsKey(KafkaHeaders .ACKNOWLEDGMENT )).isTrue()
162
- val acknowledgment = headers.get( KafkaHeaders .ACKNOWLEDGMENT , Acknowledgment :: class .java)
163
- acknowledgment? .acknowledge()
164
+ val acknowledgment = headers[ KafkaHeaders .ACKNOWLEDGMENT ] as Acknowledgment
165
+ acknowledgment.acknowledge()
164
166
assertThat(headers[KafkaHeaders .RECEIVED_TOPIC ]).isEqualTo(TEST_TOPIC1 )
165
167
assertThat(headers[KafkaHeaders .RECEIVED_MESSAGE_KEY ]).isEqualTo(i + 1 )
166
168
assertThat(headers[KafkaHeaders .RECEIVED_PARTITION_ID ]).isEqualTo(0 )
@@ -176,8 +178,8 @@ class KafkaDslKotlinTests {
176
178
assertThat(receive!! .payload).isEqualTo(" FOO" )
177
179
val headers = receive.headers
178
180
assertThat(headers.containsKey(KafkaHeaders .ACKNOWLEDGMENT )).isTrue()
179
- val acknowledgment = headers.get( KafkaHeaders .ACKNOWLEDGMENT , Acknowledgment :: class .java)
180
- acknowledgment? .acknowledge()
181
+ val acknowledgment = headers[ KafkaHeaders .ACKNOWLEDGMENT ] as Acknowledgment
182
+ acknowledgment.acknowledge()
181
183
assertThat(headers[KafkaHeaders .RECEIVED_TOPIC ]).isEqualTo(TEST_TOPIC2 )
182
184
assertThat(headers[KafkaHeaders .RECEIVED_MESSAGE_KEY ]).isEqualTo(i + 1 )
183
185
assertThat(headers[KafkaHeaders .RECEIVED_PARTITION_ID ]).isEqualTo(0 )
@@ -232,7 +234,7 @@ class KafkaDslKotlinTests {
232
234
@Bean
233
235
fun consumerFactory (): ConsumerFactory <Int , String > {
234
236
val props = KafkaTestUtils .consumerProps(" test1" , " false" , this .embeddedKafka)
235
- props.put( ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , " earliest" )
237
+ props[ ConsumerConfig .AUTO_OFFSET_RESET_CONFIG ] = " earliest"
236
238
return DefaultKafkaConsumerFactory (props)
237
239
}
238
240
@@ -241,7 +243,7 @@ class KafkaDslKotlinTests {
241
243
242
244
@Bean
243
245
fun topic1ListenerFromKafkaFlow () =
244
- IntegrationFlows .from (
246
+ integrationFlow (
245
247
Kafka .messageDrivenChannelAdapter(consumerFactory(),
246
248
KafkaMessageDrivenChannelAdapter .ListenerMode .record, TEST_TOPIC1 )
247
249
.configureListenerContainer {
@@ -251,50 +253,52 @@ class KafkaDslKotlinTests {
251
253
.recoveryCallback(ErrorMessageSendingRecoverer (errorChannel(),
252
254
RawRecordHeaderErrorMessageStrategy ()))
253
255
.retryTemplate(RetryTemplate ())
254
- .filterInRetry(true ))
255
- .filter(Message ::class .java, { m -> m.getHeaders().get(KafkaHeaders .RECEIVED_MESSAGE_KEY , Integer ::class .java)!! < 101 },
256
- { f -> f.throwExceptionOnRejection(true ) })
257
- .transform<String , String > { it.toUpperCase() }
258
- .channel { c -> c.queue(" listeningFromKafkaResults1" ) }
259
- .get()
256
+ .filterInRetry(true )) {
257
+ it.filterReified<Message <* >>(
258
+ { m -> (m.headers[KafkaHeaders .RECEIVED_MESSAGE_KEY ] as Int ) < 101 },
259
+ { f -> f.throwExceptionOnRejection(true ) })
260
+ .transform<String , String > { it.toUpperCase() }
261
+ .channel { c -> c.queue(" listeningFromKafkaResults1" ) }
262
+ }
260
263
261
264
@Bean
262
265
fun topic2ListenerFromKafkaFlow () =
263
- IntegrationFlows .from (
266
+ integrationFlow (
264
267
Kafka .messageDrivenChannelAdapter(consumerFactory(),
265
268
KafkaMessageDrivenChannelAdapter .ListenerMode .record, TEST_TOPIC2 )
266
269
.configureListenerContainer { it.ackMode(ContainerProperties .AckMode .MANUAL ) }
267
270
.recoveryCallback(ErrorMessageSendingRecoverer (errorChannel(),
268
271
RawRecordHeaderErrorMessageStrategy ()))
269
272
.retryTemplate(RetryTemplate ())
270
- .filterInRetry(true ))
271
- .filter( Message :: class .java,
272
- { m -> m.getHeaders().get( KafkaHeaders .RECEIVED_MESSAGE_KEY , Integer :: class .java) !! < 101 },
273
- { it.throwExceptionOnRejection(true ) })
274
- .transform<String , String > { it.toUpperCase() }
275
- .channel { c -> c.queue(" listeningFromKafkaResults2" ) }
276
- .get()
273
+ .filterInRetry(true )) {
274
+ it.filterReified< Message < * >>(
275
+ { m -> (m.headers[ KafkaHeaders .RECEIVED_MESSAGE_KEY ] as Int ) < 101 },
276
+ { it.throwExceptionOnRejection(true ) })
277
+ .transform<String , String > { it.toUpperCase() }
278
+ .channel { c -> c.queue(" listeningFromKafkaResults2" ) }
279
+ }
277
280
278
281
@Bean
279
282
fun producerFactory (): DefaultKafkaProducerFactory <Int , String > {
280
283
val props = KafkaTestUtils .producerProps(this .embeddedKafka)
281
- props.put( ProducerConfig .MAX_BLOCK_MS_CONFIG , " 10000" )
284
+ props[ ProducerConfig .MAX_BLOCK_MS_CONFIG ] = " 10000"
282
285
return DefaultKafkaProducerFactory (props)
283
286
}
284
287
285
288
@Bean
286
289
fun sendToKafkaFlow () =
287
- IntegrationFlow { f ->
288
- f.split<String >({ p -> Stream .generate { p }.limit(101 ) }, null )
289
- .publishSubscribeChannel { c ->
290
- c.subscribe { sf ->
291
- sf.handle(
292
- kafkaMessageHandler(producerFactory(), TEST_TOPIC1 )
293
- .timestampExpression(" T(Long).valueOf('1487694048633')" )
294
- ) { it.id(" kafkaProducer1" ) }
295
- }
296
- .subscribe { sf ->
297
- sf.handle(
290
+ IntegrationFlow {
291
+ it.split<String >({ p -> Stream .generate { p }.limit(101 ) })
292
+ .publishSubscribeChannel {
293
+ it
294
+ .subscribe {
295
+ it.handle(
296
+ kafkaMessageHandler(producerFactory(), TEST_TOPIC1 )
297
+ .timestampExpression(" T(Long).valueOf('1487694048633')" )
298
+ ) { it.id(" kafkaProducer1" ) }
299
+ }
300
+ .subscribe {
301
+ it.handle(
298
302
kafkaMessageHandler(producerFactory(), TEST_TOPIC2 )
299
303
.timestamp<Any > { 1487694048644L }
300
304
) { it.id(" kafkaProducer2" ) }
@@ -310,21 +314,20 @@ class KafkaDslKotlinTests {
310
314
.messageKey<Any > { m -> m.headers[IntegrationMessageHeaderAccessor .SEQUENCE_NUMBER ] }
311
315
.headerMapper(mapper())
312
316
.sync(true )
313
- .partitionId<Any > { _ -> 0 }
317
+ .partitionId<Any > { 0 }
314
318
.topicExpression(" headers[kafka_topic] ?: '$topic '" )
315
- .configureKafkaTemplate { t -> t .id(" kafkaTemplate:$topic " ) }
319
+ .configureKafkaTemplate { it .id(" kafkaTemplate:$topic " ) }
316
320
317
321
318
322
@Bean
319
323
fun sourceFlow () =
320
- IntegrationFlows
321
- .from(Kafka .inboundChannelAdapter(consumerFactory(), ConsumerProperties (TEST_TOPIC3 )))
322
- { e -> e.poller(Pollers .fixedDelay(100 )) }
323
- .handle { p ->
324
- this .fromSource = p.getPayload()
325
- this .sourceFlowLatch.countDown()
326
- }
327
- .get()
324
+ integrationFlow(Kafka .inboundChannelAdapter(consumerFactory(), ConsumerProperties (TEST_TOPIC3 )),
325
+ { e -> e.poller(Pollers .fixedDelay(100 )) }) {
326
+ it.handle { m ->
327
+ this .fromSource = m.payload
328
+ this .sourceFlowLatch.countDown()
329
+ }
330
+ }
328
331
329
332
@Bean
330
333
fun replyingKafkaTemplate () =
@@ -335,10 +338,10 @@ class KafkaDslKotlinTests {
335
338
336
339
@Bean
337
340
fun outboundGateFlow () =
338
- IntegrationFlows .from( Gate :: class .java)
339
- .handle(Kafka .outboundGateway(replyingKafkaTemplate())
340
- .sync(true ))
341
- .get()
341
+ integrationFlow< Gate > {
342
+ it .handle(Kafka .outboundGateway(replyingKafkaTemplate())
343
+ .sync(true ))
344
+ }
342
345
343
346
private fun replyContainer (): GenericMessageListenerContainer <Int , String > {
344
347
val containerProperties = ContainerProperties (TEST_TOPIC5 )
@@ -359,10 +362,9 @@ class KafkaDslKotlinTests {
359
362
360
363
@Bean
361
364
fun serverGateway () =
362
- IntegrationFlows .from(
363
- Kafka .inboundGateway(consumerFactory(), containerProperties(), producerFactory()))
364
- .transform<String , String > { it.toUpperCase() }
365
- .get()
365
+ integrationFlow(Kafka .inboundGateway(consumerFactory(), containerProperties(), producerFactory())) {
366
+ it.transform<String , String > { it.toUpperCase() }
367
+ }
366
368
367
369
private fun containerProperties () =
368
370
ContainerProperties (TEST_TOPIC4 )
0 commit comments