diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 8166a0d37d429..70507fd18e7a3 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -51,6 +51,7 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy { private final LongSupplier globalCheckpointSupplier; private final ObjectIntHashMap snapshottedCommits; // Number of snapshots held against each commit point. private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint. + private volatile long maxSeqNoOfNextSafeCommit; private volatile IndexCommit lastCommit; // the most recent commit point private volatile SafeCommitInfo safeCommitInfo = SafeCommitInfo.EMPTY; @@ -83,6 +84,11 @@ public void onCommit(List commits) throws IOException { this.safeCommitInfo = SafeCommitInfo.EMPTY; this.lastCommit = commits.get(commits.size() - 1); this.safeCommit = commits.get(keptPosition); + if (keptPosition == commits.size() - 1) { + this.maxSeqNoOfNextSafeCommit = Long.MAX_VALUE; + } else { + this.maxSeqNoOfNextSafeCommit = Long.parseLong(commits.get(keptPosition + 1).getUserData().get(SequenceNumbers.MAX_SEQ_NO)); + } for (int i = 0; i < keptPosition; i++) { if (snapshottedCommits.containsKey(commits.get(i)) == false) { deleteCommit(commits.get(i)); @@ -217,16 +223,10 @@ synchronized boolean hasSnapshottedCommits() { } /** - * Checks if the deletion policy can release some index commits with the latest global checkpoint. + * Checks if the deletion policy can delete some index commits with the latest global checkpoint. */ - boolean hasUnreferencedCommits() throws IOException { - final IndexCommit lastCommit = this.lastCommit; - if (safeCommit != lastCommit) { // Race condition can happen but harmless - final long maxSeqNoFromLastCommit = Long.parseLong(lastCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)); - // We can clean up the current safe commit if the last commit is safe - return globalCheckpointSupplier.getAsLong() >= maxSeqNoFromLastCommit; - } - return false; + boolean hasUnreferencedCommits() { + return maxSeqNoOfNextSafeCommit <= globalCheckpointSupplier.getAsLong(); } /** diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index 4e82a77ce43a7..deb224f5b1895 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -229,22 +229,28 @@ public void testCheckUnreferencedCommits() throws Exception { lastCheckpoint = randomLongBetween(lastCheckpoint, lastMaxSeqNo); commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID, lastTranslogGen)); } - IndexCommit safeCommit = randomFrom(commitList); - globalCheckpoint.set(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO))); + int safeCommitIndex = randomIntBetween(0, commitList.size() - 1); + globalCheckpoint.set(Long.parseLong(commitList.get(safeCommitIndex).getUserData().get(SequenceNumbers.MAX_SEQ_NO))); commitList.forEach(this::resetDeletion); indexPolicy.onCommit(commitList); - if (safeCommit == commitList.get(commitList.size() - 1)) { + + if (safeCommitIndex == commitList.size() - 1) { // Safe commit is the last commit - no need to clean up assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen)); assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen)); assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false)); } else { - // Advanced but not enough - globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), lastMaxSeqNo - 1)); - assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false)); - // Advanced enough + // Advanced but not enough for any commit after the safe commit becomes safe + IndexCommit nextSafeCommit = commitList.get(safeCommitIndex + 1); + globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), + Long.parseLong(nextSafeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)) - 1)); + assertFalse(indexPolicy.hasUnreferencedCommits()); + // Advanced enough for some index commit becomes safe + globalCheckpoint.set(randomLongBetween( + Long.parseLong(nextSafeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)), lastMaxSeqNo)); + assertTrue(indexPolicy.hasUnreferencedCommits()); + // Advanced enough for the last commit becomes safe globalCheckpoint.set(randomLongBetween(lastMaxSeqNo, Long.MAX_VALUE)); - assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(true)); commitList.forEach(this::resetDeletion); indexPolicy.onCommit(commitList); // Safe commit is the last commit - no need to clean up