From bf1e5fb09137f2dbdb794e4f0a87af07d16b56f2 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Thu, 17 Feb 2022 16:34:42 -0500 Subject: [PATCH 1/5] GH-2049: Spring Managed Producer Interceptors Allow producer interceptors to be managed by Spring so that they can be used in KafkaTemplate instead of providing the classname of the interceptor to Kafka configuraiton. Adding tests and docs. Resolves https://github.com/spring-projects/spring-kafka/issues/2049 --- .../src/main/asciidoc/index.adoc | 4 +- .../src/main/asciidoc/kafka.adoc | 61 +++++++++++++++++++ .../kafka/core/KafkaTemplate.java | 28 +++++++++ .../kafka/core/KafkaTemplateTests.java | 40 +++++++++++- 4 files changed, 130 insertions(+), 3 deletions(-) diff --git a/spring-kafka-docs/src/main/asciidoc/index.adoc b/spring-kafka-docs/src/main/asciidoc/index.adoc index 15f4d282f3..4a90993ecd 100644 --- a/spring-kafka-docs/src/main/asciidoc/index.adoc +++ b/spring-kafka-docs/src/main/asciidoc/index.adoc @@ -5,7 +5,7 @@ :numbered: :icons: font :hide-uri-scheme: -Gary Russell; Artem Bilan; Biju Kunjummen; Jay Bryant +Gary Russell; Artem Bilan; Biju Kunjummen; Jay Bryant; Soby Chacko ifdef::backend-html5[] *{project-version}* @@ -17,7 +17,7 @@ ifdef::backend-pdf[] NOTE: This documentation is also available as https://docs.spring.io/spring-kafka/docs/{project-version}/reference/html/index.html[HTML]. endif::[] -(C) 2016 - 2021 VMware, Inc. +(C) 2016 - 2022 VMware, Inc. Copies of this document may be made for your own use and for distribution to others, provided that you do not charge any fee for such copies and further provided that each copy contains this Copyright Notice, whether distributed in print or electronically. diff --git a/spring-kafka-docs/src/main/asciidoc/kafka.adoc b/spring-kafka-docs/src/main/asciidoc/kafka.adoc index fa544e8016..8cc35af232 100644 --- a/spring-kafka-docs/src/main/asciidoc/kafka.adoc +++ b/spring-kafka-docs/src/main/asciidoc/kafka.adoc @@ -3734,6 +3734,67 @@ Received test ---- ==== +==== Producer Interceptor Managed in Spring + +Staring with version 3.0.0, when it comes to a producer interceptor, you can let Spring manage it directly as a bean instead of providing the class name of the interceptor to the Kafka producer configuration. +If you go with this approach, then you need to set this producer interceptor on `KafkaTemplate`. +Following is an example using the same `MyProducerInterceptor` from above, but changed to not use the internal config property. + +==== +[source] +---- +public class MyProducerInterceptor implements ProducerInterceptor { + + private final SomeBean bean; + + public MyProducerInterceptor(SomeBean bean) { + this.bean = bean; + } + + @Override + public void configure(Map configs) { + + } + + @Override + public ProducerRecord onSend(ProducerRecord record) { + this.bean.someMethod("producer interceptor"); + return record; + } + + @Override + public void onAcknowledgement(RecordMetadata metadata, Exception exception) { + } + + @Override + public void close() { + } + +} +---- +==== + +==== +[source] +---- + +@Bean +public MyProducerInterceptor myProducerInterceptor(SomeBean someBean) { + return new MyProducerInterceptor(someBean); +} + +@Bean +public KafkaTemplate kafkaTemplate(ProducerFactory pf, MyProducerInterceptor myProducerInterceptor) { + KafkaTemplate kafkaTemplate = new KafkaTemplate(pf); + kafkaTemplate.setProducerInterceptor(myProducerInterceptor); +} +---- +==== + +Right before the records are sent, the `onSend` method of the producer interceptor is invoked. +Once the server sends an acknowledgement on publishing the data, then the `onAcknowledgement` method is invoked. +The `onAcknowledgement` is called right before the producer invokes any user callbacks. + [[pause-resume]] ==== Pausing and Resuming Listener Containers diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index c8960dc084..7c33c2b78f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -37,6 +37,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Metric; @@ -87,6 +88,7 @@ * @author Biju Kunjummen * @author Endika Gutierrez * @author Thomas Strauß + * @author Soby Chacko */ public class KafkaTemplate implements KafkaOperations, ApplicationContextAware, BeanNameAware, ApplicationListener, DisposableBean { @@ -129,6 +131,8 @@ public class KafkaTemplate implements KafkaOperations, ApplicationCo private volatile MicrometerHolder micrometerHolder; + private ProducerInterceptor producerInterceptor; + /** * Create an instance using the supplied producer factory and autoFlush false. * @param producerFactory the producer factory. @@ -370,6 +374,24 @@ public void setConsumerFactory(ConsumerFactory consumerFactory) { this.consumerFactory = consumerFactory; } + /** + * Returns the producer interceptor associated with this template. + * @return {@link ProducerInterceptor} + * @since 3.0.x + */ + public ProducerInterceptor getProducerInterceptor() { + return this.producerInterceptor; + } + + /** + * Set a producer interceptor or this template. + * @param producerInterceptor the producer interceptor + * @since 3.0.x + */ + public void setProducerInterceptor(ProducerInterceptor producerInterceptor) { + this.producerInterceptor = producerInterceptor; + } + @Override public void onApplicationEvent(ContextStoppedEvent event) { if (this.customProducerFactory) { @@ -631,6 +653,9 @@ protected ListenableFuture> doSend(final ProducerRecord p if (this.micrometerHolder != null) { sample = this.micrometerHolder.start(); } + if (this.producerInterceptor != null) { + this.producerInterceptor.onSend(producerRecord); + } Future sendFuture = producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample)); // May be an immediate failure @@ -657,6 +682,9 @@ private Callback buildCallback(final ProducerRecord producerRecord, final final SettableListenableFuture> future, @Nullable Object sample) { return (metadata, exception) -> { + if (this.producerInterceptor != null) { + this.producerInterceptor.onAcknowledgement(metadata, exception); + } try { if (exception == null) { if (sample != null) { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java index bf21fa0f2b..fb3bd4b147 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,9 @@ import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.willAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.springframework.kafka.test.assertj.KafkaConditions.key; import static org.springframework.kafka.test.assertj.KafkaConditions.keyValue; import static org.springframework.kafka.test.assertj.KafkaConditions.partition; @@ -50,6 +53,7 @@ import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Metric; @@ -65,6 +69,7 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; import org.springframework.aop.framework.ProxyFactory; import org.springframework.kafka.KafkaException; @@ -94,6 +99,7 @@ * @author Biju Kunjummen * @author Endika Gutierrez * @author Thomas Strauß + * @author Soby Chacko */ @EmbeddedKafka(topics = { KafkaTemplateTests.INT_KEY_TOPIC, KafkaTemplateTests.STRING_KEY_TOPIC }) public class KafkaTemplateTests { @@ -554,4 +560,36 @@ void testFutureFailureOnSend() { pf.destroy(); } + @SuppressWarnings("unchecked") + @Test + void testProducerInterceptorManagedOnKafkaTemplate() { + + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); + KafkaTemplate template = new KafkaTemplate<>(pf, true); + ProducerInterceptor producerInterceptor = Mockito.mock(ProducerInterceptor.class); + template.setProducerInterceptor(producerInterceptor); + + template.setDefaultTopic("prod-interceptor-test-1"); + template.sendDefault("foo"); + + verify(producerInterceptor, times(1)).onSend(any(ProducerRecord.class)); + verify(producerInterceptor, times(1)).onAcknowledgement(any(RecordMetadata.class), Mockito.isNull()); + } + + @SuppressWarnings("unchecked") + @Test + void testProducerInterceptorNotSetOnKafkaTemplateNotInvoked() { + + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); + KafkaTemplate template = new KafkaTemplate<>(pf, true); + ProducerInterceptor producerInterceptor = Mockito.mock(ProducerInterceptor.class); + + template.setDefaultTopic("prod-interceptor-test-2"); + template.sendDefault("foo"); + + verifyNoInteractions(producerInterceptor); + } + } From f049979fbfd112fbbf6b99c064e7d721df00140b Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Thu, 17 Feb 2022 17:42:38 -0500 Subject: [PATCH 2/5] Addressing PR review comments --- spring-kafka-docs/src/main/asciidoc/kafka.adoc | 2 +- .../springframework/kafka/core/KafkaTemplate.java | 13 ++----------- 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/spring-kafka-docs/src/main/asciidoc/kafka.adoc b/spring-kafka-docs/src/main/asciidoc/kafka.adoc index 8cc35af232..02bee0f1f9 100644 --- a/spring-kafka-docs/src/main/asciidoc/kafka.adoc +++ b/spring-kafka-docs/src/main/asciidoc/kafka.adoc @@ -3736,7 +3736,7 @@ Received test ==== Producer Interceptor Managed in Spring -Staring with version 3.0.0, when it comes to a producer interceptor, you can let Spring manage it directly as a bean instead of providing the class name of the interceptor to the Kafka producer configuration. +Staring with version 3.0.0, when it comes to a producer interceptor, you can let Spring manage it directly as a bean instead of providing the class name of the interceptor to the Apache Kafka producer configuration. If you go with this approach, then you need to set this producer interceptor on `KafkaTemplate`. Following is an example using the same `MyProducerInterceptor` from above, but changed to not use the internal config property. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index 7c33c2b78f..7319a95ed0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -375,18 +375,9 @@ public void setConsumerFactory(ConsumerFactory consumerFactory) { } /** - * Returns the producer interceptor associated with this template. - * @return {@link ProducerInterceptor} - * @since 3.0.x - */ - public ProducerInterceptor getProducerInterceptor() { - return this.producerInterceptor; - } - - /** - * Set a producer interceptor or this template. + * Set a producer interceptor on this template. * @param producerInterceptor the producer interceptor - * @since 3.0.x + * @since 3.0 */ public void setProducerInterceptor(ProducerInterceptor producerInterceptor) { this.producerInterceptor = producerInterceptor; From 45bcb95a21d5cf00214083496d926ad92dcc7709 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Fri, 18 Feb 2022 10:49:07 -0500 Subject: [PATCH 3/5] Fixing typo in the docs --- spring-kafka-docs/src/main/asciidoc/kafka.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-kafka-docs/src/main/asciidoc/kafka.adoc b/spring-kafka-docs/src/main/asciidoc/kafka.adoc index 02bee0f1f9..0351a4e09e 100644 --- a/spring-kafka-docs/src/main/asciidoc/kafka.adoc +++ b/spring-kafka-docs/src/main/asciidoc/kafka.adoc @@ -3736,7 +3736,7 @@ Received test ==== Producer Interceptor Managed in Spring -Staring with version 3.0.0, when it comes to a producer interceptor, you can let Spring manage it directly as a bean instead of providing the class name of the interceptor to the Apache Kafka producer configuration. +Starting with version 3.0.0, when it comes to a producer interceptor, you can let Spring manage it directly as a bean instead of providing the class name of the interceptor to the Apache Kafka producer configuration. If you go with this approach, then you need to set this producer interceptor on `KafkaTemplate`. Following is an example using the same `MyProducerInterceptor` from above, but changed to not use the internal config property. From 6f0b4248b1a359a609df423b63b2a63c9513c185 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Fri, 18 Feb 2022 19:03:33 -0500 Subject: [PATCH 4/5] Addressing PR review comments. Adding a CompositeProducerInterceptor to allow multiple producer interceptors on KafkaTemplate. --- .../src/main/asciidoc/kafka.adoc | 4 + .../kafka/core/KafkaTemplate.java | 12 +- .../support/CompositeProducerInterceptor.java | 113 ++++++++++++++++++ .../kafka/core/KafkaTemplateTests.java | 31 +++++ 4 files changed, 158 insertions(+), 2 deletions(-) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/support/CompositeProducerInterceptor.java diff --git a/spring-kafka-docs/src/main/asciidoc/kafka.adoc b/spring-kafka-docs/src/main/asciidoc/kafka.adoc index 0351a4e09e..2d52424853 100644 --- a/spring-kafka-docs/src/main/asciidoc/kafka.adoc +++ b/spring-kafka-docs/src/main/asciidoc/kafka.adoc @@ -3795,6 +3795,10 @@ Right before the records are sent, the `onSend` method of the producer intercept Once the server sends an acknowledgement on publishing the data, then the `onAcknowledgement` method is invoked. The `onAcknowledgement` is called right before the producer invokes any user callbacks. +If you have multiple such producer interceptors managed through Spring that need to be applied on the `KafkaTemplate`, you need to use `CompositeProducerInterceptor` instead. +`CompositeProducerInterceptor` allows individual producer interceptors to be added in order. +The methods from the underlying `ProducerInterceptor` implementations are invoked in the order as they were added to the `CompositeProducerInterceptor`. + [[pause-resume]] ==== Pausing and Resuming Listener Containers diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index 7319a95ed0..67669cae43 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -673,8 +673,13 @@ private Callback buildCallback(final ProducerRecord producerRecord, final final SettableListenableFuture> future, @Nullable Object sample) { return (metadata, exception) -> { - if (this.producerInterceptor != null) { - this.producerInterceptor.onAcknowledgement(metadata, exception); + try { + if (this.producerInterceptor != null) { + this.producerInterceptor.onAcknowledgement(metadata, exception); + } + } + catch (Exception e) { + KafkaTemplate.this.logger.warn(e, () -> "Error executing interceptor onAcknowledgement callback"); } try { if (exception == null) { @@ -782,6 +787,9 @@ public void destroy() { if (this.customProducerFactory) { ((DefaultKafkaProducerFactory) this.producerFactory).destroy(); } + if (this.producerInterceptor != null) { + this.producerInterceptor.close(); + } } @SuppressWarnings("serial") diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/CompositeProducerInterceptor.java b/spring-kafka/src/main/java/org/springframework/kafka/support/CompositeProducerInterceptor.java new file mode 100644 index 0000000000..e23ed0b842 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/CompositeProducerInterceptor.java @@ -0,0 +1,113 @@ +/* + * Copyright 2022-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support; + +import java.io.Closeable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.producer.ProducerInterceptor; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; + +import org.springframework.core.log.LogAccessor; +import org.springframework.util.Assert; + +/** + * A {@link ProducerInterceptor} that delegates to a collection of interceptors. + * + * @param the key type. + * @param the value type. + * + * @author Soby Chacko + * + * @since 3.0 + * + */ +public class CompositeProducerInterceptor implements ProducerInterceptor, Closeable { + + private final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass())); //NOSONAR + + private final List> delegates = new ArrayList<>(); + + /** + * Construct an instance with the provided delegates to {@link ProducerInterceptor}s. + * @param delegates the delegates. + */ + @SafeVarargs + @SuppressWarnings("varargs") + public CompositeProducerInterceptor(ProducerInterceptor... delegates) { + Assert.notNull(delegates, "'delegates' cannot be null"); + Assert.noNullElements(delegates, "'delegates' cannot have null entries"); + this.delegates.addAll(Arrays.asList(delegates)); + } + + @Override + public ProducerRecord onSend(ProducerRecord record) { + ProducerRecord interceptRecord = record; + for (ProducerInterceptor interceptor : this.delegates) { + try { + interceptRecord = interceptor.onSend(interceptRecord); + } + catch (Exception e) { + // if exception thrown, log and continue calling other interceptors. + if (record != null) { + CompositeProducerInterceptor.this.logger.warn(e, () -> + String.format("Error executing interceptor onSend callback for topic: %s, partition: %d", + record.topic(), record.partition())); + } + else { + CompositeProducerInterceptor.this.logger.warn(e, () -> "Error executing interceptor onSend callback"); + } + } + } + return interceptRecord; + } + + @Override + public void onAcknowledgement(RecordMetadata metadata, Exception exception) { + for (ProducerInterceptor interceptor : this.delegates) { + try { + interceptor.onAcknowledgement(metadata, exception); + } + catch (Exception e) { + // do not propagate interceptor exceptions, just log + CompositeProducerInterceptor.this.logger.warn(e, () -> "Error executing interceptor onAcknowledgement callback"); + } + } + } + + @Override + public void close() { + for (ProducerInterceptor interceptor : this.delegates) { + try { + interceptor.close(); + } + catch (Exception e) { + CompositeProducerInterceptor.this.logger.warn(e, () -> "Failed to close producer interceptor"); + } + } + } + + @Override + public void configure(Map configs) { + this.delegates.forEach(delegate -> delegate.configure(configs)); + } +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java index fb3bd4b147..7b0b72e3f0 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java @@ -22,6 +22,8 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -69,11 +71,13 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.mockito.InOrder; import org.mockito.Mockito; import org.springframework.aop.framework.ProxyFactory; import org.springframework.kafka.KafkaException; import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.support.CompositeProducerInterceptor; import org.springframework.kafka.support.CompositeProducerListener; import org.springframework.kafka.support.DefaultKafkaHeaderMapper; import org.springframework.kafka.support.KafkaHeaders; @@ -592,4 +596,31 @@ void testProducerInterceptorNotSetOnKafkaTemplateNotInvoked() { verifyNoInteractions(producerInterceptor); } + @SuppressWarnings("unchecked") + @Test + void testCompositeProducerInterceptor() { + + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); + KafkaTemplate template = new KafkaTemplate<>(pf, true); + ProducerInterceptor producerInterceptor1 = Mockito.mock(ProducerInterceptor.class); + ProducerInterceptor producerInterceptor2 = Mockito.mock(ProducerInterceptor.class); + CompositeProducerInterceptor compositeProducerInterceptor = + new CompositeProducerInterceptor<>(producerInterceptor1, producerInterceptor2); + template.setProducerInterceptor(compositeProducerInterceptor); + + ProducerRecord mockProducerRecord = Mockito.mock(ProducerRecord.class); + doReturn(mockProducerRecord).when(producerInterceptor1).onSend(any(ProducerRecord.class)); + + template.setDefaultTopic("prod-interceptor-test-3"); + template.sendDefault("foo"); + + InOrder inOrder = inOrder(producerInterceptor1, producerInterceptor2); + + inOrder.verify(producerInterceptor1).onSend(any(ProducerRecord.class)); + inOrder.verify(producerInterceptor2).onSend(any(ProducerRecord.class)); + inOrder.verify(producerInterceptor1).onAcknowledgement(any(RecordMetadata.class), Mockito.isNull()); + inOrder.verify(producerInterceptor2).onAcknowledgement(any(RecordMetadata.class), Mockito.isNull()); + } + } From 86534dce375af352cbdac7488cd0a3a4de95ada9 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Tue, 22 Feb 2022 18:16:02 -0500 Subject: [PATCH 5/5] Addressing PR review --- .../kafka/support/CompositeProducerInterceptor.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/CompositeProducerInterceptor.java b/spring-kafka/src/main/java/org/springframework/kafka/support/CompositeProducerInterceptor.java index e23ed0b842..2fb971fd52 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/CompositeProducerInterceptor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/CompositeProducerInterceptor.java @@ -74,7 +74,8 @@ public ProducerRecord onSend(ProducerRecord record) { record.topic(), record.partition())); } else { - CompositeProducerInterceptor.this.logger.warn(e, () -> "Error executing interceptor onSend callback"); + CompositeProducerInterceptor.this.logger.warn(e, () -> "Error executing interceptor onSend callback: " + + interceptor.toString()); } } } @@ -89,7 +90,8 @@ public void onAcknowledgement(RecordMetadata metadata, Exception exception) { } catch (Exception e) { // do not propagate interceptor exceptions, just log - CompositeProducerInterceptor.this.logger.warn(e, () -> "Error executing interceptor onAcknowledgement callback"); + CompositeProducerInterceptor.this.logger.warn(e, () -> "Error executing interceptor onAcknowledgement callback: " + + interceptor.toString()); } } } @@ -101,7 +103,8 @@ public void close() { interceptor.close(); } catch (Exception e) { - CompositeProducerInterceptor.this.logger.warn(e, () -> "Failed to close producer interceptor"); + CompositeProducerInterceptor.this.logger.warn(e, () -> "Failed to close producer interceptor: " + + interceptor.toString()); } } }