diff --git a/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java b/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java index 03f15d7a85d8e..334ab4a944e3a 100644 --- a/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java +++ b/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java @@ -257,4 +257,17 @@ public void onStoreClosed(ShardId shardId) { } } } + + @Override + public void beforeIndexShardRecovery(final IndexShard indexShard, final IndexSettings indexSettings) { + for (IndexEventListener listener : listeners) { + try { + listener.beforeIndexShardRecovery(indexShard, indexSettings); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("failed to invoke the listener before the shard recovery starts for {}", + indexShard.shardId()), e); + throw e; + } + } + } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java b/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java index 9cbd9ce4df253..1e664d656b9d0 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java @@ -167,4 +167,15 @@ default void onStoreCreated(ShardId shardId) {} * @param shardId the shard ID the store belongs to */ default void onStoreClosed(ShardId shardId) {} + + /** + * Called before the index shard starts to recover. + * Note: unlike all other methods in this class, this method is not called using the cluster state update thread. When this method is + * called the shard already transitioned to the RECOVERING state. + * + * @param indexShard the shard that is about to recover + * @param indexSettings the shard's index settings + */ + default void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexSettings) { + } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 56707bc198beb..e70837f5eb62f 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1313,6 +1313,11 @@ public void close(String reason, boolean flushEngine) throws IOException { } } + public void preRecovery() { + assert state == IndexShardState.RECOVERING : "expected a recovering shard " + shardId + " but got " + state; + indexEventListener.beforeIndexShardRecovery(this, indexSettings); + } + public void postRecovery(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException { synchronized (postRecoveryMutex) { // we need to refresh again to expose all operations that were index until now. Otherwise diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index e6301371520a5..e98859975fbae 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -359,6 +359,7 @@ private ActionListener recoveryListener(IndexShard indexShard, ActionLi * Recovers the state of the shard from the store. */ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRecoveryException { + indexShard.preRecovery(); final RecoveryState recoveryState = indexShard.recoveryState(); final boolean indexShouldExists = recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE; indexShard.prepareForIndexRecovery(); @@ -449,6 +450,7 @@ private void addRecoveredFileDetails(SegmentInfos si, Store store, RecoveryState private void restore(IndexShard indexShard, Repository repository, SnapshotRecoverySource restoreSource, ActionListener listener) { logger.debug("restoring from {} ...", indexShard.recoveryState().getRecoverySource()); + indexShard.preRecovery(); final RecoveryState.Translog translogState = indexShard.recoveryState().getTranslog(); if (restoreSource == null) { listener.onFailure(new IndexShardRestoreFailedException(shardId, "empty restore source")); diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index f0e2b04673a71..07cc7b1ffdb2c 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -23,6 +23,8 @@ import org.apache.lucene.index.Term; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.Version; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -1838,20 +1840,58 @@ public static String createEmptyTranslog(final Path location, final long initial static String createEmptyTranslog(Path location, long initialGlobalCheckpoint, ShardId shardId, ChannelFactory channelFactory, long primaryTerm) throws IOException { + return createEmptyTranslog(location, shardId, initialGlobalCheckpoint, primaryTerm, null, channelFactory); + } + + /** + * Creates a new empty translog within the specified {@code location} that contains the given {@code initialGlobalCheckpoint}, + * {@code primaryTerm} and {@code translogUUID}. + * + * This method should be used directly under specific circumstances like for shards that will see no indexing. Specifying a non-unique + * translog UUID could cause a lot of issues and that's why in all (but one) cases the method + * {@link #createEmptyTranslog(Path, long, ShardId, long)} should be used instead. + * + * @param location a {@link Path} to the directory that will contains the translog files (translog + translog checkpoint) + * @param shardId the {@link ShardId} + * @param initialGlobalCheckpoint the global checkpoint to initialize the translog with + * @param primaryTerm the shard's primary term to initialize the translog with + * @param translogUUID the unique identifier to initialize the translog with + * @param factory a {@link ChannelFactory} used to open translog files + * @return the translog's unique identifier + * @throws IOException if something went wrong during translog creation + */ + public static String createEmptyTranslog(final Path location, + final ShardId shardId, + final long initialGlobalCheckpoint, + final long primaryTerm, + @Nullable final String translogUUID, + @Nullable final ChannelFactory factory) throws IOException { IOUtils.rm(location); Files.createDirectories(location); - final Checkpoint checkpoint = - Checkpoint.emptyTranslogCheckpoint(0, 1, initialGlobalCheckpoint, 1); + + final long generation = 1L; + final long minTranslogGeneration = 1L; + final ChannelFactory channelFactory = factory != null ? factory : FileChannel::open; + final String uuid = Strings.hasLength(translogUUID) ? translogUUID : UUIDs.randomBase64UUID(); final Path checkpointFile = location.resolve(CHECKPOINT_FILE_NAME); + final Path translogFile = location.resolve(getFilename(generation)); + final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(0, generation, initialGlobalCheckpoint, minTranslogGeneration); + Checkpoint.write(channelFactory, checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); IOUtils.fsync(checkpointFile, false); - final String translogUUID = UUIDs.randomBase64UUID(); - TranslogWriter writer = TranslogWriter.create(shardId, translogUUID, 1, - location.resolve(getFilename(1)), channelFactory, - new ByteSizeValue(10), 1, initialGlobalCheckpoint, - () -> { throw new UnsupportedOperationException(); }, () -> { throw new UnsupportedOperationException(); }, primaryTerm, - new TragicExceptionHolder(), seqNo -> { throw new UnsupportedOperationException(); }); + final TranslogWriter writer = TranslogWriter.create(shardId, uuid, generation, translogFile, channelFactory, + new ByteSizeValue(10), minTranslogGeneration, initialGlobalCheckpoint, + () -> { + throw new UnsupportedOperationException(); + }, () -> { + throw new UnsupportedOperationException(); + }, + primaryTerm, + new TragicExceptionHolder(), + seqNo -> { + throw new UnsupportedOperationException(); + }); writer.close(); - return translogUUID; + return uuid; } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 08ec24573c25a..44dce3e70e21c 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -172,10 +172,12 @@ private void doRecovery(final long recoveryId) { timer = recoveryTarget.state().getTimer(); cancellableThreads = recoveryTarget.cancellableThreads(); try { + final IndexShard indexShard = recoveryTarget.indexShard(); + indexShard.preRecovery(); assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node"; logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); - recoveryTarget.indexShard().prepareForIndexRecovery(); - final long startingSeqNo = recoveryTarget.indexShard().recoverLocallyUpToGlobalCheckpoint(); + indexShard.prepareForIndexRecovery(); + final long startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint(); assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]"; request = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java index 591417f32c833..db7f5597da197 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java @@ -5,9 +5,6 @@ */ package org.elasticsearch.index.store; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.store.BaseDirectory; import org.apache.lucene.store.BufferedIndexInput; import org.apache.lucene.store.Directory; @@ -16,13 +13,10 @@ import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.SingleInstanceLockFactory; import org.elasticsearch.common.blobstore.BlobContainer; -import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; -import org.elasticsearch.index.translog.Translog; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; @@ -35,8 +29,6 @@ import java.io.IOException; import java.nio.file.Path; import java.util.Collection; -import java.util.HashMap; -import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.function.LongSupplier; @@ -172,26 +164,6 @@ public static Directory create(RepositoriesService repositories, directory = new CacheDirectory(directory, cache, cacheDir, snapshotId, indexId, shardPath.getShardId(), currentTimeNanosSupplier); } - directory = new InMemoryNoOpCommitDirectory(directory); - - final IndexWriterConfig indexWriterConfig = new IndexWriterConfig(null) - .setSoftDeletesField(Lucene.SOFT_DELETES_FIELD) - .setMergePolicy(NoMergePolicy.INSTANCE); - - try (IndexWriter indexWriter = new IndexWriter(directory, indexWriterConfig)) { - final Map userData = new HashMap<>(); - indexWriter.getLiveCommitData().forEach(e -> userData.put(e.getKey(), e.getValue())); - - final String translogUUID = Translog.createEmptyTranslog(shardPath.resolveTranslog(), - Long.parseLong(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), - shardPath.getShardId(), 0L); - - userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID); - indexWriter.setLiveCommitData(userData.entrySet()); - indexWriter.commit(); - } - - return directory; + return new InMemoryNoOpCommitDirectory(directory); } - } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java new file mode 100644 index 0000000000000..f692b8079b360 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.searchablesnapshots; + +import org.apache.lucene.index.SegmentInfos; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.IndexEventListener; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogException; +import org.elasticsearch.threadpool.ThreadPool; + +import java.nio.file.Path; + +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.isSearchableSnapshotStore; + +public class SearchableSnapshotIndexEventListener implements IndexEventListener { + + @Override + public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexSettings) { + assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC); + associateNewEmptyTranslogWithIndex(indexShard); + } + + private static void associateNewEmptyTranslogWithIndex(IndexShard indexShard) { + final ShardId shardId = indexShard.shardId(); + assert isSearchableSnapshotStore(indexShard.indexSettings().getSettings()) : "Expected a searchable snapshot shard " + shardId; + try { + final SegmentInfos segmentInfos = indexShard.store().readLastCommittedSegmentsInfo(); + final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + final long primaryTerm = indexShard.getPendingPrimaryTerm(); + final String translogUUID = segmentInfos.userData.get(Translog.TRANSLOG_UUID_KEY); + final Path translogLocation = indexShard.shardPath().resolveTranslog(); + Translog.createEmptyTranslog(translogLocation, shardId, localCheckpoint, primaryTerm, translogUUID, null); + } catch (Exception e) { + throw new TranslogException(shardId, "failed to associate a new translog", e); + } + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index d7aeedcee78b8..ea95fad971dc8 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.ReadOnlyEngine; @@ -40,8 +41,8 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; -import org.elasticsearch.xpack.searchablesnapshots.action.ClearSearchableSnapshotsCacheAction; import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction; +import org.elasticsearch.xpack.searchablesnapshots.action.ClearSearchableSnapshotsCacheAction; import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsAction; import org.elasticsearch.xpack.searchablesnapshots.action.TransportClearSearchableSnapshotsCacheAction; import org.elasticsearch.xpack.searchablesnapshots.action.TransportMountSearchableSnapshotAction; @@ -125,6 +126,13 @@ public void onRepositoriesModule(RepositoriesModule repositoriesModule) { repositoriesService.set(repositoriesModule.getRepositoryService()); } + @Override + public void onIndexModule(IndexModule indexModule) { + if (isSearchableSnapshotStore(indexModule.getSettings())) { + indexModule.addIndexEventListener(new SearchableSnapshotIndexEventListener()); + } + } + @Override public Map getDirectoryFactories() { return Map.of(SNAPSHOT_DIRECTORY_FACTORY_KEY, (indexSettings, shardPath) -> { @@ -138,7 +146,7 @@ public Map getDirectoryFactories() { @Override public Optional getEngineFactory(IndexSettings indexSettings) { - if (SNAPSHOT_DIRECTORY_FACTORY_KEY.equals(INDEX_STORE_TYPE_SETTING.get(indexSettings.getSettings())) + if (isSearchableSnapshotStore(indexSettings.getSettings()) && indexSettings.getSettings().getAsBoolean("index.frozen", false) == false) { return Optional.of(engineConfig -> new ReadOnlyEngine(engineConfig, null, new TranslogStats(), false, Function.identity())); } @@ -169,5 +177,9 @@ public List getRestHandlers(Settings settings, RestController restC public Map getExistingShardsAllocators() { return Collections.singletonMap(SearchableSnapshotAllocator.ALLOCATOR_NAME, new SearchableSnapshotAllocator()); } + + static boolean isSearchableSnapshotStore(Settings indexSettings) { + return SNAPSHOT_DIRECTORY_FACTORY_KEY.equals(INDEX_STORE_TYPE_SETTING.get(indexSettings)); + } }