Skip to content

GH-2049: Spring Managed Producer Interceptors #2112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 23, 2022
Merged

GH-2049: Spring Managed Producer Interceptors #2112

merged 5 commits into from
Feb 23, 2022

Conversation

sobychacko
Copy link
Contributor

Allow producer interceptors to be managed by Spring so that
they can be used in KafkaTemplate instead of providing the classname
of the interceptor to Kafka configuraiton.

Adding tests and docs.

Resolves #2049

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one nit-pick.

👍

/**
* Returns the producer interceptor associated with this template.
* @return {@link ProducerInterceptor}
* @since 3.0.x
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just 3.0
And Why do we need a getter?
If we do, perhaps you mean protected for extenders...

@@ -3734,6 +3734,67 @@ Received test
----
====

==== Producer Interceptor Managed in Spring

Staring with version 3.0.0, when it comes to a producer interceptor, you can let Spring manage it directly as a bean instead of providing the class name of the interceptor to the Apache Kafka producer configuration.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small typo: "Starting"

Copy link
Contributor

@garyrussell garyrussell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also need to call close() in destroy(); see Utils.closeQuietly() used by the KafkaProducer.

BTW, the KP supports multiple interceptors; I guess we could satisfy that need by implementing a composite interceptor.

@@ -631,6 +644,9 @@ protected void closeProducer(Producer<K, V> producer, boolean inTx) {
if (this.micrometerHolder != null) {
sample = this.micrometerHolder.start();
}
if (this.producerInterceptor != null) {
this.producerInterceptor.onSend(producerRecord);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we don't need to catch an exception here; a user might want to use this to cancel a send for some reason.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like Kafka's ProducerInterceptors simply log the exception from individual interceptors and continue. See here. The one set on the KafkaTemplate can be used to cancel if need be. I was thinking maybe with a CompositeProducerInterceptor we can follow the same approach used in Kafka's ProducerInterceptors.

@@ -657,6 +673,9 @@ private Callback buildCallback(final ProducerRecord<K, V> producerRecord, final
final SettableListenableFuture<SendResult<K, V>> future, @Nullable Object sample) {

return (metadata, exception) -> {
if (this.producerInterceptor != null) {
this.producerInterceptor.onAcknowledgement(metadata, exception);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to catch and log any exceptions here, then proceed; otherwise the future won't be completed.

@garyrussell
Copy link
Contributor

It is not important if this misses Tuesday's M2 release.

Copy link
Contributor

@garyrussell garyrussell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also needs rebase. Thanks.

record.topic(), record.partition()));
}
else {
CompositeProducerInterceptor.this.logger.warn(e, () -> "Error executing interceptor onSend callback");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add interceptor.toString() to these log messages?

Allow producer interceptors to be managed by Spring so that
they can be used in KafkaTemplate instead of providing the classname
of the interceptor to Kafka configuraiton.

Adding tests and docs.

Resolves #2049
Adding a CompositeProducerInterceptor to allow multiple
producer interceptors on KafkaTemplate.
@garyrussell garyrussell merged commit 38ed222 into spring-projects:main Feb 23, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Missing interceptors for producer
4 participants