Skip to content

Commit 4bde03a

Browse files
authored
Associate translog with Lucene index commit for searchable snapshots shards (#53459)
Today searchable snapshot shards can be restored from a snapshot, recovered from a peer or force-allocated as stale primary on data node without copying any files on disk. It works because SearchableSnapshotDirectory does not rely on local files on disk and because every time such a Directory is instantiated an empty translog is associated to it through a new Lucene commit. The translog/lucene commit association is done within a cluster state update when the IndexShard's directory is created and requires to open an IndexWriter and to create the translog and translog checkpoint files on disk. Opening the IndexWriter causes multiple files to be accessed (to verify checksums and to load field infos) and for some shards this can take a lot of time, causing cluster state applying timeouts. Creating the translog files triggers multiple accesses to disk in order to create or delete directories and fsync files. This commit moves the translog/lucene commit association out of the Directory instantiation - and therefore out of the cluster state update thread - in order to make it happen in a pre-recovery phase at the beginning of the recovery process. It introduces a new hook method named preRecovery() that in turns execute the registered IndexEventListeners. This allows the searchable snapshot module to register a specific IndexEventListener that will create a new empty translog with a given translog UUID so that it will be associated with the last lucene commit. This translog creation will happen when restoring the shard from a snapshot; right before recovering a shard from a peer; and when recovering the shard from the existing store after a node restart or a forced allocation. Associating a new translog with the Lucene index (and not the other way around like it is usually done during recoveries) prevent more Lucene commits to happen (as they required an IndexWriter, which triggers many file accesses). Relates #50999
1 parent 906c8a2 commit 4bde03a

File tree

9 files changed

+143
-42
lines changed

9 files changed

+143
-42
lines changed

server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,4 +257,17 @@ public void onStoreClosed(ShardId shardId) {
257257
}
258258
}
259259
}
260+
261+
@Override
262+
public void beforeIndexShardRecovery(final IndexShard indexShard, final IndexSettings indexSettings) {
263+
for (IndexEventListener listener : listeners) {
264+
try {
265+
listener.beforeIndexShardRecovery(indexShard, indexSettings);
266+
} catch (Exception e) {
267+
logger.warn(() -> new ParameterizedMessage("failed to invoke the listener before the shard recovery starts for {}",
268+
indexShard.shardId()), e);
269+
throw e;
270+
}
271+
}
272+
}
260273
}

server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,4 +167,15 @@ default void onStoreCreated(ShardId shardId) {}
167167
* @param shardId the shard ID the store belongs to
168168
*/
169169
default void onStoreClosed(ShardId shardId) {}
170+
171+
/**
172+
* Called before the index shard starts to recover.
173+
* Note: unlike all other methods in this class, this method is not called using the cluster state update thread. When this method is
174+
* called the shard already transitioned to the RECOVERING state.
175+
*
176+
* @param indexShard the shard that is about to recover
177+
* @param indexSettings the shard's index settings
178+
*/
179+
default void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexSettings) {
180+
}
170181
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1313,6 +1313,11 @@ public void close(String reason, boolean flushEngine) throws IOException {
13131313
}
13141314
}
13151315

