Skip to content

Commit 0e0f615

Browse files
author
Karol Dowbecki
authored
GH-1704 Broader Batch/RecordInterceptor (#1912)
* GH-1704: Broader Batch/RecordInterceptor * GH-1704: More pessimistic finally * Precise logger message * Renaming methods and improving Javadoc. Still need to work on race condition in the tests because beforePoll() and afterRecordsProcessed() is called while the test is setting up data. * Fixing tests * Fixing tests * Moving beforePoll() and adding InOrder tests * Reverting integration tests * Extracting BeforeAfterPollProcessor * Renaming afterPoll to clearThreadState * Fixing compiler warnings in tests
1 parent 3fa2428 commit 0e0f615

File tree

7 files changed

+405
-4
lines changed

7 files changed

+405
-4
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/BatchInterceptor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
*
3333
*/
3434
@FunctionalInterface
35-
public interface BatchInterceptor<K, V> {
35+
public interface BatchInterceptor<K, V> extends BeforeAfterPollProcessor<K, V> {
3636

3737
/**
3838
* Perform some action on the records or return a different one. If null is returned
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright 2016-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import org.apache.kafka.clients.consumer.Consumer;
20+
21+
/**
22+
* An interceptor for consumer poll operation.
23+
*
24+
* @param <K> the key type.
25+
* @param <V> the value type.
26+
*
27+
* @author Gary Russell
28+
* @author Karol Dowbecki
29+
* @author Artem Bilan
30+
* @since 2.8
31+
*
32+
*/
33+
public interface BeforeAfterPollProcessor<K, V> {
34+
35+
/**
36+
* Called before consumer is polled.
37+
* <p>
38+
* It can be used to set up thread-bound resources which will be available for the
39+
* entire duration of the consumer poll operation e.g. logging with MDC.
40+
* </p>
41+
*
42+
* @param consumer the consumer.
43+
*/
44+
default void beforePoll(Consumer<K, V> consumer) {
45+
}
46+
47+
/**
48+
* Called after records were processed by listener and error handler.
49+
* <p>
50+
* It can be used to clear thread-bound resources which were set up in {@link #beforePoll(Consumer)}.
51+
* This is the last method called by the {@link MessageListenerContainer} before the next record
52+
* processing cycle starts.
53+
* </p>
54+
*
55+
* @param consumer the consumer.
56+
*/
57+
default void clearThreadState(Consumer<K, V> consumer) {
58+
}
59+
60+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeBatchInterceptor.java

+10
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,14 @@ public void failure(ConsumerRecords<K, V> records, Exception exception, Consumer
7474
this.delegates.forEach(del -> del.failure(records, exception, consumer));
7575
}
7676

77+
@Override
78+
public void beforePoll(Consumer<K, V> consumer) {
79+
this.delegates.forEach(del -> del.beforePoll(consumer));
80+
}
81+
82+
@Override
83+
public void clearThreadState(Consumer<K, V> consumer) {
84+
this.delegates.forEach(del -> del.clearThreadState(consumer));
85+
}
86+
7787
}

spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeRecordInterceptor.java

+9
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,13 @@ public void failure(ConsumerRecord<K, V> record, Exception exception, Consumer<K
7777
this.delegates.forEach(del -> del.failure(record, exception, consumer));
7878
}
7979

80+
@Override
81+
public void beforePoll(Consumer<K, V> consumer) {
82+
this.delegates.forEach(del -> del.beforePoll(consumer));
83+
}
84+
85+
@Override
86+
public void clearThreadState(Consumer<K, V> consumer) {
87+
this.delegates.forEach(del -> del.clearThreadState(consumer));
88+
}
8089
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+46-2
Original file line numberDiff line numberDiff line change
@@ -1236,6 +1236,9 @@ public void run() {
12361236
catch (Exception e) {
12371237
handleConsumerException(e);
12381238
}
1239+
finally {
1240+
clearThreadState();
1241+
}
12391242
}
12401243
wrapUp(exitThrowable);
12411244
}
@@ -1310,6 +1313,26 @@ private void invokeIfHaveRecords(@Nullable ConsumerRecords<K, V> records) {
13101313
}
13111314
}
13121315

1316+
private void clearThreadState() {
1317+
if (this.isBatchListener) {
1318+
interceptClearThreadState(this.commonBatchInterceptor);
1319+
}
1320+
else {
1321+
interceptClearThreadState(this.commonRecordInterceptor);
1322+
}
1323+
}
1324+
1325+
private void interceptClearThreadState(BeforeAfterPollProcessor<K, V> processor) {
1326+
if (processor != null) {
1327+
try {
1328+
processor.clearThreadState(this.consumer);
1329+
}
1330+
catch (Exception e) {
1331+
this.logger.error(e, "BeforeAfterPollProcessor.clearThreadState threw an exception");
1332+
}
1333+
}
1334+
}
1335+
13131336
private void checkIdlePartitions() {
13141337
Set<TopicPartition> partitions = this.consumer.assignment();
13151338
partitions.forEach(this::checkIdlePartition);
@@ -1447,6 +1470,7 @@ private ConsumerRecords<K, V> doPoll() {
14471470
}
14481471

14491472
private ConsumerRecords<K, V> pollConsumer() {
1473+
beforePoll();
14501474
try {
14511475
return this.consumer.poll(this.pollTimeout);
14521476
}
@@ -1455,6 +1479,26 @@ private ConsumerRecords<K, V> pollConsumer() {
14551479
}
14561480
}
14571481

1482+
private void beforePoll() {
1483+
if (this.isBatchListener) {
1484+
interceptBeforePoll(this.commonBatchInterceptor);
1485+
}
1486+
else {
1487+
interceptBeforePoll(this.commonRecordInterceptor);
1488+
}
1489+
}
1490+
1491+
private void interceptBeforePoll(BeforeAfterPollProcessor<K, V> processor) {
1492+
if (processor != null) {
1493+
try {
1494+
processor.beforePoll(this.consumer);
1495+
}
1496+
catch (Exception e) {
1497+
this.logger.error(e, "BeforeAfterPollProcessor.beforePoll threw an exception");
1498+
}
1499+
}
1500+
}
1501+
14581502
private synchronized void captureOffsets(ConsumerRecords<K, V> records) {
14591503
if (this.offsetsInThisBatch != null && records.count() > 0) {
14601504
this.offsetsInThisBatch.clear();
@@ -2082,7 +2126,7 @@ private void batchInterceptAfter(ConsumerRecords<K, V> records, @Nullable Except
20822126
}
20832127
}
20842128
catch (Exception e) {
2085-
this.logger.error(e, "BatchInterceptor threw an exception");
2129+
this.logger.error(e, "BatchInterceptor.success/failure threw an exception");
20862130
}
20872131
}
20882132
}
@@ -2463,7 +2507,7 @@ private void recordInterceptAfter(ConsumerRecord<K, V> records, @Nullable Except
24632507
}
24642508
}
24652509
catch (Exception e) {
2466-
this.logger.error(e, "RecordInterceptor threw an exception");
2510+
this.logger.error(e, "RecordInterceptor.success/failure threw an exception");
24672511
}
24682512
}
24692513
}

spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInterceptor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
*
3434
*/
3535
@FunctionalInterface
36-
public interface RecordInterceptor<K, V> {
36+
public interface RecordInterceptor<K, V> extends BeforeAfterPollProcessor<K, V> {
3737

3838
/**
3939
* Perform some action on the record or return a different one. If null is returned

0 commit comments

Comments
 (0)