Skip to content

Commit 6416235

Browse files
garyrussellartembilan
authored andcommitted
GH-2295: No Resolvers with ConsumerRecordMetadata
Resolves #2295 A subclass of `InvocableHandlerMethod` was used to signal that a method needs a `ConsumerRecordMetadata` to be constructed. However, the constructor that takes an existing IHM, does not fully clone the argument passed into it. Then, the payload resolver was not invoked to resolve that parameter. Use a map of booleans instead, to signal the creation of the metadata. **cherry-pick to 2.9.x, 2.8.x, 2.7.x**
1 parent 6fd78f2 commit 6416235

File tree

2 files changed

+20
-27
lines changed

2 files changed

+20
-27
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java

+11-23
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import org.springframework.messaging.Message;
4141
import org.springframework.messaging.MessageHeaders;
4242
import org.springframework.messaging.converter.MessageConverter;
43-
import org.springframework.messaging.handler.HandlerMethod;
4443
import org.springframework.messaging.handler.annotation.Header;
4544
import org.springframework.messaging.handler.annotation.SendTo;
4645
import org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver;
@@ -75,6 +74,8 @@ public class DelegatingInvocableHandler {
7574

7675
private final Map<InvocableHandlerMethod, Boolean> handlerReturnsMessage = new ConcurrentHashMap<>();
7776

77+
private final Map<InvocableHandlerMethod, Boolean> handlerMetadataAware = new ConcurrentHashMap<>();
78+
7879
private final Object bean;
7980

8081
private final BeanExpressionResolver resolver;
@@ -137,11 +138,12 @@ public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers,
137138
@Nullable BeanExpressionContext beanExpressionContext,
138139
@Nullable BeanFactory beanFactory, @Nullable Validator validator) {
139140

140-
this.handlers = new ArrayList<>();
141+
this.handlers = new ArrayList<>(handlers);
141142
for (InvocableHandlerMethod handler : handlers) {
142-
this.handlers.add(wrapIfNecessary(handler));
143+
checkSpecial(handler);
143144
}
144-
this.defaultHandler = wrapIfNecessary(defaultHandler);
145+
this.defaultHandler = defaultHandler;
146+
checkSpecial(defaultHandler);
145147
this.bean = bean;
146148
this.resolver = beanExpressionResolver;
147149
this.beanExpressionContext = beanExpressionContext;
@@ -151,18 +153,17 @@ public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers,
151153
this.validator = validator == null ? null : new PayloadValidator(validator);
152154
}
153155

154-
@Nullable
155-
private InvocableHandlerMethod wrapIfNecessary(@Nullable InvocableHandlerMethod handler) {
156+
private void checkSpecial(@Nullable InvocableHandlerMethod handler) {
156157
if (handler == null) {
157-
return null;
158+
return;
158159
}
159160
Parameter[] parameters = handler.getMethod().getParameters();
160161
for (Parameter parameter : parameters) {
161162
if (parameter.getType().equals(ConsumerRecordMetadata.class)) {
162-
return new DelegatingInvocableHandler.MetadataAwareInvocableHandlerMethod(handler);
163+
this.handlerMetadataAware.put(handler, true);
164+
return;
163165
}
164166
}
165-
return handler;
166167
}
167168

168169
/**
@@ -191,7 +192,7 @@ public Object invoke(Message<?> message, Object... providedArgs) throws Exceptio
191192
}
192193
}
193194
Object result;
194-
if (handler instanceof MetadataAwareInvocableHandlerMethod) {
195+
if (Boolean.TRUE.equals(this.handlerMetadataAware.get(handler))) {
195196
Object[] args = new Object[providedArgs.length + 1];
196197
args[0] = AdapterUtils.buildConsumerRecordMetadataFromArray(providedArgs);
197198
System.arraycopy(providedArgs, 0, args, 1, providedArgs.length);
@@ -350,19 +351,6 @@ public boolean hasDefaultHandler() {
350351
return this.defaultHandler != null;
351352
}
352353

353-
/**
354-
* A handler method that is aware of {@link ConsumerRecordMetadata}.
355-
*
356-
* @since 2.5
357-
*/
358-
private static final class MetadataAwareInvocableHandlerMethod extends InvocableHandlerMethod {
359-
360-
MetadataAwareInvocableHandlerMethod(HandlerMethod handlerMethod) {
361-
super(handlerMethod);
362-
}
363-
364-
}
365-
366354
private static final class PayloadValidator extends PayloadMethodArgumentResolver {
367355

368356
PayloadValidator(Validator validator) {

Diff for: spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@
107107
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
108108
import org.springframework.kafka.listener.ListenerExecutionFailedException;
109109
import org.springframework.kafka.listener.MessageListenerContainer;
110+
import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata;
110111
import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter;
111112
import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter;
112113
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
@@ -431,6 +432,7 @@ public void testMulti() throws Exception {
431432

432433
template.send("annotated8", 0, 1, "junk");
433434
assertThat(this.multiListener.errorLatch.await(60, TimeUnit.SECONDS)).isTrue();
435+
assertThat(this.multiListener.meta).isNotNull();
434436
}
435437

436438
@Test
@@ -2213,18 +2215,21 @@ public CountDownLatch getLatch2() {
22132215
@KafkaListener(id = "multi", topics = "annotated8", errorHandler = "consumeMultiMethodException")
22142216
static class MultiListenerBean {
22152217

2216-
private final CountDownLatch latch1 = new CountDownLatch(1);
2218+
final CountDownLatch latch1 = new CountDownLatch(1);
22172219

2218-
private final CountDownLatch latch2 = new CountDownLatch(1);
2220+
final CountDownLatch latch2 = new CountDownLatch(1);
2221+
2222+
final CountDownLatch errorLatch = new CountDownLatch(1);
22192223

2220-
private final CountDownLatch errorLatch = new CountDownLatch(1);
2224+
volatile ConsumerRecordMetadata meta;
22212225

22222226
@KafkaHandler
2223-
public void bar(@NonNull String bar) {
2227+
public void bar(@NonNull String bar, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
22242228
if ("junk".equals(bar)) {
22252229
throw new RuntimeException("intentional");
22262230
}
22272231
else {
2232+
this.meta = meta;
22282233
this.latch1.countDown();
22292234
}
22302235
}

0 commit comments

Comments
 (0)