Skip to content

Commit d9c23e3

Browse files
garyrussellartembilan
authored andcommitted
GH-725: Fix return Collection<Message<?>>
Fixes #725 Fixes #726 Handle return type `Collection<Message<?>>`. Allow empty `@SendTo` when the return type is `Message<?>` (or a collection thereof). * Polishing according PR comments
1 parent d215f4d commit d9c23e3

File tree

7 files changed

+235
-26
lines changed

7 files changed

+235
-26
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.springframework.expression.common.TemplateParserContext;
3636
import org.springframework.expression.spel.standard.SpelExpressionParser;
3737
import org.springframework.kafka.KafkaException;
38+
import org.springframework.kafka.support.KafkaUtils;
3839
import org.springframework.lang.Nullable;
3940
import org.springframework.messaging.Message;
4041
import org.springframework.messaging.handler.annotation.Header;
@@ -66,6 +67,8 @@ public class DelegatingInvocableHandler {
6667

6768
private final Map<InvocableHandlerMethod, Expression> handlerSendTo = new HashMap<>();
6869

70+
private final Map<InvocableHandlerMethod, Boolean> handlerReturnsMessage = new HashMap<>();
71+
6972
private final Object bean;
7073

7174
private final BeanExpressionResolver resolver;
@@ -124,10 +127,7 @@ public Object invoke(Message<?> message, Object... providedArgs) throws Exceptio
124127
InvocableHandlerMethod handler = getHandlerForPayload(payloadClass);
125128
Object result = handler.invoke(message, providedArgs);
126129
Expression replyTo = this.handlerSendTo.get(handler);
127-
if (replyTo != null) {
128-
result = new InvocationResult(result, replyTo);
129-
}
130-
return result;
130+
return new InvocationResult(result, replyTo, this.handlerReturnsMessage.get(handler));
131131
}
132132

133133
/**
@@ -162,6 +162,7 @@ private void setupReplyTo(InvocableHandlerMethod handler) {
162162
if (replyTo != null) {
163163
this.handlerSendTo.put(handler, PARSER.parseExpression(replyTo, PARSER_CONTEXT));
164164
}
165+
this.handlerReturnsMessage.put(handler, KafkaUtils.returnTypeMessageOrCollectionOf(method));
165166
}
166167

167168
private String extractSendTo(String element, SendTo ann) {

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/InvocationResult.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,12 @@ public final class InvocationResult {
3030

3131
private final Expression sendTo;
3232

33-
public InvocationResult(Object result, Expression sendTo) {
33+
private final boolean messageReturnType;
34+
35+
public InvocationResult(Object result, Expression sendTo, boolean messageReturnType) {
3436
this.result = result;
3537
this.sendTo = sendTo;
38+
this.messageReturnType = messageReturnType;
3639
}
3740

3841
public Object getResult() {
@@ -43,6 +46,10 @@ public Expression getSendTo() {
4346
return this.sendTo;
4447
}
4548

49+
public boolean isMessageReturnType() {
50+
return this.messageReturnType;
51+
}
52+
4653
@Override
4754
public String toString() {
4855
return this.result.toString();

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

+22-9
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.springframework.kafka.listener.MessageListener;
5050
import org.springframework.kafka.support.Acknowledgment;
5151
import org.springframework.kafka.support.KafkaHeaders;
52+
import org.springframework.kafka.support.KafkaUtils;
5253
import org.springframework.kafka.support.converter.MessagingMessageConverter;
5354
import org.springframework.kafka.support.converter.RecordMessageConverter;
5455
import org.springframework.lang.Nullable;
@@ -104,6 +105,8 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS
104105

105106
private boolean hasAckParameter;
106107

108+
private boolean messageReturnType;
109+
107110
public MessagingMessageListenerAdapter(Object bean, Method method) {
108111
this.bean = bean;
109112
this.inferredType = determineInferredType(method);
@@ -288,11 +291,13 @@ protected void handleResult(Object resultArg, Object request, Object source) {
288291
this.logger.debug("Listener method returned result [" + resultArg
289292
+ "] - generating response message for it");
290293
}
291-
Object result = resultArg instanceof InvocationResult ? ((InvocationResult) resultArg).getResult() : resultArg;
294+
boolean isInvocationResult = resultArg instanceof InvocationResult;
295+
Object result = isInvocationResult ? ((InvocationResult) resultArg).getResult() : resultArg;
292296
String replyTopic = evaluateReplyTopic(request, source, resultArg);
293297
Assert.state(replyTopic == null || this.replyTemplate != null,
294298
"a KafkaTemplate is required to support replies");
295-
sendResponse(result, replyTopic, source);
299+
sendResponse(result, replyTopic, source, isInvocationResult
300+
? ((InvocationResult) resultArg).isMessageReturnType() : this.messageReturnType);
296301
}
297302

298303
private String evaluateReplyTopic(Object request, Object source, Object result) {
@@ -311,12 +316,13 @@ private String evaluateTopic(Object request, Object source, Object result, Expre
311316
return sendTo.getValue(String.class);
312317
}
313318
else {
314-
Object value = sendTo.getValue(this.evaluationContext, new ReplyExpressionRoot(request, source, result));
319+
Object value = sendTo == null ? null
320+
: sendTo.getValue(this.evaluationContext, new ReplyExpressionRoot(request, source, result));
315321
boolean isByteArray = value instanceof byte[];
316-
if (!(value instanceof String || isByteArray)) {
322+
if (!(value == null || value instanceof String || isByteArray)) {
317323
throw new IllegalStateException(
318324
"replyTopic expression must evaluate to a String or byte[], it is: "
319-
+ (value == null ? null : value.getClass().getName()));
325+
+ value.getClass().getName());
320326
}
321327
if (isByteArray) {
322328
return new String((byte[]) value, StandardCharsets.UTF_8);
@@ -334,7 +340,7 @@ private String evaluateTopic(Object request, Object source, Object result, Expre
334340
*/
335341
@Deprecated
336342
protected void sendResponse(Object result, String topic) {
337-
sendResponse(result, topic, null);
343+
sendResponse(result, topic, null, false);
338344
}
339345

