Skip to content

Commit a1a5d48

Browse files
garyrussellartembilan
authored andcommitted
GH-2128 Do Not Sleep Consumer Thread for Nack
Resolves #2128 Suspending polling delays rebalancing; instead pause the consumer and continue polling. Check if partitions are already paused and only pause the current active partitions and resume them after the sleep interval has passed. Re-pause as necessary after a rebalance. Also tested with reporter's reproducer. **cherry-pick to 2.8.x**
1 parent 1fa80aa commit a1a5d48

File tree

6 files changed

+363
-34
lines changed

6 files changed

+363
-34
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+44-14
Original file line numberDiff line numberDiff line change
@@ -712,6 +712,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
712712

713713
private final Header infoHeader = new RecordHeader(KafkaHeaders.LISTENER_INFO, this.listenerinfo);
714714

715+
private final Set<TopicPartition> pausedForNack = new HashSet<>();
716+
715717
private Map<TopicPartition, OffsetMetadata> definedPartitions;
716718

717719
private int count;
@@ -728,6 +730,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
728730

729731
private long nackSleep = -1;
730732

733+
private long nackWake;
734+
731735
private int nackIndex;
732736

733737
private Iterator<TopicPartition> batchIterator;
@@ -1594,6 +1598,10 @@ private void pauseConsumerIfNecessary() {
15941598
}
15951599

