Skip to content

Commit 406d4de

Browse files
committed
Associate translog with Lucene index commit
1 parent 215e94d commit 406d4de

File tree

7 files changed

+95
-45
lines changed

7 files changed

+95
-45
lines changed

server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
166166
IndexSettings.FINAL_PIPELINE,
167167
MetaDataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING,
168168
IndexSettings.ON_HEAP_ID_TERMS_INDEX,
169+
IndexSettings.SKIP_FILES_RECOVERY_SETTING,
169170
ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING,
170171

171172
// validate that built-in similarities don't get redefined

server/src/main/java/org/elasticsearch/index/IndexSettings.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,13 @@ public final class IndexSettings {
325325
public static final Setting<Double> FILE_BASED_RECOVERY_THRESHOLD_SETTING
326326
= Setting.doubleSetting("index.recovery.file_based_threshold", 0.1d, 0.0d, Setting.Property.IndexScope);
327327

328+
/**
329+
* A badly named setting indicating that for some specific shards we skip the files recovery and assume that the
330+
* files are available.
331+
*/
332+
public static final Setting<Boolean> SKIP_FILES_RECOVERY_SETTING =
333+
Setting.boolSetting("index.recovery.skip_files", false, Setting.Property.IndexScope, Property.PrivateIndex);
334+
328335
private final Index index;
329336
private final Version version;
330337
private final Logger logger;

server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.elasticsearch.common.unit.ByteSizeValue;
4242
import org.elasticsearch.common.unit.TimeValue;
4343
import org.elasticsearch.index.Index;
44+
import org.elasticsearch.index.IndexSettings;
4445
import org.elasticsearch.index.engine.Engine;
4546
import org.elasticsearch.index.engine.EngineException;
4647
import org.elasticsearch.index.mapper.MapperService;
@@ -398,7 +399,11 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
398399
writeEmptyRetentionLeasesFile(indexShard);
399400
} else if (indexShouldExists) {
400401
if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) {
401-
store.bootstrapNewHistory();
402+
if (IndexSettings.SKIP_FILES_RECOVERY_SETTING.get(indexShard.indexSettings().getSettings())) {
403+
associateTranslogWithIndex(indexShard, store);
404+
} else {
405+
store.bootstrapNewHistory();
406+
}
402407
writeEmptyRetentionLeasesFile(indexShard);
403408
}
404409
// since we recover from local, just fill the files and size
@@ -460,7 +465,14 @@ private void restore(IndexShard indexShard, Repository repository, SnapshotRecov
460465
final ActionListener<Void> restoreListener = ActionListener.wrap(
461466
v -> {
462467
final Store store = indexShard.store();
463-
bootstrap(indexShard, store);
468+
if (IndexSettings.SKIP_FILES_RECOVERY_SETTING.get(indexShard.indexSettings().getSettings())) {
469+
// shard is restored from a snapshot and we expect the store to already contains the files,
470+
// hence we can skip bootstraping a new history uuid with a new translog, and simply
471+
// associate an empty translog with the existing lucene commit.
472+
associateTranslogWithIndex(indexShard, store);
473+
} else {
474+
bootstrap(indexShard, store);
475+
}
464476
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
465477
writeEmptyRetentionLeasesFile(indexShard);
466478
indexShard.openEngineAndRecoverFromTranslog();
@@ -491,9 +503,18 @@ private void restore(IndexShard indexShard, Repository repository, SnapshotRecov
491503
indexIdListener.onResponse(indexId);
492504
}
493505
assert indexShard.getEngineOrNull() == null;
494-
indexIdListener.whenComplete(idx -> repository.restoreShard(indexShard.store(), restoreSource.snapshot().getSnapshotId(),
495-
idx, snapshotShardId, indexShard.recoveryState(), restoreListener), restoreListener::onFailure);
496-
} catch (Exception e) {
506+
indexIdListener.whenComplete(
507+
idx -> {
508+
if (IndexSettings.SKIP_FILES_RECOVERY_SETTING.get(indexShard.indexSettings().getSettings())) {
509+
logger.debug("[{}] skipping full restore from [{}] [{}]",
510+
shardId, restoreSource.snapshot().getRepository(), restoreSource.snapshot().getSnapshotId());
511+
restoreListener.onResponse(null);
512+
} else {
513+
repository.restoreShard(indexShard.store(), restoreSource.snapshot().getSnapshotId(),
514+
idx, snapshotShardId, indexShard.recoveryState(), restoreListener);
515+
}
516+
}, restoreListener::onFailure);
517+
} catch (Exception e) {
497518
restoreListener.onFailure(e);
498519
}
499520
}
@@ -506,4 +527,13 @@ private void bootstrap(final IndexShard indexShard, final Store store) throws IO
506527
indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
507528
store.associateIndexWithNewTranslog(translogUUID);
508529
}
530+
531+
private void associateTranslogWithIndex(final IndexShard indexShard, final Store store) throws IOException {
532+
assert IndexSettings.SKIP_FILES_RECOVERY_SETTING.get(indexShard.indexSettings().getSettings());
533+
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
534+
final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
535+
final long primaryTerm = indexShard.getPendingPrimaryTerm();
536+
final String translogUUID = segmentInfos.userData.get(Translog.TRANSLOG_UUID_KEY);
537+
Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(), shardId, localCheckpoint, primaryTerm, translogUUID, null);
538+
}
509539
}