340346
/**
@@ -343,11 +349,12 @@ protected void sendResponse(Object result, String topic) {
343349
* @param result the result.
344350
* @param topic the topic.
345351
* @param source the source (input).
352+
* @param messageReturnType true if we are returning message(s).
346353
* @since 2.1.3
347354
*/
348355
@SuppressWarnings("unchecked")
349-
protected void sendResponse(Object result, String topic, @Nullable Object source) {
350-
if (topic == null) {
356+
protected void sendResponse(Object result, String topic, @Nullable Object source, boolean messageReturnType) {
357+
if (!messageReturnType && topic == null) {
351358
if (this.logger.isDebugEnabled()) {
352359
this.logger.debug("No replyTopic to handle the reply: " + result);
353360
}
@@ -358,7 +365,12 @@ else if (result instanceof Message) {
358365
else {
359366
if (result instanceof Collection) {
360367
((Collection<V>) result).forEach(v -> {
361-
this.replyTemplate.send(topic, v);
368+
if (v instanceof Message) {
369+
this.replyTemplate.send((Message<?>) v);
370+
}
371+
else {
372+
this.replyTemplate.send(topic, v);
373+
}
362374
});
363375
}
364376
else {
@@ -486,6 +498,7 @@ else if (methodParameter.getGenericParameterType().equals(Acknowledgment.class))
486498
() -> String.format(stateMessage, "List<ConsumerRecord>"));
487499
Assert.state(!this.isMessageList || validParametersForBatch,
488500
() -> String.format(stateMessage, "List<Message<?>>"));
501+
this.messageReturnType = KafkaUtils.returnTypeMessageOrCollectionOf(method);
489502
return genericParameterType;
490503
}
491504

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support;
18+
19+
import java.lang.reflect.Method;
20+
import java.lang.reflect.ParameterizedType;
21+
import java.lang.reflect.Type;
22+
import java.util.Collection;
23+
24+
import org.springframework.messaging.Message;
25+
26+
/**
27+
* Utility methods.
28+
*
29+
* @author Gary Russell
30+
*
31+
* @since 2.2
32+
*
33+
*/
34+
public final class KafkaUtils {
35+
36+
/**
37+
* Return true if the method return type is {@link Message} or
38+
* {@code Collection<Message<?>>}.
39+
* @param method the method.
40+
* @return true if it returns message(s).
41+
*/
42+
public static boolean returnTypeMessageOrCollectionOf(Method method) {
43+
Type returnType = method.getGenericReturnType();
44+
if (returnType.equals(Message.class)) {
45+
return true;
46+
}
47+
if (returnType instanceof ParameterizedType) {
48+
ParameterizedType prt = (ParameterizedType) returnType;
49+
Type rawType = prt.getRawType();
50+
if (rawType.equals(Message.class)) {
51+
return true;
52+
}
53+
if (rawType.equals(Collection.class)) {
54+
Type collectionType = prt.getActualTypeArguments()[0];
55+
if (collectionType.equals(Message.class)) {
56+
return true;
57+
}
58+
return collectionType instanceof ParameterizedType
59+
&& ((ParameterizedType) collectionType).getRawType().equals(Message.class);
60+
}
61+
}
62+
return false;
63+
64+
}
65+
66+
private KafkaUtils() {
67+
super();
68+
}
69+
70+
}

Diff for: spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java

+60-3
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21+
import java.util.Collection;
2122
import java.util.Collections;
2223
import java.util.List;
2324
import java.util.Map;
2425
import java.util.concurrent.CountDownLatch;
2526
import java.util.concurrent.TimeUnit;
27+
import java.util.stream.Collectors;
2628

2729
import org.apache.kafka.clients.consumer.ConsumerConfig;
2830
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -48,7 +50,9 @@
4850
import org.springframework.kafka.test.utils.KafkaTestUtils;
4951
import org.springframework.messaging.Message;
5052
import org.springframework.messaging.handler.annotation.Header;
53+
import org.springframework.messaging.handler.annotation.SendTo;
5154
import org.springframework.messaging.support.GenericMessage;
55+
import org.springframework.messaging.support.MessageBuilder;
5256
import org.springframework.test.annotation.DirtiesContext;
5357
import org.springframework.test.context.ContextConfiguration;
5458
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@@ -66,7 +70,8 @@ public class BatchListenerConversionTests {
6670
private static final String DEFAULT_TEST_GROUP_ID = "blc";
6771

6872
@ClassRule // one topic to preserve order
69-
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 1, "blc1", "blc2", "blc3");
73+
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 1, "blc1", "blc2", "blc3",
74+
"blc4", "blc5");
7075

7176
@Autowired
7277
private Config config;
@@ -108,22 +113,40 @@ public void testBatchOfPojoMessages() throws Exception {
108113
assertThat(listener.received.get(0).getPayload().getBar()).isEqualTo("bar");
109114
}
110115

116+
@Test
117+
public void testBatchReplies() throws Exception {
118+
Listener4 listener = this.config.listener4();
119+
String topic = "blc4";
120+
this.template.send(new GenericMessage<>(
121+
new Foo("bar"), Collections.singletonMap(KafkaHeaders.TOPIC, topic)));
122+
this.template.send(new GenericMessage<>(
123+
new Foo("baz"), Collections.singletonMap(KafkaHeaders.TOPIC, topic)));
124+
assertThat(listener.latch1.await(10, TimeUnit.SECONDS)).isTrue();
125+
assertThat(listener.received.size()).isGreaterThan(0);
126+
assertThat(listener.received.get(0)).isInstanceOf(Foo.class);
127+
assertThat(listener.received.get(0).bar).isEqualTo("bar");
128+
assertThat(listener.replies.size()).isGreaterThan(0);
129+
assertThat(listener.replies.get(0)).isInstanceOf(Foo.class);
130+
assertThat(listener.replies.get(0).bar).isEqualTo("BAR");
131+
}
132+
111133
@Configuration
112134
@EnableKafka
113135
public static class Config {
114136

115137
@Bean
116138
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
117-
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
139+
ConcurrentKafkaListenerContainerFactory<Integer, Foo> factory =
118140
new ConcurrentKafkaListenerContainerFactory<>();
119141
factory.setConsumerFactory(consumerFactory());
120142
factory.setBatchListener(true);
121143
factory.setMessageConverter(new BatchMessagingMessageConverter(converter()));
144+
factory.setReplyTemplate(template());
122145
return factory;
123146
}
124147

125148
@Bean
126-
public DefaultKafkaConsumerFactory<Integer, String> consumerFactory() {
149+
public DefaultKafkaConsumerFactory<Integer, Foo> consumerFactory() {
127150
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
128151
}
129152

@@ -175,6 +198,11 @@ public Listener3 listener3() {
175198
return new Listener3();
176199
}
177200

201+
@Bean
202+
public Listener4 listener4() {
203+
return new Listener4();
204+
}
205+
178206
}
179207

180208
public static class Listener {
@@ -234,6 +262,35 @@ public void listen1(List<Message<Foo>> foos) {
234262

235263
}
236264

265+
public static class Listener4 {
266+
267+
private final CountDownLatch latch1 = new CountDownLatch(1);
268+
269+
private List<Foo> received;
270+
271+
private List<Foo> replies;
272+
273+
@KafkaListener(topics = "blc4", groupId = "blc4")
274+
@SendTo
275+
public Collection<Message<?>> listen1(List<Foo> foos) {
276+
if (this.received == null) {
277+
this.received = foos;
278+
}
279+
return foos.stream().map(f -> MessageBuilder.withPayload(new Foo(f.getBar().toUpperCase()))
280+
.setHeader(KafkaHeaders.TOPIC, "blc5")
281+
.setHeader(KafkaHeaders.MESSAGE_KEY, 42)
282+
.build())
283+
.collect(Collectors.toList());
284+
}
285+
286+
@KafkaListener(topics = "blc5", groupId = "blc5")
287+
public void listen2(List<Foo> foos) {
288+
this.replies = foos;
289+
this.latch1.countDown();
290+
}
291+
292+
}
293+
237294
public static class Foo {
238295

239296
public String bar;

0 commit comments

Comments
 (0)