Skip to content

Commit f02cbe9

Browse files
committed
Trim translog for closed indices (#43156)
Today when an index is closed all its shards are forced flushed but the translog files are left around. As explained in #42445 we'd like to trim the translog for closed indices in order to consume less disk space. This commit reuses the existing AsyncTrimTranslogTask task and reenables it for closed indices. At the time the task is executed, we should have the guarantee that nothing holds the translog files that are going to be removed. It also leaves a short period of time (10 min) during which translog files of a recently closed index are still present on disk. This could also help in some cases where the closed index is reopened shortly after being closed (in order to update an index setting for example). Relates to #42445
1 parent 7ca69db commit f02cbe9

File tree

4 files changed

+153
-5
lines changed

4 files changed

+153
-5
lines changed

server/src/main/java/org/elasticsearch/index/IndexService.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -944,6 +944,11 @@ final class AsyncTrimTranslogTask extends BaseAsyncTask {
944944
.getSettings().getAsTime(INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING, TimeValue.timeValueMinutes(10)));
945945
}
946946

947+
@Override
948+
protected boolean mustReschedule() {
949+
return indexService.closed.get() == false;
950+
}
951+
947952
@Override
948953
protected void runInternal() {
949954
indexService.maybeTrimTranslog();
@@ -1035,8 +1040,8 @@ AsyncTranslogFSync getFsyncTask() { // for tests
10351040
return fsyncTask;
10361041
}
10371042

1038-
AsyncGlobalCheckpointTask getGlobalCheckpointTask() {
1039-
return globalCheckpointTask;
1043+
AsyncTrimTranslogTask getTrimTranslogTask() { // for tests
1044+
return trimTranslogTask;
10401045
}
10411046

10421047
/**

server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java

+57-1
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,24 @@
2727
import org.apache.lucene.index.SegmentReader;
2828
import org.apache.lucene.store.Directory;
2929
import org.elasticsearch.common.lucene.Lucene;
30+
import org.elasticsearch.common.util.concurrent.ReleasableLock;
31+
import org.elasticsearch.index.store.Store;
32+
import org.elasticsearch.index.translog.Translog;
33+
import org.elasticsearch.index.translog.TranslogConfig;
34+
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
3035

3136
import java.io.IOException;
3237
import java.io.UncheckedIOException;
3338
import java.util.List;
39+
import java.util.Map;
3440
import java.util.function.Function;
3541

3642
/**
3743
* NoOpEngine is an engine implementation that does nothing but the bare minimum
3844
* required in order to have an engine. All attempts to do something (search,
39-
* index, get), throw {@link UnsupportedOperationException}.
45+
* index, get), throw {@link UnsupportedOperationException}. However, NoOpEngine
46+
* allows to trim any existing translog files through the usage of the
47+
* {{@link #trimUnreferencedTranslogFiles()}} method.
4048
*/
4149
public final class NoOpEngine extends ReadOnlyEngine {
4250

@@ -116,4 +124,52 @@ public SegmentsStats segmentsStats(boolean includeSegmentFileSizes, boolean incl
116124
return super.segmentsStats(includeSegmentFileSizes, includeUnloadedSegments);
117125
}
118126
}
127+
128+
/**
129+
* This implementation will trim existing translog files using a {@link TranslogDeletionPolicy}
130+
* that retains nothing but the last translog generation from safe commit.
131+
*/
132+
@Override
133+
public void trimUnreferencedTranslogFiles() {
134+
final Store store = this.engineConfig.getStore();
135+
store.incRef();
136+
try (ReleasableLock lock = readLock.acquire()) {
137+
ensureOpen();
138+
final List<IndexCommit> commits = DirectoryReader.listCommits(store.directory());
139+
if (commits.size() == 1) {
140+
final Map<String, String> commitUserData = getLastCommittedSegmentInfos().getUserData();
141+
final String translogUuid = commitUserData.get(Translog.TRANSLOG_UUID_KEY);
142+
if (translogUuid == null) {
143+
throw new IllegalStateException("commit doesn't contain translog unique id");
144+
}
145+
if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) {
146+
throw new IllegalStateException("commit doesn't contain translog generation id");
147+
}
148+
final long lastCommitGeneration = Long.parseLong(commitUserData.get(Translog.TRANSLOG_GENERATION_KEY));
149+
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
150+
final long minTranslogGeneration = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUuid);
151+
152+
if (minTranslogGeneration < lastCommitGeneration) {
153+
// a translog deletion policy that retains nothing but the last translog generation from safe commit
154+
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1);
155+
translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastCommitGeneration);
156+
translogDeletionPolicy.setMinTranslogGenerationForRecovery(lastCommitGeneration);
157+
158+
try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy,
159+
engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier(), seqNo -> {})) {
160+
translog.trimUnreferencedReaders();
161+
}
162+
}
163+
}
164+
} catch (final Exception e) {
165+
try {
166+
failEngine("translog trimming failed", e);
167+
} catch (Exception inner) {
168+
e.addSuppressed(inner);
169+
}
170+
throw new EngineException(shardId, "failed to trim translog", e);
171+
} finally {
172+
store.decRef();
173+
}
174+
}
119175
}

