Skip to content

Commit 7a1653b

Browse files
endSlygaryrussell
authored andcommitted
ProducerListener Improvements
Added methods receiving ProducerRecord to ProducerListener LoggingProducerListener implements ProducerListener directly Added test for ProducerListener with ProducerRecord Deprecate isInterestedInSuccess for ProducerListener Final Polishing
1 parent 4563e32 commit 7a1653b

File tree

5 files changed

+67
-33
lines changed

5 files changed

+67
-33
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

+4-9
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
* @author Igor Stepanov
5757
* @author Artem Bilan
5858
* @author Biju Kunjummen
59+
* @author Endika Gutiérrez
5960
*/
6061
public class KafkaTemplate<K, V> implements KafkaOperations<K, V> {
6162

@@ -352,10 +353,8 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
352353
try {
353354
if (exception == null) {
354355
future.set(new SendResult<>(producerRecord, metadata));
355-
if (KafkaTemplate.this.producerListener != null
356-
&& KafkaTemplate.this.producerListener.isInterestedInSuccess()) {
357-
KafkaTemplate.this.producerListener.onSuccess(producerRecord.topic(),
358-
producerRecord.partition(), producerRecord.key(), producerRecord.value(), metadata);
356+
if (KafkaTemplate.this.producerListener != null) {
357+
KafkaTemplate.this.producerListener.onSuccess(producerRecord, metadata);
359358
}
360359
if (KafkaTemplate.this.logger.isTraceEnabled()) {
361360
KafkaTemplate.this.logger.trace("Sent ok: " + producerRecord + ", metadata: " + metadata);
@@ -364,11 +363,7 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
364363
else {
365364
future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception));
366365
if (KafkaTemplate.this.producerListener != null) {
367-
KafkaTemplate.this.producerListener.onError(producerRecord.topic(),
368-
producerRecord.partition(),
369-
producerRecord.key(),
370-
producerRecord.value(),
371-
exception);
366+
KafkaTemplate.this.producerListener.onError(producerRecord, exception);
372367
}
373368
if (KafkaTemplate.this.logger.isDebugEnabled()) {
374369
KafkaTemplate.this.logger.debug("Failed to send: " + producerRecord, exception);

Diff for: spring-kafka/src/main/java/org/springframework/kafka/support/LoggingProducerListener.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
* @author Marius Bogoevici
3131
* @author Gary Russell
3232
*/
33-
public class LoggingProducerListener<K, V> extends ProducerListenerAdapter<K, V> {
33+
public class LoggingProducerListener<K, V> implements ProducerListener<K, V> {
3434

3535
private static final Log log = LogFactory.getLog(LoggingProducerListener.class);
3636

Diff for: spring-kafka/src/main/java/org/springframework/kafka/support/ProducerListener.java

+33-3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.kafka.support;
1818

19+
import org.apache.kafka.clients.producer.ProducerRecord;
1920
import org.apache.kafka.clients.producer.RecordMetadata;
2021

2122
/**
@@ -30,35 +31,64 @@
3031
*
3132
* @author Marius Bogoevici
3233
* @author Gary Russell
34+
* @author Endika Gutiérrez
3335
*
3436
* @see org.apache.kafka.clients.producer.Callback
3537
*/
3638
public interface ProducerListener<K, V> {
3739

3840
/**
3941
* Invoked after the successful send of a message (that is, after it has been acknowledged by the broker).
42+
* @param producerRecord the actual sent record
43+
* @param recordMetadata the result of the successful send operation
44+
*/
45+
default void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
46+
onSuccess(producerRecord.topic(), producerRecord.partition(),
47+
producerRecord.key(), producerRecord.value(), recordMetadata);
48+
}
49+
50+
/**
51+
* Invoked after the successful send of a message (that is, after it has been acknowledged by the broker).
52+
* If the method receiving the ProducerRecord is overridden, this method won't be called
4053
* @param topic the destination topic
4154
* @param partition the destination partition (could be null)
4255
* @param key the key of the outbound message
4356
* @param value the payload of the outbound message
4457
* @param recordMetadata the result of the successful send operation
4558
*/
46-
void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata);
59+
default void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata) {
60+
}
61+
62+
/**
63+
* Invoked after an attempt to send a message has failed.
64+
* @param producerRecord the failed record
65+
* @param exception the exception thrown
66+
*/
67+
default void onError(ProducerRecord<K, V> producerRecord, Exception exception) {
68+
onError(producerRecord.topic(), producerRecord.partition(),
69+
producerRecord.key(), producerRecord.value(), exception);
70+
}
4771

4872
/**
4973
* Invoked after an attempt to send a message has failed.
74+
* If the method receiving the ProducerRecord is overridden, this method won't be called
5075
* @param topic the destination topic
5176
* @param partition the destination partition (could be null)
5277
* @param key the key of the outbound message
5378
* @param value the payload of the outbound message
5479
* @param exception the exception thrown
5580
*/
56-
void onError(String topic, Integer partition, K key, V value, Exception exception);
81+
default void onError(String topic, Integer partition, K key, V value, Exception exception) {
82+
}
5783

5884
/**
5985
* Return true if this listener is interested in success as well as failure.
86+
* @deprecated the result of this method will be ignored.
6087
* @return true to express interest in successful sends.
6188
*/
62-
boolean isInterestedInSuccess();
89+
@Deprecated
90+
default boolean isInterestedInSuccess() {
91+
return false;
92+
}
6393

6494
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/support/ProducerListenerAdapter.java

+4-16
Original file line numberDiff line numberDiff line change
@@ -16,31 +16,19 @@
1616

1717
package org.springframework.kafka.support;
1818

19-
import org.apache.kafka.clients.producer.RecordMetadata;
20-
2119
/**
2220
* No-op implementation of {@link ProducerListener}, to be used as base class for other implementations.
2321
*
22+
* @deprecated as the {@link ProducerListener} has default methods and can be implemented directly without the need for this adapter
23+
*
2424
* @param <K> the key type.
2525
* @param <V> the value type.
2626
*
2727
* @author Marius Bogoevici
2828
* @author Gary Russell
2929
* @author Artem Bilan
30+
* @author Endika Gutiérrez
3031
*/
32+
@Deprecated
3133
public abstract class ProducerListenerAdapter<K, V> implements ProducerListener<K, V> {
32-
33-
@Override
34-
public void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata) {
35-
}
36-
37-
@Override
38-
public void onError(String topic, Integer partition, K key, V value, Exception exception) {
39-
}
40-
41-
@Override
42-
public boolean isInterestedInSuccess() {
43-
return false;
44-
}
45-
4634
}

Diff for: spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java

+25-4
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.kafka.clients.consumer.ConsumerConfig;
3535
import org.apache.kafka.clients.consumer.ConsumerRecord;
3636
import org.apache.kafka.clients.producer.Producer;
37+
import org.apache.kafka.clients.producer.ProducerRecord;
3738
import org.apache.kafka.clients.producer.RecordMetadata;
3839
import org.apache.kafka.common.Metric;
3940
import org.apache.kafka.common.MetricName;
@@ -50,7 +51,7 @@
5051
import org.springframework.kafka.support.Acknowledgment;
5152
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
5253
import org.springframework.kafka.support.KafkaHeaders;
53-
import org.springframework.kafka.support.ProducerListenerAdapter;
54+
import org.springframework.kafka.support.ProducerListener;
5455
import org.springframework.kafka.support.SendResult;
5556
import org.springframework.kafka.support.converter.MessagingMessageConverter;
5657
import org.springframework.kafka.test.rule.KafkaEmbedded;
@@ -65,6 +66,7 @@
6566
* @author Artem Bilan
6667
* @author Igor Stepanov
6768
* @author Biju Kunjummen
69+
* @author Endika Gutiérrez
6870
*/
6971
public class KafkaTemplateTests {
7072

@@ -241,17 +243,36 @@ public void withListener() throws Exception {
241243
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
242244
template.setDefaultTopic(INT_KEY_TOPIC);
243245
final CountDownLatch latch = new CountDownLatch(1);
244-
template.setProducerListener(new ProducerListenerAdapter<Integer, String>() {
246+
template.setProducerListener(new ProducerListener<Integer, String>() {
245247

246248
@Override
247249
public void onSuccess(String topic, Integer partition, Integer key, String value,
248250
RecordMetadata recordMetadata) {
249251
latch.countDown();
250252
}
251253

254+
});
255+
template.sendDefault("foo");
256+
template.flush();
257+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
258+
259+
//Drain the topic
260+
KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC);
261+
pf.destroy();
262+
}
263+
264+
@Test
265+
public void withProducerRecordListener() throws Exception {
266+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
267+
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
268+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
269+
template.setDefaultTopic(INT_KEY_TOPIC);
270+
final CountDownLatch latch = new CountDownLatch(1);
271+
template.setProducerListener(new ProducerListener<Integer, String>() {
272+
252273
@Override
253-
public boolean isInterestedInSuccess() {
254-
return true;
274+
public void onSuccess(ProducerRecord<Integer, String> record, RecordMetadata recordMetadata) {
275+
latch.countDown();
255276
}
256277

257278
});

0 commit comments

Comments
 (0)