Skip to content

Commit ffa5ff3

Browse files
author
Karol Dowbecki
committed
spring-projectsGH-1704: Broader Batch/RecordInterceptor
1 parent b8ce3f0 commit ffa5ff3

File tree

6 files changed

+233
-9
lines changed

6 files changed

+233
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/BatchInterceptor.java

+17
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,21 @@ default void success(ConsumerRecords<K, V> records, Consumer<K, V> consumer) {
6161
default void failure(ConsumerRecords<K, V> records, Exception exception, Consumer<K, V> consumer) {
6262
}
6363

64+
/**
65+
* Called before consumer is polled.
66+
* @param consumer the consumer.
67+
* @since 2.8
68+
*/
69+
default void beforePoll(Consumer<K, V> consumer) {
70+
}
71+
72+
/**
73+
* Called after listener and error handler were invoked. Last action before the
74+
* next consumer polling.
75+
* @param consumer the consumer.
76+
* @since 2.8
77+
*/
78+
default void finishInvoke(Consumer<K, V> consumer) {
79+
}
80+
6481
}

spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeBatchInterceptor.java

+10
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,14 @@ public void failure(ConsumerRecords<K, V> records, Exception exception, Consumer
7474
this.delegates.forEach(del -> del.failure(records, exception, consumer));
7575
}
7676

77+
@Override
78+
public void beforePoll(Consumer<K, V> consumer) {
79+
this.delegates.forEach(del -> del.beforePoll(consumer));
80+
}
81+
82+
@Override
83+
public void finishInvoke(Consumer<K, V> consumer) {
84+
this.delegates.forEach(del -> del.finishInvoke(consumer));
85+
}
86+
7787
}

spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeRecordInterceptor.java

+9
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,13 @@ public void failure(ConsumerRecord<K, V> record, Exception exception, Consumer<K
7777
this.delegates.forEach(del -> del.failure(record, exception, consumer));
7878
}
7979

80+
@Override
81+
public void beforePoll(Consumer<K, V> consumer) {
82+
this.delegates.forEach(del -> del.beforePoll(consumer));
83+
}
84+
85+
@Override
86+
public void finishInvoke(Consumer<K, V> consumer) {
87+
this.delegates.forEach(del -> del.finishInvoke(consumer));
88+
}
8089
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+65
Original file line numberDiff line numberDiff line change
@@ -1291,6 +1291,7 @@ protected void pollAndInvoke() {
12911291
}
12921292

12931293
invokeIfHaveRecords(records);
1294+
finishInvoke();
12941295
}
12951296

12961297
private void invokeIfHaveRecords(@Nullable ConsumerRecords<K, V> records) {
@@ -1310,6 +1311,37 @@ private void invokeIfHaveRecords(@Nullable ConsumerRecords<K, V> records) {
13101311
}
13111312
}
13121313

