Skip to content

Commit 4b8e8a4

Browse files
committed
spring-projectsGH-3555: Improve docs for KafkaHeaders.CONVERSION_FAILURES
Fixes: spring-projects#3555 Issue link: spring-projects#3555 The batch might be processed in the `@KafkaListener` silently without looking into conversion failures header. So, that might cause in impression that nothing is wrong with the batch. * Mention `KafkaHeaders.CONVERSION_FAILURES` in the docs * Add `warn` for the failed conversion in the `BatchMessagingMessageConverter` * Some other code optimization clean up in the `BatchMessagingMessageConverter`
1 parent 07cff76 commit 4b8e8a4

File tree

2 files changed

+57
-25
lines changed

2 files changed

+57
-25
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -724,6 +724,24 @@ public void listen(List<Message<Foo>> fooMessages) {
724724
}
725725
----
726726

727+
If record in the batch cannot be converted, its payload is set as `null` into the target `payloads` list.
728+
The conversion exception is logged as warning for this record and also stored into a `KafkaHeaders.CONVERSION_FAILURES` header as an item of the `List<ConversionException>`.
729+
The target `@KafkaListener` method may perform Java `Stream` API to filter out those `null` values from the payload list or do something with the conversion exceptions header:
730+
731+
[source, java]
732+
----
733+
@KafkaListener(id = "foo", topics = "foo", autoStartup = "false")
734+
public void listen(List<Foo> list,
735+
@Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> conversionFailures) {
736+
737+
for (int i = 0; i < list.size(); i++) {
738+
if (conversionFailures.get(i) != null) {
739+
throw new BatchListenerFailedException("Conversion Failed", conversionFailures.get(i), i);
740+
}
741+
}
742+
}
743+
----
744+
727745
[[conversionservice-customization]]
728746
== `ConversionService` Customization
729747

spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java

Lines changed: 39 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,11 @@
2929
import org.apache.kafka.clients.consumer.ConsumerRecord;
3030
import org.apache.kafka.clients.producer.ProducerRecord;
3131
import org.apache.kafka.common.header.Headers;
32+
import org.apache.kafka.common.record.TimestampType;
3233
import org.apache.kafka.common.utils.Bytes;
3334

3435
import org.springframework.core.log.LogAccessor;
36+
import org.springframework.core.log.LogMessage;
3537
import org.springframework.kafka.support.Acknowledgment;
3638
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
3739
import org.springframework.kafka.support.JacksonPresent;
@@ -54,7 +56,7 @@
5456
* <p>
5557
* If a {@link RecordMessageConverter} is provided, and the batch type is a {@link ParameterizedType}
5658
* with a single generic type parameter, each record will be passed to the converter, thus supporting
57-
* a method signature {@code List<Foo> foos}.
59+
* a method signature {@code List<MyType> myObjects}.
5860
*
5961
* @author Marius Bogoevici
6062
* @author Gary Russell
@@ -63,11 +65,13 @@
6365
* @author Sanghyeok An
6466
* @author Hope Kim
6567
* @author Borahm Lee
68+
* @author Artem Bilan
69+
*
6670
* @since 1.1
6771
*/
6872
public class BatchMessagingMessageConverter implements BatchMessageConverter {
6973

70-
protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); // NOSONAR
74+
protected final LogAccessor logger = new LogAccessor(getClass()); // NOSONAR
7175

