Skip to content

Commit 00c6e66

Browse files
committed
Clean up commits when global checkpoint advanced (#28140)
Today we keep multiple index commits based on the current global checkpoint, but only clean up unneeded index commits when we have a new index commit. However, we can release the old index commits earlier once the global checkpoint has advanced enough. This commit makes an engine revisit the index deletion policy whenever a new global checkpoint value is persisted and advanced enough. Relates #10708
1 parent a76a9bb commit 00c6e66

File tree

10 files changed

+119
-14
lines changed

10 files changed

+119
-14
lines changed

server/src/main/java/org/elasticsearch/index/IndexService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -678,7 +678,7 @@ private void maybeFSyncTranslogs() {
678678
try {
679679
Translog translog = shard.getTranslog();
680680
if (translog.syncNeeded()) {
681-
translog.sync();
681+
shard.sync();
682682
}
683683
} catch (AlreadyClosedException ex) {
684684
// fine - continue;

server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
4747
private final LongSupplier globalCheckpointSupplier;
4848
private final IndexCommit startingCommit;
4949
private final ObjectIntHashMap<IndexCommit> snapshottedCommits; // Number of snapshots held against each commit point.
50-
private IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
51-
private IndexCommit lastCommit; // the most recent commit point
50+
private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
51+
private volatile IndexCommit lastCommit; // the most recent commit point
5252

5353
CombinedDeletionPolicy(EngineConfig.OpenMode openMode, TranslogDeletionPolicy translogDeletionPolicy,
5454
LongSupplier globalCheckpointSupplier, IndexCommit startingCommit) {
@@ -224,6 +224,21 @@ private static int indexOfKeptCommits(List<? extends IndexCommit> commits, long
224224
return 0;
225225
}
226226

227+
/**
228+
* Checks if the deletion policy can release some index commits with the latest global checkpoint.
229+
*/
230+
boolean hasUnreferencedCommits() throws IOException {
231+
final IndexCommit lastCommit = this.lastCommit;
232+
if (safeCommit != lastCommit) { // Race condition can happen but harmless
233+
if (lastCommit.getUserData().containsKey(SequenceNumbers.MAX_SEQ_NO)) {
234+
final long maxSeqNoFromLastCommit = Long.parseLong(lastCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO));
235+
// We can clean up the current safe commit if the last commit is safe
236+
return globalCheckpointSupplier.getAsLong() >= maxSeqNoFromLastCommit;
237+
}
238+
}
239+
return false;
240+
}
241+
227242
/**
228243
* A wrapper of an index commit that prevents it from being deleted.
229244
*/

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
import java.util.concurrent.locks.ReentrantLock;
9292
import java.util.concurrent.locks.ReentrantReadWriteLock;
9393
import java.util.function.BiFunction;
94+
import java.util.stream.Stream;
9495

9596
public abstract class Engine implements Closeable {
9697

@@ -549,6 +550,13 @@ public enum SearcherScope {
549550
/** returns the translog for this engine */
550551
public abstract Translog getTranslog();
551552

553+
/**
554+
* Ensures that all locations in the given stream have been written to the underlying storage.
555+
*/
556+
public abstract boolean ensureTranslogSynced(Stream<Translog.Location> locations) throws IOException;
557+
558+
public abstract void syncTranslog() throws IOException;
559+
552560
protected void ensureOpen() {
553561
if (isClosed.get()) {
554562
throw new AlreadyClosedException(shardId + " engine is closed", failedEngine.get());

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.lucene.index.MergePolicy;
3232
import org.apache.lucene.index.SegmentCommitInfo;
3333
import org.apache.lucene.index.SegmentInfos;
34-
import org.apache.lucene.index.SnapshotDeletionPolicy;
3534
import org.apache.lucene.index.Term;
3635
import org.apache.lucene.search.IndexSearcher;
3736
import org.apache.lucene.search.ReferenceManager;
@@ -95,6 +94,7 @@
9594
import java.util.concurrent.locks.ReentrantLock;
9695
import java.util.function.BiFunction;
9796
import java.util.function.LongSupplier;
97+
import java.util.stream.Stream;
9898

9999
public class InternalEngine extends Engine {
100100

@@ -558,6 +558,27 @@ public Translog getTranslog() {
558558
return translog;
559559
}
560560

561+
@Override
562+
public boolean ensureTranslogSynced(Stream<Translog.Location> locations) throws IOException {
563+
final boolean synced = translog.ensureSynced(locations);
564+
if (synced) {
565+
revisitIndexDeletionPolicyOnTranslogSynced();
566+
}
567+
return synced;
568+
}
569+
570+
@Override
571+
public void syncTranslog() throws IOException {
572+
translog.sync();
573+
revisitIndexDeletionPolicyOnTranslogSynced();
574+
}
575+
576+
private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException {
577+
if (combinedDeletionPolicy.hasUnreferencedCommits()) {
578+
indexWriter.deleteUnusedFiles();
579+
}
580+
}
581+
561582
@Override
562583
public String getHistoryUUID() {
563584
return historyUUID;

server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,9 @@ protected ReplicaResult shardOperationOnReplica(final Request request, final Ind
130130
}
131131

132132
private void maybeSyncTranslog(final IndexShard indexShard) throws IOException {
133-
final Translog translog = indexShard.getTranslog();
134133
if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST &&
135-
translog.getLastSyncedGlobalCheckpoint() < indexShard.getGlobalCheckpoint()) {
136-
indexShard.getTranslog().sync();
134+
indexShard.getTranslog().getLastSyncedGlobalCheckpoint() < indexShard.getGlobalCheckpoint()) {
135+
indexShard.sync();
137136
}
138137
}
139138

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2330,8 +2330,7 @@ public int getActiveOperationsCount() {
23302330
@Override
23312331
protected void write(List<Tuple<Translog.Location, Consumer<Exception>>> candidates) throws IOException {
23322332
try {
2333-
final Engine engine = getEngine();
2334-
engine.getTranslog().ensureSynced(candidates.stream().map(Tuple::v1));
2333+
getEngine().ensureTranslogSynced(candidates.stream().map(Tuple::v1));
23352334
} catch (AlreadyClosedException ex) {
23362335
// that's fine since we already synced everything on engine close - this also is conform with the methods
23372336
// documentation
@@ -2356,9 +2355,9 @@ public final void sync(Translog.Location location, Consumer<Exception> syncListe
23562355
translogSyncProcessor.put(location, syncListener);
23572356
}
23582357

2359-
public final void sync() throws IOException {
2358+
public void sync() throws IOException {
23602359
verifyNotClosed();
2361-
getEngine().getTranslog().sync();
2360+
getEngine().syncTranslog();
23622361
}
23632362

23642363
/**

server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,44 @@ public void testKeepOnlyStartingCommitOnInit() throws Exception {
294294
equalTo(Long.parseLong(startingCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))));
295295
}
296296

297+
public void testCheckUnreferencedCommits() throws Exception {
298+
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
299+
final UUID translogUUID = UUID.randomUUID();
300+
final TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
301+
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
302+
OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null);
303+
final List<IndexCommit> commitList = new ArrayList<>();
304+
int totalCommits = between(2, 20);
305+
long lastMaxSeqNo = between(1, 1000);
306+
long lastTranslogGen = between(1, 50);
307+
for (int i = 0; i < totalCommits; i++) {
308+
lastMaxSeqNo += between(1, 10000);
309+
lastTranslogGen += between(1, 100);
310+
commitList.add(mockIndexCommit(lastMaxSeqNo, translogUUID, lastTranslogGen));
311+
}
312+
IndexCommit safeCommit = randomFrom(commitList);
313+
globalCheckpoint.set(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)));
314+
indexPolicy.onCommit(commitList);
315+
if (safeCommit == commitList.get(commitList.size() - 1)) {
316+
// Safe commit is the last commit - no need to clean up
317+
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen));
318+
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen));
319+
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false));
320+
} else {
321+
// Advanced but not enough
322+
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), lastMaxSeqNo - 1));
323+
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false));
324+
// Advanced enough
325+
globalCheckpoint.set(randomLongBetween(lastMaxSeqNo, Long.MAX_VALUE));
326+
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(true));
327+
indexPolicy.onCommit(commitList);
328+
// Safe commit is the last commit - no need to clean up
329+
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen));
330+
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen));
331+
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false));
332+
}
333+
}
334+
297335
IndexCommit mockIndexCommit(long maxSeqNo, UUID translogUUID, long translogGen) throws IOException {
298336
final Map<String, String> userData = new HashMap<>();
299337
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4432,4 +4432,29 @@ public void testOpenIndexCreateTranslogKeepOnlyLastCommit() throws Exception {
44324432
assertThat(userData.get(Translog.TRANSLOG_GENERATION_KEY), equalTo("1"));
44334433
}
44344434
}
4435+
4436+
public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception {
4437+
IOUtils.close(engine, store);
4438+
store = createStore();
4439+
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
4440+
try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) {
4441+
final int numDocs = scaledRandomIntBetween(10, 100);
4442+
for (int docId = 0; docId < numDocs; docId++) {
4443+
index(engine, docId);
4444+
if (frequently()) {
4445+
engine.flush(randomBoolean(), randomBoolean());
4446+
}
4447+
}
4448+
engine.flush(false, randomBoolean());
4449+
List<IndexCommit> commits = DirectoryReader.listCommits(store.directory());
4450+
// Global checkpoint advanced but not enough - all commits are kept.
4451+
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpointTracker().getCheckpoint() - 1));
4452+
engine.syncTranslog();
4453+
assertThat(DirectoryReader.listCommits(store.directory()), equalTo(commits));
4454+
// Global checkpoint advanced enough - only the last commit is kept.
4455+
globalCheckpoint.set(randomLongBetween(engine.getLocalCheckpointTracker().getCheckpoint(), Long.MAX_VALUE));
4456+
engine.syncTranslog();
4457+
assertThat(DirectoryReader.listCommits(store.directory()), contains(commits.get(commits.size() - 1)));
4458+
}
4459+
}
44354460
}

server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -657,7 +657,7 @@ protected PrimaryResult performOnPrimary(
657657

658658
@Override
659659
protected void performOnReplica(final GlobalCheckpointSyncAction.Request request, final IndexShard replica) throws IOException {
660-
replica.getTranslog().sync();
660+
replica.sync();
661661
}
662662
}
663663

server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,9 @@ public void testTranslogSyncAfterGlobalCheckpointSync() throws Exception {
123123
}
124124

125125
if (durability == Translog.Durability.ASYNC || lastSyncedGlobalCheckpoint == globalCheckpoint) {
126-
verify(translog, never()).sync();
126+
verify(indexShard, never()).sync();
127127
} else {
128-
verify(translog).sync();
128+
verify(indexShard).sync();
129129
}
130130
}
131131

0 commit comments

Comments
 (0)