Skip to content

Commit bb26e05

Browse files
garyrussellartembilan
authored andcommitted
GH-2280: Add ContainerProperties.pauseImmediate
Resolves #2280
1 parent ad48491 commit bb26e05

File tree

4 files changed

+287
-16
lines changed

4 files changed

+287
-16
lines changed

Diff for: spring-kafka-docs/src/main/asciidoc/kafka.adoc

+8
Original file line numberDiff line numberDiff line change
@@ -2578,6 +2578,10 @@ See `monitorInterval`.
25782578
|`false`
25792579
|Set to false to log the complete consumer record (in error, debug logs etc) instead of just `topic-partition@offset`.
25802580

2581+
|[[pauseImmediate]]<<pauseImmediate,`pauseImmediate`>>
2582+
|`false`
2583+
|When the container is paused, stop processing after the current record instead of after processing all the records from the previous poll; the remaining records are retained in memory and will be passed to the listener when the container is resumed.
2584+
25812585
|[[pollTimeout]]<<pollTimeout,`pollTimeout`>>
25822586
|5000
25832587
|The timeout passed into `Consumer.poll()`.
@@ -3825,6 +3829,10 @@ However, the consumers might not have actually paused yet.
38253829

38263830
In addition (also since 2.1.5), `ConsumerPausedEvent` and `ConsumerResumedEvent` instances are published with the container as the `source` property and the `TopicPartition` instances involved in the `partitions` property.
38273831

3832+
Starting with version 2.9, a new container property `pauseImmediate`, when set to true, causes the pause to take effect after the current record is processed.
3833+
By default, the pause takes effect when all of the records from the previous poll have been processed.
3834+
See <<pauseImmediate>>.
3835+
38283836
The following simple Spring Boot application demonstrates by using the container registry to get a reference to a `@KafkaListener` method's container and pausing or resuming its consumers as well as receiving the corresponding events:
38293837

38303838
====

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