1316+
public void preRecovery() {
1317+
assert state == IndexShardState.RECOVERING : "expected a recovering shard " + shardId + " but got " + state;
1318+
indexEventListener.beforeIndexShardRecovery(this, indexSettings);
1319+
}
1320+
13161321
public void postRecovery(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException {
13171322
synchronized (postRecoveryMutex) {
13181323
// we need to refresh again to expose all operations that were index until now. Otherwise

server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,7 @@ private ActionListener<Boolean> recoveryListener(IndexShard indexShard, ActionLi
359359
* Recovers the state of the shard from the store.
360360
*/
361361
private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRecoveryException {
362+
indexShard.preRecovery();
362363
final RecoveryState recoveryState = indexShard.recoveryState();
363364
final boolean indexShouldExists = recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE;
364365
indexShard.prepareForIndexRecovery();
@@ -449,6 +450,7 @@ private void addRecoveredFileDetails(SegmentInfos si, Store store, RecoveryState
449450
private void restore(IndexShard indexShard, Repository repository, SnapshotRecoverySource restoreSource,
450451
ActionListener<Boolean> listener) {
451452
logger.debug("restoring from {} ...", indexShard.recoveryState().getRecoverySource());
453+
indexShard.preRecovery();
452454
final RecoveryState.Translog translogState = indexShard.recoveryState().getTranslog();
453455
if (restoreSource == null) {
454456
listener.onFailure(new IndexShardRestoreFailedException(shardId, "empty restore source"));

server/src/main/java/org/elasticsearch/index/translog/Translog.java

Lines changed: 49 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.apache.lucene.index.Term;
2424
import org.apache.lucene.store.AlreadyClosedException;
2525
import org.elasticsearch.Version;
26+
import org.elasticsearch.common.Nullable;
27+
import org.elasticsearch.common.Strings;
2628
import org.elasticsearch.common.UUIDs;
2729
import org.elasticsearch.common.bytes.BytesArray;
2830
import org.elasticsearch.common.bytes.BytesReference;
@@ -1838,20 +1840,58 @@ public static String createEmptyTranslog(final Path location, final long initial
18381840

18391841
static String createEmptyTranslog(Path location, long initialGlobalCheckpoint, ShardId shardId,
18401842
ChannelFactory channelFactory, long primaryTerm) throws IOException {
1843+
return createEmptyTranslog(location, shardId, initialGlobalCheckpoint, primaryTerm, null, channelFactory);
1844+
}
1845+
1846+
/**
1847+
* Creates a new empty translog within the specified {@code location} that contains the given {@code initialGlobalCheckpoint},
1848+
* {@code primaryTerm} and {@code translogUUID}.
1849+
*
1850+
* This method should be used directly under specific circumstances like for shards that will see no indexing. Specifying a non-unique
1851+
* translog UUID could cause a lot of issues and that's why in all (but one) cases the method
1852+
* {@link #createEmptyTranslog(Path, long, ShardId, long)} should be used instead.
1853+
*
1854+
* @param location a {@link Path} to the directory that will contains the translog files (translog + translog checkpoint)
1855+
* @param shardId the {@link ShardId}
1856+
* @param initialGlobalCheckpoint the global checkpoint to initialize the translog with
1857+
* @param primaryTerm the shard's primary term to initialize the translog with
1858+
* @param translogUUID the unique identifier to initialize the translog with
1859+
* @param factory a {@link ChannelFactory} used to open translog files
1860+
* @return the translog's unique identifier
1861+
* @throws IOException if something went wrong during translog creation
1862+
*/
1863+
public static String createEmptyTranslog(final Path location,
1864+
final ShardId shardId,
1865+
final long initialGlobalCheckpoint,
1866+
final long primaryTerm,
1867+
@Nullable final String translogUUID,
1868+
@Nullable final ChannelFactory factory) throws IOException {
18411869
IOUtils.rm(location);
18421870
Files.createDirectories(location);
1843-
final Checkpoint checkpoint =
1844-
Checkpoint.emptyTranslogCheckpoint(0, 1, initialGlobalCheckpoint, 1);
1871+
1872+
final long generation = 1L;
1873+
final long minTranslogGeneration = 1L;
1874+
final ChannelFactory channelFactory = factory != null ? factory : FileChannel::open;
1875+
final String uuid = Strings.hasLength(translogUUID) ? translogUUID : UUIDs.randomBase64UUID();
18451876
final Path checkpointFile = location.resolve(CHECKPOINT_FILE_NAME);
1877+
final Path translogFile = location.resolve(getFilename(generation));
1878+
final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(0, generation, initialGlobalCheckpoint, minTranslogGeneration);
1879+
18461880
Checkpoint.write(channelFactory, checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
18471881
IOUtils.fsync(checkpointFile, false);
1848-
final String translogUUID = UUIDs.randomBase64UUID();
1849-
TranslogWriter writer = TranslogWriter.create(shardId, translogUUID, 1,
1850-
location.resolve(getFilename(1)), channelFactory,
1851-
new ByteSizeValue(10), 1, initialGlobalCheckpoint,
1852-
() -> { throw new UnsupportedOperationException(); }, () -> { throw new UnsupportedOperationException(); }, primaryTerm,
1853-
new TragicExceptionHolder(), seqNo -> { throw new UnsupportedOperationException(); });
1882+
final TranslogWriter writer = TranslogWriter.create(shardId, uuid, generation, translogFile, channelFactory,
1883+
new ByteSizeValue(10), minTranslogGeneration, initialGlobalCheckpoint,
1884+
() -> {
1885+
throw new UnsupportedOperationException();
1886+
}, () -> {
1887+
throw new UnsupportedOperationException();
1888+
},
1889+
primaryTerm,
1890+
new TragicExceptionHolder(),
1891+
seqNo -> {
1892+
throw new UnsupportedOperationException();
1893+
});
18541894
writer.close();
1855-
return translogUUID;
1895+
return uuid;
18561896
}
18571897
}

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,12 @@ private void doRecovery(final long recoveryId) {
172172
timer = recoveryTarget.state().getTimer();
173173
cancellableThreads = recoveryTarget.cancellableThreads();
174174
try {
175+
final IndexShard indexShard = recoveryTarget.indexShard();
176+
indexShard.preRecovery();
175177
assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node";
176178
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
177-
recoveryTarget.indexShard().prepareForIndexRecovery();
178-
final long startingSeqNo = recoveryTarget.indexShard().recoverLocallyUpToGlobalCheckpoint();
179+
indexShard.prepareForIndexRecovery();
180+
final long startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint();
179181
assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG :
180182
"unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]";
181183
request = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo);

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,6 @@
55
*/
66
package org.elasticsearch.index.store;
77

8-
import org.apache.lucene.index.IndexWriter;
9-
import org.apache.lucene.index.IndexWriterConfig;
10-
import org.apache.lucene.index.NoMergePolicy;
118
import org.apache.lucene.store.BaseDirectory;
129
import org.apache.lucene.store.BufferedIndexInput;
1310
import org.apache.lucene.store.Directory;
@@ -16,13 +13,10 @@
1613
import org.apache.lucene.store.IndexOutput;
1714
import org.apache.lucene.store.SingleInstanceLockFactory;
1815
import org.elasticsearch.common.blobstore.BlobContainer;
19-
import org.elasticsearch.common.lucene.Lucene;
2016
import org.elasticsearch.index.IndexSettings;
21-
import org.elasticsearch.index.seqno.SequenceNumbers;
2217
import org.elasticsearch.index.shard.ShardPath;
2318
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
2419
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
25-
import org.elasticsearch.index.translog.Translog;
2620
import org.elasticsearch.repositories.IndexId;
2721
import org.elasticsearch.repositories.RepositoriesService;
2822
import org.elasticsearch.repositories.Repository;
@@ -35,8 +29,6 @@
3529
import java.io.IOException;
3630
import java.nio.file.Path;
3731
import java.util.Collection;
38-
import java.util.HashMap;
39-
import java.util.Map;
4032
import java.util.Objects;
4133
import java.util.Set;
4234
import java.util.function.LongSupplier;
@@ -172,26 +164,6 @@ public static Directory create(RepositoriesService repositories,
172164
directory = new CacheDirectory(directory, cache, cacheDir, snapshotId, indexId, shardPath.getShardId(),
173165
currentTimeNanosSupplier);
174166
}
175-
directory = new InMemoryNoOpCommitDirectory(directory);
176-
177-
final IndexWriterConfig indexWriterConfig = new IndexWriterConfig(null)
178-
.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD)
179-
.setMergePolicy(NoMergePolicy.INSTANCE);
180-
181-
try (IndexWriter indexWriter = new IndexWriter(directory, indexWriterConfig)) {
182-
final Map<String, String> userData = new HashMap<>();
183-
indexWriter.getLiveCommitData().forEach(e -> userData.put(e.getKey(), e.getValue()));
184-
185-
final String translogUUID = Translog.createEmptyTranslog(shardPath.resolveTranslog(),
186-
Long.parseLong(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)),
187-
shardPath.getShardId(), 0L);
188-
189-
userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
190-
indexWriter.setLiveCommitData(userData.entrySet());
191-
indexWriter.commit();
192-
}
193-
194-
return directory;
167+
return new InMemoryNoOpCommitDirectory(directory);
195168
}
196-
197169
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.searchablesnapshots;
7+
8+
import org.apache.lucene.index.SegmentInfos;
9+
import org.elasticsearch.index.IndexSettings;
10+
import org.elasticsearch.index.seqno.SequenceNumbers;
11+
import org.elasticsearch.index.shard.IndexEventListener;
12+
import org.elasticsearch.index.shard.IndexShard;
13+
import org.elasticsearch.index.shard.ShardId;
14+
import org.elasticsearch.index.translog.Translog;
15+
import org.elasticsearch.index.translog.TranslogException;
16+
import org.elasticsearch.threadpool.ThreadPool;
17+
18+
import java.nio.file.Path;
19+
20+
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.isSearchableSnapshotStore;
21+
22+
public class SearchableSnapshotIndexEventListener implements IndexEventListener {
23+
24+
@Override
25+
public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexSettings) {
26+
assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC);
27+
associateNewEmptyTranslogWithIndex(indexShard);
28+
}
29+
30+
private static void associateNewEmptyTranslogWithIndex(IndexShard indexShard) {
31+
final ShardId shardId = indexShard.shardId();
32+
assert isSearchableSnapshotStore(indexShard.indexSettings().getSettings()) : "Expected a searchable snapshot shard " + shardId;
33+
try {
34+
final SegmentInfos segmentInfos = indexShard.store().readLastCommittedSegmentsInfo();
35+
final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
36+
final long primaryTerm = indexShard.getPendingPrimaryTerm();
37+
final String translogUUID = segmentInfos.userData.get(Translog.TRANSLOG_UUID_KEY);
38+
final Path translogLocation = indexShard.shardPath().resolveTranslog();
39+
Translog.createEmptyTranslog(translogLocation, shardId, localCheckpoint, primaryTerm, translogUUID, null);
40+
} catch (Exception e) {
41+
throw new TranslogException(shardId, "failed to associate a new translog", e);
42+
}
43+
}
44+
}

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2323
import org.elasticsearch.env.Environment;
2424
import org.elasticsearch.env.NodeEnvironment;
25+
import org.elasticsearch.index.IndexModule;
2526
import org.elasticsearch.index.IndexSettings;
2627
import org.elasticsearch.index.engine.EngineFactory;
2728
import org.elasticsearch.index.engine.ReadOnlyEngine;
@@ -40,8 +41,8 @@
4041
import org.elasticsearch.script.ScriptService;
4142
import org.elasticsearch.threadpool.ThreadPool;
4243
import org.elasticsearch.watcher.ResourceWatcherService;
43-
import org.elasticsearch.xpack.searchablesnapshots.action.ClearSearchableSnapshotsCacheAction;
4444
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction;
45+
import org.elasticsearch.xpack.searchablesnapshots.action.ClearSearchableSnapshotsCacheAction;
4546
import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsAction;
4647
import org.elasticsearch.xpack.searchablesnapshots.action.TransportClearSearchableSnapshotsCacheAction;
4748
import org.elasticsearch.xpack.searchablesnapshots.action.TransportMountSearchableSnapshotAction;
@@ -125,6 +126,13 @@ public void onRepositoriesModule(RepositoriesModule repositoriesModule) {
125126
repositoriesService.set(repositoriesModule.getRepositoryService());
126127
}
127128

129+
@Override
130+
public void onIndexModule(IndexModule indexModule) {
131+
if (isSearchableSnapshotStore(indexModule.getSettings())) {
132+
indexModule.addIndexEventListener(new SearchableSnapshotIndexEventListener());
133+
}
134+
}
135+
128136
@Override
129137
public Map<String, DirectoryFactory> getDirectoryFactories() {
130138
return Map.of(SNAPSHOT_DIRECTORY_FACTORY_KEY, (indexSettings, shardPath) -> {
@@ -138,7 +146,7 @@ public Map<String, DirectoryFactory> getDirectoryFactories() {
138146

139147
@Override
140148
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
141-
if (SNAPSHOT_DIRECTORY_FACTORY_KEY.equals(INDEX_STORE_TYPE_SETTING.get(indexSettings.getSettings()))
149+
if (isSearchableSnapshotStore(indexSettings.getSettings())
142150
&& indexSettings.getSettings().getAsBoolean("index.frozen", false) == false) {
143151
return Optional.of(engineConfig -> new ReadOnlyEngine(engineConfig, null, new TranslogStats(), false, Function.identity()));
144152
}
@@ -169,5 +177,9 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
169177
public Map<String, ExistingShardsAllocator> getExistingShardsAllocators() {
170178
return Collections.singletonMap(SearchableSnapshotAllocator.ALLOCATOR_NAME, new SearchableSnapshotAllocator());
171179
}
180+
181+
static boolean isSearchableSnapshotStore(Settings indexSettings) {
182+
return SNAPSHOT_DIRECTORY_FACTORY_KEY.equals(INDEX_STORE_TYPE_SETTING.get(indexSettings));
183+
}
172184
}
173185

0 commit comments

Comments
 (0)