Skip to content

Commit 522f6b4

Browse files
garyrussellartembilan
authored andcommitted
GH-601: @KafkaListener: Support List<Message<Foo>>
Resolves #601
1 parent 6bd58b2 commit 522f6b4

File tree

6 files changed

+95
-15
lines changed

6 files changed

+95
-15
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -61,7 +61,7 @@ public class BatchMessagingMessageListenerAdapter<K, V> extends MessagingMessage
6161

6262
private static final Message<KafkaNull> NULL_MESSAGE = new GenericMessage<>(KafkaNull.INSTANCE);
6363

64-
private BatchMessageConverter messageConverter = new BatchMessagingMessageConverter();
64+
private BatchMessageConverter batchMessageConverter = new BatchMessagingMessageConverter();
6565

6666
private KafkaListenerErrorHandler errorHandler;
6767

@@ -79,7 +79,10 @@ public BatchMessagingMessageListenerAdapter(Object bean, Method method, KafkaLis
7979
* @param messageConverter the converter.
8080
*/
8181
public void setBatchMessageConverter(BatchMessageConverter messageConverter) {
82-
this.messageConverter = messageConverter;
82+
this.batchMessageConverter = messageConverter;
83+
if (messageConverter.getRecordMessageConverter() != null) {
84+
setMessageConverter(messageConverter.getRecordMessageConverter());
85+
}
8386
}
8487

8588
/**
@@ -89,7 +92,7 @@ public void setBatchMessageConverter(BatchMessageConverter messageConverter) {
8992
* being able to convert {@link org.springframework.messaging.Message}.
9093
*/
9194
protected final BatchMessageConverter getBatchMessageConverter() {
92-
return this.messageConverter;
95+
return this.batchMessageConverter;
9396
}
9497

9598
/**

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -428,9 +428,12 @@ else if (parameterizedType.getRawType().equals(List.class)
428428
this.isConsumerRecordList = paramType.equals(ConsumerRecord.class)
429429
|| (paramType instanceof ParameterizedType
430430
&& ((ParameterizedType) paramType).getRawType().equals(ConsumerRecord.class));
431-
this.isMessageList = paramType.equals(Message.class)
432-
|| (paramType instanceof ParameterizedType
433-
&& ((ParameterizedType) paramType).getRawType().equals(Message.class));
431+
boolean messageHasGeneric = paramType instanceof ParameterizedType
432+
&& ((ParameterizedType) paramType).getRawType().equals(Message.class);
433+
this.isMessageList = paramType.equals(Message.class) || messageHasGeneric;
434+
if (messageHasGeneric) {
435+
genericParameterType = ((ParameterizedType) paramType).getActualTypeArguments()[0];
436+
}
434437
}
435438
}
436439
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessageConverter.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016 the original author or authors.
2+
* Copyright 2016-2016 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,6 +24,7 @@
2424
import org.apache.kafka.clients.producer.ProducerRecord;
2525

2626
import org.springframework.kafka.support.Acknowledgment;
27+
import org.springframework.lang.Nullable;
2728
import org.springframework.messaging.Message;
2829

2930
/**
@@ -53,4 +54,15 @@ Message<?> toMessage(List<ConsumerRecord<?, ?>> records, Acknowledgment acknowle
5354
*/
5455
List<ProducerRecord<?, ?>> fromMessage(Message<?> message, String defaultTopic);
5556

57+
/**
58+
* Return the record converter used by this batch converter, if configured,
59+
* or null.
60+
* @return the converter or null.
61+
* @since 2.1.5
62+
*/
63+
@Nullable
64+
default RecordMessageConverter getRecordMessageConverter() {
65+
return null;
66+
}
67+
5668
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -117,6 +117,11 @@ public void setHeaderMapper(KafkaHeaderMapper headerMapper) {
117117
this.headerMapper = headerMapper;
118118
}
119119

120+
@Override
121+
public RecordMessageConverter getRecordMessageConverter() {
122+
return this.recordConverter;
123+
}
124+
120125
@Override
121126
public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, Acknowledgment acknowledgment,
122127
Consumer<?, ?> consumer, Type type) {

Diff for: spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java

+38-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -43,6 +43,7 @@
4343
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
4444
import org.springframework.kafka.test.rule.KafkaEmbedded;
4545
import org.springframework.kafka.test.utils.KafkaTestUtils;
46+
import org.springframework.messaging.Message;
4647
import org.springframework.messaging.handler.annotation.Header;
4748
import org.springframework.messaging.support.GenericMessage;
4849
import org.springframework.test.annotation.DirtiesContext;
@@ -62,7 +63,7 @@ public class BatchListenerConversionTests {
6263
private static final String DEFAULT_TEST_GROUP_ID = "blc";
6364

6465
@ClassRule // one topic to preserve order
65-
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 1, "blc1", "blc2");
66+
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 1, "blc1", "blc2", "blc3");
6667

6768
@Autowired
6869
private Config config;
@@ -90,6 +91,20 @@ private void doTest(Listener listener, String topic) throws InterruptedException
9091
assertThat((listener.receivedPartitions).get(0)).isEqualTo(0);
9192
}
9293

