Skip to content

Commit 1f5ae61

Browse files
garyrussellartembilan
authored andcommitted
GH-2332: Fix Container.pause() with Manual Assign.
Resolves #2332 Use manual assignments, if present, for pause. Also, in the rebalance listener, use `isPaused()` rather than `consumerPaused` to determine whether the partitions should be paused. Add logs and events for pauses in the rebal listener. Revert errant log4j commit. **cherry-pick to 2.9.x, 2.8.x** * Use `CollectionUtils.isEmpty()`
1 parent a3d1f42 commit 1f5ae61

File tree

4 files changed

+251
-9
lines changed

4 files changed

+251
-9
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+15-6
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@
121121
import org.springframework.transaction.support.TransactionTemplate;
122122
import org.springframework.util.Assert;
123123
import org.springframework.util.ClassUtils;
124+
import org.springframework.util.CollectionUtils;
124125
import org.springframework.util.StringUtils;
125126
import org.springframework.util.concurrent.ListenableFuture;
126127
import org.springframework.util.concurrent.ListenableFutureCallback;
@@ -1674,11 +1675,14 @@ private void doPauseConsumerIfNecessary() {
16741675
if (!this.consumerPaused && (isPaused() || this.pausedForAsyncAcks)
16751676
|| this.pauseForPending) {
16761677

1677-
this.consumer.pause(this.consumer.assignment());
1678-
this.consumerPaused = true;
1679-
this.pauseForPending = false;
1680-
this.logger.debug(() -> "Paused consumption from: " + this.consumer.paused());
1681-
publishConsumerPausedEvent(this.consumer.assignment());
1678+
Collection<TopicPartition> assigned = getAssignedPartitions();
1679+
if (!CollectionUtils.isEmpty(assigned)) {
1680+
this.consumer.pause(assigned);
1681+
this.consumerPaused = true;
1682+
this.pauseForPending = false;
1683+
this.logger.debug(() -> "Paused consumption from: " + this.consumer.paused());
1684+
publishConsumerPausedEvent(this.consumer.assignment());
1685+
}
16821686
}
16831687
}
16841688

@@ -3494,10 +3498,13 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
34943498
}
34953499

