@@ -707,6 +707,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
707
707
708
708
private final Map <TopicPartition , Boolean > wasIdlePartition ;
709
709
710
+ private final Set <TopicPartition > pausedForNack = new HashSet <>();
711
+
710
712
private Map <TopicPartition , OffsetMetadata > definedPartitions ;
711
713
712
714
private int count ;
@@ -723,6 +725,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
723
725
724
726
private long nackSleep = -1 ;
725
727
728
+ private long nackWake ;
729
+
726
730
private int nackIndex ;
727
731
728
732
private Iterator <TopicPartition > batchIterator ;
@@ -1589,6 +1593,10 @@ private void pauseConsumerIfNecessary() {
1589
1593
}
1590
1594
1591
1595
private void doPauseConsumerIfNecessary () {
1596
+ if (this .pausedForNack .size () > 0 ) {
1597
+ this .logger .debug ("Still paused for nack sleep" );
1598
+ return ;
1599
+ }
1592
1600
if (this .offsetsInThisBatch != null && this .offsetsInThisBatch .size () > 0 && !this .pausedForAsyncAcks ) {
1593
1601
this .pausedForAsyncAcks = true ;
1594
1602
this .logger .debug (() -> "Pausing for incomplete async acks: " + this .offsetsInThisBatch );
@@ -1602,7 +1610,15 @@ private void doPauseConsumerIfNecessary() {
1602
1610
}
1603
1611
1604
1612
private void resumeConsumerIfNeccessary () {
1605
- if (this .offsetsInThisBatch != null ) {
1613
+ if (this .nackWake > 0 ) {
1614
+ if (System .currentTimeMillis () > this .nackWake ) {
1615
+ this .nackWake = 0 ;
1616
+ this .consumer .resume (this .pausedForNack );
1617
+ this .logger .debug (() -> "Resumed after nack sleep: " + this .pausedForNack );
1618
+ this .pausedForNack .clear ();
1619
+ }
1620
+ }
1621
+ else if (this .offsetsInThisBatch != null ) {
1606
1622
synchronized (this ) {
1607
1623
doResumeConsumerIfNeccessary ();
1608
1624
}
@@ -1646,12 +1662,10 @@ private void pausePartitionsIfNecessary() {
1646
1662
}
1647
1663
1648
1664
private void resumePartitionsIfNecessary () {
1649
- Set <TopicPartition > pausedConsumerPartitions = this .consumer .paused ();
1650
- List <TopicPartition > partitionsToResume = this
1651
- .assignedPartitions
1665
+ List <TopicPartition > partitionsToResume = getAssignedPartitions ()
1652
1666
.stream ()
1653
1667
.filter (tp -> !isPartitionPauseRequested (tp )
1654
- && pausedConsumerPartitions .contains (tp ))
1668
+ && this . pausedPartitions .contains (tp ))
1655
1669
.collect (Collectors .toList ());
1656
1670
if (partitionsToResume .size () > 0 ) {
1657
1671
this .consumer .resume (partitionsToResume );
@@ -2200,7 +2214,7 @@ private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, // NOSONA
2200
2214
processCommits ();
2201
2215
}
2202
2216
SeekUtils .doSeeks (toSeek , this .consumer , null , true , (rec , ex ) -> false , this .logger ); // NOSONAR
2203
- nackSleepAndReset ();
2217
+ pauseForNackSleep ();
2204
2218
}
2205
2219
}
2206
2220
@@ -2458,17 +2472,29 @@ private void handleNack(final ConsumerRecords<K, V> records, final ConsumerRecor
2458
2472
}
2459
2473
}
2460
2474
SeekUtils .doSeeks (list , this .consumer , null , true , (rec , ex ) -> false , this .logger ); // NOSONAR
2461
- nackSleepAndReset ();
2475
+ pauseForNackSleep ();
2462
2476
}
2463
2477
2464
- private void nackSleepAndReset () {
2465
- try {
2466
- ListenerUtils .stoppableSleep (KafkaMessageListenerContainer .this .thisOrParentContainer , this .nackSleep );
2467
- }
2468
- catch (@ SuppressWarnings (UNUSED ) InterruptedException e ) {
2469
- Thread .currentThread ().interrupt ();
2478
+ private void pauseForNackSleep () {
2479
+ if (this .nackSleep > 0 ) {
2480
+ this .nackWake = System .currentTimeMillis () + this .nackSleep ;
2481
+ this .nackSleep = -1 ;
2482
+ Set <TopicPartition > alreadyPaused = this .consumer .paused ();
2483
+ this .pausedForNack .addAll (getAssignedPartitions ());
2484
+ this .pausedForNack .removeAll (alreadyPaused );
2485
+ this .logger .debug (() -> "Pausing for nack sleep: " + ListenerConsumer .this .pausedForNack );
2486
+ try {
2487
+ this .consumer .pause (this .pausedForNack );
2488
+ }
2489
+ catch (IllegalStateException ex ) {
2490
+ // this should never happen; defensive, just in case...
2491
+ this .logger .warn (() -> "Could not pause for nack, possible rebalance in process: "
2492
+ + ex .getMessage ());
2493
+ Set <TopicPartition > nowPaused = new HashSet <>(this .consumer .paused ());
2494
+ nowPaused .removeAll (alreadyPaused );
2495
+ this .consumer .resume (nowPaused );
2496
+ }
2470
2497
}
2471
- this .nackSleep = -1 ;
2472
2498
}
2473
2499
2474
2500
/**
@@ -3237,6 +3263,7 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
3237
3263
if (ListenerConsumer .this .assignedPartitions != null ) {
3238
3264
ListenerConsumer .this .assignedPartitions .removeAll (partitions );
3239
3265
}
3266
+ ListenerConsumer .this .pausedForNack .removeAll (partitions );
3240
3267
partitions .forEach (tp -> ListenerConsumer .this .lastCommits .remove (tp ));
3241
3268
synchronized (ListenerConsumer .this ) {
3242
3269
if (ListenerConsumer .this .offsetsInThisBatch != null ) {
@@ -3261,6 +3288,9 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
3261
3288
ListenerConsumer .this .logger .warn ("Paused consumer resumed by Kafka due to rebalance; "
3262
3289
+ "consumer paused again, so the initial poll() will never return any records" );
3263
3290
}
3291
+ if (ListenerConsumer .this .pausedForNack .size () > 0 ) {
3292
+ ListenerConsumer .this .consumer .pause (ListenerConsumer .this .pausedForNack );
3293
+ }
3264
3294
ListenerConsumer .this .assignedPartitions .addAll (partitions );
3265
3295
if (ListenerConsumer .this .commitCurrentOnAssignment
3266
3296
&& !collectAndCommitIfNecessary (partitions )) {
0 commit comments