Skip to content

Commit 4d4a1a6

Browse files
garyrussellartembilan
authored andcommitted
GH-2295: No Resolvers with ConsumerRecordMetadata
Resolves #2295 A subclass of `InvocableHandlerMethod` was used to signal that a method needs a `ConsumerRecordMetadata` to be constructed. However, the constructor that takes an existing IHM, does not fully clone the argument passed into it. Then, the payload resolver was not invoked to resolve that parameter. Use a map of booleans instead, to signal the creation of the metadata. **cherry-pick to 2.9.x, 2.8.x, 2.7.x**
1 parent 2112c8d commit 4d4a1a6

File tree

2 files changed

+20
-27
lines changed

2 files changed

+20
-27
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java

+11-23
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import org.springframework.messaging.Message;
4141
import org.springframework.messaging.MessageHeaders;
4242
import org.springframework.messaging.converter.MessageConverter;
43-
import org.springframework.messaging.handler.HandlerMethod;
4443
import org.springframework.messaging.handler.annotation.Header;
4544
import org.springframework.messaging.handler.annotation.SendTo;
4645
import org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver;
@@ -75,6 +74,8 @@ public class DelegatingInvocableHandler {
7574

7675
private final Map<InvocableHandlerMethod, Boolean> handlerReturnsMessage = new ConcurrentHashMap<>();
7776

77+
private final Map<InvocableHandlerMethod, Boolean> handlerMetadataAware = new ConcurrentHashMap<>();
78+
7879
private final Object bean;
7980

8081
private final BeanExpressionResolver resolver;
@@ -102,11 +103,12 @@ public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers,
102103
@Nullable BeanExpressionContext beanExpressionContext,
103104
@Nullable BeanFactory beanFactory, @Nullable Validator validator) {
104105

105-
this.handlers = new ArrayList<>();
106+
this.handlers = new ArrayList<>(handlers);
106107
for (InvocableHandlerMethod handler : handlers) {
107-
this.handlers.add(wrapIfNecessary(handler));
108+
checkSpecial(handler);
108109
}
109-
this.defaultHandler = wrapIfNecessary(defaultHandler);
110+
this.defaultHandler = defaultHandler;
111+
checkSpecial(defaultHandler);
110112
this.bean = bean;
111113
this.resolver = beanExpressionResolver;
112114
this.beanExpressionContext = beanExpressionContext;
@@ -116,18 +118,17 @@ public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers,
116118
this.validator = validator == null ? null : new PayloadValidator(validator);
117119
}
118120

119-
@Nullable
120-
private InvocableHandlerMethod wrapIfNecessary(@Nullable InvocableHandlerMethod handler) {
121+
private void checkSpecial(@Nullable InvocableHandlerMethod handler) {
121122
if (handler == null) {
122-
return null;
123+
return;
123124
}
124125
Parameter[] parameters = handler.getMethod().getParameters();
125126
for (Parameter parameter : parameters) {
126127
if (parameter.getType().equals(ConsumerRecordMetadata.class)) {
127-
return new DelegatingInvocableHandler.MetadataAwareInvocableHandlerMethod(handler);
128+
this.handlerMetadataAware.put(handler, true);
129+
return;
128130
}
129131
}
130-
return handler;
131132
}
132133

133134
/**
@@ -156,7 +157,7 @@ public Object invoke(Message<?> message, Object... providedArgs) throws Exceptio
156157
}
157158
}
158159
Object result;
159-
if (handler instanceof MetadataAwareInvocableHandlerMethod) {
160+
if (Boolean.TRUE.equals(this.handlerMetadataAware.get(handler))) {
160161
Object[] args = new Object[providedArgs.length + 1];
161162
args[0] = AdapterUtils.buildConsumerRecordMetadataFromArray(providedArgs);
162163
System.arraycopy(providedArgs, 0, args, 1, providedArgs.length);
@@ -315,19 +316,6 @@ public boolean hasDefaultHandler() {
315316
return this.defaultHandler != null;
316317
}
317318

318-
/**
319-
* A handler method that is aware of {@link ConsumerRecordMetadata}.
320-
*
321-
* @since 2.5
322-
*/
323-
private static final class MetadataAwareInvocableHandlerMethod extends InvocableHandlerMethod {
324-
325-
MetadataAwareInvocableHandlerMethod(HandlerMethod handlerMethod) {
326-
super(handlerMethod);
327-
}
328-
329-
}
330-
331319
private static final class PayloadValidator extends PayloadMethodArgumentResolver {
332320

333321
PayloadValidator(Validator validator) {

Diff for: spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@
108108
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
109109
import org.springframework.kafka.listener.ListenerExecutionFailedException;
110110
import org.springframework.kafka.listener.MessageListenerContainer;
111+
import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata;
111112
import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter;
112113
import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter;
113114
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
@@ -447,6 +448,7 @@ public void testMulti() throws Exception {
447448

448449
template.send("annotated8", 0, 1, "junk");
449450
assertThat(this.multiListener.errorLatch.await(60, TimeUnit.SECONDS)).isTrue();
451+
assertThat(this.multiListener.meta).isNotNull();
450452
}
451453

452454
@Test
@@ -2302,18 +2304,21 @@ public CountDownLatch getLatch2() {
23022304
@KafkaListener(id = "multi", topics = "annotated8", errorHandler = "consumeMultiMethodException")
23032305
static class MultiListenerBean {
23042306

2305-
private final CountDownLatch latch1 = new CountDownLatch(1);
2307+
final CountDownLatch latch1 = new CountDownLatch(1);
23062308

2307-
private final CountDownLatch latch2 = new CountDownLatch(1);
2309+
final CountDownLatch latch2 = new CountDownLatch(1);
2310+
2311+
final CountDownLatch errorLatch = new CountDownLatch(1);
23082312

2309-
private final CountDownLatch errorLatch = new CountDownLatch(1);
2313+
volatile ConsumerRecordMetadata meta;
23102314

23112315
@KafkaHandler
2312-
public void bar(@NonNull String bar) {
2316+
public void bar(@NonNull String bar, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
23132317
if ("junk".equals(bar)) {
23142318
throw new RuntimeException("intentional");
23152319
}
23162320
else {
2321+
this.meta = meta;
23172322
this.latch1.countDown();
23182323
}
23192324
}

0 commit comments

Comments
 (0)