Skip to content

Commit 5879b41

Browse files
garyrussellartembilan
authored andcommitted
spring-projectsGH-1688: ConsumerAwareRebalListener Improvements
Resolves spring-projects#1668 Users might implement one of the not-consumer-aware methods so the default methods should delegate to those. Call in try/catch block, expecially `...RevokedBeforeCommit` to prevent the commits to be skipped. **cherry-pick to 2.6.x**
1 parent 2f93abe commit 5879b41

File tree

2 files changed

+148
-8
lines changed

2 files changed

+148
-8
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareRebalanceListener.java

+27-8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,10 +18,13 @@
1818

1919
import java.util.Collection;
2020

21+
import org.apache.commons.logging.LogFactory;
2122
import org.apache.kafka.clients.consumer.Consumer;
2223
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
2324
import org.apache.kafka.common.TopicPartition;
2425

26+
import org.springframework.core.log.LogAccessor;
27+
2528
/**
2629
* A rebalance listener that provides access to the consumer object. Starting with version
2730
* 2.1.5, as a convenience, default no-op implementations are provided for all methods,
@@ -33,14 +36,24 @@
3336
*/
3437
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
3538

39+
/**
40+
* {@link LogAccessor} for use in default methods.
41+
*/
42+
LogAccessor logger = new LogAccessor(LogFactory.getLog(ConsumerAwareRebalanceListener.class));
43+
3644
/**
3745
* The same as {@link #onPartitionsRevoked(Collection)} with the additional consumer
3846
* parameter. It is invoked by the container before any pending offsets are committed.
3947
* @param consumer the consumer.
4048
* @param partitions the partitions.
4149
*/
4250
default void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
43-
// do nothing
51+
try {
52+
onPartitionsRevoked(partitions);
53+
}
54+
catch (Exception e) { // NOSONAR
55+
logger.debug(e, "User method threw exception");
56+
}
4457
}
4558

4659
/**
@@ -50,7 +63,6 @@ default void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection
5063
* @param partitions the partitions.
5164
*/
5265
default void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
53-
// do nothing
5466
}
5567

5668
/**
@@ -60,7 +72,12 @@ default void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<
6072
* @since 2.4
6173
*/
6274
default void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
63-
// do nothing
75+
try {
76+
onPartitionsLost(partitions);
77+
}
78+
catch (Exception e) { // NOSONAR
79+
logger.debug(e, "User method threw exception");
80+
}
6481
}
6582

6683
/**
@@ -70,22 +87,24 @@ default void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition
7087
* @param partitions the partitions.
7188
*/
7289
default void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
73-
// do nothing
90+
try {
91+
onPartitionsAssigned(partitions);
92+
}
93+
catch (Exception e) { // NOSONAR
94+
logger.debug(e, "User method threw exception");
95+
}
7496
}
7597

7698
@Override
7799
default void onPartitionsRevoked(Collection<TopicPartition> partitions) {
78-
throw new UnsupportedOperationException("Listener container should never call this");
79100
}
80101

81102
@Override
82103
default void onPartitionsAssigned(Collection<TopicPartition> partitions) {
83-
throw new UnsupportedOperationException("Listener container should never call this");
84104
}
85105

86106
@Override
87107
default void onPartitionsLost(Collection<TopicPartition> partitions) {
88-
throw new UnsupportedOperationException("Listener container should never call this");
89108
}
90109

91110
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.util.Collection;
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
24+
import org.apache.kafka.common.TopicPartition;
25+
import org.junit.jupiter.api.Test;
26+
27+
/**
28+
* @author Gary Russell
29+
* @since 2.6.5
30+
*
31+
*/
32+
public class ConsumerAwareRebalanceListenerTests {
33+
34+
@Test
35+
void nonConsumerAwareTestAssigned() {
36+
AtomicBoolean called = new AtomicBoolean();
37+
new ConsumerAwareRebalanceListener() {
38+
39+
@Override
40+
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
41+
called.set(true);
42+
}
43+
44+
}.onPartitionsAssigned(null, null);
45+
assertThat(called.get()).isTrue();
46+
}
47+
48+
@Test
49+
void nonConsumerAwareTestAssignedThrows() {
50+
AtomicBoolean called = new AtomicBoolean();
51+
new ConsumerAwareRebalanceListener() {
52+
53+
@Override
54+
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
55+
called.set(true);
56+
throw new RuntimeException();
57+
}
58+
59+
}.onPartitionsAssigned(null, null);
60+
assertThat(called.get()).isTrue();
61+
}
62+
63+
@Test
64+
void nonConsumerAwareTestRevoked() {
65+
AtomicBoolean called = new AtomicBoolean();
66+
new ConsumerAwareRebalanceListener() {
67+
68+
@Override
69+
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
70+
called.set(true);
71+
}
72+
73+
}.onPartitionsRevokedBeforeCommit(null, null);
74+
assertThat(called.get()).isTrue();
75+
}
76+
77+
@Test
78+
void nonConsumerAwareTestRevokedThrows() {
79+
AtomicBoolean called = new AtomicBoolean();
80+
new ConsumerAwareRebalanceListener() {
81+
82+
@Override
83+
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
84+
called.set(true);
85+
throw new RuntimeException();
86+
}
87+
88+
}.onPartitionsRevokedBeforeCommit(null, null);
89+
assertThat(called.get()).isTrue();
90+
}
91+
92+
@Test
93+
void nonConsumerAwareTestLost() {
94+
AtomicBoolean called = new AtomicBoolean();
95+
new ConsumerAwareRebalanceListener() {
96+
97+
@Override
98+
public void onPartitionsLost(Collection<TopicPartition> partitions) {
99+
called.set(true);
100+
}
101+
102+
}.onPartitionsLost(null, null);
103+
assertThat(called.get()).isTrue();
104+
}
105+
106+
@Test
107+
void nonConsumerAwareTestLostThrows() {
108+
AtomicBoolean called = new AtomicBoolean();
109+
new ConsumerAwareRebalanceListener() {
110+
111+
@Override
112+
public void onPartitionsLost(Collection<TopicPartition> partitions) {
113+
called.set(true);
114+
throw new RuntimeException();
115+
}
116+
117+
}.onPartitionsLost(null, null);
118+
assertThat(called.get()).isTrue();
119+
}
120+
121+
}

0 commit comments

Comments
 (0)