-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Add Acknowledgment.nack() variants accepting Duration #2281
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
To prevent confusion what unit should be used for "long sleep" arguments. As the old and new methods in the Acknowledgment interface have default implementations, the change itself is backward compatible. The old methods are marked as deprecated and intended to be removed in the future.
*/ | ||
@Deprecated(since = "2.8.7", forRemoval = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For backports, since
and forRemoval
will be a problem. On the other hand, IDE generated a warning without them in 3.x. They could be dropped during backporting or removed also in master. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make a separate PR for 2.8.x and actually remove the method in 3.0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created #2298 which should be JDK 8+ compatible and remove the methods for 3.0.
(sorry for force-push, but I just amended the new commit as I forgot to remove the second method in the first try, so it's clear what has been added since the one your already reviewed).
* duration. Must be called on the consumer thread. | ||
* <p> | ||
* @param sleep the duration to sleep; the actual sleep time will be larger of this value | ||
* and the container's {@code maxPollInterval}, which defaults to 5 seconds. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's unrelated to that PR, however, yesterday I was very curious why in my integration tests the nack(3000)
still checks every 5 seconds, even though, I set maxPollInterval
to 1 seconds (and now also idleBetweenPolls
to 0 which should be default). Today, checking if it is worth to change also the internal fields from long
to Instant
, I have found:
Lines 1722 to 1723 in 93f5ae5
this.maxPollInterval - (System.currentTimeMillis() - this.lastPoll) | |
- 5000); // NOSONAR - less by five seconds to avoid race condition with rebalance |
I wonder, if that could have some impact, or maybe you see any other reason why I cannot enforce polling every 1 or 2 seconds (as maxPollInterval
is set)?
Here an example with nack(1991)
(precisely with PT1.991779718S
to millis) and max.poll.interval.ms = 750
, but it is resumed after 5 seconds:
2022-05-20 00:08:13.992 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Pausing for nack sleep: [pusher_retry_1-0]
2022-05-20 00:08:13.992 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Commit list: {}
2022-05-20 00:08:13.992 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Still paused for nack sleep
2022-05-20 00:08:18.142 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Received: 0 records
2022-05-20 00:08:18.143 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Commit list: {}
2022-05-20 00:08:18.146 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Received: 0 records
2022-05-20 00:08:18.146 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Commit list: {}
2022-05-20 00:08:18.227 DEBUG foobar --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Received: 0 records
2022-05-20 00:08:18.227 DEBUG foobar --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Commit list: {}
2022-05-20 00:08:18.234 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Received: 0 records
2022-05-20 00:08:18.234 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Commit list: {}
2022-05-20 00:08:18.927 DEBUG foobar --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Received: 0 records
2022-05-20 00:08:18.927 DEBUG foobar --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Commit list: {}
2022-05-20 00:08:18.975 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Received: 0 records
2022-05-20 00:08:18.975 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Commit list: {}
2022-05-20 00:08:18.992 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Received: 0 records
2022-05-20 00:08:18.992 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Resumed after nack sleep: [pusher_retry_1-0]
2022-05-20 00:08:18.993 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Commit list: {}
2022-05-20 00:08:18.993 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Received: 0 records
2022-05-20 00:08:18.993 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Commit list: {}
2022-05-20 00:08:18.997 DEBUG foobar --- [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Received: 1 records
2022-05-20 00:08:18.998 DEBUG foobar --- [Container-0-C-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=MyMessage.....]
I might try to create a reproducer, if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@garyrussell What do you think about that case (and the PR in general ;-) )?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like you are using non-blocking retries; the precision there is also affected by the idlePartitionEventInterval
.
https://docs.spring.io/spring-kafka/docs/current/reference/html/#back-off-delay-precision
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm - but that shouldn't be the case here; will investigate...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Works as expected for me:
2022-06-06T13:45:04.375-04:00 DEBUG 72091 --- [1-retry-0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Pausing for nack sleep: [kgh2281-retry-0-0]
2022-06-06T13:45:04.376-04:00 DEBUG 72091 --- [1-retry-0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Commit list: {}
2022-06-06T13:45:04.376-04:00 DEBUG 72091 --- [1-retry-0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Still paused for nack sleep
2022-06-06T13:45:05.378-04:00 DEBUG 72091 --- [1-retry-0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Received: 0 records
2022-06-06T13:45:05.378-04:00 DEBUG 72091 --- [1-retry-0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Commit list: {}
2022-06-06T13:45:05.378-04:00 DEBUG 72091 --- [1-retry-0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Still paused for nack sleep
2022-06-06T13:45:06.379-04:00 DEBUG 72091 --- [1-retry-0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Received: 0 records
2022-06-06T13:45:06.380-04:00 DEBUG 72091 --- [1-retry-0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Resumed after nack sleep: [kgh2281-retry-0-0]
spring.kafka.listener.poll-timeout=1000
spring.kafka.listener.ack-mode=MANUAL
spring.kafka.consumer.auto-offset-reset=earliest
logging.level.org.springframework.kafka.listener.KafkaMessageListenerContainer=debug
@SpringBootApplication
public class Kgh2281Application {
private static final Logger log = LoggerFactory.getLogger(Kgh2281Application.class);
public static void main(String[] args) {
SpringApplication.run(Kgh2281Application.class, args);
}
@KafkaListener(id = "kgh2281", topics = "kgh2281")
@RetryableTopic
void listen(String in, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("{} from {}", in, topic);
if (topic.contains("retry")) {
ack.nack(Duration.ofMillis(1900));
}
else {
throw new RuntimeException("force retry");
}
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("kgh2281").partitions(1).replicas(1).build();
}
@Bean
ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("kgh2281", "foo");
};
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting 😕 . I will try to re-check my code with the latest version of spring-kafka.
Thanks for checking.
nack(Duration) should be used instead.
Should this be backported into 2.9.x ? I had a whiplash when upgrading from 2.8.8 |
No; we can't make breaking changes in a patch release; if you want it back ported, we would have to reinstate the old variants (and deprecate them). |
Wasn't that done in 2.8.x? #2298 |
Oh, sorry; I forgot about the second PR; the confusion is because the second PR was against 2.8 instead of 2.9; it should have been against 2.9 and we would have back-ported to 2.8. Will fix in the next release. |
@wilerson Merged; 2.9.2 is due next week. Apologies for the inconvenience. |
To prevent confusion what unit should be used for "long sleep" arguments.
As the old and new methods in the Acknowledgment interface have default
implementations, the change itself is backward compatible.
The old methods are marked as deprecated and intended to be removed in the future.
I wonder, if we should also test old, deprecated methods. On one hand, they should be to "ensure" they still work fine. On the other hand the logic there is rather obvious and covering them would probably duplicate 2 tests. WDYT?
An extension to #2278.