server/src/main/java/org/elasticsearch/index/translog/Translog.java

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.apache.lucene.index.Term;
2424
import org.apache.lucene.store.AlreadyClosedException;
2525
import org.elasticsearch.Version;
26+
import org.elasticsearch.common.Nullable;
27+
import org.elasticsearch.common.Strings;
2628
import org.elasticsearch.common.UUIDs;
2729
import org.elasticsearch.common.bytes.BytesArray;
2830
import org.elasticsearch.common.bytes.BytesReference;
@@ -1838,20 +1840,41 @@ public static String createEmptyTranslog(final Path location, final long initial
18381840

18391841
static String createEmptyTranslog(Path location, long initialGlobalCheckpoint, ShardId shardId,
18401842
ChannelFactory channelFactory, long primaryTerm) throws IOException {
1843+
return createEmptyTranslog(location, shardId, initialGlobalCheckpoint, primaryTerm, null, channelFactory);
1844+
}
1845+
1846+
public static String createEmptyTranslog(final Path location,
1847+
final ShardId shardId,
1848+
final long initialGlobalCheckpoint,
1849+
final long primaryTerm,
1850+
@Nullable final String translogUUID,
1851+
@Nullable final ChannelFactory factory) throws IOException {
18411852
IOUtils.rm(location);
18421853
Files.createDirectories(location);
1843-
final Checkpoint checkpoint =
1844-
Checkpoint.emptyTranslogCheckpoint(0, 1, initialGlobalCheckpoint, 1);
1854+
1855+
final long generation = 1L;
1856+
final long minTranslogGeneration = 1L;
1857+
final ChannelFactory channelFactory = factory != null ? factory : FileChannel::open;
1858+
final String uuid = Strings.hasLength(translogUUID) ? translogUUID : UUIDs.randomBase64UUID();
18451859
final Path checkpointFile = location.resolve(CHECKPOINT_FILE_NAME);
1860+
final Path translogFile = location.resolve(getFilename(generation));
1861+
final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(0, generation, initialGlobalCheckpoint, minTranslogGeneration);
1862+
18461863
Checkpoint.write(channelFactory, checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
18471864
IOUtils.fsync(checkpointFile, false);
1848-
final String translogUUID = UUIDs.randomBase64UUID();
1849-
TranslogWriter writer = TranslogWriter.create(shardId, translogUUID, 1,
1850-
location.resolve(getFilename(1)), channelFactory,
1851-
new ByteSizeValue(10), 1, initialGlobalCheckpoint,
1852-
() -> { throw new UnsupportedOperationException(); }, () -> { throw new UnsupportedOperationException(); }, primaryTerm,
1853-
new TragicExceptionHolder(), seqNo -> { throw new UnsupportedOperationException(); });
1865+
final TranslogWriter writer = TranslogWriter.create(shardId, uuid, generation, translogFile, channelFactory,
1866+
new ByteSizeValue(10), minTranslogGeneration, initialGlobalCheckpoint,
1867+
() -> {
1868+
throw new UnsupportedOperationException();
1869+
}, () -> {
1870+
throw new UnsupportedOperationException();
1871+
},
1872+
primaryTerm,
1873+
new TragicExceptionHolder(),
1874+
seqNo -> {
1875+
throw new UnsupportedOperationException();
1876+
});
18541877
writer.close();
1855-
return translogUUID;
1878+
return uuid;
18561879
}
18571880
}

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.logging.log4j.LogManager;
2323
import org.apache.logging.log4j.Logger;
2424
import org.apache.logging.log4j.message.ParameterizedMessage;
25+
import org.apache.lucene.index.SegmentInfos;
2526
import org.apache.lucene.store.AlreadyClosedException;
2627
import org.apache.lucene.store.RateLimiter;
2728
import org.elasticsearch.ElasticsearchException;
@@ -42,8 +43,10 @@
4243
import org.elasticsearch.common.util.CancellableThreads;
4344
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
4445
import org.elasticsearch.index.IndexNotFoundException;
46+
import org.elasticsearch.index.IndexSettings;
4547
import org.elasticsearch.index.engine.RecoveryEngineException;
4648
import org.elasticsearch.index.mapper.MapperException;
49+
import org.elasticsearch.index.seqno.SequenceNumbers;
4750
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
4851
import org.elasticsearch.index.shard.IndexEventListener;
4952
import org.elasticsearch.index.shard.IndexShard;
@@ -64,6 +67,7 @@
6467
import org.elasticsearch.transport.TransportService;
6568

6669
import java.io.IOException;
70+
import java.nio.file.Path;
6771
import java.util.concurrent.atomic.AtomicLong;
6872
import java.util.function.Consumer;
6973

@@ -174,8 +178,19 @@ private void doRecovery(final long recoveryId) {
174178
try {
175179
assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node";
176180
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
177-
recoveryTarget.indexShard().prepareForIndexRecovery();
178-
final long startingSeqNo = recoveryTarget.indexShard().recoverLocallyUpToGlobalCheckpoint();
181+
final IndexShard indexShard = recoveryTarget.indexShard();
182+
indexShard.prepareForIndexRecovery();
183+
if (IndexSettings.SKIP_FILES_RECOVERY_SETTING.get(indexShard.indexSettings().getSettings())) {
184+
// associate a new empty translog with the last lucene commit, this way the next StartRecoveryRequest
185+
// will see shard files as if they were already on disk
186+
final SegmentInfos segmentInfos = indexShard.store().readLastCommittedSegmentsInfo();
187+
final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
188+
final long primaryTerm = indexShard.getPendingPrimaryTerm();
189+
final String translogUUID = segmentInfos.userData.get(Translog.TRANSLOG_UUID_KEY);
190+
final Path location = indexShard.shardPath().resolveTranslog();
191+
Translog.createEmptyTranslog(location, indexShard.shardId(), localCheckpoint, primaryTerm, translogUUID, null);
192+
}
193+
final long startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint();
179194
assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG :
180195
"unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]";
181196
request = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo);

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,6 @@
55
*/
66
package org.elasticsearch.index.store;
77

8-
import org.apache.lucene.index.IndexWriter;
9-
import org.apache.lucene.index.IndexWriterConfig;
10-
import org.apache.lucene.index.NoMergePolicy;
118
import org.apache.lucene.store.BaseDirectory;
129
import org.apache.lucene.store.BufferedIndexInput;
1310
import org.apache.lucene.store.Directory;
@@ -16,13 +13,10 @@
1613
import org.apache.lucene.store.IndexOutput;
1714
import org.apache.lucene.store.SingleInstanceLockFactory;
1815
import org.elasticsearch.common.blobstore.BlobContainer;
19-
import org.elasticsearch.common.lucene.Lucene;
2016
import org.elasticsearch.index.IndexSettings;
21-
import org.elasticsearch.index.seqno.SequenceNumbers;
2217
import org.elasticsearch.index.shard.ShardPath;
2318
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
2419
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
25-
import org.elasticsearch.index.translog.Translog;
2620
import org.elasticsearch.repositories.IndexId;
2721
import org.elasticsearch.repositories.RepositoriesService;
2822
import org.elasticsearch.repositories.Repository;
@@ -35,8 +29,6 @@
3529
import java.io.IOException;
3630
import java.nio.file.Path;
3731
import java.util.Collection;
38-
import java.util.HashMap;
39-
import java.util.Map;
4032
import java.util.Objects;
4133
import java.util.Set;
4234
import java.util.function.LongSupplier;
@@ -172,26 +164,6 @@ public static Directory create(RepositoriesService repositories,
172164
directory = new CacheDirectory(directory, cache, cacheDir, snapshotId, indexId, shardPath.getShardId(),
173165
currentTimeNanosSupplier);
174166
}
175-
directory = new InMemoryNoOpCommitDirectory(directory);
176-
177-
final IndexWriterConfig indexWriterConfig = new IndexWriterConfig(null)
178-
.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD)
179-
.setMergePolicy(NoMergePolicy.INSTANCE);
180-
181-
try (IndexWriter indexWriter = new IndexWriter(directory, indexWriterConfig)) {
182-
final Map<String, String> userData = new HashMap<>();
183-
indexWriter.getLiveCommitData().forEach(e -> userData.put(e.getKey(), e.getValue()));
184-
185-
final String translogUUID = Translog.createEmptyTranslog(shardPath.resolveTranslog(),
186-
Long.parseLong(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)),
187-
shardPath.getShardId(), 0L);
188-
189-
userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
190-
indexWriter.setLiveCommitData(userData.entrySet());
191-
indexWriter.commit();
192-
}
193-
194-
return directory;
167+
return new InMemoryNoOpCommitDirectory(directory);
195168
}
196-
197169
}

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportMountSearchableSnapshotAction.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.common.io.stream.StreamInput;
2525
import org.elasticsearch.common.settings.Settings;
2626
import org.elasticsearch.index.IndexNotFoundException;
27+
import org.elasticsearch.index.IndexSettings;
2728
import org.elasticsearch.repositories.IndexId;
2829
import org.elasticsearch.repositories.RepositoriesService;
2930
import org.elasticsearch.repositories.Repository;
@@ -95,6 +96,7 @@ private static Settings buildIndexSettings(String repoName, SnapshotId snapshotI
9596
.put(SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING.getKey(), snapshotId.getUUID())
9697
.put(SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING.getKey(), indexId.getId())
9798
.put(INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshots.SNAPSHOT_DIRECTORY_FACTORY_KEY)
99+
.put(IndexSettings.SKIP_FILES_RECOVERY_SETTING.getKey(), true)
98100
.put(IndexMetaData.SETTING_BLOCKS_WRITE, true)
99101
.put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING.getKey(), SearchableSnapshotAllocator.ALLOCATOR_NAME)
100102
.build();

0 commit comments

Comments
 (0)