Skip to content

Commit 93f5ae5

Browse files
szpakgaryrussell
authored andcommitted
Precise unit for sleep duration and wake time
Prior to, it was required to dig a few levels into to realize they are in millis.
1 parent ed0a492 commit 93f5ae5

File tree

2 files changed

+28
-35
lines changed

2 files changed

+28
-35
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+19-19
Original file line numberDiff line numberDiff line change
@@ -730,9 +730,9 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
730730

731731
private long lastAlertAt = this.lastReceive;
732732

733-
private long nackSleep = -1;
733+
private long nackSleepDurationMillis = -1;
734734

735-
private long nackWake;
735+
private long nackWakeTimeMillis;
736736

737737
private int nackIndex;
738738

@@ -1622,9 +1622,9 @@ private void doPauseConsumerIfNecessary() {
16221622
}
16231623

16241624
private void resumeConsumerIfNeccessary() {
1625-
if (this.nackWake > 0) {
1626-
if (System.currentTimeMillis() > this.nackWake) {
1627-
this.nackWake = 0;
1625+
if (this.nackWakeTimeMillis > 0) {
1626+
if (System.currentTimeMillis() > this.nackWakeTimeMillis) {
1627+
this.nackWakeTimeMillis = 0;
16281628
this.consumer.resume(this.pausedForNack);
16291629
this.logger.debug(() -> "Resumed after nack sleep: " + this.pausedForNack);
16301630
this.pausedForNack.clear();
@@ -2207,7 +2207,7 @@ private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, // NOSONA
22072207

22082208
invokeBatchOnMessageWithRecordsOrList(records, recordList);
22092209
List<ConsumerRecord<?, ?>> toSeek = null;
2210-
if (this.nackSleep >= 0) {
2210+
if (this.nackSleepDurationMillis >= 0) {
22112211
int index = 0;
22122212
toSeek = new ArrayList<>();
22132213
for (ConsumerRecord<K, V> record : records) {
@@ -2217,7 +2217,7 @@ private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, // NOSONA
22172217
}
22182218
}
22192219
if (this.producer != null || (!this.isAnyManualAck && !this.autoCommit)) {
2220-
if (this.nackSleep < 0) {
2220+
if (this.nackSleepDurationMillis < 0) {
22212221
for (ConsumerRecord<K, V> record : getHighestOffsetRecords(records)) {
22222222
this.acks.put(record);
22232223
}
@@ -2356,7 +2356,7 @@ private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
23562356
if (this.commonRecordInterceptor != null) {
23572357
this.commonRecordInterceptor.afterRecord(record, this.consumer);
23582358
}
2359-
if (this.nackSleep >= 0) {
2359+
if (this.nackSleepDurationMillis >= 0) {
23602360
handleNack(records, record);
23612361
break;
23622362
}
@@ -2435,7 +2435,7 @@ private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
24352435
if (this.commonRecordInterceptor != null) {
24362436
this.commonRecordInterceptor.afterRecord(record, this.consumer);
24372437
}
2438-
if (this.nackSleep >= 0) {
2438+
if (this.nackSleepDurationMillis >= 0) {
24392439
handleNack(records, record);
24402440
break;
24412441
}
@@ -2510,8 +2510,8 @@ private boolean recordsEqual(ConsumerRecord<K, V> rec1, ConsumerRecord<K, V> rec
25102510
}
25112511

25122512
private void pauseForNackSleep() {
2513-
if (this.nackSleep > 0) {
2514-
this.nackWake = System.currentTimeMillis() + this.nackSleep;
2513+
if (this.nackSleepDurationMillis > 0) {
2514+
this.nackWakeTimeMillis = System.currentTimeMillis() + this.nackSleepDurationMillis;
25152515
Set<TopicPartition> alreadyPaused = this.consumer.paused();
25162516
Collection<TopicPartition> assigned = getAssignedPartitions();
25172517
if (assigned != null) {
@@ -2531,7 +2531,7 @@ private void pauseForNackSleep() {
25312531
this.consumer.resume(nowPaused);
25322532
}
25332533
}
2534-
this.nackSleep = -1;
2534+
this.nackSleepDurationMillis = -1;
25352535
}
25362536

25372537
/**
@@ -2626,7 +2626,7 @@ private void invokeOnMessage(final ConsumerRecord<K, V> record) {
26262626
checkDeser(record, SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER);
26272627
}
26282628
doInvokeOnMessage(record);
2629-
if (this.nackSleep < 0 && !this.isManualImmediateAck) {
2629+
if (this.nackSleepDurationMillis < 0 && !this.isManualImmediateAck) {
26302630
ackCurrent(record);
26312631
}
26322632
}
@@ -3174,11 +3174,11 @@ public void acknowledge() {
31743174
}
31753175

31763176
@Override
3177-
public void nack(long sleep) {
3177+
public void nack(long sleepMillis) {
31783178
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
31793179
"nack() can only be called on the consumer thread");
3180-
Assert.isTrue(sleep >= 0, "sleep cannot be negative");
3181-
ListenerConsumer.this.nackSleep = sleep;
3180+
Assert.isTrue(sleepMillis >= 0, "sleepMillis cannot be negative");
3181+
ListenerConsumer.this.nackSleepDurationMillis = sleepMillis;
31823182
synchronized (ListenerConsumer.this) {
31833183
if (ListenerConsumer.this.offsetsInThisBatch != null) {
31843184
ListenerConsumer.this.offsetsInThisBatch.forEach((part, recs) -> recs.clear());
@@ -3221,13 +3221,13 @@ public void acknowledge() {
32213221
}
32223222

32233223
@Override
3224-
public void nack(int index, long sleep) {
3224+
public void nack(int index, long sleepMillis) {
32253225
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
32263226
"nack() can only be called on the consumer thread");
3227-
Assert.isTrue(sleep >= 0, "sleep cannot be negative");
3227+
Assert.isTrue(sleepMillis >= 0, "sleepMillis cannot be negative");
32283228
Assert.isTrue(index >= 0 && index < this.records.count(), "index out of bounds");
32293229
ListenerConsumer.this.nackIndex = index;
3230-
ListenerConsumer.this.nackSleep = sleep;
3230+
ListenerConsumer.this.nackSleepDurationMillis = sleepMillis;
32313231
synchronized (ListenerConsumer.this) {
32323232
if (ListenerConsumer.this.offsetsInThisBatch != null) {
32333233
ListenerConsumer.this.offsetsInThisBatch.forEach((part, recs) -> recs.clear());

Diff for: spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java

+9-16
Original file line numberDiff line numberDiff line change
@@ -37,35 +37,28 @@ public interface Acknowledgment {
3737
/**
3838
* Negatively acknowledge the current record - discard remaining records from the poll
3939
* and re-seek all partitions so that this record will be redelivered after the sleep
40-
* time. Must be called on the consumer thread.
40+
* time (in milliseconds). Must be called on the consumer thread.
4141
* <p>
42-
* <b>When using group management,
43-
* {@code sleep + time spent processing the previous messages from the poll} must be
44-
* less than the consumer {@code max.poll.interval.ms} property, to avoid a
45-
* rebalance.</b>
46-
* @param sleep the time to sleep; the actual sleep time will be larger of this value
47-
* and the container's {@code maxPollInterval}, which defaults to 5 seconds.
42+
* @param sleepMillis the time to sleep in milliseconds; the actual sleep time will be larger
43+
* of this value and the container's {@code maxPollInterval}, which defaults to 5 seconds.
4844
* @since 2.3
4945
*/
50-
default void nack(long sleep) {
46+
default void nack(long sleepMillis) {
5147
throw new UnsupportedOperationException("nack(sleep) is not supported by this Acknowledgment");
5248
}
5349

5450
/**
5551
* Negatively acknowledge the record at an index in a batch - commit the offset(s) of
5652
* records before the index and re-seek the partitions so that the record at the index
57-
* and subsequent records will be redelivered after the sleep time. Must be called on
58-
* the consumer thread.
53+
* and subsequent records will be redelivered after the sleep time (in milliseconds).
54+
* Must be called on the consumer thread.
5955
* <p>
60-
* <b>When using group management,
61-
* {@code sleep + time spent processing the records before the index} must be less
62-
* than the consumer {@code max.poll.interval.ms} property, to avoid a rebalance.</b>
6356
* @param index the index of the failed record in the batch.
64-
* @param sleep the time to sleep; the actual sleep time will be larger of this value
65-
* and the container's {@code maxPollInterval}, which defaults to 5 seconds.
57+
* @param sleepMillis the time to sleep in milliseconds; the actual sleep time will be larger
58+
* of this value and the container's {@code maxPollInterval}, which defaults to 5 seconds.
6659
* @since 2.3
6760
*/
68-
default void nack(int index, long sleep) {
61+
default void nack(int index, long sleepMillis) {
6962
throw new UnsupportedOperationException("nack(index, sleep) is not supported by this Acknowledgment");
7063
}
7164

0 commit comments

Comments
 (0)