+23
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,8 @@ public enum EOSMode {
280280

281281
private boolean asyncAcks;
282282

283+
private boolean pauseImmediate;
284+
283285
/**
284286
* Create properties for a container that will subscribe to the specified topics.
285287
* @param topics the topics.
@@ -873,6 +875,27 @@ public void setAsyncAcks(boolean asyncAcks) {
873875
this.asyncAcks = asyncAcks;
874876
}
875877

878+
/**
879+
* When pausing the container with a record listener, whether the pause takes effect
880+
* immediately, when the current record has been processed, or after all records from
881+
* the previous poll have been processed. Default false.
882+
* @return whether to pause immediately.
883+
* @since 2.9
884+
*/
885+
public boolean isPauseImmediate() {
886+
return this.pauseImmediate;
887+
}
888+
889+
/**
890+
* Set to true to pause the container after the current record has been processed, rather
891+
* than after all the records from the previous poll have been processed.
892+
* @param pauseImmediate true to pause immediately.
893+
* @since 2.9
894+
*/
895+
public void setPauseImmediate(boolean pauseImmediate) {
896+
this.pauseImmediate = pauseImmediate;
897+
}
898+
876899
private void adviseListenerIfNeeded() {
877900
if (!CollectionUtils.isEmpty(this.adviceChain)) {
878901
if (AopUtils.isAopProxy(this.messageListener)) {

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+39-16
Original file line numberDiff line numberDiff line change
@@ -746,6 +746,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
746746

747747
private final Set<TopicPartition> pausedForNack = new HashSet<>();
748748

749+
private final boolean pauseImmediate = this.containerProperties.isPauseImmediate();
750+
749751
private Map<TopicPartition, OffsetMetadata> definedPartitions;
750752

751753
private int count;
@@ -782,7 +784,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
782784

783785
private boolean receivedSome;
784786

785-
private ConsumerRecords<K, V> pendingRecordsAfterError;
787+
private ConsumerRecords<K, V> remainingRecords;
786788

787789
private boolean pauseForPending;
788790

@@ -1381,7 +1383,7 @@ protected void pollAndInvoke() {
13811383
debugRecords(records);
13821384

13831385
invokeIfHaveRecords(records);
1384-
if (this.pendingRecordsAfterError == null) {
1386+
if (this.remainingRecords == null) {
13851387
resumeConsumerIfNeccessary();
13861388
if (!this.consumerPaused) {
13871389
resumePartitionsIfNecessary();
@@ -1395,9 +1397,9 @@ private void doProcessCommits() {
13951397
processCommits();
13961398
}
13971399
catch (CommitFailedException cfe) {
1398-
if (this.pendingRecordsAfterError != null && !this.isBatchListener) {
1399-
ConsumerRecords<K, V> pending = this.pendingRecordsAfterError;
1400-
this.pendingRecordsAfterError = null;
1400+
if (this.remainingRecords != null && !this.isBatchListener) {
1401+
ConsumerRecords<K, V> pending = this.remainingRecords;
1402+
this.remainingRecords = null;
14011403
List<ConsumerRecord<?, ?>> records = new ArrayList<>();
14021404
Iterator<ConsumerRecord<K, V>> iterator = pending.iterator();
14031405
while (iterator.hasNext()) {
@@ -1563,19 +1565,19 @@ private ConsumerRecords<K, V> doPoll() {
15631565
}
15641566
else {
15651567
records = pollConsumer();
1566-
if (this.pendingRecordsAfterError != null) {
1568+
if (this.remainingRecords != null) {
15671569
int howManyRecords = records.count();
15681570
if (howManyRecords > 0) {
15691571
this.logger.error(() -> String.format("Poll returned %d record(s) while consumer was paused "
15701572
+ "after an error; emergency stop invoked to avoid message loss", howManyRecords));
15711573
KafkaMessageListenerContainer.this.emergencyStop.run();
15721574
}
1573-
TopicPartition firstPart = this.pendingRecordsAfterError.partitions().iterator().next();
1575+
TopicPartition firstPart = this.remainingRecords.partitions().iterator().next();
15741576
boolean isPaused = isPaused() || isPartitionPauseRequested(firstPart);
15751577
this.logger.debug(() -> "First pending after error: " + firstPart + "; paused: " + isPaused);
15761578
if (!isPaused) {
1577-
records = this.pendingRecordsAfterError;
1578-
this.pendingRecordsAfterError = null;
1579+
records = this.remainingRecords;
1580+
this.remainingRecords = null;
15791581
}
15801582
}
15811583
captureOffsets(records);
@@ -2225,8 +2227,8 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
22252227
private void commitOffsetsIfNeeded(final ConsumerRecords<K, V> records) {
22262228
if ((!this.autoCommit && this.commonErrorHandler.isAckAfterHandle())
22272229
|| this.producer != null) {
2228-
if (this.pendingRecordsAfterError != null) {
2229-
ConsumerRecord<K, V> firstUncommitted = this.pendingRecordsAfterError.iterator().next();
2230+
if (this.remainingRecords != null) {
2231+
ConsumerRecord<K, V> firstUncommitted = this.remainingRecords.iterator().next();
22302232
Iterator<ConsumerRecord<K, V>> it = records.iterator();
22312233
while (it.hasNext()) {
22322234
ConsumerRecord<K, V> next = it.next();
@@ -2392,7 +2394,7 @@ private void invokeBatchErrorHandler(final ConsumerRecords<K, V> records,
23922394
records, this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer,
23932395
() -> invokeBatchOnMessageWithRecordsOrList(records, list));
23942396
if (!afterHandling.isEmpty()) {
2395-
this.pendingRecordsAfterError = afterHandling;
2397+
this.remainingRecords = afterHandling;
23962398
this.pauseForPending = true;
23972399
}
23982400
}
@@ -2444,7 +2446,9 @@ private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
24442446
handleNack(records, record);
24452447
break;
24462448
}
2447-
2449+
if (checkImmediatePause(iterator)) {
2450+
break;
2451+
}
24482452
}
24492453
}
24502454

@@ -2523,9 +2527,28 @@ private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
25232527
handleNack(records, record);
25242528
break;
25252529
}
2530+
if (checkImmediatePause(iterator)) {
2531+
break;
2532+
}
25262533
}
25272534
}
25282535

2536+
private boolean checkImmediatePause(Iterator<ConsumerRecord<K, V>> iterator) {
2537+
if (isPaused() && this.pauseImmediate) {
2538+
Map<TopicPartition, List<ConsumerRecord<K, V>>> remaining = new HashMap<>();
2539+
while (iterator.hasNext()) {
2540+
ConsumerRecord<K, V> next = iterator.next();
2541+
remaining.computeIfAbsent(new TopicPartition(next.topic(), next.partition()),
2542+
tp -> new ArrayList<ConsumerRecord<K, V>>()).add(next);
2543+
}
2544+
if (remaining.size() > 0) {
2545+
this.remainingRecords = new ConsumerRecords<>(remaining);
2546+
return true;
2547+
}
2548+
}
2549+
return false;
2550+
}
2551+
25292552
@Nullable
25302553
private ConsumerRecords<K, V> checkEarlyIntercept(ConsumerRecords<K, V> nextArg) {
25312554
ConsumerRecords<K, V> next = nextArg;
@@ -2669,8 +2692,8 @@ private void commitOffsetsIfNeeded(final ConsumerRecord<K, V> record) {
26692692
if (this.isManualAck) {
26702693
this.commitRecovered = true;
26712694
}
2672-
if (this.pendingRecordsAfterError == null
2673-
|| !record.equals(this.pendingRecordsAfterError.iterator().next())) {
2695+
if (this.remainingRecords == null
2696+
|| !record.equals(this.remainingRecords.iterator().next())) {
26742697
ackCurrent(record);
26752698
}
26762699
if (this.isManualAck) {
@@ -2787,7 +2810,7 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> record,
27872810
tp -> new ArrayList<ConsumerRecord<K, V>>()).add(next);
27882811
}
27892812
if (records.size() > 0) {
2790-
this.pendingRecordsAfterError = new ConsumerRecords<>(records);
2813+
this.remainingRecords = new ConsumerRecords<>(records);
27912814
this.pauseForPending = true;
27922815
}
27932816
}

0 commit comments

Comments
 (0)