Skip to content

Commit d5501d8

Browse files
artembilangaryrussell
authored andcommitted
GH-3734: Support MessageHistory on Kafka deser
Fixes #3734 The `MessageHistory` is not in the trusted packages of the `DefaultKafkaHeaderMapper` therefore it fails when `MessageHistory.read()` is performed. * Configure default `DefaultKafkaHeaderMapper` in the `SubscribableKafkaChannel`, `KafkaInboundGateway`, `KafkaMessageDrivenChannelAdapter` and `KafkaMessageSource` to also trust packages exposed via `JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES` which include a `MessageHistory`, too. * Verify in some integration Kafka tests to be sure that `MessageHistory` is deserialized properly in the transferred headers * Rework some tests to use `@EmbeddedKafka` instead of `@BeforeAll/@AfterAll` **Cherry-pick to `5.5.x`**
1 parent bba83c7 commit d5501d8

File tree

10 files changed

+179
-119
lines changed

10 files changed

+179
-119
lines changed

spring-integration-core/src/main/java/org/springframework/integration/support/json/JacksonJsonUtils.java

+23-21
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.support.json;
1818

1919
import java.io.IOException;
20+
import java.io.Serial;
2021
import java.util.Arrays;
2122
import java.util.Collection;
2223
import java.util.LinkedHashSet;
@@ -51,6 +52,20 @@
5152
*/
5253
public final class JacksonJsonUtils {
5354

55+
/**
56+
* The packages to trust on JSON deserialization by default.
57+
*/
58+
public static final List<String> DEFAULT_TRUSTED_PACKAGES =
59+
List.of(
60+
"java.util",
61+
"java.lang",
62+
"org.springframework.messaging.support",
63+
"org.springframework.integration.support",
64+
"org.springframework.integration.message",
65+
"org.springframework.integration.store",
66+
"org.springframework.integration.history"
67+
);
68+
5469
private JacksonJsonUtils() {
5570
}
5671

@@ -99,17 +114,16 @@ public static ObjectMapper messagingAwareMapper(String... trustedPackages) {
99114

100115
/**
101116
* An implementation of {@link ObjectMapper.DefaultTypeResolverBuilder}
102-
* that wraps a default {@link TypeIdResolver} to the {@link AllowlistTypeIdResolver}.
117+
* that wraps a default {@link TypeIdResolver} to the {@link AllowListTypeIdResolver}.
103118
*
104119
* @author Rob Winch
105120
* @author Artem Bilan
106121
* @author Filip Hanik
107122
* @author Gary Russell
108-
*
109-
* @since 4.3.11
110123
*/
111124
private static final class AllowListTypeResolverBuilder extends ObjectMapper.DefaultTypeResolverBuilder {
112125

126+
@Serial
113127
private static final long serialVersionUID = 1L;
114128

115129
private final String[] trustedPackages;
@@ -133,8 +147,9 @@ protected TypeIdResolver idResolver(MapperConfig<?> config,
133147
JavaType baseType,
134148
PolymorphicTypeValidator subtypeValidator,
135149
Collection<NamedType> subtypes, boolean forSer, boolean forDeser) {
150+
136151
TypeIdResolver result = super.idResolver(config, baseType, subtypeValidator, subtypes, forSer, forDeser);
137-
return new AllowlistTypeIdResolver(result, this.trustedPackages);
152+
return new AllowListTypeIdResolver(result, this.trustedPackages);
138153
}
139154

140155
}
@@ -146,27 +161,14 @@ protected TypeIdResolver idResolver(MapperConfig<?> config,
146161
*
147162
* @author Rob Winch
148163
* @author Artem Bilan
149-
*
150-
* @since 4.3.11
151164
*/
152-
private static final class AllowlistTypeIdResolver implements TypeIdResolver {
153-
154-
private static final List<String> TRUSTED_PACKAGES =
155-
Arrays.asList(
156-
"java.util",
157-
"java.lang",
158-
"org.springframework.messaging.support",
159-
"org.springframework.integration.support",
160-
"org.springframework.integration.message",
161-
"org.springframework.integration.store",
162-
"org.springframework.integration.history"
163-
);
165+
private static final class AllowListTypeIdResolver implements TypeIdResolver {
164166

165167
private final TypeIdResolver delegate;
166168

167-
private final Set<String> trustedPackages = new LinkedHashSet<>(TRUSTED_PACKAGES);
169+
private final Set<String> trustedPackages = new LinkedHashSet<>(DEFAULT_TRUSTED_PACKAGES);
168170

169-
AllowlistTypeIdResolver(TypeIdResolver delegate, String... trustedPackages) {
171+
AllowListTypeIdResolver(TypeIdResolver delegate, String... trustedPackages) {
170172
this.delegate = delegate;
171173
if (trustedPackages != null) {
172174
for (String trustedPackage : trustedPackages) {

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/SubscribableKafkaChannel.java

+27-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2021 the original author or authors.
2+
* Copyright 2020-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,13 +22,18 @@
2222
import org.springframework.integration.dispatcher.MessageDispatcher;
2323
import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy;
2424
import org.springframework.integration.dispatcher.UnicastingDispatcher;
25+
import org.springframework.integration.support.json.JacksonJsonUtils;
2526
import org.springframework.integration.support.management.ManageableSmartLifecycle;
2627
import org.springframework.kafka.config.KafkaListenerContainerFactory;
2728
import org.springframework.kafka.core.KafkaOperations;
2829
import org.springframework.kafka.listener.ContainerProperties;
2930
import org.springframework.kafka.listener.MessageListenerContainer;
3031
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
3132
import org.springframework.kafka.support.Acknowledgment;
33+
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
34+
import org.springframework.kafka.support.JacksonPresent;
35+
import org.springframework.kafka.support.converter.MessagingMessageConverter;
36+
import org.springframework.kafka.support.converter.RecordMessageConverter;
3237
import org.springframework.messaging.MessageHandler;
3338
import org.springframework.messaging.SubscribableChannel;
3439
import org.springframework.util.Assert;
@@ -49,6 +54,8 @@ public class SubscribableKafkaChannel extends AbstractKafkaChannel implements Su
4954

5055
private final KafkaListenerContainerFactory<?> factory;
5156

57+
private final IntegrationRecordMessageListener recordListener = new IntegrationRecordMessageListener();
58+
5259
private MessageDispatcher dispatcher;
5360

5461
private MessageListenerContainer container;
@@ -71,6 +78,24 @@ public SubscribableKafkaChannel(KafkaOperations<?, ?> template, KafkaListenerCon
7178
super(template, channelTopic);
7279
Assert.notNull(factory, "'factory' cannot be null");
7380
this.factory = factory;
81+
82+
if (JacksonPresent.isJackson2Present()) {
83+
var messageConverter = new MessagingMessageConverter();
84+
var headerMapper = new DefaultKafkaHeaderMapper();
85+
headerMapper.addTrustedPackages(JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0]));
86+
messageConverter.setHeaderMapper(headerMapper);
87+
this.recordListener.setMessageConverter(messageConverter);
88+
}
89+
}
90+
91+
92+
/**
93+
* Set the {@link RecordMessageConverter} to the listener.
94+
* @param messageConverter the converter.
95+
* @since 6.0
96+
*/
97+
public void setMessageConverter(RecordMessageConverter messageConverter) {
98+
this.recordListener.setMessageConverter(messageConverter);
7499
}
75100

76101
@Override
@@ -113,7 +138,7 @@ protected void onInit() {
113138
String groupId = getGroupId();
114139
ContainerProperties containerProperties = this.container.getContainerProperties();
115140
containerProperties.setGroupId(groupId != null ? groupId : getBeanName());
116-
containerProperties.setMessageListener(new IntegrationRecordMessageListener());
141+
containerProperties.setMessageListener(this.recordListener);
117142
}
118143

119144
protected MessageDispatcher createDispatcher() {

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundGateway.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,19 @@
3636
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
3737
import org.springframework.integration.support.ErrorMessageUtils;
3838
import org.springframework.integration.support.MessageBuilder;
39+
import org.springframework.integration.support.json.JacksonJsonUtils;
3940
import org.springframework.kafka.core.KafkaTemplate;
4041
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
4142
import org.springframework.kafka.listener.ConsumerSeekAware;
4243
import org.springframework.kafka.listener.ContainerProperties;
4344
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
4445
import org.springframework.kafka.support.Acknowledgment;
46+
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
47+
import org.springframework.kafka.support.JacksonPresent;
4548
import org.springframework.kafka.support.KafkaHeaders;
4649
import org.springframework.kafka.support.converter.ConversionException;
4750
import org.springframework.kafka.support.converter.KafkaMessageHeaders;
51+
import org.springframework.kafka.support.converter.MessagingMessageConverter;
4852
import org.springframework.kafka.support.converter.RecordMessageConverter;
4953
import org.springframework.lang.Nullable;
5054
import org.springframework.messaging.Message;
@@ -108,6 +112,13 @@ public KafkaInboundGateway(AbstractMessageListenerContainer<K, V> messageListene
108112
this.messageListenerContainer.setAutoStartup(false);
109113
this.kafkaTemplate = kafkaTemplate;
110114
setErrorMessageStrategy(new RawRecordHeaderErrorMessageStrategy());
115+
if (JacksonPresent.isJackson2Present()) {
116+
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
117+
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
118+
headerMapper.addTrustedPackages(JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0]));
119+
messageConverter.setHeaderMapper(headerMapper);
120+
this.listener.setMessageConverter(messageConverter);
121+
}
111122
}
112123

113124
/**
@@ -169,6 +180,7 @@ public void setRecoveryCallback(RecoveryCallback<?> recoveryCallback) {
169180
*/
170181
public void setOnPartitionsAssignedSeekCallback(
171182
BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback) {
183+
172184
this.onPartitionsAssignedSeekCallback = onPartitionsAssignedCallback;
173185
}
174186

@@ -192,9 +204,6 @@ protected void onInit() {
192204
}
193205
}
194206
ContainerProperties containerProperties = this.messageListenerContainer.getContainerProperties();
195-
Object existing = containerProperties.getMessageListener();
196-
Assert.state(existing == null, () -> "listener container cannot have an existing message listener (" + existing
197-
+ ")");
198207
containerProperties.setMessageListener(this.listener);
199208
this.containerDeliveryAttemptPresent = containerProperties.isDeliveryAttemptHeader();
200209
}

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java

+18-11
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717
package org.springframework.integration.kafka.inbound;
1818

1919
import java.nio.ByteBuffer;
20+
import java.util.ArrayList;
2021
import java.util.List;
2122
import java.util.Map;
2223
import java.util.concurrent.atomic.AtomicInteger;
2324
import java.util.function.BiConsumer;
24-
import java.util.stream.Collectors;
2525

2626
import org.apache.kafka.clients.consumer.Consumer;
2727
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -37,6 +37,7 @@
3737
import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy;
3838
import org.springframework.integration.support.ErrorMessageUtils;
3939
import org.springframework.integration.support.MessageBuilder;
40+
import org.springframework.integration.support.json.JacksonJsonUtils;
4041
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
4142
import org.springframework.kafka.listener.BatchMessageListener;
4243
import org.springframework.kafka.listener.ConsumerSeekAware;
@@ -48,11 +49,14 @@
4849
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
4950
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
5051
import org.springframework.kafka.support.Acknowledgment;
52+
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
53+
import org.springframework.kafka.support.JacksonPresent;
5154
import org.springframework.kafka.support.KafkaHeaders;
5255
import org.springframework.kafka.support.converter.BatchMessageConverter;
5356
import org.springframework.kafka.support.converter.ConversionException;
5457
import org.springframework.kafka.support.converter.KafkaMessageHeaders;
5558
import org.springframework.kafka.support.converter.MessageConverter;
59+
import org.springframework.kafka.support.converter.MessagingMessageConverter;
5660
import org.springframework.kafka.support.converter.RecordMessageConverter;
5761
import org.springframework.lang.Nullable;
5862
import org.springframework.messaging.Message;
@@ -106,8 +110,6 @@ public class KafkaMessageDrivenChannelAdapter<K, V> extends MessageProducerSuppo
106110

107111
private boolean containerDeliveryAttemptPresent;
108112

109-
private boolean doFilterInRetry;
110-
111113
/**
112114
* Construct an instance with mode {@link ListenerMode#record}.
113115
* @param messageListenerContainer the container.
@@ -131,6 +133,15 @@ public KafkaMessageDrivenChannelAdapter(AbstractMessageListenerContainer<K, V> m
131133
this.messageListenerContainer.setAutoStartup(false);
132134
this.mode = mode;
133135
setErrorMessageStrategy(new RawRecordHeaderErrorMessageStrategy());
136+
137+
if (JacksonPresent.isJackson2Present()) {
138+
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
139+
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
140+
headerMapper.addTrustedPackages(JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0]));
141+
messageConverter.setHeaderMapper(headerMapper);
142+
this.recordListener.setMessageConverter(messageConverter);
143+
this.batchListener.setMessageConverter(messageConverter);
144+
}
134145
}
135146

136147
/**
@@ -149,7 +160,6 @@ else if (messageConverter instanceof BatchMessageConverter) {
149160
throw new IllegalArgumentException(
150161
"Message converter must be a 'RecordMessageConverter' or 'BatchMessageConverter'");
151162
}
152-
153163
}
154164

155165
/**
@@ -280,14 +290,11 @@ protected void onInit() {
280290
super.onInit();
281291

282292
ContainerProperties containerProperties = this.messageListenerContainer.getContainerProperties();
283-
Object existing = containerProperties.getMessageListener();
284-
Assert.state(existing == null, () -> "listener container cannot have an existing message listener (" + existing
285-
+ ")");
286293
if (this.mode.equals(ListenerMode.record)) {
287294
MessageListener<K, V> listener = this.recordListener;
288295

289-
this.doFilterInRetry = this.filterInRetry && this.retryTemplate != null
290-
&& this.recordFilterStrategy != null;
296+
boolean doFilterInRetry =
297+
this.filterInRetry && this.retryTemplate != null && this.recordFilterStrategy != null;
291298

292299
if (this.retryTemplate != null) {
293300
MessageChannel errorChannel = getErrorChannel();
@@ -296,7 +303,7 @@ protected void onInit() {
296303
}
297304
this.retryTemplate.registerListener(this.recordListener);
298305
}
299-
if (!this.doFilterInRetry && this.recordFilterStrategy != null) {
306+
if (!doFilterInRetry && this.recordFilterStrategy != null) {
300307
listener = new FilteringMessageListenerAdapter<>(listener, this.recordFilterStrategy,
301308
this.ackDiscarded);
302309
}
@@ -597,7 +604,7 @@ private Message<?> toMessage(List<ConsumerRecord<K, V>> records, Acknowledgment
597604
}
598605
catch (RuntimeException ex) {
599606
Exception exception = new ConversionException("Failed to convert to message",
600-
records.stream().collect(Collectors.toList()), ex);
607+
new ArrayList<>(records), ex);
601608
MessageChannel errorChannel = getErrorChannel();
602609
if (errorChannel != null) {
603610
getMessagingTemplate().send(errorChannel, buildErrorMessage(message, exception));

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java

+9
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,15 @@
5151
import org.springframework.integration.core.Pausable;
5252
import org.springframework.integration.endpoint.AbstractMessageSource;
5353
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
54+
import org.springframework.integration.support.json.JacksonJsonUtils;
5455
import org.springframework.kafka.core.ConsumerFactory;
5556
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
5657
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
5758
import org.springframework.kafka.listener.ConsumerProperties;
5859
import org.springframework.kafka.listener.LoggingCommitCallback;
5960
import org.springframework.kafka.support.Acknowledgment;
61+
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
62+
import org.springframework.kafka.support.JacksonPresent;
6063
import org.springframework.kafka.support.KafkaHeaders;
6164
import org.springframework.kafka.support.KafkaUtils;
6265
import org.springframework.kafka.support.LogIfLevelEnabled;
@@ -233,6 +236,12 @@ public KafkaMessageSource(ConsumerFactory<K, V> consumerFactory,
233236
this.assignTimeout =
234237
Duration.ofMillis(Math.max(this.pollTimeout.toMillis() * 20, MIN_ASSIGN_TIMEOUT)); // NOSONAR - magic
235238
this.commitTimeout = consumerProperties.getSyncCommitTimeout();
239+
240+
if (JacksonPresent.isJackson2Present()) {
241+
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
242+
headerMapper.addTrustedPackages(JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0]));
243+
((MessagingMessageConverter) this.messageConverter).setHeaderMapper(headerMapper);
244+
}
236245
}
237246

238247
/**

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/channnel/ChannelTests.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.springframework.beans.factory.annotation.Autowired;
3030
import org.springframework.context.annotation.Bean;
3131
import org.springframework.context.annotation.Configuration;
32+
import org.springframework.integration.channel.NullChannel;
33+
import org.springframework.integration.history.MessageHistory;
3234
import org.springframework.integration.kafka.channel.PollableKafkaChannel;
3335
import org.springframework.integration.kafka.channel.PublishSubscribeKafkaChannel;
3436
import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
@@ -53,6 +55,7 @@
5355

5456
/**
5557
* @author Gary Russell
58+
* @author Artem Bilan
5659
*
5760
* @since 5.4
5861
*
@@ -70,10 +73,18 @@ void subscribablePtp(@Autowired SubscribableChannel ptp) throws InterruptedExcep
7073
latch.countDown();
7174
});
7275
Message<?> msg = new GenericMessage<>("foo");
76+
NullChannel component = new NullChannel();
77+
component.setBeanName("myNullChannel");
78+
msg = MessageHistory.write(msg, component);
7379
ptp.send(msg, 10_000L);
7480
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
75-
assertThat(message.get().getPayload()).isEqualTo("foo");
76-
assertThat(message.get().getHeaders().get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo("channel.1");
81+
Message<?> received = message.get();
82+
assertThat(received.getPayload()).isEqualTo("foo");
83+
assertThat(received.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo("channel.1");
84+
85+
MessageHistory messageHistory = MessageHistory.read(received);
86+
assertThat(messageHistory).isNotNull();
87+
assertThat(messageHistory.toString()).isEqualTo("myNullChannel");
7788
}
7889

7990
@Test

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/KafkaMessageDrivenChannelAdapterParserTests.java

-2
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,6 @@ void testKafkaMessageDrivenChannelAdapterOptions() {
156156

157157
messageListener = containerProps.getMessageListener();
158158
assertThat(messageListener.getClass().getName()).contains("$IntegrationRecordMessageListener");
159-
160-
assertThat(adapter).extracting("doFilterInRetry").isEqualTo(Boolean.TRUE);
161159
}
162160

163161
}

0 commit comments

Comments
 (0)