Skip to content

Commit f0ad7b0

Browse files
garyrussellartembilan
authored andcommitted
GH-2153: Switch to ConsumerRecord Formatter
Resolves #2153
1 parent bbf26bc commit f0ad7b0

15 files changed

+44
-105
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/CommonLoggingErrorHandler.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@
2222
import org.apache.kafka.clients.consumer.ConsumerRecords;
2323

2424
import org.springframework.core.log.LogAccessor;
25+
import org.springframework.kafka.support.KafkaUtils;
2526

2627
/**
2728
* The {@link CommonErrorHandler} implementation for logging exceptions.
@@ -50,7 +51,7 @@ public void setAckAfterHandle(boolean ackAfterHandle) {
5051
public void handleRecord(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
5152
MessageListenerContainer container) {
5253

53-
LOGGER.error(thrownException, () -> "Error occured while processing: " + ListenerUtils.recordToString(record));
54+
LOGGER.error(thrownException, () -> "Error occured while processing: " + KafkaUtils.format(record));
5455
}
5556

5657
@Override
@@ -59,7 +60,7 @@ public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, C
5960

6061
StringBuilder message = new StringBuilder("Error occurred while processing:\n");
6162
for (ConsumerRecord<?, ?> record : data) {
62-
message.append(ListenerUtils.recordToString(record)).append('\n');
63+
message.append(KafkaUtils.format(record)).append('\n');
6364
}
6465
LOGGER.error(thrownException, () -> message.substring(0, message.length() - 1));
6566
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -332,14 +332,14 @@ public void accept(ConsumerRecord<?, ?> record, @Nullable Consumer<?, ?> consume
332332
TopicPartition tp = this.destinationResolver.apply(record, exception);
333333
if (tp == null) {
334334
maybeThrow(record, exception);
335-
this.logger.debug(() -> "Recovery of " + ListenerUtils.recordToString(record, true)
335+
this.logger.debug(() -> "Recovery of " + KafkaUtils.format(record)
336336
+ " skipped because destination resolver returned null");
337337
return;
338338
}
339339
if (this.skipSameTopicFatalExceptions
340340
&& tp.topic().equals(record.topic())
341341
&& !getClassifier().classify(exception)) {
342-
this.logger.error("Recovery of " + ListenerUtils.recordToString(record, true)
342+
this.logger.error("Recovery of " + KafkaUtils.format(record)
343343
+ " skipped because not retryable exception " + exception.toString()
344344
+ " and the destination resolver routed back to the same topic");
345345
return;
@@ -394,7 +394,7 @@ private void sendOrThrow(ProducerRecord<Object, Object> outRecord,
394394

395395
private void maybeThrow(ConsumerRecord<?, ?> record, Exception exception) {
396396
String message = String.format("No destination returned for record %s and exception %s. " +
397-
"failIfNoDestinationReturned: %s", ListenerUtils.recordToString(record), exception,
397+
"failIfNoDestinationReturned: %s", KafkaUtils.format(record), exception,
398398
this.throwIfNoDestinationReturned);
399399
this.logger.warn(message);
400400
if (this.throwIfNoDestinationReturned) {
@@ -513,7 +513,7 @@ protected void publish(ProducerRecord<Object, Object> outRecord, KafkaOperations
513513
sendResult = kafkaTemplate.send(outRecord);
514514
sendResult.addCallback(result -> {
515515
this.logger.debug(() -> "Successful dead-letter publication: "
516-
+ ListenerUtils.recordToString(inRecord, true) + " to " + result.getRecordMetadata());
516+
+ KafkaUtils.format(inRecord) + " to " + result.getRecordMetadata());
517517
}, ex -> {
518518
this.logger.error(ex, () -> pubFailMessage(outRecord, inRecord));
519519
});
@@ -548,7 +548,7 @@ private void verifySendResult(KafkaOperations<Object, Object> kafkaTemplate,
548548

549549
private String pubFailMessage(ProducerRecord<Object, Object> outRecord, ConsumerRecord<?, ?> inRecord) {
550550
return "Dead-letter publication to "
551-
+ outRecord.topic() + "failed for: " + ListenerUtils.recordToString(inRecord, true);
551+
+ outRecord.topic() + "failed for: " + KafkaUtils.format(inRecord);
552552
}
553553

554554
private Duration determineSendTimeout(KafkaOperations<?, ?> template) {

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,6 +24,7 @@
2424

2525
import org.springframework.core.log.LogAccessor;
2626
import org.springframework.kafka.KafkaException;
27+
import org.springframework.kafka.support.KafkaUtils;
2728
import org.springframework.util.backoff.BackOff;
2829
import org.springframework.util.backoff.BackOffExecution;
2930

@@ -108,7 +109,7 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
108109
public static String recordsToString(ConsumerRecords<?, ?> records) {
109110
StringBuffer sb = new StringBuffer();
110111
records.spliterator().forEachRemaining(rec -> sb
111-
.append(ListenerUtils.recordToString(rec, true))
112+
.append(KafkaUtils.format(rec))
112113
.append(','));
113114
sb.deleteCharAt(sb.length() - 1);
114115
return sb.toString();

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.kafka.clients.consumer.ConsumerRecord;
2727

2828
import org.springframework.core.log.LogAccessor;
29+
import org.springframework.kafka.support.KafkaUtils;
2930
import org.springframework.kafka.support.TopicPartitionOffset;
3031
import org.springframework.lang.Nullable;
3132
import org.springframework.util.backoff.BackOff;
@@ -152,7 +153,7 @@ protected RecoveryStrategy getRecoveryStrategy(List<ConsumerRecord<?, ?>> record
152153
catch (Exception ex) {
153154
if (records.size() > 0) {
154155
this.logger.error(ex, () -> "Recovery of record ("
155-
+ ListenerUtils.recordToString(records.get(0)) + ") failed");
156+
+ KafkaUtils.format(records.get(0)) + ") failed");
156157
this.failureTracker.getRetryListeners().forEach(rl ->
157158
rl.recoveryFailed(records.get(0), thrownException, ex));
158159
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.kafka.common.TopicPartition;
3131

3232
import org.springframework.core.log.LogAccessor;
33+
import org.springframework.kafka.support.KafkaUtils;
3334
import org.springframework.kafka.support.TopicPartitionOffset;
3435
import org.springframework.lang.Nullable;
3536
import org.springframework.util.Assert;
@@ -76,7 +77,7 @@ class FailedRecordTracker implements RecoveryStrategy {
7677
+ (failedRecord == null
7778
? "none"
7879
: failedRecord.getBackOffExecution())
79-
+ " exhausted for " + ListenerUtils.recordToString(rec));
80+
+ " exhausted for " + KafkaUtils.format(rec));
8081
};
8182
}
8283
else {

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+8-9
Original file line numberDiff line numberDiff line change
@@ -1228,7 +1228,6 @@ public boolean isLongLived() {
12281228

12291229
@Override // NOSONAR complexity
12301230
public void run() {
1231-
ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());
12321231
publishConsumerStartingEvent();
12331232
this.consumerThread = Thread.currentThread();
12341233
setupSeeks();
@@ -1809,7 +1808,7 @@ record = this.acks.poll();
18091808
}
18101809

18111810
private void traceAck(ConsumerRecord<K, V> record) {
1812-
this.logger.trace(() -> "Ack: " + ListenerUtils.recordToString(record, true));
1811+
this.logger.trace(() -> "Ack: " + KafkaUtils.format(record));
18131812
}
18141813

18151814
private void doAck(ConsumerRecord<K, V> record) {
@@ -1905,14 +1904,14 @@ private synchronized void ackInOrder(ConsumerRecord<K, V> record) {
19051904
}
19061905
else if (record.offset() < offs.get(0)) {
19071906
throw new IllegalStateException("First remaining offset for this batch is " + offs.get(0)
1908-
+ "; you are acknowledging a stale record: " + ListenerUtils.recordToString(record));
1907+
+ "; you are acknowledging a stale record: " + KafkaUtils.format(record));
19091908
}
19101909
else {
19111910
deferred.add(record);
19121911
}
19131912
}
19141913
else {
1915-
throw new IllegalStateException("Unexpected ack for " + ListenerUtils.recordToString(record)
1914+
throw new IllegalStateException("Unexpected ack for " + KafkaUtils.format(record)
19161915
+ "; offsets list is empty");
19171916
}
19181917
}
@@ -2311,7 +2310,7 @@ private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
23112310
if (record == null) {
23122311
continue;
23132312
}
2314-
this.logger.trace(() -> "Processing " + ListenerUtils.recordToString(record));
2313+
this.logger.trace(() -> "Processing " + KafkaUtils.format(record));
23152314
try {
23162315
invokeInTransaction(iterator, record);
23172316
}
@@ -2412,7 +2411,7 @@ private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
24122411
if (record == null) {
24132412
continue;
24142413
}
2415-
this.logger.trace(() -> "Processing " + ListenerUtils.recordToString(record));
2414+
this.logger.trace(() -> "Processing " + KafkaUtils.format(record));
24162415
doInvokeRecordListener(record, iterator);
24172416
if (this.commonRecordInterceptor != null) {
24182417
this.commonRecordInterceptor.afterRecord(record, this.consumer);
@@ -2445,7 +2444,7 @@ private ConsumerRecord<K, V> checkEarlyIntercept(ConsumerRecord<K, V> recordArg)
24452444
record = this.earlyRecordInterceptor.intercept(record, this.consumer);
24462445
if (record == null) {
24472446
this.logger.debug(() -> "RecordInterceptor returned null, skipping: "
2448-
+ ListenerUtils.recordToString(recordArg));
2447+
+ KafkaUtils.format(recordArg));
24492448
}
24502449
}
24512450
return record;
@@ -2604,7 +2603,7 @@ record = this.recordInterceptor.intercept(record, this.consumer);
26042603
}
26052604
if (record == null) {
26062605
this.logger.debug(() -> "RecordInterceptor returned null, skipping: "
2607-
+ ListenerUtils.recordToString(recordArg));
2606+
+ KafkaUtils.format(recordArg));
26082607
}
26092608
else {
26102609
try {
@@ -3158,7 +3157,7 @@ public void nack(long sleep) {
31583157

31593158
@Override
31603159
public String toString() {
3161-
return "Acknowledgment for " + ListenerUtils.recordToString(this.record, true);
3160+
return "Acknowledgment for " + KafkaUtils.format(this.record);
31623161
}
31633162

31643163
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java

-35
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.kafka.common.header.internals.RecordHeaders;
2828

2929
import org.springframework.core.log.LogAccessor;
30-
import org.springframework.kafka.support.KafkaUtils;
3130
import org.springframework.kafka.support.serializer.DeserializationException;
3231
import org.springframework.lang.Nullable;
3332
import org.springframework.util.Assert;
@@ -143,40 +142,6 @@ protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, Clas
143142
}
144143
}
145144

146-
/**
147-
* Set to true to only log record metadata.
148-
* @param onlyMeta true to only log record metadata.
149-
* @since 2.2.14
150-
* @see #recordToString(ConsumerRecord)
151-
*/
152-
public static void setLogOnlyMetadata(boolean onlyMeta) {
153-
KafkaUtils.setLogOnlyMetadata(onlyMeta);
154-
}
155-
156-
/**
157-
* Return the {@link ConsumerRecord} as a String; either {@code toString()} or
158-
* {@code topic-partition@offset}.
159-
* @param record the record.
160-
* @return the rendered record.
161-
* @since 2.2.14
162-
* @see #setLogOnlyMetadata(boolean)
163-
*/
164-
public static String recordToString(ConsumerRecord<?, ?> record) {
165-
return KafkaUtils.recordToString(record);
166-
}
167-
168-
/**
169-
* Return the {@link ConsumerRecord} as a String; either {@code toString()} or
170-
* {@code topic-partition@offset}.
171-
* @param record the record.
172-
* @param meta true to log just the metadata.
173-
* @return the rendered record.
174-
* @since 2.5.4
175-
*/
176-
public static String recordToString(ConsumerRecord<?, ?> record, boolean meta) {
177-
return KafkaUtils.recordToString(record, meta);
178-
}
179-
180145
/**
181146
* Sleep according to the {@link BackOff}; when the {@link BackOffExecution} returns
182147
* {@link BackOffExecution#STOP} sleep for the previous backOff.

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -35,6 +35,7 @@
3535
import org.springframework.kafka.KafkaException;
3636
import org.springframework.kafka.KafkaException.Level;
3737
import org.springframework.kafka.listener.ContainerProperties.AckMode;
38+
import org.springframework.kafka.support.KafkaUtils;
3839
import org.springframework.lang.Nullable;
3940
import org.springframework.util.ObjectUtils;
4041
import org.springframework.util.backoff.FixedBackOff;
@@ -107,18 +108,18 @@ public static boolean doSeeks(List<ConsumerRecord<?, ?>> records, Consumer<?, ?>
107108
}
108109
catch (Exception ex) {
109110
if (isBackoffException(ex)) {
110-
logger.debug(ex, () -> ListenerUtils.recordToString(record)
111+
logger.debug(ex, () -> KafkaUtils.format(record)
111112
+ " included in seeks due to retry back off");
112113
}
113114
else {
114115
logger.error(ex, () -> "Failed to determine if this record ("
115-
+ ListenerUtils.recordToString(record)
116+
+ KafkaUtils.format(record)
116117
+ ") should be recovererd, including in seeks");
117118
}
118119
skipped.set(false);
119120
}
120121
if (skipped.get()) {
121-
logger.debug(() -> "Skipping seek of: " + ListenerUtils.recordToString(record));
122+
logger.debug(() -> "Skipping seek of: " + KafkaUtils.format(record));
122123
}
123124
}
124125
if (!recoverable || !first.get() || !skipped.get()) {

Diff for: spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public void onMessage(List<ConsumerRecord<K, Collection<ConsumerRecord<K, R>>>>
127127
data.forEach(record -> {
128128
Header correlation = record.headers().lastHeader(KafkaHeaders.CORRELATION_ID);
129129
if (correlation == null) {
130-
this.logger.error(() -> "No correlationId found in reply: " + KafkaUtils.recordToString(record)
130+
this.logger.error(() -> "No correlationId found in reply: " + KafkaUtils.format(record)
131131
+ " - to use request/reply semantics, the responding server must return the correlation id "
132132
+ " in the '" + KafkaHeaders.CORRELATION_ID + "' header");
133133
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ public void onMessage(List<ConsumerRecord<K, R>> data) {
457457
correlationId = new CorrelationKey(correlationHeader.value());
458458
}
459459
if (correlationId == null) {
460-
this.logger.error(() -> "No correlationId found in reply: " + KafkaUtils.recordToString(record)
460+
this.logger.error(() -> "No correlationId found in reply: " + KafkaUtils.format(record)
461461
+ " - to use request/reply semantics, the responding server must return the correlation id "
462462
+ " in the '" + this.correlationHeaderName + "' header");
463463
}
@@ -475,7 +475,7 @@ public void onMessage(List<ConsumerRecord<K, R>> data) {
475475
future.setException(exception);
476476
}
477477
if (ok) {
478-
this.logger.debug(() -> "Received: " + KafkaUtils.recordToString(record)
478+
this.logger.debug(() -> "Received: " + KafkaUtils.format(record)
479479
+ WITH_CORRELATION_ID + correlationKey);
480480
future.set(record);
481481
}
@@ -543,7 +543,7 @@ protected void logLateArrival(ConsumerRecord<K, R> record, CorrelationKey correl
543543
}
544544

545545
private String missingCorrelationLogMessage(ConsumerRecord<K, R> record, CorrelationKey correlationId) {
546-
return "No pending reply: " + KafkaUtils.recordToString(record) + WITH_CORRELATION_ID
546+
return "No pending reply: " + KafkaUtils.format(record) + WITH_CORRELATION_ID
547547
+ correlationId + ", perhaps timed out, or using a shared reply topic";
548548
}
549549

Diff for: spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@
3333
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
3434
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
3535
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
36-
import org.springframework.kafka.listener.ListenerUtils;
3736
import org.springframework.kafka.support.EndpointHandlerMethod;
37+
import org.springframework.kafka.support.KafkaUtils;
3838
import org.springframework.lang.Nullable;
3939

4040

@@ -432,7 +432,7 @@ static class LoggingDltListenerHandlerMethod {
432432
public void logMessage(Object message) {
433433
if (message instanceof ConsumerRecord) {
434434
LOGGER.info(() -> "Received message in dlt listener: "
435-
+ ListenerUtils.recordToString((ConsumerRecord<?, ?>) message));
435+
+ KafkaUtils.format((ConsumerRecord<?, ?>) message));
436436
}
437437
else {
438438
LOGGER.info(() -> "Received message in dlt listener.");

0 commit comments

Comments
 (0)