Skip to content

Commit 1f3d7cd

Browse files
garyrussellartembilan
authored andcommitted
GH-652: JsonSerializer add null check for Headers
Fixes: #652 Kafka Streams `ChangedSerializer` calls the `Serde` with `null` in `headers`. **cherry-pick to 2.1.x, 2.0.x, 1.3.x** (cherry picked from commit ad28b91)
1 parent 19ea7f1 commit 1f3d7cd

File tree

3 files changed

+4
-2
lines changed

3 files changed

+4
-2
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonSerializer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ else if (config instanceof String) {
130130

131131
@Override
132132
public byte[] serialize(String topic, Headers headers, T data) {
133-
if (this.addTypeInfo) {
133+
if (this.addTypeInfo && headers != null) {
134134
this.typeMapper.fromJavaType(this.objectMapper.constructType(data.getClass()), headers);
135135
}
136136
return serialize(topic, data);

Diff for: src/reference/asciidoc/kafka.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -1433,7 +1433,7 @@ You can also extend them to implement some particular configuration logic in the
14331433
Starting with _version 2.1_, type information can be conveyed in record `Headers`, allowing the handling of multiple types.
14341434
In addition, the serializer/deserializer can be configured using Kafka properties.
14351435

1436-
- `JsonSerializer.ADD_TYPE_INFO_HEADERS` (default `true`); set to `false` to disable this feature.
1436+
- `JsonSerializer.ADD_TYPE_INFO_HEADERS` (default `true`); set to `false` to disable this feature on the `JsonSerializer` (sets the `addTypeInfo` property).
14371437
- `JsonDeserializer.KEY_DEFAULT_TYPE`; fallback type for deserialization of keys if no header information is present.
14381438
- `JsonDeserializer.VALUE_DEFAULT_TYPE`; fallback type for deserialization of values if no header information is present.
14391439
- `JsonDeserializer.TRUSTED_PACKAGES` (default `java.util`, `java.lang`); comma-delimited list of package patterns allowed for deserialization; `*` means deserialize all.

Diff for: src/reference/asciidoc/streams.adoc

+2
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ In the following example we use the `JsonSerde` to serialize and deserialize the
111111
stream.through(Serdes.Integer(), new JsonSerde<>(Foo.class), "foos");
112112
----
113113

114+
IMPORTANT: Since Kafka Streams do not support headers, the `addTypeInfo` property on the `JsonSerializer` is ignored.
115+
114116
==== Configuration
115117

116118
To configure the Kafka Streams environment, the `StreamsBuilderFactoryBean` requires a `Map` of particular properties or a `StreamsConfig` instance.

0 commit comments

Comments
 (0)