Skip to content

Commit 71a6873

Browse files
authored
Greedily advance safe commit on new global checkpoint (#48559)
Today we won't advance the safe commit on a new global checkpoint unless the last commit can become safe. This is not great if we have more than two commits as we can have a new safe commit earlier. Closes #4853
1 parent e171e48 commit 71a6873

File tree

2 files changed

+23
-17
lines changed

2 files changed

+23
-17
lines changed

server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy {
5151
private final LongSupplier globalCheckpointSupplier;
5252
private final ObjectIntHashMap<IndexCommit> snapshottedCommits; // Number of snapshots held against each commit point.
5353
private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
54+
private volatile long maxSeqNoOfNextSafeCommit;
5455
private volatile IndexCommit lastCommit; // the most recent commit point
5556
private volatile SafeCommitInfo safeCommitInfo = SafeCommitInfo.EMPTY;
5657

@@ -83,6 +84,11 @@ public void onCommit(List<? extends IndexCommit> commits) throws IOException {
8384
this.safeCommitInfo = SafeCommitInfo.EMPTY;
8485
this.lastCommit = commits.get(commits.size() - 1);
8586
this.safeCommit = commits.get(keptPosition);
87+
if (keptPosition == commits.size() - 1) {
88+
this.maxSeqNoOfNextSafeCommit = Long.MAX_VALUE;
89+
} else {
90+
this.maxSeqNoOfNextSafeCommit = Long.parseLong(commits.get(keptPosition + 1).getUserData().get(SequenceNumbers.MAX_SEQ_NO));
91+
}
8692
for (int i = 0; i < keptPosition; i++) {
8793
if (snapshottedCommits.containsKey(commits.get(i)) == false) {
8894
deleteCommit(commits.get(i));
@@ -217,16 +223,10 @@ synchronized boolean hasSnapshottedCommits() {
217223
}
218224

219225
/**
220-
* Checks if the deletion policy can release some index commits with the latest global checkpoint.
226+
* Checks if the deletion policy can delete some index commits with the latest global checkpoint.
221227
*/
222-
boolean hasUnreferencedCommits() throws IOException {
223-
final IndexCommit lastCommit = this.lastCommit;
224-
if (safeCommit != lastCommit) { // Race condition can happen but harmless
225-
final long maxSeqNoFromLastCommit = Long.parseLong(lastCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO));
226-
// We can clean up the current safe commit if the last commit is safe
227-
return globalCheckpointSupplier.getAsLong() >= maxSeqNoFromLastCommit;
228-
}
229-
return false;
228+
boolean hasUnreferencedCommits() {
229+
return maxSeqNoOfNextSafeCommit <= globalCheckpointSupplier.getAsLong();
230230
}
231231

232232
/**

server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -229,22 +229,28 @@ public void testCheckUnreferencedCommits() throws Exception {
229229
lastCheckpoint = randomLongBetween(lastCheckpoint, lastMaxSeqNo);
230230
commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID, lastTranslogGen));
231231
}
232-
IndexCommit safeCommit = randomFrom(commitList);
233-
globalCheckpoint.set(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)));
232+
int safeCommitIndex = randomIntBetween(0, commitList.size() - 1);
233+
globalCheckpoint.set(Long.parseLong(commitList.get(safeCommitIndex).getUserData().get(SequenceNumbers.MAX_SEQ_NO)));
234234
commitList.forEach(this::resetDeletion);
235235
indexPolicy.onCommit(commitList);
236-
if (safeCommit == commitList.get(commitList.size() - 1)) {
236+
237+
if (safeCommitIndex == commitList.size() - 1) {
237238
// Safe commit is the last commit - no need to clean up
238239
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen));
239240
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen));
240241
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false));
241242
} else {
242-
// Advanced but not enough
243-
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), lastMaxSeqNo - 1));
244-
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false));
245-
// Advanced enough
243+
// Advanced but not enough for any commit after the safe commit becomes safe
244+
IndexCommit nextSafeCommit = commitList.get(safeCommitIndex + 1);
245+
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(),
246+
Long.parseLong(nextSafeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)) - 1));
247+
assertFalse(indexPolicy.hasUnreferencedCommits());
248+
// Advanced enough for some index commit becomes safe
249+
globalCheckpoint.set(randomLongBetween(
250+
Long.parseLong(nextSafeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)), lastMaxSeqNo));
251+
assertTrue(indexPolicy.hasUnreferencedCommits());
252+
// Advanced enough for the last commit becomes safe
246253
globalCheckpoint.set(randomLongBetween(lastMaxSeqNo, Long.MAX_VALUE));
247-
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(true));
248254
commitList.forEach(this::resetDeletion);
249255
indexPolicy.onCommit(commitList);
250256
// Safe commit is the last commit - no need to clean up

0 commit comments

Comments
 (0)