Skip to content

Commit 852c447

Browse files
authored
GH-1879: Factories Now Configure (De)Serializers
Resolves #1879 It was not intuitive that configuration properties were not applied when constructing (De)Serializers programmatically. The producer and consumer factories now call the `configure()` method. However, the JSON implementations cannot be configured with a mixture of setters and configuration properties. * Assert no mixed config; synchronize.
1 parent 523fa05 commit 852c447

File tree

12 files changed

+299
-29
lines changed

12 files changed

+299
-29
lines changed

Diff for: spring-kafka-docs/src/main/asciidoc/kafka.adoc

+10
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,8 @@ void removeConfig(String configKey);
529529
----
530530
====
531531

532+
Starting with version 2.8, if you provide serializers as objects (in the constructor or via the setters), the factory will invoke the `configure()` method to configure them with the configuration properties.
533+
532534
[[replying-template]]
533535
===== Using `ReplyingKafkaTemplate`
534536

@@ -1133,6 +1135,8 @@ Defining `authorizationExceptionRetryInterval` should help the application to re
11331135

11341136
NOTE: By default, no interval is configured - authorization errors are considered fatal, which causes the container to stop.
11351137

1138+
Starting with version 2.8, when creating the consumer factory, if you provide deserializers as objects (in the constructor or via the setters), the factory will invoke the `configure()` method to configure them with the configuration properties.
1139+
11361140
[[using-ConcurrentMessageListenerContainer]]
11371141
====== Using `ConcurrentMessageListenerContainer`
11381142

@@ -3943,6 +3947,9 @@ You can revert to the previous behavior by setting the `removeTypeHeaders` prope
39433947

39443948
See also <<tip-json>>.
39453949

3950+
IMPORTANT: Starting with version 2.8, if you construct the serializer or deserializer programmatically as shown in <<prog-json>>, the above properties will be applied by the factories, as long as you have not set any properties explicitly (using `set*()` methods or using the fluent API).
3951+
Previously, when creating programmatically, the configuration properties were never applied; this is still the case if you explicitly set properties on the object directly.
3952+
39463953
[[serdes-mapping-types]]
39473954
====== Mapping Types
39483955

@@ -4073,6 +4080,7 @@ public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, H
40734080
----
40744081
====
40754082

4083+
[[prog-json]]
40764084
====== Programmatic Construction
40774085

40784086
When constructing the serializer/deserializer programmatically for use in the producer/consumer factory, since version 2.3, you can use the fluent API, which simplifies configuration.
@@ -4122,6 +4130,8 @@ JsonDeserializer<Object> deser = new JsonDeserializer<>()
41224130
----
41234131
====
41244132

4133+
Alternatively, as long as you don't use the fluent API to configure properties, or set them using `set*()` methods, the factories will configure the serializer/deserializer using the configuration properties; see <<serdes-json-config>>.
4134+
41254135
[[delegating-serialization]]
41264136
===== Delegating Serializer and Deserializer
41274137

Diff for: spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java

