Skip to content

Associate translog with Lucene index commit for searchable snapshots shards #53459

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -257,4 +257,17 @@ public void onStoreClosed(ShardId shardId) {
}
}
}

@Override
public void beforeIndexShardRecovery(final IndexShard indexShard, final IndexSettings indexSettings) {
for (IndexEventListener listener : listeners) {
try {
listener.beforeIndexShardRecovery(indexShard, indexSettings);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("failed to invoke the listener before the shard recovery starts for {}",
indexShard.shardId()), e);
throw e;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,15 @@ default void onStoreCreated(ShardId shardId) {}
* @param shardId the shard ID the store belongs to
*/
default void onStoreClosed(ShardId shardId) {}

/**
* Called before the index shard starts to recover.
* Note: unlike all other methods in this class, this method is not called using the cluster state update thread. When this method is
* called the shard already transitioned to the RECOVERING state.
*
* @param indexShard the shard that is about to recover
* @param indexSettings the shard's index settings
*/
default void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexSettings) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1313,6 +1313,11 @@ public void close(String reason, boolean flushEngine) throws IOException {
}
}

public void preRecovery() {
assert state == IndexShardState.RECOVERING : "expected a recovering shard " + shardId + " but got " + state;
indexEventListener.beforeIndexShardRecovery(this, indexSettings);
}

