You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
for more complex or particular cases, the `KafkaConsumer`, and therefore `KafkaProducer`, provides overloaded
1558
1558
constructors to accept `(De)Serializer` instances for `keys` and/or `values`, respectively.
1559
1559
1560
-
To meet this API, the `DefaultKafkaProducerFactory` and `DefaultKafkaConsumerFactory` also provide properties to allow
1561
-
to inject a custom `(De)Serializer` to target `Producer`/`Consumer`.
1560
+
Using this API, the `DefaultKafkaProducerFactory` and `DefaultKafkaConsumerFactory` also provide properties (via constructors or setter methods) to inject custom `(De)Serializer` s to the target `Producer`/`Consumer`.
1562
1561
1563
-
For this purpose, Spring for Apache Kafka also provides `JsonSerializer`/`JsonDeserializer` implementations based on the
1562
+
Spring for Apache Kafka also provides `JsonSerializer`/`JsonDeserializer` implementations based on the
1564
1563
Jackson JSON object mapper.
1565
1564
The `JsonSerializer` is quite simple and just allows writing any Java object as a JSON `byte[]`, the `JsonDeserializer`
1566
1565
requires an additional `Class<?> targetType` argument to allow the deserialization of a consumed `byte[]` to the proper target
@@ -1583,6 +1582,31 @@ In addition, the serializer/deserializer can be configured using Kafka propertie
1583
1582
- `JsonDeserializer.VALUE_DEFAULT_TYPE`; fallback type for deserialization of values if no header information is present.
1584
1583
- `JsonDeserializer.TRUSTED_PACKAGES` (default `java.util`, `java.lang`); comma-delimited list of package patterns allowed for deserialization; `*` means deserialize all.
1585
1584
1585
+
IMPORTANT: Only simple configuration can be performed with properties; for more advanced configuration (such as using a custom `ObjectMapper` in the serializer/deserializer), you should use the producer/consumer factory constructors that accept a pre-built serializer and deserializer. For example, with Spring Boot, to override the default factories:
1586
+
1587
+
[source, java]
1588
+
----
1589
+
@Bean
1590
+
public ConsumerFactory<Foo, Bar> kafkaConsumerFactory(KafkaProperties properties,
1591
+
JsonDeserializer customDeserializer) {
1592
+
1593
+
return new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties(),
1594
+
customDeserializer, customDeserializer);
1595
+
}
1596
+
1597
+
@Bean
1598
+
public ProducererFactory<Foo, Bar> kafkaProducerFactory(KafkaProperties properties,
1599
+
JsonSserializer customSerializer) {
1600
+
1601
+
return new DefaultKafkaConsumerFactory<>(properties.buildProducerProperties(),
1602
+
customSerializer, customSerializer);
1603
+
}
1604
+
----
1605
+
1606
+
Setters are also provided, as an alternative to using these constructors.
1607
+
1608
+
===== Spring Messaging Message Conversion
1609
+
1586
1610
Although the `Serializer`/`Deserializer` API is quite simple and flexible from the low-level Kafka `Consumer` and
1587
1611
`Producer` perspective, you might need more flexibility at the Spring Messaging level, either when using `@KafkaListener` or <<si-kafka,Spring Integration>>.
1588
1612
To easily convert to/from `org.springframework.messaging.Message`, Spring for Apache Kafka provides a `MessageConverter`
@@ -1617,7 +1641,7 @@ With a class-level `@KafkaListener`, the payload type is used to select which `@
1617
1641
====
1618
1642
1619
1643
NOTE: When using the `StringJsonMessageConverter`, you should use a `StringDeserializer` in the kafka consumer configuration and `StringSerializer` in the kafka producer configuration, when using Spring Integration or the `KafkaTemplate.send(Message<?> message)` method.
1620
-
When using the `BytesJsonMessageConverter`, you should use a `BytesDeserializer` in the kafka consumer configuration and `BytesSerializer` in the kafka producer configuration, when using Spring Integration or the `KafkaTemplate.send(Message<?> message)` method.
1644
+
When using the `BytesJsonMessageConverter`, you should use a `BytesDeserializer` in the kafka consumer configuration and `BytesSerializer` in the kafka producer configuration, when using Spring Integration or the `KafkaTemplate.send(Message<?> message)` method (see <<kafka-template>>).
1621
1645
Generally, the `BytesJsonMessageConverter` is more efficient because it avoids a `String` to/from `byte[]` conversion.
0 commit comments