+51-5
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ public DefaultKafkaConsumerFactory(Map<String, Object> configs) {
100100

101101
/**
102102
* Construct a factory with the provided configuration and deserializers.
103+
* The deserializers' {@code configure()} methods will be called with the
104+
* configuration map.
103105
* @param configs the configuration.
104106
* @param keyDeserializer the key {@link Deserializer}.
105107
* @param valueDeserializer the value {@link Deserializer}.
@@ -113,8 +115,10 @@ public DefaultKafkaConsumerFactory(Map<String, Object> configs,
113115

114116
/**
115117
* Construct a factory with the provided configuration and deserializer suppliers.
118+
* When the suppliers are invoked to get an instance, the deserializers'
119+
* {@code configure()} methods will be called with the configuration map.
116120
* @param configs the configuration.
117-
* @param keyDeserializerSupplier the key {@link Deserializer} supplier function.
121+
* @param keyDeserializerSupplier the key {@link Deserializer} supplier function.
118122
* @param valueDeserializerSupplier the value {@link Deserializer} supplier function.
119123
* @since 2.3
120124
*/
@@ -123,8 +127,32 @@ public DefaultKafkaConsumerFactory(Map<String, Object> configs,
123127
@Nullable Supplier<Deserializer<V>> valueDeserializerSupplier) {
124128

125129
this.configs = new ConcurrentHashMap<>(configs);
126-
this.keyDeserializerSupplier = keyDeserializerSupplier == null ? () -> null : keyDeserializerSupplier;
127-
this.valueDeserializerSupplier = valueDeserializerSupplier == null ? () -> null : valueDeserializerSupplier;
130+
this.keyDeserializerSupplier = keyDeserializerSupplier(keyDeserializerSupplier);
131+
this.valueDeserializerSupplier = valueDeserializerSupplier(valueDeserializerSupplier);
132+
}
133+
134+
private Supplier<Deserializer<K>> keyDeserializerSupplier(Supplier<Deserializer<K>> keyDeserializerSupplier) {
135+
return keyDeserializerSupplier == null
136+
? () -> null
137+
: () -> {
138+
Deserializer<K> deserializer = keyDeserializerSupplier.get();
139+
if (deserializer != null) {
140+
deserializer.configure(this.configs, true);
141+
}
142+
return deserializer;
143+
};
144+
}
145+
146+
private Supplier<Deserializer<V>> valueDeserializerSupplier(Supplier<Deserializer<V>> valueDeserializerSupplier) {
147+
return valueDeserializerSupplier == null
148+
? () -> null
149+
: () -> {
150+
Deserializer<V> deserializer = valueDeserializerSupplier.get();
151+
if (deserializer != null) {
152+
deserializer.configure(this.configs, false);
153+
}
154+
return deserializer;
155+
};
128156
}
129157

130158
@Override
@@ -137,15 +165,33 @@ public void setBeanName(String name) {
137165
* @param keyDeserializer the deserializer.
138166
*/
139167
public void setKeyDeserializer(@Nullable Deserializer<K> keyDeserializer) {
140-
this.keyDeserializerSupplier = () -> keyDeserializer;
168+
this.keyDeserializerSupplier = keyDeserializerSupplier(() -> keyDeserializer);
141169
}
142170

143171
/**
144172
* Set the value deserializer.
145173
* @param valueDeserializer the valuee deserializer.
146174
*/
147175
public void setValueDeserializer(@Nullable Deserializer<V> valueDeserializer) {
148-
this.valueDeserializerSupplier = () -> valueDeserializer;
176+
this.valueDeserializerSupplier = valueDeserializerSupplier(() -> valueDeserializer);
177+
}
178+
179+
/**
180+
* Set a supplier to supply instances of the key deserializer.
181+
* @param keyDeserializerSupplier the supplier.
182+
* @since 2.8
183+
*/
184+
public void setKeyDeserializerSupplier(Supplier<Deserializer<K>> keyDeserializerSupplier) {
185+
this.keyDeserializerSupplier = keyDeserializerSupplier(keyDeserializerSupplier);
186+
}
187+
188+
/**
189+
* Set a supplier to supply instances of the value deserializer.
190+
* @param valueDeserializerSupplier the supplier.
191+
* @since 2.8
192+
*/
193+
public void setValueDeserializerSupplier(Supplier<Deserializer<V>> valueDeserializerSupplier) {
194+
this.valueDeserializerSupplier = valueDeserializerSupplier(valueDeserializerSupplier);
149195
}
150196

151197
@Override

Diff for: spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

+71-7
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,10 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
137137

138138
private Supplier<Serializer<V>> valueSerializerSupplier;
139139

140+
private Supplier<Serializer<K>> rawKeySerializerSupplier;
141+
142+
private Supplier<Serializer<V>> rawValueSerializerSupplier;
143+
140144
private Duration physicalCloseTimeout = DEFAULT_PHYSICAL_CLOSE_TIMEOUT;
141145

142146
private ApplicationContext applicationContext;
@@ -168,6 +172,8 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs) {
168172
* Also configures a {@link #transactionIdPrefix} as a value from the
169173
* {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} if provided.
170174
* This config is going to be overridden with a suffix for target {@link Producer} instance.
175+
* The serializers' {@code configure()} methods will be called with the
176+
* configuration map.
171177
* @param configs the configuration.
172178
* @param keySerializer the key {@link Serializer}.
173179
* @param valueSerializer the value {@link Serializer}.
@@ -184,6 +190,8 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
184190
* Also configures a {@link #transactionIdPrefix} as a value from the
185191
* {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} if provided.
186192
* This config is going to be overridden with a suffix for target {@link Producer} instance.
193+
* When the suppliers are invoked to get an instance, the serializers'
194+
* {@code configure()} methods will be called with the configuration map.
187195
* @param configs the configuration.
188196
* @param keySerializerSupplier the key {@link Serializer} supplier function.
189197
* @param valueSerializerSupplier the value {@link Serializer} supplier function.
@@ -194,12 +202,11 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
194202
@Nullable Supplier<Serializer<V>> valueSerializerSupplier) {
195203

196204
this.configs = new ConcurrentHashMap<>(configs);
197-
this.keySerializerSupplier = keySerializerSupplier == null ? () -> null : keySerializerSupplier;
198-
this.valueSerializerSupplier = valueSerializerSupplier == null ? () -> null : valueSerializerSupplier;
205+
this.keySerializerSupplier = keySerializerSupplier(keySerializerSupplier);
206+
this.valueSerializerSupplier = valueSerializerSupplier(valueSerializerSupplier);
199207
if (this.clientIdPrefix == null && configs.get(ProducerConfig.CLIENT_ID_CONFIG) instanceof String) {
200208
this.clientIdPrefix = (String) configs.get(ProducerConfig.CLIENT_ID_CONFIG);
201209
}
202-
203210
String txId = (String) this.configs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
204211
if (StringUtils.hasText(txId)) {
205212
setTransactionIdPrefix(txId);
@@ -208,6 +215,32 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
208215
this.configs.put("internal.auto.downgrade.txn.commit", true);
209216
}
210217

218+
private Supplier<Serializer<K>> keySerializerSupplier(Supplier<Serializer<K>> keySerializerSupplier) {
219+
this.rawKeySerializerSupplier = keySerializerSupplier;
220+
return keySerializerSupplier == null
221+
? () -> null
222+
: () -> {
223+
Serializer<K> serializer = keySerializerSupplier.get();
224+
if (serializer != null) {
225+
serializer.configure(this.configs, true);
226+
}
227+
return serializer;
228+
};
229+
}
230+
231+
private Supplier<Serializer<V>> valueSerializerSupplier(Supplier<Serializer<V>> valueSerializerSupplier) {
232+
this.rawValueSerializerSupplier = valueSerializerSupplier;
233+
return valueSerializerSupplier == null
234+
? () -> null
235+
: () -> {
236+
Serializer<V> serializer = valueSerializerSupplier.get();
237+
if (serializer != null) {
238+
serializer.configure(this.configs, false);
239+
}
240+
return serializer;
241+
};
242+
}
243+
211244
@Override
212245
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
213246
this.applicationContext = applicationContext;
@@ -223,15 +256,33 @@ public void setBeanName(String name) {
223256
* @param keySerializer the key serializer.
224257
*/
225258
public void setKeySerializer(@Nullable Serializer<K> keySerializer) {
226-
this.keySerializerSupplier = () -> keySerializer;
259+
this.keySerializerSupplier = keySerializerSupplier(() -> keySerializer);
227260
}
228261

229262
/**
230263
* Set a value serializer.
231264
* @param valueSerializer the value serializer.
232265
*/
233266
public void setValueSerializer(@Nullable Serializer<V> valueSerializer) {
234-
this.valueSerializerSupplier = () -> valueSerializer;
267+
this.valueSerializerSupplier = valueSerializerSupplier(() -> valueSerializer);
268+
}
269+
270+
/**
271+
* Set a supplier to supply instances of the key serializer.
272+
* @param keySerializerSupplier the supplier.
273+
* @since 2.8
274+
*/
275+
public void setKeySerializerSupplier(Supplier<Serializer<K>> keySerializerSupplier) {
276+
this.keySerializerSupplier = keySerializerSupplier;
277+
}
278+
279+
/**
280+
* Set a supplier to supply instances of the value serializer.
281+
* @param valueSerializerSupplier the supplier.
282+
* @since 2.8
283+
*/
284+
public void setValueSerializerSupplier(Supplier<Serializer<V>> valueSerializerSupplier) {
285+
this.valueSerializerSupplier = valueSerializerSupplier;
235286
}
236287

237288
/**
@@ -313,14 +364,27 @@ public boolean isProducerPerConsumerPartition() {
313364
return this.producerPerConsumerPartition;
314365
}
315366

367+
368+
@Override
369+
@Nullable
370+
public Serializer<K> getKeySerializer() {
371+
return this.keySerializerSupplier.get();
372+
}
373+
374+
@Override
375+
@Nullable
376+
public Serializer<V> getValueSerializer() {
377+
return this.valueSerializerSupplier.get();
378+
}
379+
316380
@Override
317381
public Supplier<Serializer<K>> getKeySerializerSupplier() {
318-
return this.keySerializerSupplier;
382+
return this.rawKeySerializerSupplier;
319383
}
320384

321385
@Override
322386
public Supplier<Serializer<V>> getValueSerializerSupplier() {
323-
return this.valueSerializerSupplier;
387+
return this.rawValueSerializerSupplier;
324388
}
325389

326390
/**

Diff for: spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactory.java

+23
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,29 @@ default void updateConfigs(Map<String, Object> updates) {
250250
default void removeConfig(String configKey) {
251251
}
252252

253+
/**
254+
* Return the configured key serializer (if provided as an object instead
255+
* of a class name in the properties).
256+
* @return the serializer.
257+
* @since 2.8
258+
*/
259+
@Nullable
260+
default Serializer<K> getKeySerializer() {
261+
return null;
262+
}
263+
264+
/**
265+
* Return the configured value serializer (if provided as an object instead
266+
* of a class name in the properties).
267+
* @return the serializer.
268+
* @since 2.8
269+
*/
270+
@Nullable
271+
default Serializer<V> getValueSerializer() {
272+
return null;
273+
}
274+
275+
253276
/**
254277
* Called whenever a producer is added or removed.
255278
*

Diff for: spring-kafka/src/main/java/org/springframework/kafka/support/converter/AbstractJavaTypeMapper.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,8 +18,8 @@
1818

1919
import java.nio.charset.StandardCharsets;
2020
import java.util.Collections;
21-
import java.util.HashMap;
2221
import java.util.Map;
22+
import java.util.concurrent.ConcurrentHashMap;
2323

2424
import org.apache.kafka.common.header.Header;
2525
import org.apache.kafka.common.header.Headers;
@@ -72,9 +72,9 @@ public abstract class AbstractJavaTypeMapper implements BeanClassLoaderAware {
7272
*/
7373
public static final String KEY_DEFAULT_KEY_CLASSID_FIELD_NAME = "__Key_KeyTypeId__";
7474

75-
private final Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
75+
private final Map<String, Class<?>> idClassMapping = new ConcurrentHashMap<String, Class<?>>();
7676

77-
private final Map<Class<?>, byte[]> classIdMapping = new HashMap<Class<?>, byte[]>();
77+
private final Map<Class<?>, byte[]> classIdMapping = new ConcurrentHashMap<Class<?>, byte[]>();
7878

7979
private String classIdFieldName = DEFAULT_CLASSID_FIELD_NAME;
8080

Diff for: spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingDeserializer.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2020 the original author or authors.
2+
* Copyright 2019-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -83,6 +83,7 @@ public void configure(Map<String, ?> configs, boolean isKey) {
8383
((Map<String, Object>) value).forEach((selector, deser) -> {
8484
if (deser instanceof Deserializer) {
8585
this.delegates.put(selector, (Deserializer<?>) deser);
86+
((Deserializer<?>) deser).configure(configs, isKey);
8687
}
8788
else if (deser instanceof Class) {
8889
instantiateAndConfigure(configs, isKey, this.delegates, selector, (Class<?>) deser);

Diff for: spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingSerializer.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2020 the original author or authors.
2+
* Copyright 2019-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -108,6 +108,7 @@ else if (value instanceof Map) {
108108
((Map<String, Object>) value).forEach((selector, serializer) -> {
109109
if (serializer instanceof Serializer) {
110110
this.delegates.put(selector, (Serializer<?>) serializer);
111+
((Serializer<?>) serializer).configure(configs, isKey);
111112
}
112113
else if (serializer instanceof Class) {
113114
instantiateAndConfigure(configs, isKey, this.delegates, selector, (Class<?>) serializer);

0 commit comments

Comments
 (0)