server/src/test/java/org/elasticsearch/index/IndexServiceTests.java

+47-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.lucene.search.MatchAllDocsQuery;
2323
import org.apache.lucene.search.TopDocs;
24+
import org.elasticsearch.action.support.ActiveShardCount;
2425
import org.elasticsearch.cluster.metadata.IndexMetaData;
2526
import org.elasticsearch.common.Strings;
2627
import org.elasticsearch.common.compress.CompressedXContent;
@@ -42,12 +43,15 @@
4243
import org.elasticsearch.threadpool.ThreadPool;
4344

4445
import java.io.IOException;
46+
import java.nio.file.Path;
4547
import java.util.Collection;
4648
import java.util.Collections;
49+
import java.util.Map;
4750
import java.util.concurrent.CountDownLatch;
4851
import java.util.concurrent.atomic.AtomicInteger;
4952
import java.util.concurrent.atomic.AtomicReference;
5053

54+
import static org.elasticsearch.index.shard.IndexShardTestCase.getEngine;
5155
import static org.elasticsearch.test.InternalSettingsPlugin.TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING;
5256
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
5357
import static org.hamcrest.core.IsEqual.equalTo;
@@ -370,7 +374,7 @@ public void testAsyncTranslogTrimActuallyWorks() throws Exception {
370374
.build();
371375
IndexService indexService = createIndex("test", settings);
372376
ensureGreen("test");
373-
assertTrue(indexService.getRefreshTask().mustReschedule());
377+
assertTrue(indexService.getTrimTranslogTask().mustReschedule());
374378
client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}", XContentType.JSON).get();
375379
client().admin().indices().prepareFlush("test").get();
376380
client().admin().indices().prepareUpdateSettings("test")
@@ -382,6 +386,48 @@ public void testAsyncTranslogTrimActuallyWorks() throws Exception {
382386
assertBusy(() -> assertThat(IndexShardTestCase.getTranslog(shard).totalOperations(), equalTo(0)));
383387
}
384388

389+
public void testAsyncTranslogTrimTaskOnClosedIndex() throws Exception {
390+
final String indexName = "test";
391+
IndexService indexService = createIndex(indexName, Settings.builder()
392+
.put(TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING.getKey(), "100ms")
393+
.build());
394+
395+
Translog translog = IndexShardTestCase.getTranslog(indexService.getShard(0));
396+
final Path translogPath = translog.getConfig().getTranslogPath();
397+
final String translogUuid = translog.getTranslogUUID();
398+
399+
final int numDocs = scaledRandomIntBetween(10, 100);
400+
for (int i = 0; i < numDocs; i++) {
401+
client().prepareIndex().setIndex(indexName).setId(String.valueOf(i)).setSource("{\"foo\": \"bar\"}", XContentType.JSON).get();
402+
if (randomBoolean()) {
403+
client().admin().indices().prepareFlush(indexName).get();
404+
}
405+
}
406+
assertThat(translog.totalOperations(), equalTo(numDocs));
407+
assertThat(translog.stats().estimatedNumberOfOperations(), equalTo(numDocs));
408+
assertAcked(client().admin().indices().prepareClose("test").setWaitForActiveShards(ActiveShardCount.DEFAULT));
409+
410+
indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(indexService.index());
411+
assertTrue(indexService.getTrimTranslogTask().mustReschedule());
412+
413+
final long lastCommitedTranslogGeneration;
414+
try (Engine.IndexCommitRef indexCommitRef = getEngine(indexService.getShard(0)).acquireLastIndexCommit(false)) {
415+
Map<String, String> lastCommittedUserData = indexCommitRef.getIndexCommit().getUserData();
416+
lastCommitedTranslogGeneration = Long.parseLong(lastCommittedUserData.get(Translog.TRANSLOG_GENERATION_KEY));
417+
}
418+
assertBusy(() -> {
419+
long minTranslogGen = Translog.readMinTranslogGeneration(translogPath, translogUuid);
420+
assertThat(minTranslogGen, equalTo(lastCommitedTranslogGeneration));
421+
});
422+
423+
assertAcked(client().admin().indices().prepareOpen("test").setWaitForActiveShards(ActiveShardCount.DEFAULT));
424+
425+
indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(indexService.index());
426+
translog = IndexShardTestCase.getTranslog(indexService.getShard(0));
427+
assertThat(translog.totalOperations(), equalTo(0));
428+
assertThat(translog.stats().estimatedNumberOfOperations(), equalTo(0));
429+
}
430+
385431
public void testIllegalFsyncInterval() {
386432
Settings settings = Settings.builder()
387433
.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "0ms") // disable

