Skip to content

Call (De)Serializer.configure() in the respective setters of DefaultKafkaProducerFactory and DefaultKafkaConsumerFactory #1879

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
artembilan opened this issue Jul 21, 2021 · 0 comments · Fixed by #1907
Assignees
Milestone

Comments

@artembilan
Copy link
Member

artembilan commented Jul 21, 2021

Expected Behavior

End-user should be free from finding the way to propagate configuration properties to their custom (De)Serializier when they configure it programmatically.
The general idea for the target solution is to call (De)Serializer.configure() directly in the DefaultKafkaProducerFactory or DefaultKafkaConsumerFactory when respective setter is used for external (De)Serializier .
Such a behavior of those setters must be mentioned in their JavaDocs.
Out-of-the-box (De)Serializier (e.g. JsonSerializer) must be guarded with some state like initialized to not call a configure() if that has been called externally, e.g. in projects which don't have this fix yet.
Another consistency matter is about to not call configure() if some setters of that (De)Serializer has been called: or setters configuration, or via properties, but not both.

Current Behavior

We have to call configure() explicitly in the factory customizer before setting (de)serializer into that factory back:

    public void customize(DefaultKafkaProducerFactory<?, ?> producerFactory) {
        JsonSerializer<Object> jsonSerializer = new JsonSerializer<>(objectMapper());
        jsonSerializer.configure(producerFactory.getConfigurationProperties(), false);
        producerFactory.setValueSerializer((JsonSerializer) jsonSerializer);
    }

Context

More info in this SO thread: https://stackoverflow.com/questions/68472258/spring-json-type-mapping-property-ignored-when-specifying-a-custom-objectmapper

@artembilan artembilan transferred this issue from spring-projects/spring-integration Jul 21, 2021
@garyrussell garyrussell added this to the 2.8.0-M2 milestone Aug 11, 2021
@garyrussell garyrussell self-assigned this Aug 11, 2021
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Aug 11, 2021
Resolves spring-projects#1879

It was not intuitive that configuration properties were not applied when
construcing (De)Serializers programmatically.

The producer and consumer factories now call the `configure()` method.
However, the JSON implementations cannot be configured with a mixture
of setters and configuration properties.
artembilan pushed a commit that referenced this issue Aug 12, 2021
Resolves #1879

It was not intuitive that configuration properties were not applied when
constructing (De)Serializers programmatically.

The producer and consumer factories now call the `configure()` method.
However, the JSON implementations cannot be configured with a mixture
of setters and configuration properties.

* Assert no mixed config; synchronize.
artembilan added a commit to artembilan/spring-kafka that referenced this issue Dec 17, 2021
Fixes spring-projects#2050

The application can have several consumer factories when one fully relies
on the configuration properties for its deserializers and other
configures them programmatically.
The consumer factory now calls `configure()` on the `Deserializer`
independently of its origins.
See spring-projects#1879
In this case the `ErrorHandlingDeserializer` consults
`spring.deserializer.key.delegate.class` or `spring.deserializer.value.delegate.class`
for its delegate overriding provided explicitly programmatically

* Fix `ErrorHandlingDeserializer` to check it `delegate` and `failedDeserializationFunction`
for null before taking their values from the respective configuration properties
* Add `spring.deserializer.value.delegate.class` property to `testJsonSerDeIgnoreTypeHeadersInbound()`
configuration to ensure that it does not override an explicit `JsonDeserializer` delegate
* Fix warning in the `EmbeddedKafkaBroker` about `this.` prefix for a `static logger` property
garyrussell pushed a commit that referenced this issue Dec 20, 2021
Fixes #2050

The application can have several consumer factories when one fully relies
on the configuration properties for its deserializers and other
configures them programmatically.
The consumer factory now calls `configure()` on the `Deserializer`
independently of its origins.
See #1879
In this case the `ErrorHandlingDeserializer` consults
`spring.deserializer.key.delegate.class` or `spring.deserializer.value.delegate.class`
for its delegate overriding provided explicitly programmatically

* Fix `ErrorHandlingDeserializer` to check it `delegate` and `failedDeserializationFunction`
for null before taking their values from the respective configuration properties
* Add `spring.deserializer.value.delegate.class` property to `testJsonSerDeIgnoreTypeHeadersInbound()`
configuration to ensure that it does not override an explicit `JsonDeserializer` delegate
* Fix warning in the `EmbeddedKafkaBroker` about `this.` prefix for a `static logger` property
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants