Skip to content

GH-2049: Spring Managed Producer Interceptors #2112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions spring-kafka-docs/src/main/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
:numbered:
:icons: font
:hide-uri-scheme:
Gary Russell; Artem Bilan; Biju Kunjummen; Jay Bryant
Gary Russell; Artem Bilan; Biju Kunjummen; Jay Bryant; Soby Chacko

ifdef::backend-html5[]
*{project-version}*
Expand All @@ -17,7 +17,7 @@ ifdef::backend-pdf[]
NOTE: This documentation is also available as https://docs.spring.io/spring-kafka/docs/{project-version}/reference/html/index.html[HTML].
endif::[]

(C) 2016 - 2021 VMware, Inc.
(C) 2016 - 2022 VMware, Inc.

Copies of this document may be made for your own use and for distribution to others, provided that you do not charge any fee for such copies and further provided that each copy contains this Copyright Notice, whether distributed in print or electronically.

Expand Down
65 changes: 65 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3734,6 +3734,71 @@ Received test
----
====

==== Producer Interceptor Managed in Spring

Starting with version 3.0.0, when it comes to a producer interceptor, you can let Spring manage it directly as a bean instead of providing the class name of the interceptor to the Apache Kafka producer configuration.
If you go with this approach, then you need to set this producer interceptor on `KafkaTemplate`.
Following is an example using the same `MyProducerInterceptor` from above, but changed to not use the internal config property.

====
[source]
----
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {

private final SomeBean bean;

public MyProducerInterceptor(SomeBean bean) {
this.bean = bean;
}

@Override
public void configure(Map<String, ?> configs) {

}

@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
this.bean.someMethod("producer interceptor");
return record;
}

@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}

@Override
public void close() {
}

}
----
====

====
[source]
----

@Bean
public MyProducerInterceptor myProducerInterceptor(SomeBean someBean) {
return new MyProducerInterceptor(someBean);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf, MyProducerInterceptor myProducerInterceptor) {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<String, String>(pf);
kafkaTemplate.setProducerInterceptor(myProducerInterceptor);
}
----
====

Right before the records are sent, the `onSend` method of the producer interceptor is invoked.
Once the server sends an acknowledgement on publishing the data, then the `onAcknowledgement` method is invoked.
The `onAcknowledgement` is called right before the producer invokes any user callbacks.

If you have multiple such producer interceptors managed through Spring that need to be applied on the `KafkaTemplate`, you need to use `CompositeProducerInterceptor` instead.
`CompositeProducerInterceptor` allows individual producer interceptors to be added in order.
The methods from the underlying `ProducerInterceptor` implementations are invoked in the order as they were added to the `CompositeProducerInterceptor`.

