Skip to content

Commit bcd6843

Browse files
garyrussellartembilan
authored andcommitted
GH-693: Add BytesJsonMessageConverter
Resolves #693 **cherry-pick to 2.1.x**
1 parent 34172b6 commit bcd6843

File tree

4 files changed

+91
-10
lines changed

4 files changed

+91
-10
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support.converter;
18+
19+
import org.apache.kafka.common.utils.Bytes;
20+
21+
import org.springframework.messaging.Message;
22+
23+
import com.fasterxml.jackson.core.JsonProcessingException;
24+
import com.fasterxml.jackson.databind.ObjectMapper;
25+
26+
/**
27+
* JSON Message converter - {@code Bytes} on output, String, Bytes, or byte[] on input.
28+
* Used in conjunction with Kafka {@code BytesSerializer/BytesDeserializer}. More
29+
* efficient than {@link StringJsonMessageConverter} because the {@code String/byte[]} is
30+
* avoided.
31+
*
32+
* @author Gary Russell
33+
* @since 2.1.7
34+
*
35+
*/
36+
public class BytesJsonMessageConverter extends StringJsonMessageConverter {
37+
38+
public BytesJsonMessageConverter() {
39+
super();
40+
}
41+
42+
public BytesJsonMessageConverter(ObjectMapper objectMapper) {
43+
super(objectMapper);
44+
}
45+
46+
@Override
47+
protected Object convertPayload(Message<?> message) {
48+
try {
49+
return Bytes.wrap(getObjectMapper().writeValueAsBytes(message.getPayload()));
50+
}
51+
catch (JsonProcessingException e) {
52+
throw new ConversionException("Failed to convert to JSON", e);
53+
}
54+
}
55+
56+
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/support/converter/StringJsonMessageConverter.java

+18-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@
2222
import org.apache.kafka.clients.consumer.ConsumerRecord;
2323
import org.apache.kafka.common.header.Headers;
2424
import org.apache.kafka.common.header.internals.RecordHeaders;
25+
import org.apache.kafka.common.utils.Bytes;
2526

2627
import org.springframework.kafka.support.KafkaNull;
2728
import org.springframework.kafka.support.converter.Jackson2JavaTypeMapper.TypePrecedence;
@@ -36,7 +37,10 @@
3637
import com.fasterxml.jackson.databind.type.TypeFactory;
3738

3839
/**
39-
* JSON Message converter - String on output, String or byte[] on input.
40+
* JSON Message converter - String on output, String, Bytes, or byte[] on input. Used in
41+
* conjunction with Kafka
42+
* {@code StringSerializer/StringDeserializer or BytesDeserializer}. Consider using the
43+
* BytesJsonMessageConverter instead.
4044
*
4145
* @author Gary Russell
4246
* @author Artem Bilan
@@ -73,6 +77,15 @@ public void setTypeMapper(Jackson2JavaTypeMapper typeMapper) {
7377
this.typeMapper = typeMapper;
7478
}
7579

80+
/**
81+
* Return the object mapper.
82+
* @return the mapper.
83+
* @since 2.1.7
84+
*/
85+
protected ObjectMapper getObjectMapper() {
86+
return this.objectMapper;
87+
}
88+
7689
@Override
7790
protected Headers initialRecordHeaders(Message<?> message) {
7891
RecordHeaders headers = new RecordHeaders();
@@ -104,6 +117,9 @@ protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, Type type)
104117
if (javaType == null) { // no headers
105118
javaType = TypeFactory.defaultInstance().constructType(type);
106119
}
120+
if (value instanceof Bytes) {
121+
value = ((Bytes) value).get();
122+
}
107123
if (value instanceof String) {
108124
try {
109125
return this.objectMapper.readValue((String) value, javaType);

Diff for: spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
import java.util.concurrent.TimeUnit;
2626

2727
import org.apache.kafka.clients.consumer.ConsumerConfig;
28+
import org.apache.kafka.clients.producer.ProducerConfig;
29+
import org.apache.kafka.common.serialization.BytesDeserializer;
30+
import org.apache.kafka.common.serialization.BytesSerializer;
2831
import org.junit.ClassRule;
2932
import org.junit.Test;
3033
import org.junit.runner.RunWith;
@@ -40,7 +43,7 @@
4043
import org.springframework.kafka.core.ProducerFactory;
4144
import org.springframework.kafka.support.KafkaHeaders;
4245
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
43-
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
46+
import org.springframework.kafka.support.converter.BytesJsonMessageConverter;
4447
import org.springframework.kafka.test.rule.KafkaEmbedded;
4548
import org.springframework.kafka.test.utils.KafkaTestUtils;
4649
import org.springframework.messaging.Message;
@@ -128,6 +131,7 @@ public DefaultKafkaConsumerFactory<Integer, String> consumerFactory() {
128131
public Map<String, Object> consumerConfigs() {
129132
Map<String, Object> consumerProps =
130133
KafkaTestUtils.consumerProps(DEFAULT_TEST_GROUP_ID, "false", embeddedKafka);
134+
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
131135
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
132136
return consumerProps;
133137
}
@@ -140,8 +144,8 @@ public KafkaTemplate<Integer, Foo> template() {
140144
}
141145

142146
@Bean
143-
public StringJsonMessageConverter converter() {
144-
return new StringJsonMessageConverter();
147+
public BytesJsonMessageConverter converter() {
148+
return new BytesJsonMessageConverter();
145149
}
146150

147151
@Bean
@@ -151,7 +155,9 @@ public ProducerFactory<Integer, Foo> producerFactory() {
151155

152156
@Bean
153157
public Map<String, Object> producerConfigs() {
154-
return KafkaTestUtils.producerProps(embeddedKafka);
158+
Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka);
159+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, BytesSerializer.class);
160+
return props;
155161
}
156162

157163
@Bean

Diff for: src/reference/asciidoc/kafka.adoc

+7-4
Original file line numberDiff line numberDiff line change
@@ -994,7 +994,7 @@ static class MultiListenerBean {
994994
Starting with _version 2.1.3_, a `@KafkaHandler` method can be designated as the default method which is invoked if there is no match on other methods.
995995
At most one method can be so designated.
996996
When using `@KafkaHandler` methods, the payload must have already been converted to the domain object (so the match can be performed).
997-
Use a custom deserializer, the `JsonDeserializer` or the `StringJsonMessageConverter` with its `TypePrecedence` set to `TYPE_ID` - see <<serdes>> for more information.
997+
Use a custom deserializer, the `JsonDeserializer` or the `(String|Bytes)JsonMessageConverter` with its `TypePrecedence` set to `TYPE_ID` - see <<serdes>> for more information.
998998

999999
[[kafkalistener-lifecycle]]
10001000
===== @KafkaListener Lifecycle Management
@@ -1471,7 +1471,7 @@ In addition, the serializer/deserializer can be configured using Kafka propertie
14711471
Although the `Serializer`/`Deserializer` API is quite simple and flexible from the low-level Kafka `Consumer` and
14721472
`Producer` perspective, you might need more flexibility at the Spring Messaging level, either when using `@KafkaListener` or <<si-kafka,Spring Integration>>.
14731473
To easily convert to/from `org.springframework.messaging.Message`, Spring for Apache Kafka provides a `MessageConverter`
1474-
abstraction with the `MessagingMessageConverter` implementation and its `StringJsonMessageConverter` customization.
1474+
abstraction with the `MessagingMessageConverter` implementation and its `StringJsonMessageConverter` and `BytesJsonMessageConverter` customization.
14751475
The `MessageConverter` can be injected into `KafkaTemplate` instance directly and via
14761476
`AbstractKafkaListenerContainerFactory` bean definition for the `@KafkaListener.containerFactory()` property:
14771477

@@ -1502,14 +1502,17 @@ With a class-level `@KafkaListener`, the payload type is used to select which `@
15021502
====
15031503

15041504
NOTE: When using the `StringJsonMessageConverter`, you should use a `StringDeserializer` in the kafka consumer configuration and `StringSerializer` in the kafka producer configuration, when using Spring Integration or the `KafkaTemplate.send(Message<?> message)` method.
1505+
When using the `BytesJsonMessageConverter`, you should use a `BytesDeserializer` in the kafka consumer configuration and `BytesSerializer` in the kafka producer configuration, when using Spring Integration or the `KafkaTemplate.send(Message<?> message)` method.
1506+
Generally, the `BytesJsonMessageConverter` is more efficient because it avoids a `String` to/from `byte[]` conversion.
15051507

15061508
[[payload-conversion-with-batch]]
15071509
===== Payload Conversion with Batch Listeners
15081510

1509-
Starting with _version 1.3.2_, you can also use a `StringJsonMessageConverter` within a `BatchMessagingMessageConverter` for converting batch messages, when using a batch listener container factory.
1511+
Starting with _version 1.3.2_, you can also use a `StringJsonMessageConverter` or `BytesJsonMessageConverter` within a `BatchMessagingMessageConverter` for converting batch messages, when using a batch listener container factory.
1512+
See <<serdes>> for more information.
15101513

15111514
By default, the type for the conversion is inferred from the listener argument.
1512-
If you configure the `StringJsonMessageConverter` with a `DefaultJackson2TypeMapper` that has its `TypePrecedence` set to `TYPE_ID` (instead of the default `INFERRED`), then the converter will use type information in headers (if present) instead.
1515+
If you configure the `(Bytes|String)JsonMessageConverter` with a `DefaultJackson2TypeMapper` that has its `TypePrecedence` set to `TYPE_ID` (instead of the default `INFERRED`), then the converter will use type information in headers (if present) instead.
15131516
This allows, for example, listener methods to be declared with interfaces instead of concrete classes.
15141517
Also, the type converter supports mapping so the deserialization can be to a different type than the source (as long as the data is compatible).
15151518
This is also useful when using <<class-level-kafkalistener,class-level `@KafkaListener` s>> where the payload must have already been converted, to determine which method to invoke.

0 commit comments

Comments
 (0)