Skip to content

Commit 38ed222

Browse files
authored
GH-2049: Spring Managed Producer Interceptors (#2112)
* GH-2049: Spring Managed Producer Interceptors Allow producer interceptors to be managed by Spring so that they can be used in KafkaTemplate instead of providing the classname of the interceptor to Kafka configuraiton. Adding tests and docs. Resolves #2049 * Addressing PR review comments * Fixing typo in the docs * Addressing PR review comments. Adding a CompositeProducerInterceptor to allow multiple producer interceptors on KafkaTemplate. * Addressing PR review
1 parent c27de92 commit 38ed222

File tree

5 files changed

+280
-3
lines changed

5 files changed

+280
-3
lines changed

Diff for: spring-kafka-docs/src/main/asciidoc/index.adoc

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
:numbered:
66
:icons: font
77
:hide-uri-scheme:
8-
Gary Russell; Artem Bilan; Biju Kunjummen; Jay Bryant
8+
Gary Russell; Artem Bilan; Biju Kunjummen; Jay Bryant; Soby Chacko
99

1010
ifdef::backend-html5[]
1111
*{project-version}*
@@ -17,7 +17,7 @@ ifdef::backend-pdf[]
1717
NOTE: This documentation is also available as https://docs.spring.io/spring-kafka/docs/{project-version}/reference/html/index.html[HTML].
1818
endif::[]
1919

20-
(C) 2016 - 2021 VMware, Inc.
20+
(C) 2016 - 2022 VMware, Inc.
2121

2222
Copies of this document may be made for your own use and for distribution to others, provided that you do not charge any fee for such copies and further provided that each copy contains this Copyright Notice, whether distributed in print or electronically.
2323

Diff for: spring-kafka-docs/src/main/asciidoc/kafka.adoc

+65
Original file line numberDiff line numberDiff line change
@@ -3734,6 +3734,71 @@ Received test
37343734
----
37353735
====
37363736

3737+
==== Producer Interceptor Managed in Spring
3738+
3739+
Starting with version 3.0.0, when it comes to a producer interceptor, you can let Spring manage it directly as a bean instead of providing the class name of the interceptor to the Apache Kafka producer configuration.
3740+
If you go with this approach, then you need to set this producer interceptor on `KafkaTemplate`.
3741+
Following is an example using the same `MyProducerInterceptor` from above, but changed to not use the internal config property.
3742+
3743+
====
3744+
[source]
3745+
----
3746+
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {
3747+
3748+
private final SomeBean bean;
3749+
3750+
public MyProducerInterceptor(SomeBean bean) {
3751+
this.bean = bean;
3752+
}
3753+
3754+
@Override
3755+
public void configure(Map<String, ?> configs) {
3756+
3757+
}
3758+
3759+
@Override
3760+
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
3761+
this.bean.someMethod("producer interceptor");
3762+
return record;
3763+
}
3764+
3765+
@Override
3766+
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
3767+
}
3768+
3769+
@Override
3770+
public void close() {
3771+
}
3772+
3773+
}
3774+
----
3775+
====
3776+
3777+
====
3778+
[source]
3779+
----
3780+
3781+
@Bean
3782+
public MyProducerInterceptor myProducerInterceptor(SomeBean someBean) {
3783+
return new MyProducerInterceptor(someBean);
3784+
}
3785+
3786+
@Bean
3787+
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf, MyProducerInterceptor myProducerInterceptor) {
3788+
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<String, String>(pf);
3789+
kafkaTemplate.setProducerInterceptor(myProducerInterceptor);
3790+
}
3791+
----
3792+
====
3793+
3794+
Right before the records are sent, the `onSend` method of the producer interceptor is invoked.
3795+
Once the server sends an acknowledgement on publishing the data, then the `onAcknowledgement` method is invoked.
3796+
The `onAcknowledgement` is called right before the producer invokes any user callbacks.
3797+
3798+
If you have multiple such producer interceptors managed through Spring that need to be applied on the `KafkaTemplate`, you need to use `CompositeProducerInterceptor` instead.
3799+
`CompositeProducerInterceptor` allows individual producer interceptors to be added in order.
3800+
The methods from the underlying `ProducerInterceptor` implementations are invoked in the order as they were added to the `CompositeProducerInterceptor`.
3801+
37373802
[[pause-resume]]
37383803
==== Pausing and Resuming Listener Containers
37393804

