Skip to content

Commit 4aa5c2a

Browse files
garyrussellartembilan
authored andcommitted
GH-2350: Fix Paused Partitions After Rebalance
Resolves #2350 Do not maintain the `pausePartitionsRequested` field in the concurrent MLC, it is not used and can cause confusion. Also fixes some synchronization around `CMLC.containers`. Also resolves #2222 The previous fix removed revoked partitions from `pausePartitionsRequested`; this was incorrect - consider a rebalance where we lose `topic-0` and another rebalance where we are re-assigned `topic-0`. According to the contract, this partition should remain paused. **cherry-pick to 2.9.x, 2.8.x**
1 parent 6d4cba5 commit 4aa5c2a

File tree

3 files changed

+17
-11
lines changed

3 files changed

+17
-11
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

+12-10
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,6 @@ public void resume() {
347347
@Override
348348
public void pausePartition(TopicPartition topicPartition) {
349349
synchronized (this.lifecycleMonitor) {
350-
super.pausePartition(topicPartition);
351350
this.containers
352351
.stream()
353352
.filter(container -> containsPartition(topicPartition, container))
@@ -358,7 +357,6 @@ public void pausePartition(TopicPartition topicPartition) {
358357
@Override
359358
public void resumePartition(TopicPartition topicPartition) {
360359
synchronized (this.lifecycleMonitor) {
361-
super.resumePartition(topicPartition);
362360
this.containers
363361
.stream()
364362
.filter(container -> containsPartition(topicPartition, container))
@@ -368,18 +366,22 @@ public void resumePartition(TopicPartition topicPartition) {
368366

369367
@Override
370368
public boolean isPartitionPaused(TopicPartition topicPartition) {
371-
return this
372-
.containers
373-
.stream()
374-
.anyMatch(container -> container.isPartitionPaused(topicPartition));
369+
synchronized (this.lifecycleMonitor) {
370+
return this
371+
.containers
372+
.stream()
373+
.anyMatch(container -> container.isPartitionPaused(topicPartition));
374+
}
375375
}
376376

377377
@Override
378378
public boolean isInExpectedState() {
379-
return (isRunning() || isStoppedNormally()) && this.containers
380-
.stream()
381-
.map(container -> container.isInExpectedState())
382-
.allMatch(bool -> Boolean.TRUE.equals(bool));
379+
synchronized (this.lifecycleMonitor) {
380+
return (isRunning() || isStoppedNormally()) && this.containers
381+
.stream()
382+
.map(container -> container.isInExpectedState())
383+
.allMatch(bool -> Boolean.TRUE.equals(bool));
384+
}
383385
}
384386

385387
private boolean containsPartition(TopicPartition topicPartition, KafkaMessageListenerContainer<K, V> container) {

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

-1
Original file line numberDiff line numberDiff line change
@@ -3491,7 +3491,6 @@ private void repauseIfNeeded(Collection<TopicPartition> partitions) {
34913491
publishConsumerPausedEvent(toRepause);
34923492
}
34933493
this.revoked.removeAll(toRepause);
3494-
this.revoked.forEach(tp -> resumePartition(tp));
34953494
ListenerConsumer.this.pausedPartitions.removeAll(this.revoked);
34963495
this.revoked.clear();
34973496
if (ListenerConsumer.this.pausedForNack.size() > 0) {

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

+5
Original file line numberDiff line numberDiff line change
@@ -2803,6 +2803,11 @@ public void rePausePartitionAfterRebalance() throws Exception {
28032803
.asInstanceOf(InstanceOfAssertFactories.collection(TopicPartition.class))
28042804
.hasSize(1)
28052805
.contains(tp0);
2806+
assertThat(container)
2807+
.extracting("pauseRequestedPartitions")
2808+
.asInstanceOf(InstanceOfAssertFactories.collection(TopicPartition.class))
2809+
.hasSize(2)
2810+
.contains(tp0, tp1);
28062811
suspendConsumerThread.countDown();
28072812
container.stop();
28082813
}

0 commit comments

Comments
 (0)