7276
@Nullable
7377
private final RecordMessageConverter recordConverter;
@@ -102,7 +106,7 @@ public BatchMessagingMessageConverter(@Nullable RecordMessageConverter recordCon
102106

103107
/**
104108
* Generate {@link Message} {@code ids} for produced messages. If set to {@code false},
105-
* will try to use a default value. By default set to {@code false}.
109+
* will try to use a default value. By default, set to {@code false}.
106110
* @param generateMessageId true if a message id should be generated
107111
*/
108112
public void setGenerateMessageId(boolean generateMessageId) {
@@ -111,7 +115,7 @@ public void setGenerateMessageId(boolean generateMessageId) {
111115

112116
/**
113117
* Generate {@code timestamp} for produced messages. If set to {@code false}, -1 is
114-
* used instead. By default set to {@code false}.
118+
* used instead. By default, set to {@code false}.
115119
* @param generateTimestamp true if a timestamp should be generated
116120
*/
117121
public void setGenerateTimestamp(boolean generateTimestamp) {
@@ -147,8 +151,8 @@ public void setRawRecordHeader(boolean rawRecordHeader) {
147151
public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknowledgment acknowledgment,
148152
Consumer<?, ?> consumer, Type type) {
149153

150-
KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId,
151-
this.generateTimestamp);
154+
KafkaMessageHeaders kafkaMessageHeaders =
155+
new KafkaMessageHeaders(this.generateMessageId, this.generateTimestamp);
152156

153157
Map<String, Object> rawHeaders = kafkaMessageHeaders.getRawHeaders();
154158
List<Object> payloads = new ArrayList<>();
@@ -169,16 +173,18 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
169173

170174
String listenerInfo = null;
171175
for (ConsumerRecord<?, ?> record : records) {
172-
addRecordInfo(record, type, payloads, keys, topics, partitions, offsets, timestampTypes, timestamps, conversionFailures);
173-
if (this.headerMapper != null && record.headers() != null) {
174-
Map<String, Object> converted = convertHeaders(record.headers(), convertedHeaders);
176+
addRecordInfo(record, type, payloads, keys, topics, partitions, offsets, timestampTypes, timestamps,
177+
conversionFailures);
178+
Headers recordHeaders = record.headers();
179+
if (this.headerMapper != null && recordHeaders != null) {
180+
Map<String, Object> converted = convertHeaders(recordHeaders, convertedHeaders);
175181
Object obj = converted.get(KafkaHeaders.LISTENER_INFO);
176-
if (obj instanceof String) {
177-
listenerInfo = (String) obj;
182+
if (obj instanceof String info) {
183+
listenerInfo = info;
178184
}
179185
}
180186
else {
181-
natives.add(record.headers());
187+
natives.add(recordHeaders);
182188
}
183189
if (this.rawRecordHeader) {
184190
raws.add(record);
@@ -198,6 +204,7 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
198204

199205
private void addToRawHeaders(Map<String, Object> rawHeaders, List<Map<String, Object>> convertedHeaders,
200206
List<Headers> natives, List<ConsumerRecord<?, ?>> raws, List<ConversionException> conversionFailures) {
207+
201208
if (this.headerMapper != null) {
202209
rawHeaders.put(KafkaHeaders.BATCH_CONVERTED_HEADERS, convertedHeaders);
203210
}
@@ -211,16 +218,18 @@ private void addToRawHeaders(Map<String, Object> rawHeaders, List<Map<String, Ob
211218
}
212219

213220
private void addRecordInfo(ConsumerRecord<?, ?> record, Type type, List<Object> payloads, List<Object> keys,
214-
List<String> topics, List<Integer> partitions, List<Long> offsets, List<String> timestampTypes,
215-
List<Long> timestamps, List<ConversionException> conversionFailures) {
221+
List<String> topics, List<Integer> partitions, List<Long> offsets, List<String> timestampTypes,
222+
List<Long> timestamps, List<ConversionException> conversionFailures) {
223+
216224
payloads.add(obtainPayload(type, record, conversionFailures));
217225
keys.add(record.key());
218226
topics.add(record.topic());
219227
partitions.add(record.partition());
220228
offsets.add(record.offset());
221229
timestamps.add(record.timestamp());
222-
if (record.timestampType() != null) {
223-
timestampTypes.add(record.timestampType().name());
230+
TimestampType timestampType = record.timestampType();
231+
if (timestampType != null) {
232+
timestampTypes.add(timestampType.name());
224233
}
225234
}
226235

@@ -264,24 +273,29 @@ protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, Type type)
264273
protected Object convert(ConsumerRecord<?, ?> record, Type type, List<ConversionException> conversionFailures) {
265274
try {
266275
Object payload = this.recordConverter
267-
.toMessage(record, null, null, ((ParameterizedType) type).getActualTypeArguments()[0]).getPayload();
276+
.toMessage(record, null, null, ((ParameterizedType) type).getActualTypeArguments()[0]).getPayload();
268277
conversionFailures.add(null);
269278
return payload;
270279
}
271280
catch (ConversionException ex) {
272281
byte[] original = null;
273-
if (record.value() instanceof byte[]) {
274-
original = (byte[]) record.value();
282+
if (record.value() instanceof byte[] bytes) {
283+
original = bytes;
275284
}
276-
else if (record.value() instanceof Bytes) {
277-
original = ((Bytes) record.value()).get();
285+
else if (record.value() instanceof Bytes bytes) {
286+
original = bytes.get();
278287
}
279-
else if (record.value() instanceof String) {
280-
original = ((String) record.value()).getBytes(StandardCharsets.UTF_8);
288+
else if (record.value() instanceof String string) {
289+
original = string.getBytes(StandardCharsets.UTF_8);
281290
}
282291
if (original != null) {
283292
SerializationUtils.deserializationException(record.headers(), original, ex, false);
284293
conversionFailures.add(ex);
294+
logger.warn(ex,
295+
LogMessage.format("Could not convert message for topic=%s, partition=%d, offset=%d",
296+
record.topic(),
297+
record.partition(),
298+
record.offset()));
285299
return null;
286300
}
287301
throw new ConversionException("The batch converter can only report conversion failures to the listener "
@@ -296,8 +310,8 @@ else if (record.value() instanceof String) {
296310
* @return true if the conditions are met.
297311
*/
298312
private boolean containerType(Type type) {
299-
return type instanceof ParameterizedType
300-
&& ((ParameterizedType) type).getActualTypeArguments().length == 1;
313+
return type instanceof ParameterizedType parameterizedType
314+
&& parameterizedType.getActualTypeArguments().length == 1;
301315
}
302316

303317
}

0 commit comments

Comments
 (0)