Skip to content

Commit 05f1fd8

Browse files
authored
GH-8573: Fix KafkaMessageSource samples in docs (#8575)
Fixes #8573 * Also add a Kotlin DSL sample **Cherry-pick to `6.0.x` & `5.5.x`**
1 parent 1c90a60 commit 05f1fd8

File tree

1 file changed

+38
-12
lines changed

1 file changed

+38
-12
lines changed

src/reference/asciidoc/kafka.adoc

+38-12
Original file line numberDiff line numberDiff line change
@@ -360,22 +360,37 @@ The `KafkaMessageSource` provides a pollable channel adapter implementation.
360360
----
361361
@Bean
362362
public IntegrationFlow flow(ConsumerFactory<String, String> cf) {
363-
return IntegrationFlow.from(Kafka.inboundChannelAdapter(cf, "myTopic")
364-
.groupId("myDslGroupId"), e -> e.poller(Pollers.fixedDelay(5000)))
363+
return IntegrationFlow.from(Kafka.inboundChannelAdapter(cf, new ConsumerProperties("myTopic")),
364+
e -> e.poller(Pollers.fixedDelay(5000)))
365365
.handle(System.out::println)
366366
.get();
367367
}
368368
----
369+
[source, kotlin, role="secondary"]
370+
.Kotlin
371+
----
372+
@Bean
373+
fun sourceFlow(cf: ConsumerFactory<String, String>) =
374+
integrationFlow(Kafka.inboundChannelAdapter(cf,
375+
ConsumerProperties(TEST_TOPIC3).also {
376+
it.groupId = "kotlinMessageSourceGroup"
377+
}),
378+
{ poller(Pollers.fixedDelay(100)) }) {
379+
handle { m ->
380+
381+
}
382+
}
383+
----
369384
[source, java, role="secondary"]
370385
.Java
371386
----
372387
@InboundChannelAdapter(channel = "fromKafka", poller = @Poller(fixedDelay = "5000"))
373388
@Bean
374389
public KafkaMessageSource<String, String> source(ConsumerFactory<String, String> cf) {
375-
KafkaMessageSource<String, String> source = new KafkaMessageSource<>(cf, "myTopic");
376-
source.setGroupId("myGroupId");
377-
source.setClientId("myClientId");
378-
return source;
390+
ConsumerProperties consumerProperties = new ConsumerProperties("myTopic");
391+
consumerProperties.setGroupId("myGroupId");
392+
consumerProperties.setClientId("myClientId");
393+
retunr new KafkaMessageSource<>(cf, consumerProperties);
379394
}
380395
----
381396
[source, xml, role="secondary"]
@@ -384,18 +399,29 @@ public KafkaMessageSource<String, String> source(ConsumerFactory<String, String>
384399
<int-kafka:inbound-channel-adapter
385400
id="adapter1"
386401
consumer-factory="consumerFactory"
402+
consumer-properties="consumerProperties1"
387403
ack-factory="ackFactory"
388-
topics="topic1"
389404
channel="inbound"
390-
client-id="client"
391-
group-id="group"
392405
message-converter="converter"
393406
payload-type="java.lang.String"
394407
raw-header="true"
395-
auto-startup="false"
396-
rebalance-listener="rebal">
408+
auto-startup="false">
397409
<int:poller fixed-delay="5000"/>
398410
</int-kafka:inbound-channel-adapter>
411+
412+
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
413+
<constructor-arg>
414+
<map>
415+
<entry key="max.poll.records" value="1"/>
416+
</map>
417+
</constructor-arg>
418+
</bean>
419+
420+
<bean id="consumerProperties1" class="org.springframework.kafka.listener.ConsumerProperties">
421+
<constructor-arg name="topics" value="topic1"/>
422+
<property name="groupId" value="group"/>
423+
<property name="clientId" value="client"/>
424+
</bean>
399425
----
400426
====
401427

@@ -421,7 +447,7 @@ IMPORTANT: The gateway does not accept requests until the reply container has be
421447
It is suggested that you add a `ConsumerRebalanceListener` to the template's reply container properties and wait for the `onPartitionsAssigned` call before sending messages to the gateway.
422448

423449
The `KafkaProducerMessageHandler` `sendTimeoutExpression` default is `delivery.timeout.ms` Kafka producer property `+ 5000` so that the actual Kafka error after a timeout is propagated to the application, instead of a timeout generated by this framework.
424-
This has been changed for consistency because you may get unexpected behavior (Spring may timeout the `send()`, while it is actually, eventually, successful).
450+
This has been changed for consistency because you may get unexpected behavior (Spring may time out the `send()`, while it is actually, eventually, successful).
425451
IMPORTANT: That timeout is 120 seconds by default so you may wish to reduce it to get more timely failures.
426452

427453
[[kafka-outbound-gateway-configuration]]

0 commit comments

Comments
 (0)