1314+
private void finishInvoke() {
1315+
if (this.isBatchListener) {
1316+
batchInterceptFinishInvoke();
1317+
}
1318+
else {
1319+
recordInterceptFinishInvoke();
1320+
}
1321+
}
1322+
1323+
private void batchInterceptFinishInvoke() {
1324+
if (this.commonBatchInterceptor != null) {
1325+
try {
1326+
this.commonBatchInterceptor.finishInvoke(this.consumer);
1327+
}
1328+
catch (Exception e) {
1329+
this.logger.error(e, "BatchInterceptor threw an exception");
1330+
}
1331+
}
1332+
}
1333+
1334+
private void recordInterceptFinishInvoke() {
1335+
if (this.commonRecordInterceptor != null) {
1336+
try {
1337+
this.commonRecordInterceptor.finishInvoke(this.consumer);
1338+
}
1339+
catch (Exception e) {
1340+
this.logger.error(e, "RecordInterceptor threw an exception");
1341+
}
1342+
}
1343+
}
1344+
13131345
private void checkIdlePartitions() {
13141346
Set<TopicPartition> partitions = this.consumer.assignment();
13151347
partitions.forEach(this::checkIdlePartition);
@@ -1422,6 +1454,7 @@ private ConsumerRecords<K, V> doPoll() {
14221454
ConsumerRecords<K, V> records;
14231455
if (this.isBatchListener && this.subBatchPerPartition) {
14241456
if (this.batchIterator == null) {
1457+
batchInterceptBeforePoll();
14251458
this.lastBatch = pollConsumer();
14261459
captureOffsets(this.lastBatch);
14271460
if (this.lastBatch.count() == 0) {
@@ -1439,6 +1472,7 @@ private ConsumerRecords<K, V> doPoll() {
14391472
}
14401473
}
14411474
else {
1475+
beforePoll();
14421476
records = pollConsumer();
14431477
captureOffsets(records);
14441478
checkRebalanceCommits();
@@ -1455,6 +1489,37 @@ private ConsumerRecords<K, V> pollConsumer() {
14551489
}
14561490
}
14571491

1492+
private void beforePoll() {
1493+
if (this.isBatchListener) {
1494+
batchInterceptBeforePoll();
1495+
}
1496+
else {
1497+
recordInterceptBeforePoll();
1498+
}
1499+
}
1500+
1501+
private void batchInterceptBeforePoll() {
1502+
if (this.commonBatchInterceptor != null) {
1503+
try {
1504+
this.commonBatchInterceptor.beforePoll(this.consumer);
1505+
}
1506+
catch (Exception e) {
1507+
this.logger.error(e, "BatchInterceptor threw an exception");
1508+
}
1509+
}
1510+
}
1511+
1512+
private void recordInterceptBeforePoll() {
1513+
if (this.commonRecordInterceptor != null) {
1514+
try {
1515+
this.commonRecordInterceptor.beforePoll(this.consumer);
1516+
}
1517+
catch (Exception e) {
1518+
this.logger.error(e, "RecordInterceptor threw an exception");
1519+
}
1520+
}
1521+
}
1522+
14581523
private synchronized void captureOffsets(ConsumerRecords<K, V> records) {
14591524
if (this.offsetsInThisBatch != null && records.count() > 0) {
14601525
this.offsetsInThisBatch.clear();

spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInterceptor.java

+17
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,21 @@ default void success(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
8181
default void failure(ConsumerRecord<K, V> record, Exception exception, Consumer<K, V> consumer) {
8282
}
8383

84+
/**
85+
* Called before consumer is polled.
86+
* @param consumer the consumer.
87+
* @since 2.8
88+
*/
89+
default void beforePoll(Consumer<K, V> consumer) {
90+
}
91+
92+
/**
93+
* Called after listener and error handler were invoked. Last action before the
94+
* next consumer polling.
95+
* @param consumer the consumer.
96+
* @since 2.8
97+
*/
98+
default void finishInvoke(Consumer<K, V> consumer) {
99+
}
100+
84101
}

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

+115-9
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898
import org.springframework.kafka.event.ListenerContainerIdleEvent;
9999
import org.springframework.kafka.event.ListenerContainerNoLongerIdleEvent;
100100
import org.springframework.kafka.listener.AbstractConsumerSeekAware;
101+
import org.springframework.kafka.listener.BatchInterceptor;
101102
import org.springframework.kafka.listener.CommonLoggingErrorHandler;
102103
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
103104
import org.springframework.kafka.listener.ConsumerAwareErrorHandler;
@@ -111,6 +112,7 @@
111112
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
112113
import org.springframework.kafka.listener.ListenerExecutionFailedException;
113114
import org.springframework.kafka.listener.MessageListenerContainer;
115+
import org.springframework.kafka.listener.RecordInterceptor;
114116
import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter;
115117
import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter;
116118
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
@@ -180,7 +182,8 @@
180182
"annotated25", "annotated25reply1", "annotated25reply2", "annotated26", "annotated27", "annotated28",
181183
"annotated29", "annotated30", "annotated30reply", "annotated31", "annotated32", "annotated33",
182184
"annotated34", "annotated35", "annotated36", "annotated37", "foo", "manualStart", "seekOnIdle",
183-
"annotated38", "annotated38reply", "annotated39", "annotated40", "annotated41" })
185+
"annotated38", "annotated38reply", "annotated39", "annotated40", "annotated41", "annotated42",
186+
"annotated43" })
184187
@TestPropertySource(properties = "spel.props=fetch.min.bytes=420000,max.poll.records=10")
185188
public class EnableKafkaIntegrationTests {
186189

@@ -600,7 +603,23 @@ public void testBatchRecords() throws Exception {
600603
assertThat(this.config.listen12Message.getPayload()).isInstanceOf(List.class);
601604
List<?> errorPayload = (List<?>) this.config.listen12Message.getPayload();
602605
assertThat(errorPayload.size()).isGreaterThanOrEqualTo(1);
606+
607+
}
608+
609+
@Test
610+
public void testBatchInterceptor() throws Exception {
611+
template.send("annotated43", null, "foo");
612+
template.send("annotated43", null, "bar");
613+
assertThat(this.listener.latch43.await(30, TimeUnit.SECONDS)).isTrue();
614+
assertThat(this.listener.payload).isInstanceOf(List.class);
615+
List<?> list = (List<?>) this.listener.payload;
616+
assertThat(list.size()).isGreaterThan(0);
617+
assertThat(list.get(0)).isInstanceOf(ConsumerRecord.class);
618+
assertThat(this.config.batchBeforePoll).isTrue();
603619
assertThat(this.config.batchIntercepted).isTrue();
620+
assertThat(this.config.batchSuccess).isTrue();
621+
assertThat(this.config.batchFailure).isFalse();
622+
assertThat(this.config.batchFinishInvoke).isTrue();
604623
}
605624

606625
@Test
@@ -865,7 +884,6 @@ public void testKeyConversion() throws Exception {
865884
this.bytesKeyTemplate.send("annotated36", "foo".getBytes(), "bar");
866885
assertThat(this.listener.keyLatch.await(30, TimeUnit.SECONDS)).isTrue();
867886
assertThat(this.listener.convertedKey).isEqualTo("foo");
868-
assertThat(this.config.intercepted).isTrue();
869887
try {
870888
assertThat(this.meterRegistry.get("kafka.consumer.coordinator.join.total")
871889
.tag("consumerTag", "bytesString")
@@ -893,6 +911,17 @@ public void testKeyConversion() throws Exception {
893911
.isInstanceOf(CommonLoggingErrorHandler.class);
894912
}
895913

914+
@Test
915+
public void testRecordInterceptor() throws Exception {
916+
this.bytesKeyTemplate.send("annotated42", "fooRecordInterceptor".getBytes(), "barRecordInterceptor");
917+
assertThat(this.listener.latch42.await(30, TimeUnit.SECONDS)).isTrue();
918+
assertThat(this.config.recordBeforePoll).isTrue();
919+
assertThat(this.config.recordIntercepted).isTrue();
920+
assertThat(this.config.recordSuccess).isTrue();
921+
assertThat(this.config.recordFailure).isFalse();
922+
assertThat(this.config.recordFinishInvoke).isTrue();
923+
}
924+
896925
@Test
897926
public void testProjection() throws InterruptedException {
898927
template.send("annotated37", 0, "{ \"username\" : \"SomeUsername\", \"user\" : { \"name\" : \"SomeName\"}}");
@@ -984,10 +1013,26 @@ public static class Config implements KafkaListenerConfigurer {
9841013

9851014
volatile Throwable globalErrorThrowable;
9861015

987-
volatile boolean intercepted;
1016+
volatile boolean recordBeforePoll;
1017+
1018+
volatile boolean recordIntercepted;
1019+
1020+
volatile boolean recordSuccess;
1021+
1022+
volatile boolean recordFailure;
1023+
1024+
volatile boolean recordFinishInvoke;
1025+
1026+
volatile boolean batchBeforePoll;
9881027

9891028
volatile boolean batchIntercepted;
9901029

1030+
volatile boolean batchSuccess;
1031+
1032+
volatile boolean batchFailure;
1033+
1034+
volatile boolean batchFinishInvoke;
1035+
9911036
@Autowired
9921037
private EmbeddedKafkaBroker embeddedKafka;
9931038

@@ -1132,9 +1177,32 @@ public KafkaListenerContainerFactory<?> bytesStringListenerContainerFactory() {
11321177
ConcurrentKafkaListenerContainerFactory<byte[], String> factory =
11331178
new ConcurrentKafkaListenerContainerFactory<>();
11341179
factory.setConsumerFactory(bytesStringConsumerFactory());
1135-
factory.setRecordInterceptor(record -> {
1136-
this.intercepted = true;
1137-
return record;
1180+
factory.setRecordInterceptor(new RecordInterceptor<>() {
1181+
@Override
1182+
public ConsumerRecord<byte[], String> intercept(ConsumerRecord<byte[], String> record) {
1183+
Config.this.recordIntercepted = true;
1184+
return record;
1185+
}
1186+
1187+
@Override
1188+
public void success(ConsumerRecord<byte[], String> record, Consumer<byte[], String> consumer) {
1189+
Config.this.recordSuccess = true;
1190+
}
1191+
1192+
@Override
1193+
public void failure(ConsumerRecord<byte[], String> record, Exception exception, Consumer<byte[], String> consumer) {
1194+
Config.this.recordFailure = true;
1195+
}
1196+
1197+
@Override
1198+
public void beforePoll(Consumer<byte[], String> consumer) {
1199+
Config.this.recordBeforePoll = true;
1200+
}
1201+
1202+
@Override
1203+
public void finishInvoke(Consumer<byte[], String> consumer) {
1204+
Config.this.recordFinishInvoke = true;
1205+
}
11381206
});
11391207
factory.setCommonErrorHandler(new CommonLoggingErrorHandler());
11401208
return factory;
@@ -1149,9 +1217,32 @@ public KafkaListenerContainerFactory<?> batchFactory() {
11491217
factory.setRecordFilterStrategy(recordFilter());
11501218
// always send to the same partition so the replies are in order for the test
11511219
factory.setReplyTemplate(partitionZeroReplyTemplate());
1152-
factory.setBatchInterceptor((records, consumer) -> {
1153-
this.batchIntercepted = true;
1154-
return records;
1220+
factory.setBatchInterceptor(new BatchInterceptor<>() {
1221+
@Override
1222+
public ConsumerRecords<Integer, String> intercept(ConsumerRecords<Integer, String> records, Consumer<Integer, String> consumer) {
1223+
Config.this.batchIntercepted = true;
1224+
return records;
1225+
}
1226+
1227+
@Override
1228+
public void success(ConsumerRecords<Integer, String> records, Consumer<Integer, String> consumer) {
1229+
Config.this.batchSuccess = true;
1230+
}
1231+
1232+
@Override
1233+
public void failure(ConsumerRecords<Integer, String> records, Exception exception, Consumer<Integer, String> consumer) {
1234+
Config.this.batchFailure = true;
1235+
}
1236+
1237+
@Override
1238+
public void beforePoll(Consumer<Integer, String> consumer) {
1239+
Config.this.batchBeforePoll = true;
1240+
}
1241+
1242+
@Override
1243+
public void finishInvoke(Consumer<Integer, String> consumer) {
1244+
Config.this.batchFinishInvoke = true;
1245+
}
11551246
});
11561247
return factory;
11571248
}
@@ -1734,6 +1825,10 @@ static class Listener implements ConsumerSeekAware {
17341825

17351826
final CountDownLatch latch21 = new CountDownLatch(1);
17361827

1828+
final CountDownLatch latch42 = new CountDownLatch(1);
1829+
1830+
final CountDownLatch latch43 = new CountDownLatch(1);
1831+
17371832
final CountDownLatch validationLatch = new CountDownLatch(1);
17381833

17391834
final CountDownLatch eventLatch = new CountDownLatch(1);
@@ -2090,6 +2185,17 @@ public void bytesKey(String in, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Strin
20902185
this.keyLatch.countDown();
20912186
}
20922187

2188+
@KafkaListener(topics = "annotated42", containerFactory = "bytesStringListenerContainerFactory")
2189+
public void recordInterceptorListener(String in) {
2190+
this.latch42.countDown();
2191+
}
2192+
2193+
@KafkaListener(topics = "annotated43", containerFactory = "batchFactory")
2194+
public void batchInterceptorListener(List<ConsumerRecord<?, ?>> records) {
2195+
this.payload = records;
2196+
this.latch43.countDown();
2197+
}
2198+
20932199
@KafkaListener(topics = "annotated29")
20942200
public void anonymousListener(String in) {
20952201
}

0 commit comments

Comments
 (0)