Skip to content

Commit a63359c

Browse files
committed
GH-2332: Fix Partitions in Pause Event
1 parent a6cce8a commit a63359c

File tree

2 files changed

+16
-1
lines changed

2 files changed

+16
-1
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1700,7 +1700,7 @@ private void doPauseConsumerIfNecessary() {
17001700
this.consumerPaused = true;
17011701
this.pauseForPending = false;
17021702
this.logger.debug(() -> "Paused consumption from: " + this.consumer.paused());
1703-
publishConsumerPausedEvent(this.consumer.assignment());
1703+
publishConsumerPausedEvent(assigned);
17041704
}
17051705
}
17061706
}

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/PauseContainerManualAssignmentTests.java

+15
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,13 @@
5656
import org.springframework.beans.factory.annotation.Autowired;
5757
import org.springframework.context.annotation.Bean;
5858
import org.springframework.context.annotation.Configuration;
59+
import org.springframework.context.event.EventListener;
5960
import org.springframework.kafka.annotation.EnableKafka;
6061
import org.springframework.kafka.annotation.KafkaListener;
6162
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
6263
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
6364
import org.springframework.kafka.core.ConsumerFactory;
65+
import org.springframework.kafka.event.ConsumerPausedEvent;
6466
import org.springframework.kafka.listener.ContainerProperties.AckMode;
6567
import org.springframework.kafka.test.utils.KafkaTestUtils;
6668
import org.springframework.test.annotation.DirtiesContext;
@@ -114,6 +116,9 @@ public void pausesWithManualAssignment() throws Exception {
114116
assertThat(this.config.count).isEqualTo(4);
115117
assertThat(this.config.contents).contains("foo", "bar", "baz", "qux");
116118
verify(this.consumer, never()).seek(any(), anyLong());
119+
assertThat(this.config.eventLatch.await(10, TimeUnit.SECONDS)).isTrue();
120+
assertThat(this.config.event.getPartitions()).contains(
121+
new TopicPartition("foo", 0), new TopicPartition("foo", 1), new TopicPartition("foo", 2));
117122
}
118123

119124
@Configuration
@@ -130,8 +135,12 @@ public static class Config {
130135

131136
final CountDownLatch commitLatch = new CountDownLatch(3);
132137

138+
final CountDownLatch eventLatch = new CountDownLatch(1);
139+
133140
int count;
134141

142+
volatile ConsumerPausedEvent event;
143+
135144
@KafkaListener(id = "id", groupId = "grp",
136145
topicPartitions = @org.springframework.kafka.annotation.TopicPartition(topic = "foo",
137146
partitions = "#{'0,1,2'.split(',')}"))
@@ -228,6 +237,12 @@ ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(KafkaListe
228237
return factory;
229238
}
230239

240+
@EventListener
241+
public void paused(ConsumerPausedEvent event) {
242+
this.event = event;
243+
this.eventLatch.countDown();
244+
}
245+
231246
}
232247

233248
}

0 commit comments

Comments
 (0)