Skip to content

Commit 36665e9

Browse files
authored
Related to spring-projects#2195 - check `isPartitionPauseRequested()` instead of `isPartitionPaused()` - add diagnostic debug logging - prevent re-pausing the entire consumer each time a poll exits but the first partition is still paused. **cherry-pick to 2.9.x** * Checkstyle fix.
1 parent e9b5e69 commit 36665e9

File tree

1 file changed

+10
-2
lines changed

1 file changed

+10
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -782,6 +782,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
782782

783783
private ConsumerRecords<K, V> pendingRecordsAfterError;
784784

785+
private boolean pauseForPending;
786+
785787
private volatile boolean consumerPaused;
786788

787789
private volatile Thread consumerThread;
@@ -1566,7 +1568,10 @@ private ConsumerRecords<K, V> doPoll() {
15661568
+ "after an error; emergency stop invoked to avoid message loss", howManyRecords));
15671569
KafkaMessageListenerContainer.this.emergencyStop.run();
15681570
}
1569-
if (!isPartitionPaused(this.pendingRecordsAfterError.partitions().iterator().next())) {
1571+
TopicPartition firstPart = this.pendingRecordsAfterError.partitions().iterator().next();
1572+
boolean isPaused = isPartitionPauseRequested(firstPart);
1573+
this.logger.debug(() -> "First pending after error: " + firstPart + "; paused: " + isPaused);
1574+
if (!isPaused) {
15701575
records = this.pendingRecordsAfterError;
15711576
this.pendingRecordsAfterError = null;
15721577
}
@@ -1682,10 +1687,11 @@ private void doPauseConsumerIfNecessary() {
16821687
this.logger.debug(() -> "Pausing for incomplete async acks: " + this.offsetsInThisBatch);
16831688
}
16841689
if (!this.consumerPaused && (isPaused() || this.pausedForAsyncAcks)
1685-
|| this.pendingRecordsAfterError != null) {
1690+
|| this.pauseForPending) {
16861691

16871692
this.consumer.pause(this.consumer.assignment());
16881693
this.consumerPaused = true;
1694+
this.pauseForPending = false;
16891695
this.logger.debug(() -> "Paused consumption from: " + this.consumer.paused());
16901696
publishConsumerPausedEvent(this.consumer.assignment());
16911697
}
@@ -2385,6 +2391,7 @@ private void invokeBatchErrorHandler(final ConsumerRecords<K, V> records,
23852391
() -> invokeBatchOnMessageWithRecordsOrList(records, list));
23862392
if (!afterHandling.isEmpty()) {
23872393
this.pendingRecordsAfterError = afterHandling;
2394+
this.pauseForPending = true;
23882395
}
23892396
}
23902397
}
@@ -2778,6 +2785,7 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> record,
27782785
}
27792786
if (records.size() > 0) {
27802787
this.pendingRecordsAfterError = new ConsumerRecords<>(records);
2788+
this.pauseForPending = true;
27812789
}
27822790
}
27832791
}

0 commit comments

Comments
 (0)