15961600
private void doPauseConsumerIfNecessary() {
1601+
if (this.pausedForNack.size() > 0) {
1602+
this.logger.debug("Still paused for nack sleep");
1603+
return;
1604+
}
15971605
if (this.offsetsInThisBatch != null && this.offsetsInThisBatch.size() > 0 && !this.pausedForAsyncAcks) {
15981606
this.pausedForAsyncAcks = true;
15991607
this.logger.debug(() -> "Pausing for incomplete async acks: " + this.offsetsInThisBatch);
@@ -1607,7 +1615,15 @@ private void doPauseConsumerIfNecessary() {
16071615
}
16081616

16091617
private void resumeConsumerIfNeccessary() {
1610-
if (this.offsetsInThisBatch != null) {
1618+
if (this.nackWake > 0) {
1619+
if (System.currentTimeMillis() > this.nackWake) {
1620+
this.nackWake = 0;
1621+
this.consumer.resume(this.pausedForNack);
1622+
this.logger.debug(() -> "Resumed after nack sleep: " + this.pausedForNack);
1623+
this.pausedForNack.clear();
1624+
}
1625+
}
1626+
else if (this.offsetsInThisBatch != null) {
16111627
synchronized (this) {
16121628
doResumeConsumerIfNeccessary();
16131629
}
@@ -1651,12 +1667,10 @@ private void pausePartitionsIfNecessary() {
16511667
}
16521668

16531669
private void resumePartitionsIfNecessary() {
1654-
Set<TopicPartition> pausedConsumerPartitions = this.consumer.paused();
1655-
List<TopicPartition> partitionsToResume = this
1656-
.assignedPartitions
1670+
List<TopicPartition> partitionsToResume = getAssignedPartitions()
16571671
.stream()
16581672
.filter(tp -> !isPartitionPauseRequested(tp)
1659-
&& pausedConsumerPartitions.contains(tp))
1673+
&& this.pausedPartitions.contains(tp))
16601674
.collect(Collectors.toList());
16611675
if (partitionsToResume.size() > 0) {
16621676
this.consumer.resume(partitionsToResume);
@@ -2203,7 +2217,7 @@ private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, // NOSONA
22032217
processCommits();
22042218
}
22052219
SeekUtils.doSeeks(toSeek, this.consumer, null, true, (rec, ex) -> false, this.logger); // NOSONAR
2206-
nackSleepAndReset();
2220+
pauseForNackSleep();
22072221
}
22082222
}
22092223

@@ -2464,17 +2478,29 @@ private void handleNack(final ConsumerRecords<K, V> records, final ConsumerRecor
24642478
}
24652479
}
24662480
SeekUtils.doSeeks(list, this.consumer, null, true, (rec, ex) -> false, this.logger); // NOSONAR
2467-
nackSleepAndReset();
2481+
pauseForNackSleep();
24682482
}
24692483

2470-
private void nackSleepAndReset() {
2471-
try {
2472-
ListenerUtils.stoppableSleep(KafkaMessageListenerContainer.this.thisOrParentContainer, this.nackSleep);
2473-
}
2474-
catch (@SuppressWarnings(UNUSED) InterruptedException e) {
2475-
Thread.currentThread().interrupt();
2484+
private void pauseForNackSleep() {
2485+
if (this.nackSleep > 0) {
2486+
this.nackWake = System.currentTimeMillis() + this.nackSleep;
2487+
this.nackSleep = -1;
2488+
Set<TopicPartition> alreadyPaused = this.consumer.paused();
2489+
this.pausedForNack.addAll(getAssignedPartitions());
2490+
this.pausedForNack.removeAll(alreadyPaused);
2491+
this.logger.debug(() -> "Pausing for nack sleep: " + ListenerConsumer.this.pausedForNack);
2492+
try {
2493+
this.consumer.pause(this.pausedForNack);
2494+
}
2495+
catch (IllegalStateException ex) {
2496+
// this should never happen; defensive, just in case...
2497+
this.logger.warn(() -> "Could not pause for nack, possible rebalance in process: "
2498+
+ ex.getMessage());
2499+
Set<TopicPartition> nowPaused = new HashSet<>(this.consumer.paused());
2500+
nowPaused.removeAll(alreadyPaused);
2501+
this.consumer.resume(nowPaused);
2502+
}
24762503
}
2477-
this.nackSleep = -1;
24782504
}
24792505

24802506
/**
@@ -3243,6 +3269,7 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
32433269
if (ListenerConsumer.this.assignedPartitions != null) {
32443270
ListenerConsumer.this.assignedPartitions.removeAll(partitions);
32453271
}
3272+
ListenerConsumer.this.pausedForNack.removeAll(partitions);
32463273
partitions.forEach(tp -> ListenerConsumer.this.lastCommits.remove(tp));
32473274
synchronized (ListenerConsumer.this) {
32483275
if (ListenerConsumer.this.offsetsInThisBatch != null) {
@@ -3267,6 +3294,9 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
32673294
ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
32683295
+ "consumer paused again, so the initial poll() will never return any records");
32693296
}
3297+
if (ListenerConsumer.this.pausedForNack.size() > 0) {
3298+
ListenerConsumer.this.consumer.pause(ListenerConsumer.this.pausedForNack);
3299+
}
32703300
ListenerConsumer.this.assignedPartitions.addAll(partitions);
32713301
if (ListenerConsumer.this.commitCurrentOnAssignment
32723302
&& !collectAndCommitIfNecessary(partitions)) {

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackBatchTests.java

+18-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2021 the original author or authors.
2+
* Copyright 2017-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -197,7 +197,12 @@ public Consumer consumer() {
197197
new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux",
198198
new RecordHeaders(), Optional.empty())));
199199
final AtomicInteger which = new AtomicInteger();
200+
final AtomicBoolean paused = new AtomicBoolean();
200201
willAnswer(i -> {
202+
if (paused.get()) {
203+
Thread.sleep(10);
204+
return ConsumerRecords.empty();
205+
}
201206
this.pollLatch.countDown();
202207
switch (which.getAndIncrement()) {
203208
case 0:
@@ -211,9 +216,20 @@ public Consumer consumer() {
211216
catch (@SuppressWarnings("unused") InterruptedException e) {
212217
Thread.currentThread().interrupt();
213218
}
214-
return new ConsumerRecords(Collections.emptyMap());
219+
return ConsumerRecords.empty();
215220
}
216221
}).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
222+
willAnswer(i -> {
223+
return Collections.emptySet();
224+
}).given(consumer).paused();
225+
willAnswer(i -> {
226+
paused.set(true);
227+
return null;
228+
}).given(consumer).pause(any());
229+
willAnswer(i -> {
230+
paused.set(false);
231+
return null;
232+
}).given(consumer).resume(any());
217233
willAnswer(i -> {
218234
this.commitLatch.countDown();
219235
return null;

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackRecordTxTests.java renamed to spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackBatchTxTests.java

+31-10
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2021 the original author or authors.
2+
* Copyright 2017-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -38,6 +38,7 @@
3838
import java.util.Optional;
3939
import java.util.concurrent.CountDownLatch;
4040
import java.util.concurrent.TimeUnit;
41+
import java.util.concurrent.atomic.AtomicBoolean;
4142
import java.util.concurrent.atomic.AtomicInteger;
4243

4344
import org.apache.kafka.clients.consumer.Consumer;
@@ -77,7 +78,7 @@
7778
@SpringJUnitConfig
7879
@DirtiesContext
7980
@SuppressWarnings("deprecation")
80-
public class ManualNackRecordTxTests {
81+
public class ManualNackBatchTxTests {
8182

8283
@SuppressWarnings("rawtypes")
8384
@Autowired
@@ -102,6 +103,7 @@ public class ManualNackRecordTxTests {
102103
@Test
103104
public void discardRemainingRecordsFromPollAndSeek() throws Exception {
104105
assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue();
106+
assertThat(this.config.replayTime).isBetween(50L, 30_000L);
105107
assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
106108
assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
107109
this.registry.stop();
@@ -128,24 +130,27 @@ public void discardRemainingRecordsFromPollAndSeek() throws Exception {
128130
@EnableKafka
129131
public static class Config {
130132

131-
private final List<List<String>> contents = new ArrayList<>();
133+
final List<List<String>> contents = new ArrayList<>();
132134

133-
private final CountDownLatch pollLatch = new CountDownLatch(3);
135+
final CountDownLatch pollLatch = new CountDownLatch(3);
134136

135-
private final CountDownLatch deliveryLatch = new CountDownLatch(2);
137+
final CountDownLatch deliveryLatch = new CountDownLatch(2);
136138

137-
private final CountDownLatch closeLatch = new CountDownLatch(1);
139+
final CountDownLatch closeLatch = new CountDownLatch(1);
138140

139-
private final CountDownLatch commitLatch = new CountDownLatch(2);
141+
final CountDownLatch commitLatch = new CountDownLatch(2);
140142

141-
private int count;
143+
volatile int count;
144+
145+
volatile long replayTime;
142146

143147
@KafkaListener(topics = "foo", groupId = "grp")
144148
public void foo(List<String> in, Acknowledgment ack) {
145149
this.contents.add(in);
150+
this.replayTime = System.currentTimeMillis() - this.replayTime;
146151
this.deliveryLatch.countDown();
147152
if (++this.count == 1) { // part 1, offset 1, first time
148-
ack.nack(3, 0);
153+
ack.nack(3, 50);
149154
}
150155
else {
151156
ack.acknowledge();
@@ -196,7 +201,12 @@ public Consumer consumer() {
196201
new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux",
197202
new RecordHeaders(), Optional.empty())));
198203
final AtomicInteger which = new AtomicInteger();
204+
final AtomicBoolean paused = new AtomicBoolean();
199205
willAnswer(i -> {
206+
if (paused.get()) {
207+
Thread.sleep(10);
208+
return ConsumerRecords.empty();
209+
}
200210
this.pollLatch.countDown();
201211
switch (which.getAndIncrement()) {
202212
case 0:
@@ -210,9 +220,20 @@ public Consumer consumer() {
210220
catch (InterruptedException e) {
211221
Thread.currentThread().interrupt();
212222
}
213-
return new ConsumerRecords(Collections.emptyMap());
223+
return ConsumerRecords.empty();
214224
}
215225
}).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
226+
willAnswer(i -> {
227+
return Collections.emptySet();
228+
}).given(consumer).paused();
229+
willAnswer(i -> {
230+
paused.set(true);
231+
return null;
232+
}).given(consumer).pause(any());
233+
willAnswer(i -> {
234+
paused.set(false);
235+
return null;
236+
}).given(consumer).resume(any());
216237
willAnswer(i -> {
217238
this.commitLatch.countDown();
218239
return null;

0 commit comments

Comments
 (0)