94+
@Test
95+
public void testBatchOfPojoMessages() throws Exception {
96+
String topic = "blc3";
97+
this.template.send(new GenericMessage<>(
98+
new Foo("bar"), Collections.singletonMap(KafkaHeaders.TOPIC, topic)));
99+
this.template.send(new GenericMessage<>(
100+
new Foo("baz"), Collections.singletonMap(KafkaHeaders.TOPIC, topic)));
101+
Listener3 listener = this.config.listener3();
102+
assertThat(listener.latch1.await(10, TimeUnit.SECONDS)).isTrue();
103+
assertThat(listener.received.size()).isGreaterThan(0);
104+
assertThat(listener.received.get(0).getPayload()).isInstanceOf(Foo.class);
105+
assertThat(listener.received.get(0).getPayload().getBar()).isEqualTo("bar");
106+
}
107+
93108
@Configuration
94109
@EnableKafka
95110
public static class Config {
@@ -149,6 +164,11 @@ public Listener listener2() {
149164
return new Listener("blc2");
150165
}
151166

167+
@Bean
168+
public Listener3 listener3() {
169+
return new Listener3();
170+
}
171+
152172
}
153173

154174
public static class Listener {
@@ -191,6 +211,22 @@ public String getTopic() {
191211

192212
}
193213

214+
public static class Listener3 {
215+
216+
private final CountDownLatch latch1 = new CountDownLatch(1);
217+
218+
private List<Message<Foo>> received;
219+
220+
@KafkaListener(topics = "blc3", groupId = "blc3")
221+
public void listen1(List<Message<Foo>> foos) {
222+
if (this.received == null) {
223+
this.received = foos;
224+
}
225+
this.latch1.countDown();
226+
}
227+
228+
}
229+
194230
public static class Foo {
195231

196232
public String bar;

Diff for: src/reference/asciidoc/kafka.adoc

+25-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
[[kafka]]
32
=== Using Spring for Apache Kafka
43

@@ -863,6 +862,11 @@ public void listen15(List<Message<?>> list, Acknowledgment ack) {
863862
}
864863
----
865864

865+
No conversion is performed on the payloads in this case.
866+
867+
If the `BatchMessagingMessageConverter` is configured with a `RecordMessageConverter`, you can also add a generic type to the `Message` parameter and the payloads will be converted.
868+
See <<payload-conversion-with-batch>> for more information.
869+
866870
You can also receive a list of `ConsumerRecord<?, ?>` objects but it must be the only parameter (aside from an optional `Acknowledgment` when using manual commits) defined on the method:
867871

868872
[source, java]
@@ -1382,6 +1386,8 @@ When a container is paused, it continues to `poll()` the consumer, avoiding a re
13821386
[[serdes]]
13831387
==== Serialization/Deserialization and Message Conversion
13841388

1389+
===== Overview
1390+
13851391
Apache Kafka provides a high-level API for serializing/deserializing record values as well as their keys.
13861392
It is present with the `org.apache.kafka.common.serialization.Serializer<T>` and
13871393
`org.apache.kafka.common.serialization.Deserializer<T>` abstractions with some built-in implementations.
@@ -1460,7 +1466,10 @@ With a class-level `@KafkaListener`, the payload type is used to select which `@
14601466

14611467
NOTE: When using the `StringJsonMessageConverter`, you should use a `StringDeserializer` in the kafka consumer configuration and `StringSerializer` in the kafka producer configuration, when using Spring Integration or the `KafkaTemplate.send(Message<?> message)` method.
14621468

1463-
Starting with _version 1.3.2_ you can also use a `StringJsonMessageConverter` within a `BatchMessagingMessageConverter` for converting batch messages, when using a batch listener container factory.
1469+
[[payload-conversion-with-batch]]
1470+
===== Payload Conversion with Batch Listeners
1471+
1472+
Starting with _version 1.3.2_, you can also use a `StringJsonMessageConverter` within a `BatchMessagingMessageConverter` for converting batch messages, when using a batch listener container factory.
14641473

14651474
By default, the type for the conversion is inferred from the listener argument.
14661475
If you configure the `StringJsonMessageConverter` with a `DefaultJackson2TypeMapper` that has its `TypePrecedence` set to `TYPE_ID` (instead of the default `INFERRED`), then the converter will use type information in headers (if present) instead.
@@ -1498,8 +1507,20 @@ public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offse
14981507

14991508
Notice that you can still access the batch headers too.
15001509

1501-
Starting with _versions 2.1.1_, the `org.springframework.core.convert.ConversionService` used by the default
1502-
`o.s.messaging.handler.annotation.support.MessageHandlerMethodFactory` to reslove parameters for the invocation
1510+
If the batch converter has a record converter that supports it, you can also receive a list of messages where the payloads are converted according to the generic type:
1511+
1512+
[source, java]
1513+
----
1514+
@KafkaListener(topics = "blc3", groupId = "blc3")
1515+
public void listen1(List<Message<Foo>> fooMessages) {
1516+
...
1517+
}
1518+
----
1519+
1520+
===== ConversionService Customization
1521+
1522+
Starting with _version 2.1.1_, the `org.springframework.core.convert.ConversionService` used by the default
1523+
`o.s.messaging.handler.annotation.support.MessageHandlerMethodFactory` to resolve parameters for the invocation
15031524
of a listener method is supplied with all beans implementing any of the following interfaces:
15041525

15051526
- `org.springframework.core.convert.converter.Converter`

0 commit comments

Comments
 (0)