Skip to content

Commit ef48cfa

Browse files
authored
Add Acknowledgment.nack() variants accepting Duration (#2281)
* Add Acknowledgment.nack() variants accepting Duration To prevent confusion what unit should be used for "long sleep" arguments. As the old and new methods in the Acknowledgment interface have default implementations, the change itself is backward compatible. The old methods are marked as deprecated and intended to be removed in the future. * Remove deprecated Acknowledgment.nack(long) method nack(Duration) should be used instead.
1 parent 4d4a1a6 commit ef48cfa

File tree

7 files changed

+23
-21
lines changed

7 files changed

+23
-21
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -3182,11 +3182,11 @@ public void acknowledge() {
31823182
}
31833183

31843184
@Override
3185-
public void nack(long sleepMillis) {
3185+
public void nack(Duration sleep) {
31863186
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
31873187
"nack() can only be called on the consumer thread");
3188-
Assert.isTrue(sleepMillis >= 0, "sleepMillis cannot be negative");
3189-
ListenerConsumer.this.nackSleepDurationMillis = sleepMillis;
3188+
Assert.isTrue(!sleep.isNegative(), "sleep cannot be negative");
3189+
ListenerConsumer.this.nackSleepDurationMillis = sleep.toMillis();
31903190
synchronized (ListenerConsumer.this) {
31913191
if (ListenerConsumer.this.offsetsInThisBatch != null) {
31923192
ListenerConsumer.this.offsetsInThisBatch.forEach((part, recs) -> recs.clear());
@@ -3229,13 +3229,13 @@ public void acknowledge() {
32293229
}
32303230

32313231
@Override
3232-
public void nack(int index, long sleepMillis) {
3232+
public void nack(int index, Duration sleep) {
32333233
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
32343234
"nack() can only be called on the consumer thread");
3235-
Assert.isTrue(sleepMillis >= 0, "sleepMillis cannot be negative");
3235+
Assert.isTrue(!sleep.isNegative(), "sleep cannot be negative");
32363236
Assert.isTrue(index >= 0 && index < this.records.count(), "index out of bounds");
32373237
ListenerConsumer.this.nackIndex = index;
3238-
ListenerConsumer.this.nackSleepDurationMillis = sleepMillis;
3238+
ListenerConsumer.this.nackSleepDurationMillis = sleep.toMillis();
32393239
synchronized (ListenerConsumer.this) {
32403240
if (ListenerConsumer.this.offsetsInThisBatch != null) {
32413241
ListenerConsumer.this.offsetsInThisBatch.forEach((part, recs) -> recs.clear());

Diff for: spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java

+12-10
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.kafka.support;
1818

19+
import java.time.Duration;
20+
1921
/**
2022
* Handle for acknowledging the processing of a
2123
* {@link org.apache.kafka.clients.consumer.ConsumerRecord}. Recipients can store the
@@ -37,28 +39,28 @@ public interface Acknowledgment {
3739
/**
3840
* Negatively acknowledge the current record - discard remaining records from the poll
3941
* and re-seek all partitions so that this record will be redelivered after the sleep
40-
* time (in milliseconds). Must be called on the consumer thread.
42+
* duration. Must be called on the consumer thread.
4143
* <p>
42-
* @param sleepMillis the time to sleep in milliseconds; the actual sleep time will be larger
43-
* of this value and the container's {@code maxPollInterval}, which defaults to 5 seconds.
44-
* @since 2.3
44+
* @param sleep the duration to sleep; the actual sleep time will be larger of this value
45+
* and the container's {@code maxPollInterval}, which defaults to 5 seconds.
46+
* @since 2.8.7
4547
*/
46-
default void nack(long sleepMillis) {
48+
default void nack(Duration sleep) {
4749
throw new UnsupportedOperationException("nack(sleep) is not supported by this Acknowledgment");
4850
}
4951

5052
/**
5153
* Negatively acknowledge the record at an index in a batch - commit the offset(s) of
5254
* records before the index and re-seek the partitions so that the record at the index
53-
* and subsequent records will be redelivered after the sleep time (in milliseconds).
55+
* and subsequent records will be redelivered after the sleep duration.
5456
* Must be called on the consumer thread.
5557
* <p>
5658
* @param index the index of the failed record in the batch.
57-
* @param sleepMillis the time to sleep in milliseconds; the actual sleep time will be larger
58-
* of this value and the container's {@code maxPollInterval}, which defaults to 5 seconds.
59-
* @since 2.3
59+
* @param sleep the duration to sleep; the actual sleep time will be larger of this value
60+
* and the container's {@code maxPollInterval}, which defaults to 5 seconds.
61+
* @since 2.8.7
6062
*/
61-
default void nack(int index, long sleepMillis) {
63+
default void nack(int index, Duration sleep) {
6264
throw new UnsupportedOperationException("nack(index, sleep) is not supported by this Acknowledgment");
6365
}
6466

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackBatchTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ public void foo(List<String> in, Acknowledgment ack) {
146146
this.replayTime = System.currentTimeMillis() - this.replayTime;
147147
this.deliveryLatch.countDown();
148148
if (this.fail.getAndSet(false)) {
149-
ack.nack(3, 50);
149+
ack.nack(3, Duration.ofMillis(50));
150150
}
151151
else {
152152
ack.acknowledge();

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackBatchTxTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ public void foo(List<String> in, Acknowledgment ack) {
150150
this.replayTime = System.currentTimeMillis() - this.replayTime;
151151
this.deliveryLatch.countDown();
152152
if (++this.count == 1) { // part 1, offset 1, first time
153-
ack.nack(3, 50);
153+
ack.nack(3, Duration.ofMillis(50));
154154
}
155155
else {
156156
ack.acknowledge();

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackPauseResumeTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public void foo(String in, Acknowledgment ack) {
125125
}
126126
this.deliveryLatch.countDown();
127127
if (++this.count == 4) { // part 1, offset 1, first time
128-
ack.nack(50);
128+
ack.nack(Duration.ofMillis(50));
129129
}
130130
else {
131131
ack.acknowledge();

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackRecordTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public void foo(String in, Acknowledgment ack) {
142142
}
143143
this.deliveryLatch.countDown();
144144
if (++this.count == 4) { // part 1, offset 1, first time
145-
ack.nack(50);
145+
ack.nack(Duration.ofMillis(50));
146146
}
147147
else {
148148
ack.acknowledge();

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackRecordZeroSleepTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public void foo(String in, Acknowledgment ack) {
136136
++this.count;
137137
if (this.contents.size() == 1 || this.count == 5 || this.count == 8) {
138138
// first, last record or part 1, offset 1, first time
139-
ack.nack(0);
139+
ack.nack(Duration.ofMillis(0));
140140
}
141141
else {
142142
ack.acknowledge();

0 commit comments

Comments
 (0)