Skip to content

GH-1879: Factories Now Configure (De)Serializers #1907

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,8 @@ void removeConfig(String configKey);
----
====

Starting with version 2.8, if you provide serializers as objects (in the constructor or via the setters), the factory will invoke the `configure()` method to configure them with the configuration properties.

[[replying-template]]
===== Using `ReplyingKafkaTemplate`

Expand Down Expand Up @@ -1133,6 +1135,8 @@ Defining `authorizationExceptionRetryInterval` should help the application to re

NOTE: By default, no interval is configured - authorization errors are considered fatal, which causes the container to stop.

Starting with version 2.8, when creating the consumer factory, if you provide deserializers as objects (in the constructor or via the setters), the factory will invoke the `configure()` method to configure them with the configuration properties.

[[using-ConcurrentMessageListenerContainer]]
====== Using `ConcurrentMessageListenerContainer`

Expand Down Expand Up @@ -3943,6 +3947,9 @@ You can revert to the previous behavior by setting the `removeTypeHeaders` prope

See also <<tip-json>>.

IMPORTANT: Starting with version 2.8, if you construct the serializer or deserializer programmatically as shown in <<prog-json>>, the above properties will be applied by the factories, as long as you have not set any properties explicitly (using `set*()` methods or using the fluent API).
Previously, when creating programmatically, the configuration properties were never applied; this is still the case if you explicitly set properties on the object directly.

[[serdes-mapping-types]]
====== Mapping Types

Expand Down Expand Up @@ -4073,6 +4080,7 @@ public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, H
----
====

[[prog-json]]
====== Programmatic Construction

When constructing the serializer/deserializer programmatically for use in the producer/consumer factory, since version 2.3, you can use the fluent API, which simplifies configuration.
Expand Down Expand Up @@ -4122,6 +4130,8 @@ JsonDeserializer<Object> deser = new JsonDeserializer<>()
----
====

Alternatively, as long as you don't use the fluent API to configure properties, or set them using `set*()` methods, the factories will configure the serializer/deserializer using the configuration properties; see <<serdes-json-config>>.