public void postRecovery(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException {
synchronized (postRecoveryMutex) {
// we need to refresh again to expose all operations that were index until now. Otherwise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ private ActionListener<Boolean> recoveryListener(IndexShard indexShard, ActionLi
* Recovers the state of the shard from the store.
*/
private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRecoveryException {
indexShard.preRecovery();
final RecoveryState recoveryState = indexShard.recoveryState();
final boolean indexShouldExists = recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE;
indexShard.prepareForIndexRecovery();
Expand Down Expand Up @@ -449,6 +450,7 @@ private void addRecoveredFileDetails(SegmentInfos si, Store store, RecoveryState
private void restore(IndexShard indexShard, Repository repository, SnapshotRecoverySource restoreSource,
ActionListener<Boolean> listener) {
logger.debug("restoring from {} ...", indexShard.recoveryState().getRecoverySource());
indexShard.preRecovery();
final RecoveryState.Translog translogState = indexShard.recoveryState().getTranslog();
if (restoreSource == null) {
listener.onFailure(new IndexShardRestoreFailedException(shardId, "empty restore source"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.lucene.index.Term;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -1838,20 +1840,58 @@ public static String createEmptyTranslog(final Path location, final long initial

static String createEmptyTranslog(Path location, long initialGlobalCheckpoint, ShardId shardId,
ChannelFactory channelFactory, long primaryTerm) throws IOException {
return createEmptyTranslog(location, shardId, initialGlobalCheckpoint, primaryTerm, null, channelFactory);
}

/**
* Creates a new empty translog within the specified {@code location} that contains the given {@code initialGlobalCheckpoint},
* {@code primaryTerm} and {@code translogUUID}.
*
* This method should be used directly under specific circumstances like for shards that will see no indexing. Specifying a non-unique
* translog UUID could cause a lot of issues and that's why in all (but one) cases the method
* {@link #createEmptyTranslog(Path, long, ShardId, long)} should be used instead.
*
* @param location a {@link Path} to the directory that will contains the translog files (translog + translog checkpoint)
* @param shardId the {@link ShardId}
* @param initialGlobalCheckpoint the global checkpoint to initialize the translog with
* @param primaryTerm the shard's primary term to initialize the translog with
* @param translogUUID the unique identifier to initialize the translog with
* @param factory a {@link ChannelFactory} used to open translog files
* @return the translog's unique identifier
* @throws IOException if something went wrong during translog creation
*/
public static String createEmptyTranslog(final Path location,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest adding a big warning in a comment here saying how dangerous it is to specify the translog UUID and that it should only be used for shards that will see no indexing.

I'm also idly wondering about how hard it would be to make this Translog read-only when it's not been created with a fresh UUID. Probably too hard to be worth doing, but thought I'd mention it anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest adding a big warning in a comment here saying how dangerous it is to specify the translog UUID and that it should only be used for shards that will see no indexing.

Sure, I added some doc in 557d924

I'm also idly wondering about how hard it would be to make this Translog read-only when it's not been created with a fresh UUID. Probably too hard to be worth doing, but thought I'd mention it anyway.

I find the idea interesting but I'm not sure if it worths it; I'd prefer to not create translogs at all if they were not to be used.

final ShardId shardId,
final long initialGlobalCheckpoint,
final long primaryTerm,
@Nullable final String translogUUID,
@Nullable final ChannelFactory factory) throws IOException {
IOUtils.rm(location);
Files.createDirectories(location);
final Checkpoint checkpoint =
Checkpoint.emptyTranslogCheckpoint(0, 1, initialGlobalCheckpoint, 1);

final long generation = 1L;
final long minTranslogGeneration = 1L;
final ChannelFactory channelFactory = factory != null ? factory : FileChannel::open;
final String uuid = Strings.hasLength(translogUUID) ? translogUUID : UUIDs.randomBase64UUID();
final Path checkpointFile = location.resolve(CHECKPOINT_FILE_NAME);
final Path translogFile = location.resolve(getFilename(generation));
final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(0, generation, initialGlobalCheckpoint, minTranslogGeneration);

Checkpoint.write(channelFactory, checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
IOUtils.fsync(checkpointFile, false);
final String translogUUID = UUIDs.randomBase64UUID();
TranslogWriter writer = TranslogWriter.create(shardId, translogUUID, 1,
location.resolve(getFilename(1)), channelFactory,
new ByteSizeValue(10), 1, initialGlobalCheckpoint,
() -> { throw new UnsupportedOperationException(); }, () -> { throw new UnsupportedOperationException(); }, primaryTerm,
new TragicExceptionHolder(), seqNo -> { throw new UnsupportedOperationException(); });
final TranslogWriter writer = TranslogWriter.create(shardId, uuid, generation, translogFile, channelFactory,
new ByteSizeValue(10), minTranslogGeneration, initialGlobalCheckpoint,
() -> {
throw new UnsupportedOperationException();
}, () -> {
throw new UnsupportedOperationException();
},
primaryTerm,
new TragicExceptionHolder(),
seqNo -> {
throw new UnsupportedOperationException();
});
writer.close();
return translogUUID;
return uuid;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,12 @@ private void doRecovery(final long recoveryId) {
timer = recoveryTarget.state().getTimer();
cancellableThreads = recoveryTarget.cancellableThreads();
try {
final IndexShard indexShard = recoveryTarget.indexShard();
indexShard.preRecovery();
assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node";
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
recoveryTarget.indexShard().prepareForIndexRecovery();
final long startingSeqNo = recoveryTarget.indexShard().recoverLocallyUpToGlobalCheckpoint();
indexShard.prepareForIndexRecovery();
final long startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint();
assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG :
"unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]";
request = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@
*/
package org.elasticsearch.index.store;

import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.store.BaseDirectory;
import org.apache.lucene.store.BufferedIndexInput;
import org.apache.lucene.store.Directory;
Expand All @@ -16,13 +13,10 @@
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.SingleInstanceLockFactory;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
Expand All @@ -35,8 +29,6 @@
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -172,26 +164,6 @@ public static Directory create(RepositoriesService repositories,
directory = new CacheDirectory(directory, cache, cacheDir, snapshotId, indexId, shardPath.getShardId(),
currentTimeNanosSupplier);
}
directory = new InMemoryNoOpCommitDirectory(directory);

final IndexWriterConfig indexWriterConfig = new IndexWriterConfig(null)
.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD)
.setMergePolicy(NoMergePolicy.INSTANCE);

try (IndexWriter indexWriter = new IndexWriter(directory, indexWriterConfig)) {
final Map<String, String> userData = new HashMap<>();
indexWriter.getLiveCommitData().forEach(e -> userData.put(e.getKey(), e.getValue()));

final String translogUUID = Translog.createEmptyTranslog(shardPath.resolveTranslog(),
Long.parseLong(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)),
shardPath.getShardId(), 0L);

userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
indexWriter.setLiveCommitData(userData.entrySet());
indexWriter.commit();
}

return directory;
return new InMemoryNoOpCommitDirectory(directory);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.searchablesnapshots;

import org.apache.lucene.index.SegmentInfos;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException;
import org.elasticsearch.threadpool.ThreadPool;

import java.nio.file.Path;

import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.isSearchableSnapshotStore;

public class SearchableSnapshotIndexEventListener implements IndexEventListener {

@Override
public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexSettings) {
assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC);
associateNewEmptyTranslogWithIndex(indexShard);
}

private static void associateNewEmptyTranslogWithIndex(IndexShard indexShard) {
final ShardId shardId = indexShard.shardId();
assert isSearchableSnapshotStore(indexShard.indexSettings().getSettings()) : "Expected a searchable snapshot shard " + shardId;
try {
final SegmentInfos segmentInfos = indexShard.store().readLastCommittedSegmentsInfo();
final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
final long primaryTerm = indexShard.getPendingPrimaryTerm();
final String translogUUID = segmentInfos.userData.get(Translog.TRANSLOG_UUID_KEY);
final Path translogLocation = indexShard.shardPath().resolveTranslog();
Translog.createEmptyTranslog(translogLocation, shardId, localCheckpoint, primaryTerm, translogUUID, null);
} catch (Exception e) {
throw new TranslogException(shardId, "failed to associate a new translog", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.ReadOnlyEngine;
Expand All @@ -40,8 +41,8 @@
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.searchablesnapshots.action.ClearSearchableSnapshotsCacheAction;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction;
import org.elasticsearch.xpack.searchablesnapshots.action.ClearSearchableSnapshotsCacheAction;
import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsAction;
import org.elasticsearch.xpack.searchablesnapshots.action.TransportClearSearchableSnapshotsCacheAction;
import org.elasticsearch.xpack.searchablesnapshots.action.TransportMountSearchableSnapshotAction;
Expand Down Expand Up @@ -125,6 +126,13 @@ public void onRepositoriesModule(RepositoriesModule repositoriesModule) {
repositoriesService.set(repositoriesModule.getRepositoryService());
}

@Override
public void onIndexModule(IndexModule indexModule) {
if (isSearchableSnapshotStore(indexModule.getSettings())) {
indexModule.addIndexEventListener(new SearchableSnapshotIndexEventListener());
}
}

@Override
public Map<String, DirectoryFactory> getDirectoryFactories() {
return Map.of(SNAPSHOT_DIRECTORY_FACTORY_KEY, (indexSettings, shardPath) -> {
Expand All @@ -138,7 +146,7 @@ public Map<String, DirectoryFactory> getDirectoryFactories() {

@Override
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
if (SNAPSHOT_DIRECTORY_FACTORY_KEY.equals(INDEX_STORE_TYPE_SETTING.get(indexSettings.getSettings()))
if (isSearchableSnapshotStore(indexSettings.getSettings())
&& indexSettings.getSettings().getAsBoolean("index.frozen", false) == false) {
return Optional.of(engineConfig -> new ReadOnlyEngine(engineConfig, null, new TranslogStats(), false, Function.identity()));
}
Expand Down Expand Up @@ -169,5 +177,9 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
public Map<String, ExistingShardsAllocator> getExistingShardsAllocators() {
return Collections.singletonMap(SearchableSnapshotAllocator.ALLOCATOR_NAME, new SearchableSnapshotAllocator());
}

static boolean isSearchableSnapshotStore(Settings indexSettings) {
return SNAPSHOT_DIRECTORY_FACTORY_KEY.equals(INDEX_STORE_TYPE_SETTING.get(indexSettings));
}
}