[[pause-resume]]
==== Pausing and Resuming Listener Containers

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
Expand Down Expand Up @@ -87,6 +88,7 @@
* @author Biju Kunjummen
* @author Endika Gutierrez
* @author Thomas Strauß
* @author Soby Chacko
*/
public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationContextAware, BeanNameAware,
ApplicationListener<ContextStoppedEvent>, DisposableBean {
Expand Down Expand Up @@ -129,6 +131,8 @@ public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationCo

private volatile MicrometerHolder micrometerHolder;

private ProducerInterceptor<K, V> producerInterceptor;

/**
* Create an instance using the supplied producer factory and autoFlush false.
* @param producerFactory the producer factory.
Expand Down Expand Up @@ -370,6 +374,15 @@ public void setConsumerFactory(ConsumerFactory<K, V> consumerFactory) {
this.consumerFactory = consumerFactory;
}

/**
* Set a producer interceptor on this template.
* @param producerInterceptor the producer interceptor
* @since 3.0
*/
public void setProducerInterceptor(ProducerInterceptor<K, V> producerInterceptor) {
this.producerInterceptor = producerInterceptor;
}

@Override
public void onApplicationEvent(ContextStoppedEvent event) {
if (this.customProducerFactory) {
Expand Down Expand Up @@ -631,6 +644,9 @@ protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> p
if (this.micrometerHolder != null) {
sample = this.micrometerHolder.start();
}
if (this.producerInterceptor != null) {
this.producerInterceptor.onSend(producerRecord);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we don't need to catch an exception here; a user might want to use this to cancel a send for some reason.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like Kafka's ProducerInterceptors simply log the exception from individual interceptors and continue. See here. The one set on the KafkaTemplate can be used to cancel if need be. I was thinking maybe with a CompositeProducerInterceptor we can follow the same approach used in Kafka's ProducerInterceptors.

}
Future<RecordMetadata> sendFuture =
producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample));
// May be an immediate failure
Expand All @@ -657,6 +673,14 @@ private Callback buildCallback(final ProducerRecord<K, V> producerRecord, final
final SettableListenableFuture<SendResult<K, V>> future, @Nullable Object sample) {

return (metadata, exception) -> {
try {
if (this.producerInterceptor != null) {
this.producerInterceptor.onAcknowledgement(metadata, exception);
}
}
catch (Exception e) {
KafkaTemplate.this.logger.warn(e, () -> "Error executing interceptor onAcknowledgement callback");
}
try {
if (exception == null) {
if (sample != null) {
Expand Down Expand Up @@ -763,6 +787,9 @@ public void destroy() {
if (this.customProducerFactory) {
((DefaultKafkaProducerFactory<K, V>) this.producerFactory).destroy();
}
if (this.producerInterceptor != null) {
this.producerInterceptor.close();
}
}

@SuppressWarnings("serial")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2022-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.support;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import org.springframework.core.log.LogAccessor;
import org.springframework.util.Assert;

/**
* A {@link ProducerInterceptor} that delegates to a collection of interceptors.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Soby Chacko
*
* @since 3.0
*
*/
public class CompositeProducerInterceptor<K, V> implements ProducerInterceptor<K, V>, Closeable {

private final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass())); //NOSONAR

private final List<ProducerInterceptor<K, V>> delegates = new ArrayList<>();

/**
* Construct an instance with the provided delegates to {@link ProducerInterceptor}s.
* @param delegates the delegates.
*/
@SafeVarargs
@SuppressWarnings("varargs")
public CompositeProducerInterceptor(ProducerInterceptor<K, V>... delegates) {
Assert.notNull(delegates, "'delegates' cannot be null");
Assert.noNullElements(delegates, "'delegates' cannot have null entries");
this.delegates.addAll(Arrays.asList(delegates));
}

@Override
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
ProducerRecord<K, V> interceptRecord = record;
for (ProducerInterceptor<K, V> interceptor : this.delegates) {
try {
interceptRecord = interceptor.onSend(interceptRecord);
}
catch (Exception e) {
// if exception thrown, log and continue calling other interceptors.
if (record != null) {
CompositeProducerInterceptor.this.logger.warn(e, () ->
String.format("Error executing interceptor onSend callback for topic: %s, partition: %d",
record.topic(), record.partition()));
}
else {
CompositeProducerInterceptor.this.logger.warn(e, () -> "Error executing interceptor onSend callback: "
+ interceptor.toString());
}
}
}
return interceptRecord;
}

@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
for (ProducerInterceptor<K, V> interceptor : this.delegates) {
try {
interceptor.onAcknowledgement(metadata, exception);
}
catch (Exception e) {
// do not propagate interceptor exceptions, just log
CompositeProducerInterceptor.this.logger.warn(e, () -> "Error executing interceptor onAcknowledgement callback: "
+ interceptor.toString());
}
}
}

@Override
public void close() {
for (ProducerInterceptor<K, V> interceptor : this.delegates) {
try {
interceptor.close();
}
catch (Exception e) {
CompositeProducerInterceptor.this.logger.warn(e, () -> "Failed to close producer interceptor: "
+ interceptor.toString());
}
}
}

@Override
public void configure(Map<String, ?> configs) {
this.delegates.forEach(delegate -> delegate.configure(configs));
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,7 +22,12 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.springframework.kafka.test.assertj.KafkaConditions.key;
import static org.springframework.kafka.test.assertj.KafkaConditions.keyValue;
import static org.springframework.kafka.test.assertj.KafkaConditions.partition;
Expand Down Expand Up @@ -50,6 +55,7 @@
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
Expand All @@ -65,10 +71,13 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;

import org.springframework.aop.framework.ProxyFactory;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.CompositeProducerInterceptor;
import org.springframework.kafka.support.CompositeProducerListener;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.KafkaHeaders;
Expand All @@ -94,6 +103,7 @@
* @author Biju Kunjummen
* @author Endika Gutierrez
* @author Thomas Strauß
* @author Soby Chacko
*/
@EmbeddedKafka(topics = { KafkaTemplateTests.INT_KEY_TOPIC, KafkaTemplateTests.STRING_KEY_TOPIC })
public class KafkaTemplateTests {
Expand Down Expand Up @@ -554,4 +564,63 @@ void testFutureFailureOnSend() {
pf.destroy();
}

@SuppressWarnings("unchecked")
@Test
void testProducerInterceptorManagedOnKafkaTemplate() {

Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
ProducerInterceptor<Integer, String> producerInterceptor = Mockito.mock(ProducerInterceptor.class);
template.setProducerInterceptor(producerInterceptor);

template.setDefaultTopic("prod-interceptor-test-1");
template.sendDefault("foo");

verify(producerInterceptor, times(1)).onSend(any(ProducerRecord.class));
verify(producerInterceptor, times(1)).onAcknowledgement(any(RecordMetadata.class), Mockito.isNull());
}

@SuppressWarnings("unchecked")
@Test
void testProducerInterceptorNotSetOnKafkaTemplateNotInvoked() {

Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
ProducerInterceptor<Integer, String> producerInterceptor = Mockito.mock(ProducerInterceptor.class);

template.setDefaultTopic("prod-interceptor-test-2");
template.sendDefault("foo");

verifyNoInteractions(producerInterceptor);
}

@SuppressWarnings("unchecked")
@Test
void testCompositeProducerInterceptor() {

Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
ProducerInterceptor<Integer, String> producerInterceptor1 = Mockito.mock(ProducerInterceptor.class);
ProducerInterceptor<Integer, String> producerInterceptor2 = Mockito.mock(ProducerInterceptor.class);
CompositeProducerInterceptor<Integer, String> compositeProducerInterceptor =
new CompositeProducerInterceptor<>(producerInterceptor1, producerInterceptor2);
template.setProducerInterceptor(compositeProducerInterceptor);

ProducerRecord<Integer, String> mockProducerRecord = Mockito.mock(ProducerRecord.class);
doReturn(mockProducerRecord).when(producerInterceptor1).onSend(any(ProducerRecord.class));

template.setDefaultTopic("prod-interceptor-test-3");
template.sendDefault("foo");

InOrder inOrder = inOrder(producerInterceptor1, producerInterceptor2);

inOrder.verify(producerInterceptor1).onSend(any(ProducerRecord.class));
inOrder.verify(producerInterceptor2).onSend(any(ProducerRecord.class));
inOrder.verify(producerInterceptor1).onAcknowledgement(any(RecordMetadata.class), Mockito.isNull());
inOrder.verify(producerInterceptor2).onAcknowledgement(any(RecordMetadata.class), Mockito.isNull());
}

}