[[delegating-serialization]]
===== Delegating Serializer and Deserializer

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ public DefaultKafkaConsumerFactory(Map<String, Object> configs) {

/**
* Construct a factory with the provided configuration and deserializers.
* The deserializers' {@code configure()} methods will be called with the
* configuration map.
* @param configs the configuration.
* @param keyDeserializer the key {@link Deserializer}.
* @param valueDeserializer the value {@link Deserializer}.
Expand All @@ -113,8 +115,10 @@ public DefaultKafkaConsumerFactory(Map<String, Object> configs,

/**
* Construct a factory with the provided configuration and deserializer suppliers.
* When the suppliers are invoked to get an instance, the deserializers'
* {@code configure()} methods will be called with the configuration map.
* @param configs the configuration.
* @param keyDeserializerSupplier the key {@link Deserializer} supplier function.
* @param keyDeserializerSupplier the key {@link Deserializer} supplier function.
* @param valueDeserializerSupplier the value {@link Deserializer} supplier function.
* @since 2.3
*/
Expand All @@ -123,8 +127,32 @@ public DefaultKafkaConsumerFactory(Map<String, Object> configs,
@Nullable Supplier<Deserializer<V>> valueDeserializerSupplier) {

this.configs = new ConcurrentHashMap<>(configs);
this.keyDeserializerSupplier = keyDeserializerSupplier == null ? () -> null : keyDeserializerSupplier;
this.valueDeserializerSupplier = valueDeserializerSupplier == null ? () -> null : valueDeserializerSupplier;
this.keyDeserializerSupplier = keyDeserializerSupplier(keyDeserializerSupplier);
this.valueDeserializerSupplier = valueDeserializerSupplier(valueDeserializerSupplier);
}

private Supplier<Deserializer<K>> keyDeserializerSupplier(Supplier<Deserializer<K>> keyDeserializerSupplier) {
return keyDeserializerSupplier == null
? () -> null
: () -> {
Deserializer<K> deserializer = keyDeserializerSupplier.get();
if (deserializer != null) {
deserializer.configure(this.configs, true);
}
return deserializer;
};
}

private Supplier<Deserializer<V>> valueDeserializerSupplier(Supplier<Deserializer<V>> valueDeserializerSupplier) {
return valueDeserializerSupplier == null
? () -> null
: () -> {
Deserializer<V> deserializer = valueDeserializerSupplier.get();
if (deserializer != null) {
deserializer.configure(this.configs, false);
}
return deserializer;
};
}

@Override
Expand All @@ -137,15 +165,33 @@ public void setBeanName(String name) {
* @param keyDeserializer the deserializer.
*/
public void setKeyDeserializer(@Nullable Deserializer<K> keyDeserializer) {
this.keyDeserializerSupplier = () -> keyDeserializer;
this.keyDeserializerSupplier = keyDeserializerSupplier(() -> keyDeserializer);
}

/**
* Set the value deserializer.
* @param valueDeserializer the valuee deserializer.
*/
public void setValueDeserializer(@Nullable Deserializer<V> valueDeserializer) {
this.valueDeserializerSupplier = () -> valueDeserializer;
this.valueDeserializerSupplier = valueDeserializerSupplier(() -> valueDeserializer);
}

/**
* Set a supplier to supply instances of the key deserializer.
* @param keyDeserializerSupplier the supplier.
* @since 2.8
*/
public void setKeyDeserializerSupplier(Supplier<Deserializer<K>> keyDeserializerSupplier) {
this.keyDeserializerSupplier = keyDeserializerSupplier(keyDeserializerSupplier);
}

/**
* Set a supplier to supply instances of the value deserializer.
* @param valueDeserializerSupplier the supplier.
* @since 2.8
*/
public void setValueDeserializerSupplier(Supplier<Deserializer<V>> valueDeserializerSupplier) {
this.valueDeserializerSupplier = valueDeserializerSupplier(valueDeserializerSupplier);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory

private Supplier<Serializer<V>> valueSerializerSupplier;

private Supplier<Serializer<K>> rawKeySerializerSupplier;

private Supplier<Serializer<V>> rawValueSerializerSupplier;

private Duration physicalCloseTimeout = DEFAULT_PHYSICAL_CLOSE_TIMEOUT;

private ApplicationContext applicationContext;
Expand Down Expand Up @@ -168,6 +172,8 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs) {
* Also configures a {@link #transactionIdPrefix} as a value from the
* {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} if provided.
* This config is going to be overridden with a suffix for target {@link Producer} instance.
* The serializers' {@code configure()} methods will be called with the
* configuration map.
* @param configs the configuration.
* @param keySerializer the key {@link Serializer}.
* @param valueSerializer the value {@link Serializer}.
Expand All @@ -184,6 +190,8 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
* Also configures a {@link #transactionIdPrefix} as a value from the
* {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} if provided.
* This config is going to be overridden with a suffix for target {@link Producer} instance.
* When the suppliers are invoked to get an instance, the serializers'
* {@code configure()} methods will be called with the configuration map.
* @param configs the configuration.
* @param keySerializerSupplier the key {@link Serializer} supplier function.
* @param valueSerializerSupplier the value {@link Serializer} supplier function.
Expand All @@ -194,12 +202,11 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
@Nullable Supplier<Serializer<V>> valueSerializerSupplier) {

this.configs = new ConcurrentHashMap<>(configs);
this.keySerializerSupplier = keySerializerSupplier == null ? () -> null : keySerializerSupplier;
this.valueSerializerSupplier = valueSerializerSupplier == null ? () -> null : valueSerializerSupplier;
this.keySerializerSupplier = keySerializerSupplier(keySerializerSupplier);
this.valueSerializerSupplier = valueSerializerSupplier(valueSerializerSupplier);
if (this.clientIdPrefix == null && configs.get(ProducerConfig.CLIENT_ID_CONFIG) instanceof String) {
this.clientIdPrefix = (String) configs.get(ProducerConfig.CLIENT_ID_CONFIG);
}

String txId = (String) this.configs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
if (StringUtils.hasText(txId)) {
setTransactionIdPrefix(txId);
Expand All @@ -208,6 +215,32 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
this.configs.put("internal.auto.downgrade.txn.commit", true);
}

private Supplier<Serializer<K>> keySerializerSupplier(Supplier<Serializer<K>> keySerializerSupplier) {
this.rawKeySerializerSupplier = keySerializerSupplier;
return keySerializerSupplier == null
? () -> null
: () -> {
Serializer<K> serializer = keySerializerSupplier.get();
if (serializer != null) {
serializer.configure(this.configs, true);
}
return serializer;
};
}

private Supplier<Serializer<V>> valueSerializerSupplier(Supplier<Serializer<V>> valueSerializerSupplier) {
this.rawValueSerializerSupplier = valueSerializerSupplier;
return valueSerializerSupplier == null
? () -> null
: () -> {
Serializer<V> serializer = valueSerializerSupplier.get();
if (serializer != null) {
serializer.configure(this.configs, false);
}
return serializer;
};
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
Expand All @@ -223,15 +256,33 @@ public void setBeanName(String name) {
* @param keySerializer the key serializer.
*/
public void setKeySerializer(@Nullable Serializer<K> keySerializer) {
this.keySerializerSupplier = () -> keySerializer;
this.keySerializerSupplier = keySerializerSupplier(() -> keySerializer);
}

/**
* Set a value serializer.
* @param valueSerializer the value serializer.
*/
public void setValueSerializer(@Nullable Serializer<V> valueSerializer) {
this.valueSerializerSupplier = () -> valueSerializer;
this.valueSerializerSupplier = valueSerializerSupplier(() -> valueSerializer);
}

/**
* Set a supplier to supply instances of the key serializer.
* @param keySerializerSupplier the supplier.
* @since 2.8
*/
public void setKeySerializerSupplier(Supplier<Serializer<K>> keySerializerSupplier) {
this.keySerializerSupplier = keySerializerSupplier;
}

/**
* Set a supplier to supply instances of the value serializer.
* @param valueSerializerSupplier the supplier.
* @since 2.8
*/
public void setValueSerializerSupplier(Supplier<Serializer<V>> valueSerializerSupplier) {
this.valueSerializerSupplier = valueSerializerSupplier;
}

/**
Expand Down Expand Up @@ -313,14 +364,27 @@ public boolean isProducerPerConsumerPartition() {
return this.producerPerConsumerPartition;
}


@Override
@Nullable
public Serializer<K> getKeySerializer() {
return this.keySerializerSupplier.get();
}

@Override
@Nullable
public Serializer<V> getValueSerializer() {
return this.valueSerializerSupplier.get();
}

@Override
public Supplier<Serializer<K>> getKeySerializerSupplier() {
return this.keySerializerSupplier;
return this.rawKeySerializerSupplier;
}

@Override
public Supplier<Serializer<V>> getValueSerializerSupplier() {
return this.valueSerializerSupplier;
return this.rawValueSerializerSupplier;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,29 @@ default void updateConfigs(Map<String, Object> updates) {
default void removeConfig(String configKey) {
}

/**
* Return the configured key serializer (if provided as an object instead
* of a class name in the properties).
* @return the serializer.
* @since 2.8
*/
@Nullable
default Serializer<K> getKeySerializer() {
return null;
}

/**
* Return the configured value serializer (if provided as an object instead
* of a class name in the properties).
* @return the serializer.
* @since 2.8
*/
@Nullable
default Serializer<V> getValueSerializer() {
return null;
}


/**
* Called whenever a producer is added or removed.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,8 +18,8 @@

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
Expand Down Expand Up @@ -72,9 +72,9 @@ public abstract class AbstractJavaTypeMapper implements BeanClassLoaderAware {
*/
public static final String KEY_DEFAULT_KEY_CLASSID_FIELD_NAME = "__Key_KeyTypeId__";

private final Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
private final Map<String, Class<?>> idClassMapping = new ConcurrentHashMap<String, Class<?>>();

private final Map<Class<?>, byte[]> classIdMapping = new HashMap<Class<?>, byte[]>();
private final Map<Class<?>, byte[]> classIdMapping = new ConcurrentHashMap<Class<?>, byte[]>();

private String classIdFieldName = DEFAULT_CLASSID_FIELD_NAME;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2020 the original author or authors.
* Copyright 2019-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -83,6 +83,7 @@ public void configure(Map<String, ?> configs, boolean isKey) {
((Map<String, Object>) value).forEach((selector, deser) -> {
if (deser instanceof Deserializer) {
this.delegates.put(selector, (Deserializer<?>) deser);
((Deserializer<?>) deser).configure(configs, isKey);
}
else if (deser instanceof Class) {
instantiateAndConfigure(configs, isKey, this.delegates, selector, (Class<?>) deser);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2020 the original author or authors.
* Copyright 2019-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -108,6 +108,7 @@ else if (value instanceof Map) {
((Map<String, Object>) value).forEach((selector, serializer) -> {
if (serializer instanceof Serializer) {
this.delegates.put(selector, (Serializer<?>) serializer);
((Serializer<?>) serializer).configure(configs, isKey);
}
else if (serializer instanceof Class) {
instantiateAndConfigure(configs, isKey, this.delegates, selector, (Class<?>) serializer);
Expand Down
Loading