server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java

+42-1
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,15 @@
3535
import org.elasticsearch.index.seqno.SequenceNumbers;
3636
import org.elasticsearch.index.shard.DocsStats;
3737
import org.elasticsearch.index.store.Store;
38+
import org.elasticsearch.index.translog.Translog;
3839
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
3940
import org.elasticsearch.test.IndexSettingsModule;
4041

4142
import java.io.IOException;
4243
import java.io.UncheckedIOException;
4344
import java.nio.file.Path;
4445
import java.util.Collections;
46+
import java.util.Map;
4547
import java.util.concurrent.atomic.AtomicLong;
4648

4749
import static org.hamcrest.Matchers.equalTo;
@@ -83,7 +85,7 @@ public void testNoopAfterRegularEngine() throws IOException {
8385
tracker.updateLocalCheckpoint(allocationId.getId(), i);
8486
}
8587

86-
flushAndTrimTranslog(engine);
88+
engine.flush(true, true);
8789

8890
long localCheckpoint = engine.getPersistedLocalCheckpoint();
8991
long maxSeqNo = engine.getSeqNoStats(100L).getMaxSeqNo();
@@ -159,6 +161,45 @@ public void testNoOpEngineStats() throws Exception {
159161
}
160162
}
161163

164+
public void testTrimUnreferencedTranslogFiles() throws Exception {
165+
final ReplicationTracker tracker = (ReplicationTracker) engine.config().getGlobalCheckpointSupplier();
166+
ShardRouting routing = TestShardRouting.newShardRouting("test", shardId.id(), "node",
167+
null, true, ShardRoutingState.STARTED, allocationId);
168+
IndexShardRoutingTable table = new IndexShardRoutingTable.Builder(shardId).addShard(routing).build();
169+
tracker.updateFromMaster(1L, Collections.singleton(allocationId.getId()), table);
170+
tracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
171+
172+
final int numDocs = scaledRandomIntBetween(10, 3000);
173+
for (int i = 0; i < numDocs; i++) {
174+
engine.index(indexForDoc(createParsedDoc(Integer.toString(i), null)));
175+
if (rarely()) {
176+
engine.flush();
177+
}
178+
tracker.updateLocalCheckpoint(allocationId.getId(), i);
179+
}
180+
engine.flush(true, true);
181+
182+
final String translogUuid = engine.getTranslog().getTranslogUUID();
183+
final long minFileGeneration = engine.getTranslog().getMinFileGeneration();
184+
final long currentFileGeneration = engine.getTranslog().currentFileGeneration();
185+
engine.close();
186+
187+
final NoOpEngine noOpEngine = new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir, tracker));
188+
final Path translogPath = noOpEngine.config().getTranslogConfig().getTranslogPath();
189+
190+
final long lastCommitedTranslogGeneration;
191+
try (Engine.IndexCommitRef indexCommitRef = noOpEngine.acquireLastIndexCommit(false)) {
192+
Map<String, String> lastCommittedUserData = indexCommitRef.getIndexCommit().getUserData();
193+
lastCommitedTranslogGeneration = Long.parseLong(lastCommittedUserData.get(Translog.TRANSLOG_GENERATION_KEY));
194+
assertThat(lastCommitedTranslogGeneration, equalTo(currentFileGeneration));
195+
}
196+
197+
assertThat(Translog.readMinTranslogGeneration(translogPath, translogUuid), equalTo(minFileGeneration));
198+
noOpEngine.trimUnreferencedTranslogFiles();
199+
assertThat(Translog.readMinTranslogGeneration(translogPath, translogUuid), equalTo(lastCommitedTranslogGeneration));
200+
noOpEngine.close();
201+
}
202+
162203
private void flushAndTrimTranslog(final InternalEngine engine) {
163204
engine.flush(true, true);
164205
final TranslogDeletionPolicy deletionPolicy = engine.getTranslog().getDeletionPolicy();

0 commit comments

Comments
 (0)