Skip to content

Commit eec3e95

Browse files
artembilangaryrussell
authored andcommitted
GH-3734: Support MessageHistory on Kafka deser
Fixes #3734 The `MessageHistory` is not in the trusted packages of the `DefaultKafkaHeaderMapper` therefore it fails when `MessageHistory.read()` is performed. * Configure default `DefaultKafkaHeaderMapper` in the `SubscribableKafkaChannel`, `KafkaInboundGateway`, `KafkaMessageDrivenChannelAdapter` and `KafkaMessageSource` to also trust packages exposed via `JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES` which include a `MessageHistory`, too. * Verify in some integration Kafka tests to be sure that `MessageHistory` is deserialized properly in the transferred headers * Rework some tests to use `@EmbeddedKafka` instead of `@BeforeAll/@AfterAll` **Cherry-pick to `5.5.x`**
1 parent 675b3b4 commit eec3e95

File tree

9 files changed

+175
-107
lines changed

9 files changed

+175
-107
lines changed

Diff for: spring-integration-core/src/main/java/org/springframework/integration/support/json/JacksonJsonUtils.java

+21-21
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -51,6 +51,20 @@
5151
*/
5252
public final class JacksonJsonUtils {
5353

54+
/**
55+
* The packages to trust on JSON deserialization by default.
56+
*/
57+
public static final List<String> DEFAULT_TRUSTED_PACKAGES =
58+
List.of(
59+
"java.util",
60+
"java.lang",
61+
"org.springframework.messaging.support",
62+
"org.springframework.integration.support",
63+
"org.springframework.integration.message",
64+
"org.springframework.integration.store",
65+
"org.springframework.integration.history"
66+
);
67+
5468
private JacksonJsonUtils() {
5569
}
5670

@@ -99,14 +113,12 @@ public static ObjectMapper messagingAwareMapper(String... trustedPackages) {
99113

100114
/**
101115
* An implementation of {@link ObjectMapper.DefaultTypeResolverBuilder}
102-
* that wraps a default {@link TypeIdResolver} to the {@link AllowlistTypeIdResolver}.
116+
* that wraps a default {@link TypeIdResolver} to the {@link AllowListTypeIdResolver}.
103117
*
104118
* @author Rob Winch
105119
* @author Artem Bilan
106120
* @author Filip Hanik
107121
* @author Gary Russell
108-
*
109-
* @since 4.3.11
110122
*/
111123
private static final class AllowListTypeResolverBuilder extends ObjectMapper.DefaultTypeResolverBuilder {
112124

@@ -133,8 +145,9 @@ protected TypeIdResolver idResolver(MapperConfig<?> config,
133145
JavaType baseType,
134146
PolymorphicTypeValidator subtypeValidator,
135147
Collection<NamedType> subtypes, boolean forSer, boolean forDeser) {
148+
136149
TypeIdResolver result = super.idResolver(config, baseType, subtypeValidator, subtypes, forSer, forDeser);
137-
return new AllowlistTypeIdResolver(result, this.trustedPackages);
150+
return new AllowListTypeIdResolver(result, this.trustedPackages);
138151
}
139152

140153
}
@@ -146,27 +159,14 @@ protected TypeIdResolver idResolver(MapperConfig<?> config,
146159
*
147160
* @author Rob Winch
148161
* @author Artem Bilan
149-
*
150-
* @since 4.3.11
151162
*/
152-
private static final class AllowlistTypeIdResolver implements TypeIdResolver {
153-
154-
private static final List<String> TRUSTED_PACKAGES =
155-
Arrays.asList(
156-
"java.util",
157-
"java.lang",
158-
"org.springframework.messaging.support",
159-
"org.springframework.integration.support",
160-
"org.springframework.integration.message",
161-
"org.springframework.integration.store",
162-
"org.springframework.integration.history"
163-
);
163+
private static final class AllowListTypeIdResolver implements TypeIdResolver {
164164

165165
private final TypeIdResolver delegate;
166166

167-
private final Set<String> trustedPackages = new LinkedHashSet<>(TRUSTED_PACKAGES);
167+
private final Set<String> trustedPackages = new LinkedHashSet<>(DEFAULT_TRUSTED_PACKAGES);
168168

169-
AllowlistTypeIdResolver(TypeIdResolver delegate, String... trustedPackages) {
169+
AllowListTypeIdResolver(TypeIdResolver delegate, String... trustedPackages) {
170170
this.delegate = delegate;
171171
if (trustedPackages != null) {
172172
for (String trustedPackage : trustedPackages) {

Diff for: spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/SubscribableKafkaChannel.java

+27-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2021 the original author or authors.
2+
* Copyright 2020-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,13 +22,18 @@
2222
import org.springframework.integration.dispatcher.MessageDispatcher;
2323
import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy;
2424
import org.springframework.integration.dispatcher.UnicastingDispatcher;
25+
import org.springframework.integration.support.json.JacksonJsonUtils;
2526
import org.springframework.integration.support.management.ManageableSmartLifecycle;
2627
import org.springframework.kafka.config.KafkaListenerContainerFactory;
2728
import org.springframework.kafka.core.KafkaOperations;
2829
import org.springframework.kafka.listener.ContainerProperties;
2930
import org.springframework.kafka.listener.MessageListenerContainer;
3031
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
3132
import org.springframework.kafka.support.Acknowledgment;
33+
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
34+
import org.springframework.kafka.support.JacksonPresent;
35+
import org.springframework.kafka.support.converter.MessagingMessageConverter;
36+
import org.springframework.kafka.support.converter.RecordMessageConverter;
3237
import org.springframework.messaging.MessageHandler;
3338
import org.springframework.messaging.SubscribableChannel;
3439
import org.springframework.util.Assert;
@@ -49,6 +54,8 @@ public class SubscribableKafkaChannel extends AbstractKafkaChannel implements Su
4954

5055
private final KafkaListenerContainerFactory<?> factory;
5156

57+
private final IntegrationRecordMessageListener recordListener = new IntegrationRecordMessageListener();
58+
5259
private MessageDispatcher dispatcher;
5360

5461
private MessageListenerContainer container;
@@ -71,6 +78,24 @@ public SubscribableKafkaChannel(KafkaOperations<?, ?> template, KafkaListenerCon
7178
super(template, channelTopic);
7279
Assert.notNull(factory, "'factory' cannot be null");
7380
this.factory = factory;
81+
82+
if (JacksonPresent.isJackson2Present()) {
83+
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
84+
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
85+
headerMapper.addTrustedPackages(JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0]));
86+
messageConverter.setHeaderMapper(headerMapper);
87+
this.recordListener.setMessageConverter(messageConverter);
88+
}
89+
}
90+
91+
92+
/**
93+
* Set the {@link RecordMessageConverter} to the listener.
94+
* @param messageConverter the converter.
95+
* @since 6.0
96+
*/
97+
public void setMessageConverter(RecordMessageConverter messageConverter) {
98+
this.recordListener.setMessageConverter(messageConverter);
7499
}
75100

76101
@Override
@@ -113,7 +138,7 @@ protected void onInit() {
113138
String groupId = getGroupId();
114139
ContainerProperties containerProperties = this.container.getContainerProperties();
115140
containerProperties.setGroupId(groupId != null ? groupId : getBeanName());
116-
containerProperties.setMessageListener(new IntegrationRecordMessageListener());
141+
containerProperties.setMessageListener(this.recordListener);
117142
}
118143

119144
protected MessageDispatcher createDispatcher() {

Diff for: spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundGateway.java

+12
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,20 @@
3535
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
3636
import org.springframework.integration.support.ErrorMessageUtils;
3737
import org.springframework.integration.support.MessageBuilder;
38+
import org.springframework.integration.support.json.JacksonJsonUtils;
3839
import org.springframework.kafka.core.KafkaTemplate;
3940
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
4041
import org.springframework.kafka.listener.ConsumerSeekAware;
4142
import org.springframework.kafka.listener.ContainerProperties;
4243
import org.springframework.kafka.listener.MessageListener;
4344
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
4445
import org.springframework.kafka.support.Acknowledgment;
46+
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
47+
import org.springframework.kafka.support.JacksonPresent;
4548
import org.springframework.kafka.support.KafkaHeaders;
4649
import org.springframework.kafka.support.converter.ConversionException;
4750
import org.springframework.kafka.support.converter.KafkaMessageHeaders;
51+
import org.springframework.kafka.support.converter.MessagingMessageConverter;
4852
import org.springframework.kafka.support.converter.RecordMessageConverter;
4953
import org.springframework.messaging.Message;
5054
import org.springframework.messaging.MessageChannel;
@@ -106,6 +110,13 @@ public KafkaInboundGateway(AbstractMessageListenerContainer<K, V> messageListene
106110
this.messageListenerContainer.setAutoStartup(false);
107111
this.kafkaTemplate = kafkaTemplate;
108112
setErrorMessageStrategy(new RawRecordHeaderErrorMessageStrategy());
113+
if (JacksonPresent.isJackson2Present()) {
114+
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
115+
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
116+
headerMapper.addTrustedPackages(JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0]));
117+
messageConverter.setHeaderMapper(headerMapper);
118+
this.listener.setMessageConverter(messageConverter);
119+
}
109120
}
110121

111122
/**
@@ -158,6 +169,7 @@ public void setRecoveryCallback(RecoveryCallback<?> recoveryCallback) {
158169
*/
159170
public void setOnPartitionsAssignedSeekCallback(
160171
BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback) {
172+
161173
this.onPartitionsAssignedSeekCallback = onPartitionsAssignedCallback;
162174
}
163175

Diff for: spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java

+16-4
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717
package org.springframework.integration.kafka.inbound;
1818

1919
import java.nio.ByteBuffer;
20+
import java.util.ArrayList;
2021
import java.util.List;
2122
import java.util.Map;
2223
import java.util.concurrent.atomic.AtomicInteger;
2324
import java.util.function.BiConsumer;
24-
import java.util.stream.Collectors;
2525

2626
import org.apache.kafka.clients.consumer.Consumer;
2727
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -36,6 +36,7 @@
3636
import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy;
3737
import org.springframework.integration.support.ErrorMessageUtils;
3838
import org.springframework.integration.support.MessageBuilder;
39+
import org.springframework.integration.support.json.JacksonJsonUtils;
3940
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
4041
import org.springframework.kafka.listener.BatchMessageListener;
4142
import org.springframework.kafka.listener.ConsumerSeekAware;
@@ -47,11 +48,14 @@
4748
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
4849
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
4950
import org.springframework.kafka.support.Acknowledgment;
51+
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
52+
import org.springframework.kafka.support.JacksonPresent;
5053
import org.springframework.kafka.support.KafkaHeaders;
5154
import org.springframework.kafka.support.converter.BatchMessageConverter;
5255
import org.springframework.kafka.support.converter.ConversionException;
5356
import org.springframework.kafka.support.converter.KafkaMessageHeaders;
5457
import org.springframework.kafka.support.converter.MessageConverter;
58+
import org.springframework.kafka.support.converter.MessagingMessageConverter;
5559
import org.springframework.kafka.support.converter.RecordMessageConverter;
5660
import org.springframework.messaging.Message;
5761
import org.springframework.messaging.MessageChannel;
@@ -128,6 +132,15 @@ public KafkaMessageDrivenChannelAdapter(AbstractMessageListenerContainer<K, V> m
128132
this.messageListenerContainer.setAutoStartup(false);
129133
this.mode = mode;
130134
setErrorMessageStrategy(new RawRecordHeaderErrorMessageStrategy());
135+
136+
if (JacksonPresent.isJackson2Present()) {
137+
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
138+
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
139+
headerMapper.addTrustedPackages(JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0]));
140+
messageConverter.setHeaderMapper(headerMapper);
141+
this.recordListener.setMessageConverter(messageConverter);
142+
this.batchListener.setMessageConverter(messageConverter);
143+
}
131144
}
132145

133146
/**
@@ -146,7 +159,6 @@ else if (messageConverter instanceof BatchMessageConverter) {
146159
throw new IllegalArgumentException(
147160
"Message converter must be a 'RecordMessageConverter' or 'BatchMessageConverter'");
148161
}
149-
150162
}
151163

152164
/**
@@ -526,9 +538,9 @@ public void onMessage(List<ConsumerRecord<K, V>> records, Acknowledgment acknowl
526538
message = toMessagingMessage(records, acknowledgment, consumer);
527539
setAttributesIfNecessary(records, message);
528540
}
529-
catch (RuntimeException e) {
541+
catch (RuntimeException ex) {
530542
Exception exception = new ConversionException("Failed to convert to message",
531-
records.stream().collect(Collectors.toList()), e);
543+
new ArrayList<>(records), ex);
532544
MessageChannel errorChannel = getErrorChannel();
533545
if (errorChannel != null) {
534546
getMessagingTemplate().send(errorChannel, new ErrorMessage(exception));

Diff for: spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java

+9
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,16 @@
5151
import org.springframework.integration.core.Pausable;
5252
import org.springframework.integration.endpoint.AbstractMessageSource;
5353
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
54+
import org.springframework.integration.support.json.JacksonJsonUtils;
5455
import org.springframework.kafka.core.ConsumerFactory;
5556
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
5657
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
5758
import org.springframework.kafka.listener.ConsumerProperties;
5859
import org.springframework.kafka.listener.ListenerUtils;
5960
import org.springframework.kafka.listener.LoggingCommitCallback;
6061
import org.springframework.kafka.support.Acknowledgment;
62+
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
63+
import org.springframework.kafka.support.JacksonPresent;
6164
import org.springframework.kafka.support.KafkaHeaders;
6265
import org.springframework.kafka.support.LogIfLevelEnabled;
6366
import org.springframework.kafka.support.TopicPartitionOffset;
@@ -233,6 +236,12 @@ public KafkaMessageSource(ConsumerFactory<K, V> consumerFactory,
233236
this.assignTimeout =
234237
Duration.ofMillis(Math.max(this.pollTimeout.toMillis() * 20, MIN_ASSIGN_TIMEOUT)); // NOSONAR - magic
235238
this.commitTimeout = consumerProperties.getSyncCommitTimeout();
239+
240+
if (JacksonPresent.isJackson2Present()) {
241+
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
242+
headerMapper.addTrustedPackages(JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0]));
243+
((MessagingMessageConverter) this.messageConverter).setHeaderMapper(headerMapper);
244+
}
236245
}
237246

238247
/**

Diff for: spring-integration-kafka/src/test/java/org/springframework/integration/kafka/channnel/ChannelTests.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.springframework.beans.factory.annotation.Autowired;
3030
import org.springframework.context.annotation.Bean;
3131
import org.springframework.context.annotation.Configuration;
32+
import org.springframework.integration.channel.NullChannel;
33+
import org.springframework.integration.history.MessageHistory;
3234
import org.springframework.integration.kafka.channel.PollableKafkaChannel;
3335
import org.springframework.integration.kafka.channel.PublishSubscribeKafkaChannel;
3436
import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
@@ -53,6 +55,7 @@
5355

5456
/**
5557
* @author Gary Russell
58+
* @author Artem Bilan
5659
*
5760
* @since 5.4
5861
*
@@ -70,10 +73,18 @@ void subscribablePtp(@Autowired SubscribableChannel ptp) throws InterruptedExcep
7073
latch.countDown();
7174
});
7275
Message<?> msg = new GenericMessage<>("foo");
76+
NullChannel component = new NullChannel();
77+
component.setBeanName("myNullChannel");
78+
msg = MessageHistory.write(msg, component);
7379
ptp.send(msg, 10_000L);
7480
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
75-
assertThat(message.get().getPayload()).isEqualTo("foo");
76-
assertThat(message.get().getHeaders().get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo("channel.1");
81+
Message<?> received = message.get();
82+
assertThat(received.getPayload()).isEqualTo("foo");
83+
assertThat(received.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo("channel.1");
84+
85+
MessageHistory messageHistory = MessageHistory.read(received);
86+
assertThat(messageHistory).isNotNull();
87+
assertThat(messageHistory.toString()).isEqualTo("myNullChannel");
7788
}
7889

7990
@Test

0 commit comments

Comments
 (0)