Skip to content

Commit 5ee82ab

Browse files
author
Karol Dowbecki
committed
spring-projectsGH-1704: More pessimistic finally
1 parent ffa5ff3 commit 5ee82ab

File tree

3 files changed

+22
-5
lines changed

3 files changed

+22
-5
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/BatchInterceptor.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,22 @@ default void failure(ConsumerRecords<K, V> records, Exception exception, Consume
6363

6464
/**
6565
* Called before consumer is polled.
66+
* <p>
67+
* It can be used to set up thread-bound resources which will be available for the
68+
* entire duration of the consumer poll operation e.g. logging with MDC.
69+
* </p>
6670
* @param consumer the consumer.
6771
* @since 2.8
6872
*/
6973
default void beforePoll(Consumer<K, V> consumer) {
7074
}
7175

7276
/**
73-
* Called after listener and error handler were invoked. Last action before the
74-
* next consumer polling.
77+
* Called after listener and error handler were invoked.
78+
* <p>
79+
* It can be used to clear thread-bound resources as this is the last method called after
80+
* listener was invoked.
81+
* </p>
7582
* @param consumer the consumer.
7683
* @since 2.8
7784
*/

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -1236,6 +1236,9 @@ public void run() {
12361236
catch (Exception e) {
12371237
handleConsumerException(e);
12381238
}
1239+
finally {
1240+
finishInvoke();
1241+
}
12391242
}
12401243
wrapUp(exitThrowable);
12411244
}
@@ -1273,6 +1276,7 @@ protected void pollAndInvoke() {
12731276
return;
12741277
}
12751278
this.polling.set(true);
1279+
beforePoll();
12761280
ConsumerRecords<K, V> records = doPoll();
12771281
if (!this.polling.compareAndSet(true, false) && records != null) {
12781282
/*
@@ -1291,7 +1295,6 @@ protected void pollAndInvoke() {
12911295
}
12921296

12931297
invokeIfHaveRecords(records);
1294-
finishInvoke();
12951298
}
12961299

12971300
private void invokeIfHaveRecords(@Nullable ConsumerRecords<K, V> records) {

spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInterceptor.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -83,15 +83,22 @@ default void failure(ConsumerRecord<K, V> record, Exception exception, Consumer<
8383

8484
/**
8585
* Called before consumer is polled.
86+
* <p>
87+
* It can be used to set up thread-bound resources which will be available for the
88+
* entire duration of the consumer poll operation e.g. logging with MDC.
89+
* </p>
8690
* @param consumer the consumer.
8791
* @since 2.8
8892
*/
8993
default void beforePoll(Consumer<K, V> consumer) {
9094
}
9195

9296
/**
93-
* Called after listener and error handler were invoked. Last action before the
94-
* next consumer polling.
97+
* Called after listener and error handler were invoked.
98+
* <p>
99+
* It can be used to clear thread-bound resources as this is the last method called after
100+
* listener was invoked.
101+
* </p>
95102
* @param consumer the consumer.
96103
* @since 2.8
97104
*/

0 commit comments

Comments
 (0)