Skip to content

Commit ea50386

Browse files
garyrussellartembilan
authored andcommitted
GH-2154: Fix Tombstones with Delegating Seriali...
Resolves #2154 Check for null and return. **cherry-pick to 2.8.x, 2.7.x (partial)** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializer.java
1 parent 8ec7ed0 commit ea50386

File tree

3 files changed

+14
-2
lines changed

3 files changed

+14
-2
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTopicSerializer.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2021 the original author or authors.
2+
* Copyright 2019-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -74,6 +74,9 @@ public byte[] serialize(String topic, Object data) {
7474
@SuppressWarnings("unchecked")
7575
@Override
7676
public byte[] serialize(String topic, Headers headers, Object data) {
77+
if (data == null) {
78+
return null;
79+
}
7780
return ((Serializer<Object>) findDelegate(topic)).serialize(topic, headers, data);
7881
}
7982

Diff for: spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializer.java

+6
Original file line numberDiff line numberDiff line change
@@ -89,13 +89,19 @@ public void configure(Map<String, ?> configs, boolean isKey) {
8989
@SuppressWarnings({ RAWTYPES, "unchecked" })
9090
@Override
9191
public byte[] serialize(String topic, Object data) {
92+
if (data == null) {
93+
return null;
94+
}
9295
Serializer delegate = findDelegate(data, this.delegates);
9396
return delegate.serialize(topic, data);
9497
}
9598

9699
@SuppressWarnings({ "unchecked", RAWTYPES })
97100
@Override
98101
public byte[] serialize(String topic, Headers headers, Object data) {
102+
if (data == null) {
103+
return null;
104+
}
99105
Serializer delegate = findDelegate(data, this.delegates);
100106
return delegate.serialize(topic, headers, data);
101107
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingSerializer.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2021 the original author or authors.
2+
* Copyright 2019-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -190,6 +190,9 @@ public byte[] serialize(String topic, Object data) {
190190

191191
@Override
192192
public byte[] serialize(String topic, Headers headers, Object data) {
193+
if (data == null) {
194+
return null;
195+
}
193196
byte[] value = null;
194197
String selectorKey = selectorKey();
195198
Header header = headers.lastHeader(selectorKey);

0 commit comments

Comments
 (0)