-
Notifications
You must be signed in to change notification settings - Fork 1.6k
GH-656: Fix seek on rollback [Backport] #659
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Fixes spring-projects#656 Fixes spring-projects#657 Previously, after a rollback, we only performed a `seek` on the failed record. We need to seek for all unprocessed records. Also, when no error handler was provided, and using a batch listener, the offsets were added to `acks` and incorrectly committed. (spring-projects#657). Also, if a `ContainerAwareErrorHandler` "handles" the error, the offsets weren't committed. Enhance the tests to verify full seeks. Add a new test to verify the batch listener doesn't commit after a roll back. **cherry-pick to 2.1.x, 2.0.x** I will backport to 1.3.x after review. * Some simple polishing
* @since 1.3.5 | ||
* | ||
*/ | ||
@FunctionalInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not going to work on Java 7, I guess.
Or will be ignored at runtime...
seekOffsets.put(topicPartition, record.offset()); | ||
} | ||
} | ||
for (Entry<TopicPartition, Long> entry : seekOffsets.entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this extra loop if we can just perform seek
during the first one skipping already processed partitions ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems much more complicated to me to keep track of which ones we've already seeked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I mean something like this:
Set<TopicPartition> seekOffsets = new HashSet<>();
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
while (iterator.hasNext()) {
ConsumerRecord<K, V> record = iterator.next();
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
if (seekOffsets.add(topicPartition)) {
try {
consumer.seek(topicPartition, record.offset());
}
catch (Exception e) {
logger.error("Failed to seek " + entry.getKey() + " to " + entry.getValue());
}
}
}
No? What am I missing?
Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a big difference and I doubt the performance will be any better - it has to calculate hash codes instead performing the second loop - but feel free to change it if you insist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it has to calculate hash codes
???
Don't you do that already with the current HashMap
and seekOffsets.containsKey()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doh, of course; ok. Feel free to change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Merging with agreed polishing...
Merged as e826a58 |
Fixes #656
Fixes #657
Previously, after a rollback, we only performed a
seek
on the failed record.We need to seek for all unprocessed records.
Also, when no error handler was provided, and using a batch listener, the
offsets were added to
acks
and incorrectly committed. (#657).Also, if a
ContainerAwareErrorHandler
"handles" the error, the offsets weren'tcommitted.
Enhance the tests to verify full seeks.
Add a new test to verify the batch listener doesn't commit after a roll back.
cherry-pick to 2.1.x, 2.0.x I will backport to 1.3.x after review.