Diff for: spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

+27
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
3838
import org.apache.kafka.clients.producer.Callback;
3939
import org.apache.kafka.clients.producer.Producer;
40+
import org.apache.kafka.clients.producer.ProducerInterceptor;
4041
import org.apache.kafka.clients.producer.ProducerRecord;
4142
import org.apache.kafka.clients.producer.RecordMetadata;
4243
import org.apache.kafka.common.Metric;
@@ -87,6 +88,7 @@
8788
* @author Biju Kunjummen
8889
* @author Endika Gutierrez
8990
* @author Thomas Strauß
91+
* @author Soby Chacko
9092
*/
9193
public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationContextAware, BeanNameAware,
9294
ApplicationListener<ContextStoppedEvent>, DisposableBean {
@@ -129,6 +131,8 @@ public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationCo
129131

130132
private volatile MicrometerHolder micrometerHolder;
131133

134+
private ProducerInterceptor<K, V> producerInterceptor;
135+
132136
/**
133137
* Create an instance using the supplied producer factory and autoFlush false.
134138
* @param producerFactory the producer factory.
@@ -370,6 +374,15 @@ public void setConsumerFactory(ConsumerFactory<K, V> consumerFactory) {
370374
this.consumerFactory = consumerFactory;
371375
}
372376

377+
/**
378+
* Set a producer interceptor on this template.
379+
* @param producerInterceptor the producer interceptor
380+
* @since 3.0
381+
*/
382+
public void setProducerInterceptor(ProducerInterceptor<K, V> producerInterceptor) {
383+
this.producerInterceptor = producerInterceptor;
384+
}
385+
373386
@Override
374387
public void onApplicationEvent(ContextStoppedEvent event) {
375388
if (this.customProducerFactory) {
@@ -631,6 +644,9 @@ protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> p
631644
if (this.micrometerHolder != null) {
632645
sample = this.micrometerHolder.start();
633646
}
647+
if (this.producerInterceptor != null) {
648+
this.producerInterceptor.onSend(producerRecord);
649+
}
634650
Future<RecordMetadata> sendFuture =
635651
producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample));
636652
// May be an immediate failure
@@ -657,6 +673,14 @@ private Callback buildCallback(final ProducerRecord<K, V> producerRecord, final
657673
final SettableListenableFuture<SendResult<K, V>> future, @Nullable Object sample) {
658674

659675
return (metadata, exception) -> {
676+
try {
677+
if (this.producerInterceptor != null) {
678+
this.producerInterceptor.onAcknowledgement(metadata, exception);
679+
}
680+
}
681+
catch (Exception e) {
682+
KafkaTemplate.this.logger.warn(e, () -> "Error executing interceptor onAcknowledgement callback");
683+
}
660684
try {
661685
if (exception == null) {
662686
if (sample != null) {
@@ -763,6 +787,9 @@ public void destroy() {
763787
if (this.customProducerFactory) {
764788
((DefaultKafkaProducerFactory<K, V>) this.producerFactory).destroy();
765789
}
790+
if (this.producerInterceptor != null) {
791+
this.producerInterceptor.close();
792+
}
766793
}
767794

768795
@SuppressWarnings("serial")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Copyright 2022-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support;
18+
19+
import java.io.Closeable;
20+
import java.util.ArrayList;
21+
import java.util.Arrays;
22+
import java.util.List;
23+
import java.util.Map;
24+
25+
import org.apache.commons.logging.LogFactory;
26+
import org.apache.kafka.clients.producer.ProducerInterceptor;
27+
import org.apache.kafka.clients.producer.ProducerRecord;
28+
import org.apache.kafka.clients.producer.RecordMetadata;
29+
30+
import org.springframework.core.log.LogAccessor;
31+
import org.springframework.util.Assert;
32+
33+
/**
34+
* A {@link ProducerInterceptor} that delegates to a collection of interceptors.
35+
*
36+
* @param <K> the key type.
37+
* @param <V> the value type.
38+
*
39+
* @author Soby Chacko
40+
*
41+
* @since 3.0
42+
*
43+
*/
44+
public class CompositeProducerInterceptor<K, V> implements ProducerInterceptor<K, V>, Closeable {
45+
46+
private final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass())); //NOSONAR
47+
48+
private final List<ProducerInterceptor<K, V>> delegates = new ArrayList<>();
49+
50+
/**
51+
* Construct an instance with the provided delegates to {@link ProducerInterceptor}s.
52+
* @param delegates the delegates.
53+
*/
54+
@SafeVarargs
55+
@SuppressWarnings("varargs")
56+
public CompositeProducerInterceptor(ProducerInterceptor<K, V>... delegates) {
57+
Assert.notNull(delegates, "'delegates' cannot be null");
58+
Assert.noNullElements(delegates, "'delegates' cannot have null entries");
59+
this.delegates.addAll(Arrays.asList(delegates));
60+
}
61+
62+
@Override
63+
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
64+
ProducerRecord<K, V> interceptRecord = record;
65+
for (ProducerInterceptor<K, V> interceptor : this.delegates) {
66+
try {
67+
interceptRecord = interceptor.onSend(interceptRecord);
68+
}
69+
catch (Exception e) {
70+
// if exception thrown, log and continue calling other interceptors.
71+
if (record != null) {
72+
CompositeProducerInterceptor.this.logger.warn(e, () ->
73+
String.format("Error executing interceptor onSend callback for topic: %s, partition: %d",
74+
record.topic(), record.partition()));
75+
}
76+
else {
77+
CompositeProducerInterceptor.this.logger.warn(e, () -> "Error executing interceptor onSend callback: "
78+
+ interceptor.toString());
79+
}
80+
}
81+
}
82+
return interceptRecord;
83+
}
84+
85+
@Override
86+
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
87+
for (ProducerInterceptor<K, V> interceptor : this.delegates) {
88+
try {
89+
interceptor.onAcknowledgement(metadata, exception);
90+
}
91+
catch (Exception e) {
92+
// do not propagate interceptor exceptions, just log
93+
CompositeProducerInterceptor.this.logger.warn(e, () -> "Error executing interceptor onAcknowledgement callback: "
94+
+ interceptor.toString());
95+
}
96+
}
97+
}
98+
99+
@Override
100+
public void close() {
101+
for (ProducerInterceptor<K, V> interceptor : this.delegates) {
102+
try {
103+
interceptor.close();
104+
}
105+
catch (Exception e) {
106+
CompositeProducerInterceptor.this.logger.warn(e, () -> "Failed to close producer interceptor: "
107+
+ interceptor.toString());
108+
}
109+
}
110+
}
111+
112+
@Override
113+
public void configure(Map<String, ?> configs) {
114+
this.delegates.forEach(delegate -> delegate.configure(configs));
115+
}
116+
}

Diff for: spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java

+70-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,7 +22,12 @@
2222
import static org.mockito.ArgumentMatchers.any;
2323
import static org.mockito.BDDMockito.given;
2424
import static org.mockito.BDDMockito.willAnswer;
25+
import static org.mockito.Mockito.doReturn;
26+
import static org.mockito.Mockito.inOrder;
2527
import static org.mockito.Mockito.mock;
28+
import static org.mockito.Mockito.times;
29+
import static org.mockito.Mockito.verify;
30+
import static org.mockito.Mockito.verifyNoInteractions;
2631
import static org.springframework.kafka.test.assertj.KafkaConditions.key;
2732
import static org.springframework.kafka.test.assertj.KafkaConditions.keyValue;
2833
import static org.springframework.kafka.test.assertj.KafkaConditions.partition;
@@ -50,6 +55,7 @@
5055
import org.apache.kafka.clients.producer.Callback;
5156
import org.apache.kafka.clients.producer.Producer;
5257
import org.apache.kafka.clients.producer.ProducerConfig;
58+
import org.apache.kafka.clients.producer.ProducerInterceptor;
5359
import org.apache.kafka.clients.producer.ProducerRecord;
5460
import org.apache.kafka.clients.producer.RecordMetadata;
5561
import org.apache.kafka.common.Metric;
@@ -65,10 +71,13 @@
6571
import org.junit.jupiter.api.AfterAll;
6672
import org.junit.jupiter.api.BeforeAll;
6773
import org.junit.jupiter.api.Test;
74+
import org.mockito.InOrder;
75+
import org.mockito.Mockito;
6876

6977
import org.springframework.aop.framework.ProxyFactory;
7078
import org.springframework.kafka.KafkaException;
7179
import org.springframework.kafka.support.Acknowledgment;
80+
import org.springframework.kafka.support.CompositeProducerInterceptor;
7281
import org.springframework.kafka.support.CompositeProducerListener;
7382
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
7483
import org.springframework.kafka.support.KafkaHeaders;
@@ -94,6 +103,7 @@
94103
* @author Biju Kunjummen
95104
* @author Endika Gutierrez
96105
* @author Thomas Strauß
106+
* @author Soby Chacko
97107
*/
98108
@EmbeddedKafka(topics = { KafkaTemplateTests.INT_KEY_TOPIC, KafkaTemplateTests.STRING_KEY_TOPIC })
99109
public class KafkaTemplateTests {
@@ -554,4 +564,63 @@ void testFutureFailureOnSend() {
554564
pf.destroy();
555565
}
556566

567+
@SuppressWarnings("unchecked")
568+
@Test
569+
void testProducerInterceptorManagedOnKafkaTemplate() {
570+
571+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
572+
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
573+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
574+
ProducerInterceptor<Integer, String> producerInterceptor = Mockito.mock(ProducerInterceptor.class);
575+
template.setProducerInterceptor(producerInterceptor);
576+
577+
template.setDefaultTopic("prod-interceptor-test-1");
578+
template.sendDefault("foo");
579+
580+
verify(producerInterceptor, times(1)).onSend(any(ProducerRecord.class));
581+
verify(producerInterceptor, times(1)).onAcknowledgement(any(RecordMetadata.class), Mockito.isNull());
582+
}
583+
584+
@SuppressWarnings("unchecked")
585+
@Test
586+
void testProducerInterceptorNotSetOnKafkaTemplateNotInvoked() {
587+
588+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
589+
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
590+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
591+
ProducerInterceptor<Integer, String> producerInterceptor = Mockito.mock(ProducerInterceptor.class);
592+
593+
template.setDefaultTopic("prod-interceptor-test-2");
594+
template.sendDefault("foo");
595+
596+
verifyNoInteractions(producerInterceptor);
597+
}
598+
599+
@SuppressWarnings("unchecked")
600+
@Test
601+
void testCompositeProducerInterceptor() {
602+
603+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
604+
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
605+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
606+
ProducerInterceptor<Integer, String> producerInterceptor1 = Mockito.mock(ProducerInterceptor.class);
607+
ProducerInterceptor<Integer, String> producerInterceptor2 = Mockito.mock(ProducerInterceptor.class);
608+
CompositeProducerInterceptor<Integer, String> compositeProducerInterceptor =
609+
new CompositeProducerInterceptor<>(producerInterceptor1, producerInterceptor2);
610+
template.setProducerInterceptor(compositeProducerInterceptor);
611+
612+
ProducerRecord<Integer, String> mockProducerRecord = Mockito.mock(ProducerRecord.class);
613+
doReturn(mockProducerRecord).when(producerInterceptor1).onSend(any(ProducerRecord.class));
614+
615+
template.setDefaultTopic("prod-interceptor-test-3");
616+
template.sendDefault("foo");
617+
618+
InOrder inOrder = inOrder(producerInterceptor1, producerInterceptor2);
619+
620+
inOrder.verify(producerInterceptor1).onSend(any(ProducerRecord.class));
621+
inOrder.verify(producerInterceptor2).onSend(any(ProducerRecord.class));
622+
inOrder.verify(producerInterceptor1).onAcknowledgement(any(RecordMetadata.class), Mockito.isNull());
623+
inOrder.verify(producerInterceptor2).onAcknowledgement(any(RecordMetadata.class), Mockito.isNull());
624+
}
625+
557626
}

0 commit comments

Comments
 (0)