Skip to content

Commit a19dd47

Browse files
committed
spring-projectsGH-2050: Configure ErrHandlDeser if no delegate
Fixes spring-projects#2050 The application can have several consumer factories when one fully relies on the configuration properties for its deserializers and other configures them programmatically. The consumer factory now calls `configure()` on the `Deserializer` independently of its origins. See spring-projects#1879 In this case the `ErrorHandlingDeserializer` consults `spring.deserializer.key.delegate.class` or `spring.deserializer.value.delegate.class` for its delegate overriding provided explicitly programmatically * Fix `ErrorHandlingDeserializer` to check it `delegate` and `failedDeserializationFunction` for null before taking their values from the respective configuration properties * Add `spring.deserializer.value.delegate.class` property to `testJsonSerDeIgnoreTypeHeadersInbound()` configuration to ensure that it does not override an explicit `JsonDeserializer` delegate * Fix warning in the `EmbeddedKafkaBroker` about `this.` prefix for a `static logger` property
1 parent 1fd454a commit a19dd47

File tree

3 files changed

+9
-4
lines changed

3 files changed

+9
-4
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -595,7 +595,7 @@ private boolean brokerRunning(KafkaServer kafkaServer) {
595595
}
596596
}
597597
else {
598-
this.logger.debug("Could not determine broker state during shutdown");
598+
logger.debug("Could not determine broker state during shutdown");
599599
return true;
600600
}
601601
}

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/ErrorHandlingDeserializer.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -132,11 +132,15 @@ public ErrorHandlingDeserializer<T> keyDeserializer(boolean isKey) {
132132

133133
@Override
134134
public void configure(Map<String, ?> configs, boolean isKey) {
135-
setupDelegate(configs, isKey ? KEY_DESERIALIZER_CLASS : VALUE_DESERIALIZER_CLASS);
135+
if (this.delegate == null) {
136+
setupDelegate(configs, isKey ? KEY_DESERIALIZER_CLASS : VALUE_DESERIALIZER_CLASS);
137+
}
136138
Assert.state(this.delegate != null, "No delegate deserializer configured");
137139
this.delegate.configure(configs, isKey);
138140
this.isForKey = isKey;
139-
setupFunction(configs, isKey ? KEY_FUNCTION : VALUE_FUNCTION);
141+
if (this.failedDeserializationFunction == null) {
142+
setupFunction(configs, isKey ? KEY_FUNCTION : VALUE_FUNCTION);
143+
}
140144
}
141145

142146
public void setupDelegate(Map<String, ?> configs, String configKey) {

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -2257,7 +2257,8 @@ public void testJsonSerDeTypeMappings() throws Exception {
22572257
public void testJsonSerDeIgnoreTypeHeadersInbound() throws Exception {
22582258
this.logger.info("Start JSON4");
22592259
Map<String, Object> props = KafkaTestUtils.consumerProps("testJson", "false", embeddedKafka);
2260-
2260+
props.put("spring.deserializer.value.delegate.class",
2261+
"org.apache.kafka.common.serialization.StringDeserializer");
22612262
ErrorHandlingDeserializer<Foo1> errorHandlingDeserializer =
22622263
new ErrorHandlingDeserializer<>(new JsonDeserializer<>(Foo1.class, false));
22632264

0 commit comments

Comments
 (0)