Skip to content

Add Acknowledgment.nack() variants accepting Duration #2281

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3174,11 +3174,11 @@ public void acknowledge() {
}

@Override
public void nack(long sleepMillis) {
public void nack(Duration sleep) {
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
"nack() can only be called on the consumer thread");
Assert.isTrue(sleepMillis >= 0, "sleepMillis cannot be negative");
ListenerConsumer.this.nackSleepDurationMillis = sleepMillis;
Assert.isTrue(!sleep.isNegative(), "sleep cannot be negative");
ListenerConsumer.this.nackSleepDurationMillis = sleep.toMillis();
synchronized (ListenerConsumer.this) {
if (ListenerConsumer.this.offsetsInThisBatch != null) {
ListenerConsumer.this.offsetsInThisBatch.forEach((part, recs) -> recs.clear());
Expand Down Expand Up @@ -3221,13 +3221,13 @@ public void acknowledge() {
}

@Override
public void nack(int index, long sleepMillis) {
public void nack(int index, Duration sleep) {
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
"nack() can only be called on the consumer thread");
Assert.isTrue(sleepMillis >= 0, "sleepMillis cannot be negative");
Assert.isTrue(!sleep.isNegative(), "sleep cannot be negative");
Assert.isTrue(index >= 0 && index < this.records.count(), "index out of bounds");
ListenerConsumer.this.nackIndex = index;
ListenerConsumer.this.nackSleepDurationMillis = sleepMillis;
ListenerConsumer.this.nackSleepDurationMillis = sleep.toMillis();
synchronized (ListenerConsumer.this) {
if (ListenerConsumer.this.offsetsInThisBatch != null) {
ListenerConsumer.this.offsetsInThisBatch.forEach((part, recs) -> recs.clear());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.springframework.kafka.support;

import java.time.Duration;

/**
* Handle for acknowledging the processing of a
* {@link org.apache.kafka.clients.consumer.ConsumerRecord}. Recipients can store the
Expand All @@ -37,28 +39,28 @@ public interface Acknowledgment {
/**
* Negatively acknowledge the current record - discard remaining records from the poll
* and re-seek all partitions so that this record will be redelivered after the sleep
* time (in milliseconds). Must be called on the consumer thread.
* duration. Must be called on the consumer thread.
* <p>
* @param sleepMillis the time to sleep in milliseconds; the actual sleep time will be larger
* of this value and the container's {@code maxPollInterval}, which defaults to 5 seconds.
* @since 2.3
* @param sleep the duration to sleep; the actual sleep time will be larger of this value
* and the container's {@code maxPollInterval}, which defaults to 5 seconds.
Copy link
Contributor Author

@szpak szpak May 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unrelated to that PR, however, yesterday I was very curious why in my integration tests the nack(3000) still checks every 5 seconds, even though, I set maxPollInterval to 1 seconds (and now also idleBetweenPolls to 0 which should be default). Today, checking if it is worth to change also the internal fields from long to Instant, I have found:

this.maxPollInterval - (System.currentTimeMillis() - this.lastPoll)
- 5000); // NOSONAR - less by five seconds to avoid race condition with rebalance

I wonder, if that could have some impact, or maybe you see any other reason why I cannot enforce polling every 1 or 2 seconds (as maxPollInterval is set)?

Here an example with nack(1991) (precisely with PT1.991779718S to millis) and max.poll.interval.ms = 750, but it is resumed after 5 seconds:

2022-05-20 00:08:13.992 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Pausing for nack sleep: [pusher_retry_1-0]
2022-05-20 00:08:13.992 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2022-05-20 00:08:13.992 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Still paused for nack sleep
2022-05-20 00:08:18.142 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Received: 0 records
2022-05-20 00:08:18.143 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2022-05-20 00:08:18.146 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Received: 0 records
2022-05-20 00:08:18.146 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2022-05-20 00:08:18.227 DEBUG foobar --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Received: 0 records
2022-05-20 00:08:18.227 DEBUG foobar --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2022-05-20 00:08:18.234 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Received: 0 records
2022-05-20 00:08:18.234 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2022-05-20 00:08:18.927 DEBUG foobar --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Received: 0 records
2022-05-20 00:08:18.927 DEBUG foobar --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2022-05-20 00:08:18.975 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Received: 0 records
2022-05-20 00:08:18.975 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2022-05-20 00:08:18.992 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Received: 0 records
2022-05-20 00:08:18.992 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Resumed after nack sleep: [pusher_retry_1-0]
2022-05-20 00:08:18.993 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2022-05-20 00:08:18.993 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Received: 0 records
2022-05-20 00:08:18.993 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2022-05-20 00:08:18.997 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Received: 1 records
2022-05-20 00:08:18.998 DEBUG foobar --- [Container-0-C-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=MyMessage.....]

I might try to create a reproducer, if needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@garyrussell What do you think about that case (and the PR in general ;-) )?

Copy link
Contributor

@garyrussell garyrussell Jun 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you are using non-blocking retries; the precision there is also affected by the idlePartitionEventInterval .

https://docs.spring.io/spring-kafka/docs/current/reference/html/#back-off-delay-precision

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm - but that shouldn't be the case here; will investigate...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works as expected for me:

2022-06-06T13:45:04.375-04:00 DEBUG 72091 --- [1-retry-0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Pausing for nack sleep: [kgh2281-retry-0-0]
2022-06-06T13:45:04.376-04:00 DEBUG 72091 --- [1-retry-0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2022-06-06T13:45:04.376-04:00 DEBUG 72091 --- [1-retry-0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Still paused for nack sleep
2022-06-06T13:45:05.378-04:00 DEBUG 72091 --- [1-retry-0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Received: 0 records
2022-06-06T13:45:05.378-04:00 DEBUG 72091 --- [1-retry-0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2022-06-06T13:45:05.378-04:00 DEBUG 72091 --- [1-retry-0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Still paused for nack sleep
2022-06-06T13:45:06.379-04:00 DEBUG 72091 --- [1-retry-0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Received: 0 records
2022-06-06T13:45:06.380-04:00 DEBUG 72091 --- [1-retry-0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Resumed after nack sleep: [kgh2281-retry-0-0]
spring.kafka.listener.poll-timeout=1000
spring.kafka.listener.ack-mode=MANUAL
spring.kafka.consumer.auto-offset-reset=earliest

logging.level.org.springframework.kafka.listener.KafkaMessageListenerContainer=debug
@SpringBootApplication
public class Kgh2281Application {


	private static final Logger log = LoggerFactory.getLogger(Kgh2281Application.class);


	public static void main(String[] args) {
		SpringApplication.run(Kgh2281Application.class, args);
	}

	@KafkaListener(id = "kgh2281", topics = "kgh2281")
	@RetryableTopic
	void listen(String in, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
		log.info("{} from {}", in, topic);
		if (topic.contains("retry")) {
			ack.nack(Duration.ofMillis(1900));
		}
		else {
			throw new RuntimeException("force retry");
		}
	}

	@Bean
	public NewTopic topic() {
		return TopicBuilder.name("kgh2281").partitions(1).replicas(1).build();
	}

	@Bean
	ApplicationRunner runner(KafkaTemplate<String, String> template) {
		return args -> {
			template.send("kgh2281", "foo");
		};
	}

}

Copy link
Contributor Author

@szpak szpak Jun 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting 😕 . I will try to re-check my code with the latest version of spring-kafka.

Thanks for checking.

* @since 2.8.7
*/
default void nack(long sleepMillis) {
default void nack(Duration sleep) {
throw new UnsupportedOperationException("nack(sleep) is not supported by this Acknowledgment");
}

/**
* Negatively acknowledge the record at an index in a batch - commit the offset(s) of
* records before the index and re-seek the partitions so that the record at the index
* and subsequent records will be redelivered after the sleep time (in milliseconds).
* and subsequent records will be redelivered after the sleep duration.
* Must be called on the consumer thread.
* <p>
* @param index the index of the failed record in the batch.
* @param sleepMillis the time to sleep in milliseconds; the actual sleep time will be larger
* of this value and the container's {@code maxPollInterval}, which defaults to 5 seconds.
* @since 2.3
* @param sleep the duration to sleep; the actual sleep time will be larger of this value
* and the container's {@code maxPollInterval}, which defaults to 5 seconds.
* @since 2.8.7
*/
default void nack(int index, long sleepMillis) {
default void nack(int index, Duration sleep) {
throw new UnsupportedOperationException("nack(index, sleep) is not supported by this Acknowledgment");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void foo(List<String> in, Acknowledgment ack) {
this.replayTime = System.currentTimeMillis() - this.replayTime;
this.deliveryLatch.countDown();
if (this.fail.getAndSet(false)) {
ack.nack(3, 50);
ack.nack(3, Duration.ofMillis(50));
}
else {
ack.acknowledge();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public void foo(List<String> in, Acknowledgment ack) {
this.replayTime = System.currentTimeMillis() - this.replayTime;
this.deliveryLatch.countDown();
if (++this.count == 1) { // part 1, offset 1, first time
ack.nack(3, 50);
ack.nack(3, Duration.ofMillis(50));
}
else {
ack.acknowledge();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void foo(String in, Acknowledgment ack) {
}
this.deliveryLatch.countDown();
if (++this.count == 4) { // part 1, offset 1, first time
ack.nack(50);
ack.nack(Duration.ofMillis(50));
}
else {
ack.acknowledge();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void foo(String in, Acknowledgment ack) {
}
this.deliveryLatch.countDown();
if (++this.count == 4) { // part 1, offset 1, first time
ack.nack(50);
ack.nack(Duration.ofMillis(50));
}
else {
ack.acknowledge();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void foo(String in, Acknowledgment ack) {
++this.count;
if (this.contents.size() == 1 || this.count == 5 || this.count == 8) {
// first, last record or part 1, offset 1, first time
ack.nack(0);
ack.nack(Duration.ofMillis(0));
}
else {
ack.acknowledge();
Expand Down