Skip to content

Commit 891664b

Browse files
garyrussellartembilan
authored andcommitted
GH-2132: Producer Record Log Formatting
Partial solution for #2132 Allow the user to provide a function for formatting producer records. Also move consumer record formatting to `KafkaUtils` will deprecate `ListenerUtils` versions later. There will be more polising in this area for 3.0 only (separate PR). **cherry-pick to 2.8.x, 2.7.x**
1 parent 17b738e commit 891664b

File tree

5 files changed

+112
-28
lines changed

5 files changed

+112
-28
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -635,7 +635,7 @@ protected void closeProducer(Producer<K, V> producer, boolean inTx) {
635635
*/
636636
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
637637
final Producer<K, V> producer = getTheProducer(producerRecord.topic());
638-
this.logger.trace(() -> "Sending: " + producerRecord);
638+
this.logger.trace(() -> "Sending: " + KafkaUtils.format(producerRecord));
639639
final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
640640
Object sample = null;
641641
if (this.micrometerEnabled && this.micrometerHolder == null) {
@@ -665,7 +665,7 @@ protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> p
665665
if (this.autoFlush) {
666666
flush();
667667
}
668-
this.logger.trace(() -> "Sent: " + producerRecord);
668+
this.logger.trace(() -> "Sent: " + KafkaUtils.format(producerRecord));
669669
return future;
670670
}
671671

@@ -690,7 +690,8 @@ private Callback buildCallback(final ProducerRecord<K, V> producerRecord, final
690690
if (KafkaTemplate.this.producerListener != null) {
691691
KafkaTemplate.this.producerListener.onSuccess(producerRecord, metadata);
692692
}
693-
KafkaTemplate.this.logger.trace(() -> "Sent ok: " + producerRecord + ", metadata: " + metadata);
693+
KafkaTemplate.this.logger.trace(() -> "Sent ok: " + KafkaUtils.format(producerRecord)
694+
+ ", metadata: " + metadata);
694695
}
695696
else {
696697
if (sample != null) {
@@ -700,7 +701,8 @@ private Callback buildCallback(final ProducerRecord<K, V> producerRecord, final
700701
if (KafkaTemplate.this.producerListener != null) {
701702
KafkaTemplate.this.producerListener.onError(producerRecord, metadata, exception);
702703
}
703-
KafkaTemplate.this.logger.debug(exception, () -> "Failed to send: " + producerRecord);
704+
KafkaTemplate.this.logger.debug(exception, () -> "Failed to send: "
705+
+ KafkaUtils.format(producerRecord));
704706
}
705707
}
706708
finally {

spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java

+4-15
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.kafka.common.header.internals.RecordHeaders;
2828

2929
import org.springframework.core.log.LogAccessor;
30+
import org.springframework.kafka.support.KafkaUtils;
3031
import org.springframework.kafka.support.serializer.DeserializationException;
3132
import org.springframework.lang.Nullable;
3233
import org.springframework.util.Assert;
@@ -45,8 +46,6 @@ public final class ListenerUtils {
4546
private ListenerUtils() {
4647
}
4748

48-
private static final ThreadLocal<Boolean> LOG_METADATA_ONLY = new ThreadLocal<>();
49-
5049
private static final int DEFAULT_SLEEP_INTERVAL = 100;
5150

5251
private static final int SMALL_SLEEP_INTERVAL = 10;
@@ -151,7 +150,7 @@ protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, Clas
151150
* @see #recordToString(ConsumerRecord)
152151
*/
153152
public static void setLogOnlyMetadata(boolean onlyMeta) {
154-
LOG_METADATA_ONLY.set(onlyMeta);
153+
KafkaUtils.setLogOnlyMetadata(onlyMeta);
155154
}
156155

157156
/**
@@ -163,12 +162,7 @@ public static void setLogOnlyMetadata(boolean onlyMeta) {
163162
* @see #setLogOnlyMetadata(boolean)
164163
*/
165164
public static String recordToString(ConsumerRecord<?, ?> record) {
166-
if (Boolean.TRUE.equals(LOG_METADATA_ONLY.get())) {
167-
return record.topic() + "-" + record.partition() + "@" + record.offset();
168-
}
169-
else {
170-
return record.toString();
171-
}
165+
return KafkaUtils.recordToString(record);
172166
}
173167

174168
/**
@@ -180,12 +174,7 @@ public static String recordToString(ConsumerRecord<?, ?> record) {
180174
* @since 2.5.4
181175
*/
182176
public static String recordToString(ConsumerRecord<?, ?> record, boolean meta) {
183-
if (meta) {
184-
return record.topic() + "-" + record.partition() + "@" + record.offset();
185-
}
186-
else {
187-
return record.toString();
188-
}
177+
return KafkaUtils.recordToString(record, meta);
189178
}
190179

191180
/**

spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2020 the original author or authors.
2+
* Copyright 2019-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -39,6 +39,7 @@
3939
import org.springframework.kafka.listener.ContainerProperties.AckMode;
4040
import org.springframework.kafka.listener.GenericMessageListenerContainer;
4141
import org.springframework.kafka.support.KafkaHeaders;
42+
import org.springframework.kafka.support.KafkaUtils;
4243
import org.springframework.util.Assert;
4344

4445
/**
@@ -126,7 +127,7 @@ public void onMessage(List<ConsumerRecord<K, Collection<ConsumerRecord<K, R>>>>
126127
data.forEach(record -> {
127128
Header correlation = record.headers().lastHeader(KafkaHeaders.CORRELATION_ID);
128129
if (correlation == null) {
129-
this.logger.error(() -> "No correlationId found in reply: " + record
130+
this.logger.error(() -> "No correlationId found in reply: " + KafkaUtils.recordToString(record)
130131
+ " - to use request/reply semantics, the responding server must return the correlation id "
131132
+ " in the '" + KafkaHeaders.CORRELATION_ID + "' header");
132133
}

spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java

+9-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -47,6 +47,7 @@
4747
import org.springframework.kafka.listener.GenericMessageListenerContainer;
4848
import org.springframework.kafka.listener.ListenerUtils;
4949
import org.springframework.kafka.support.KafkaHeaders;
50+
import org.springframework.kafka.support.KafkaUtils;
5051
import org.springframework.kafka.support.TopicPartitionOffset;
5152
import org.springframework.kafka.support.serializer.DeserializationException;
5253
import org.springframework.kafka.support.serializer.SerializationUtils;
@@ -378,7 +379,7 @@ public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record, @
378379
}
379380
}
380381
headers.add(new RecordHeader(this.correlationHeaderName, correlationId.getCorrelationId()));
381-
this.logger.debug(() -> "Sending: " + record + WITH_CORRELATION_ID + correlationId);
382+
this.logger.debug(() -> "Sending: " + KafkaUtils.format(record) + WITH_CORRELATION_ID + correlationId);
382383
RequestReplyFuture<K, V, R> future = new RequestReplyFuture<>();
383384
this.futures.put(correlationId, future);
384385
try {
@@ -396,7 +397,8 @@ private void scheduleTimeout(ProducerRecord<K, V> record, CorrelationKey correla
396397
this.scheduler.schedule(() -> {
397398
RequestReplyFuture<K, V, R> removed = this.futures.remove(correlationId);
398399
if (removed != null) {
399-
this.logger.warn(() -> "Reply timed out for: " + record + WITH_CORRELATION_ID + correlationId);
400+
this.logger.warn(() -> "Reply timed out for: " + KafkaUtils.format(record)
401+
+ WITH_CORRELATION_ID + correlationId);
400402
if (!handleTimeout(correlationId, removed)) {
401403
removed.setException(new KafkaReplyTimeoutException("Reply timed out"));
402404
}
@@ -455,7 +457,7 @@ public void onMessage(List<ConsumerRecord<K, R>> data) {
455457
correlationId = new CorrelationKey(correlationHeader.value());
456458
}
457459
if (correlationId == null) {
458-
this.logger.error(() -> "No correlationId found in reply: " + record
460+
this.logger.error(() -> "No correlationId found in reply: " + KafkaUtils.recordToString(record)
459461
+ " - to use request/reply semantics, the responding server must return the correlation id "
460462
+ " in the '" + this.correlationHeaderName + "' header");
461463
}
@@ -473,7 +475,8 @@ public void onMessage(List<ConsumerRecord<K, R>> data) {
473475
future.setException(exception);
474476
}
475477
if (ok) {
476-
this.logger.debug(() -> "Received: " + record + WITH_CORRELATION_ID + correlationKey);
478+
this.logger.debug(() -> "Received: " + KafkaUtils.recordToString(record)
479+
+ WITH_CORRELATION_ID + correlationKey);
477480
future.set(record);
478481
}
479482
}
@@ -540,7 +543,7 @@ protected void logLateArrival(ConsumerRecord<K, R> record, CorrelationKey correl
540543
}
541544

542545
private String missingCorrelationLogMessage(ConsumerRecord<K, R> record, CorrelationKey correlationId) {
543-
return "No pending reply: " + record + WITH_CORRELATION_ID
546+
return "No pending reply: " + KafkaUtils.recordToString(record) + WITH_CORRELATION_ID
544547
+ correlationId + ", perhaps timed out, or using a shared reply topic";
545548
}
546549

spring-kafka/src/main/java/org/springframework/kafka/support/KafkaUtils.java

+90-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,10 +22,14 @@
2222
import java.time.Duration;
2323
import java.util.Collection;
2424
import java.util.Map;
25+
import java.util.function.Function;
2526

27+
import org.apache.kafka.clients.consumer.ConsumerRecord;
2628
import org.apache.kafka.clients.producer.ProducerConfig;
29+
import org.apache.kafka.clients.producer.ProducerRecord;
2730

2831
import org.springframework.messaging.Message;
32+
import org.springframework.util.Assert;
2933
import org.springframework.util.ClassUtils;
3034

3135
/**
@@ -38,6 +42,13 @@
3842
*/
3943
public final class KafkaUtils {
4044

45+
private static final ThreadLocal<Boolean> LOG_METADATA_ONLY = new ThreadLocal<>();
46+
47+
private static Function<ProducerRecord<?, ?>, String> prFormatter = rec -> rec.toString();
48+
49+
private static Function<ConsumerRecord<?, ?>, String> crFormatter =
50+
rec -> rec.topic() + "-" + rec.partition() + "@" + rec.offset();
51+
4152
/**
4253
* True if micrometer is on the class path.
4354
*/
@@ -134,6 +145,84 @@ else if (dt instanceof String) {
134145
min));
135146
}
136147

148+
/**
149+
* Set to true to only log record metadata.
150+
* @param onlyMeta true to only log record metadata.
151+
* @since 2.7.12
152+
* @see #recordToString(ConsumerRecord)
153+
*/
154+
public static void setLogOnlyMetadata(boolean onlyMeta) {
155+
LOG_METADATA_ONLY.set(onlyMeta);
156+
}
157+
158+
/**
159+
* Return the {@link ConsumerRecord} as a String; either {@code toString()} or
160+
* {@code topic-partition@offset}.
161+
* @param record the record.
162+
* @return the rendered record.
163+
* @since 2.7.12
164+
* @see #setLogOnlyMetadata(boolean)
165+
*/
166+
public static String recordToString(ConsumerRecord<?, ?> record) {
167+
return recordToString(record, Boolean.TRUE.equals(LOG_METADATA_ONLY.get()));
168+
}
169+
170+
/**
171+
* Return the {@link ConsumerRecord} as a String; either {@code toString()} or
172+
* {@code topic-partition@offset}.
173+
* @param record the record.
174+
* @param meta true to log just the metadata.
175+
* @return the rendered record.
176+
* @since 2.7.12
177+
*/
178+
public static String recordToString(ConsumerRecord<?, ?> record, boolean meta) {
179+
if (meta) {
180+
return crFormatter.apply(record);
181+
}
182+
else {
183+
return record.toString();
184+
}
185+
}
186+
187+
/**
188+
* Set a formatter for logging {@link ConsumererRecord}s.
189+
* @param formatter a function to format the record as a String
190+
* @since 2.7.11
191+
*/
192+
public static void setConsumerRecordFormatter(Function<ConsumerRecord<?, ?>, String> formatter) {
193+
Assert.notNull(formatter, "'formatter' cannot be null");
194+
crFormatter = formatter;
195+
}
196+
197+
/**
198+
* Set a formatter for logging {@link ProducerRecord}s.
199+
* @param formatter a function to format the record as a String
200+
* @since 2.7.11
201+
*/
202+
public static void setProducerRecordFormatter(Function<ProducerRecord<?, ?>, String> formatter) {
203+
Assert.notNull(formatter, "'formatter' cannot be null");
204+
prFormatter = formatter;
205+
}
206+
207+
/**
208+
* Format the {@link ConsumerRecord} for logging; default
209+
* {@code topic-partition@offset}.
210+
* @param record the record to format.
211+
* @return the formatted String.
212+
*/
213+
public static String format(ConsumerRecord<?, ?> record) {
214+
return crFormatter.apply(record);
215+
}
216+
217+
/**
218+
* Format the {@link ProducerRecord} for logging; default
219+
* {@link ProducerRecord}{@link #toString()}.
220+
* @param record the record to format.
221+
* @return the formatted String.
222+
*/
223+
public static String format(ProducerRecord<?, ?> record) {
224+
return prFormatter.apply(record);
225+
}
137226

138227
private KafkaUtils() {
139228
}

0 commit comments

Comments
 (0)