34963500
private void repauseIfNeeded(Collection<TopicPartition> partitions) {
3497-
if (ListenerConsumer.this.consumerPaused) {
3501+
if (isPaused()) {
34983502
ListenerConsumer.this.consumer.pause(partitions);
3503+
ListenerConsumer.this.consumerPaused = true;
34993504
ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
35003505
+ "consumer paused again, so the initial poll() will never return any records");
3506+
ListenerConsumer.this.logger.debug(() -> "Paused consumption from: " + partitions);
3507+
publishConsumerPausedEvent(partitions);
35013508
}
35023509
Collection<TopicPartition> toRepause = new LinkedList<>();
35033510
partitions.forEach(tp -> {
@@ -3507,6 +3514,8 @@ private void repauseIfNeeded(Collection<TopicPartition> partitions) {
35073514
});
35083515
if (!ListenerConsumer.this.consumerPaused && toRepause.size() > 0) {
35093516
ListenerConsumer.this.consumer.pause(toRepause);
3517+
ListenerConsumer.this.logger.debug(() -> "Paused consumption from: " + toRepause);
3518+
publishConsumerPausedEvent(toRepause);
35103519
}
35113520
this.revoked.removeAll(toRepause);
35123521
this.revoked.forEach(tp -> resumePartition(tp));

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -765,7 +765,7 @@ private void testInOrderAckPauseUntilAcked(AckMode ackMode, boolean batch) throw
765765
final CountDownLatch pauseLatch = new CountDownLatch(1);
766766
willAnswer(inv -> {
767767
paused.set(true);
768-
pausedParts.set(inv.getArgument(0));
768+
pausedParts.set(new HashSet<>(inv.getArgument(0)));
769769
pauseLatch.countDown();
770770
return null;
771771
}).given(consumer).pause(any());
@@ -2581,7 +2581,7 @@ public void testPauseResumeAndConsumerSeekAware() throws Exception {
25812581
pauseLatch1.countDown();
25822582
pauseLatch2.countDown();
25832583
return null;
2584-
}).given(consumer).pause(records.keySet());
2584+
}).given(consumer).pause(any());
25852585
given(consumer.paused()).willReturn(pausedParts);
25862586
CountDownLatch pollWhilePausedLatch = new CountDownLatch(2);
25872587
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.ArgumentMatchers.anyLong;
22+
import static org.mockito.ArgumentMatchers.anyMap;
23+
import static org.mockito.BDDMockito.given;
24+
import static org.mockito.BDDMockito.willAnswer;
25+
import static org.mockito.Mockito.inOrder;
26+
import static org.mockito.Mockito.mock;
27+
import static org.mockito.Mockito.never;
28+
import static org.mockito.Mockito.verify;
29+
30+
import java.time.Duration;
31+
import java.util.ArrayList;
32+
import java.util.Arrays;
33+
import java.util.Collection;
34+
import java.util.Collections;
35+
import java.util.HashSet;
36+
import java.util.LinkedHashMap;
37+
import java.util.List;
38+
import java.util.Map;
39+
import java.util.Optional;
40+
import java.util.concurrent.CountDownLatch;
41+
import java.util.concurrent.TimeUnit;
42+
import java.util.concurrent.atomic.AtomicInteger;
43+
import java.util.stream.Collectors;
44+
45+
import org.apache.kafka.clients.consumer.Consumer;
46+
import org.apache.kafka.clients.consumer.ConsumerRecord;
47+
import org.apache.kafka.clients.consumer.ConsumerRecords;
48+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
49+
import org.apache.kafka.common.TopicPartition;
50+
import org.apache.kafka.common.header.internals.RecordHeaders;
51+
import org.apache.kafka.common.record.TimestampType;
52+
import org.junit.jupiter.api.Test;
53+
import org.mockito.ArgumentCaptor;
54+
import org.mockito.InOrder;
55+
56+
import org.springframework.beans.factory.annotation.Autowired;
57+
import org.springframework.context.annotation.Bean;
58+
import org.springframework.context.annotation.Configuration;
59+
import org.springframework.kafka.annotation.EnableKafka;
60+
import org.springframework.kafka.annotation.KafkaListener;
61+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
62+
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
63+
import org.springframework.kafka.core.ConsumerFactory;
64+
import org.springframework.kafka.listener.ContainerProperties.AckMode;
65+
import org.springframework.kafka.test.utils.KafkaTestUtils;
66+
import org.springframework.test.annotation.DirtiesContext;
67+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
68+
69+
/**
70+
* @author Gary Russell
71+
* @since 2.9
72+
*
73+
*/
74+
@SpringJUnitConfig
75+
@DirtiesContext
76+
public class PauseContainerManualAssignmentTests {
77+
78+
@SuppressWarnings("rawtypes")
79+
@Autowired
80+
private Consumer consumer;
81+
82+
@Autowired
83+
private Config config;
84+
85+
@Autowired
86+
private KafkaListenerEndpointRegistry registry;
87+
88+
@SuppressWarnings("unchecked")
89+
@Test
90+
public void pausesWithManualAssignment() throws Exception {
91+
assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue();
92+
assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
93+
assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
94+
this.registry.stop();
95+
assertThat(this.config.closeLatch.await(10, TimeUnit.SECONDS)).isTrue();
96+
InOrder inOrder = inOrder(this.consumer);
97+
inOrder.verify(this.consumer).assign(any(Collection.class));
98+
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
99+
inOrder.verify(this.consumer).commitSync(
100+
Collections.singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(1L)),
101+
Duration.ofSeconds(60));
102+
inOrder.verify(this.consumer).commitSync(
103+
Collections.singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(2L)),
104+
Duration.ofSeconds(60));
105+
inOrder.verify(this.consumer).commitSync(
106+
Collections.singletonMap(new TopicPartition("foo", 1), new OffsetAndMetadata(1L)),
107+
Duration.ofSeconds(60));
108+
ArgumentCaptor<Collection<TopicPartition>> pauses = ArgumentCaptor.forClass(Collection.class);
109+
inOrder.verify(this.consumer).pause(pauses.capture());
110+
assertThat(pauses.getValue().stream().collect(Collectors.toList())).contains(new TopicPartition("foo", 0),
111+
new TopicPartition("foo", 1), new TopicPartition("foo", 2));
112+
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
113+
verify(this.consumer, never()).resume(any());
114+
assertThat(this.config.count).isEqualTo(4);
115+
assertThat(this.config.contents).contains("foo", "bar", "baz", "qux");
116+
verify(this.consumer, never()).seek(any(), anyLong());
117+
}
118+
119+
@Configuration
120+
@EnableKafka
121+
public static class Config {
122+
123+
final List<String> contents = new ArrayList<>();
124+
125+
final CountDownLatch pollLatch = new CountDownLatch(4);
126+
127+
final CountDownLatch deliveryLatch = new CountDownLatch(4);
128+
129+
final CountDownLatch closeLatch = new CountDownLatch(1);
130+
131+
final CountDownLatch commitLatch = new CountDownLatch(3);
132+
133+
int count;
134+
135+
@KafkaListener(id = "id", groupId = "grp",
136+
topicPartitions = @org.springframework.kafka.annotation.TopicPartition(topic = "foo",
137+
partitions = "#{'0,1,2'.split(',')}"))
138+
public void foo(String in) {
139+
this.contents.add(in);
140+
this.deliveryLatch.countDown();
141+
if (++this.count == 4 || this.count == 5) { // part 1, offset 1, first and second times
142+
throw new RuntimeException("foo");
143+
}
144+
}
145+
146+
@SuppressWarnings({ "rawtypes" })
147+
@Bean
148+
public ConsumerFactory consumerFactory(KafkaListenerEndpointRegistry registry) {
149+
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
150+
final Consumer consumer = consumer(registry);
151+
given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides()))
152+
.willReturn(consumer);
153+
return consumerFactory;
154+
}
155+
156+
@SuppressWarnings({ "rawtypes", "unchecked" })
157+
@Bean
158+
public Consumer consumer(KafkaListenerEndpointRegistry registry) {
159+
final Consumer consumer = mock(Consumer.class);
160+
final TopicPartition topicPartition0 = new TopicPartition("foo", 0);
161+
final TopicPartition topicPartition1 = new TopicPartition("foo", 1);
162+
final TopicPartition topicPartition2 = new TopicPartition("foo", 2);
163+
Map<TopicPartition, List<ConsumerRecord>> records1 = new LinkedHashMap<>();
164+
records1.put(topicPartition0, Arrays.asList(
165+
new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo",
166+
new RecordHeaders(), Optional.empty()),
167+
new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "bar",
168+
new RecordHeaders(), Optional.empty())));
169+
records1.put(topicPartition1, Arrays.asList(
170+
new ConsumerRecord("foo", 1, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "baz",
171+
new RecordHeaders(), Optional.empty()),
172+
new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux",
173+
new RecordHeaders(), Optional.empty())));
174+
records1.put(topicPartition2, Arrays.asList(
175+
new ConsumerRecord("foo", 2, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "fiz",
176+
new RecordHeaders(), Optional.empty()),
177+
new ConsumerRecord("foo", 2, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "buz",
178+
new RecordHeaders(), Optional.empty())));
179+
final AtomicInteger which = new AtomicInteger();
180+
willAnswer(i -> {
181+
this.pollLatch.countDown();
182+
switch (which.getAndIncrement()) {
183+
case 0:
184+
return new ConsumerRecords(records1);
185+
default:
186+
try {
187+
Thread.sleep(50);
188+
}
189+
catch (InterruptedException e) {
190+
Thread.currentThread().interrupt();
191+
}
192+
return new ConsumerRecords(Collections.emptyMap());
193+
}
194+
}).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
195+
List<TopicPartition> paused = new ArrayList<>();
196+
willAnswer(i -> {
197+
this.commitLatch.countDown();
198+
registry.getListenerContainer("id").pause();
199+
return null;
200+
}).given(consumer).commitSync(anyMap(), any());
201+
willAnswer(i -> {
202+
this.closeLatch.countDown();
203+
return null;
204+
}).given(consumer).close();
205+
willAnswer(i -> {
206+
paused.addAll(i.getArgument(0));
207+
return null;
208+
}).given(consumer).pause(any());
209+
willAnswer(i -> {
210+
return new HashSet<>(paused);
211+
}).given(consumer).paused();
212+
willAnswer(i -> {
213+
paused.removeAll(i.getArgument(0));
214+
return null;
215+
}).given(consumer).resume(any());
216+
return consumer;
217+
}
218+
219+
@SuppressWarnings({ "rawtypes", "unchecked" })
220+
@Bean
221+
ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(KafkaListenerEndpointRegistry registry) {
222+
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
223+
factory.setConsumerFactory(consumerFactory(registry));
224+
factory.getContainerProperties().setAckMode(AckMode.RECORD);
225+
DefaultErrorHandler eh = new DefaultErrorHandler();
226+
eh.setSeekAfterError(false);
227+
factory.setCommonErrorHandler(eh);
228+
return factory;
229+
}
230+
231+
}
232+
233+
}

Diff for: spring-kafka/src/test/resources/log4j2-test.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
</Console>
77
</Appenders>
88
<Loggers>
9-
<Logger name="org.springframework.kafka" level="debug"/>
9+
<Logger name="org.springframework.kafka" level="warn"/>
1010
<Logger name="org.springframework.kafka.ReplyingKafkaTemplate" level="warn"/>
1111
<Logger name="org.springframework.kafka.retrytopic" level="warn"/>
1212
<Logger name="org.apache.kafka.clients" level="warn"/>

0 commit comments

Comments
 (0)