Skip to content

Commit 74c4041

Browse files
authored
Add SearchableSnapshotRepository (#50239)
This commit adds a repository type that allows the creation of an index backed by searchable snapshots. These indices are allocated and recovered just like normal indices, but the underlying `SearchableSnapshotDirectory` makes sure that no recovery need take place since the correct files all seem to already exist on the target node. There are a number of limitations in this implementation: - like normal indices, after the intial allocation the primary is always allocated to a node that previously held an in-sync copy. If the cluster loses all copies of a snapshot-backed index then it does not attempt to recover. - peer recoveries of indices containing deletes do not currently work. - when performing disk-based shard allocation we make no attempt to quantify the disk usage of these shards any differently.
1 parent 287019a commit 74c4041

File tree

9 files changed

+694
-98
lines changed

9 files changed

+694
-98
lines changed

server/src/main/java/org/elasticsearch/index/store/Store.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -458,8 +458,13 @@ public static void tryOpenIndex(Path indexLocation, ShardId shardId, NodeEnviron
458458
try (ShardLock lock = shardLocker.lock(shardId, "open index", TimeUnit.SECONDS.toMillis(5));
459459
Directory dir = new SimpleFSDirectory(indexLocation)) {
460460
failIfCorrupted(dir);
461-
SegmentInfos segInfo = Lucene.readSegmentInfos(dir);
462-
logger.trace("{} loaded segment info [{}]", shardId, segInfo);
461+
// Previously we called Lucene#readSegmentInfos which verifies that some Lucene metadata is readable and makes sense, but if it
462+
// weren't then we would mark this shard as corrupt when allocated, so it seems that this is unnecessary (and it breaks when
463+
// the shard's directory is virtual since we use SimpleFSDirectory above.
464+
// TODO NORELEASE is this ok? Need to check that we definitely add a corruption marker if the metadata is corrupt.
465+
// SegmentInfos segInfo = Lucene.readSegmentInfos(dir);
466+
// logger.trace("{} loaded segment info [{}]", shardId, segInfo);
467+
logger.trace("{} tryOpenIndex succeeded", shardId);
463468
}
464469
}
465470

server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ public RepositoriesModule(Environment env, List<RepositoryPlugin> repoPlugins, T
7373
Map<String, Repository.Factory> internalRepositoryTypes = Collections.unmodifiableMap(internalFactories);
7474
repositoriesService = new RepositoriesService(settings, clusterService, transportService, repositoryTypes,
7575
internalRepositoryTypes, threadPool);
76+
77+
repoPlugins.forEach(rp -> rp.onRepositoriesModule(this));
7678
}
7779

7880
public RepositoriesService getRepositoryService() {

server/src/test/java/org/elasticsearch/index/store/StoreTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -936,7 +936,7 @@ public void testCanOpenIndex() throws IOException {
936936
IndexWriterConfig iwc = newIndexWriterConfig();
937937
Path tempDir = createTempDir();
938938
final BaseDirectoryWrapper dir = newFSDirectory(tempDir);
939-
assertFalse(StoreUtils.canOpenIndex(logger, tempDir, shardId, (id, l, d) -> new DummyShardLock(id)));
939+
// assertFalse(StoreUtils.canOpenIndex(logger, tempDir, shardId, (id, l, d) -> new DummyShardLock(id))); TODO NORELEASE
940940
IndexWriter writer = new IndexWriter(dir, iwc);
941941
Document doc = new Document();
942942
doc.add(new StringField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.searchablesnapshots;
7+
8+
import org.apache.lucene.store.ByteBuffersDirectory;
9+
import org.apache.lucene.store.Directory;
10+
import org.apache.lucene.store.FilterDirectory;
11+
import org.apache.lucene.store.IOContext;
12+
import org.apache.lucene.store.IndexInput;
13+
import org.apache.lucene.store.IndexOutput;
14+
import org.apache.lucene.store.NoLockFactory;
15+
import org.elasticsearch.core.internal.io.IOUtils;
16+
17+
import java.io.FileNotFoundException;
18+
import java.io.IOException;
19+
import java.nio.file.NoSuchFileException;
20+
import java.util.Collection;
21+
import java.util.Set;
22+
23+
/**
24+
* A {@link Directory} which wraps a read-only "real" directory with a wrapper that allows no-op (in-memory) commits, and peer recoveries
25+
* of the same, so that we can start a shard on a completely readonly data set.
26+
*/
27+
public class InMemoryNoOpCommitDirectory extends FilterDirectory {
28+
private final Directory realDirectory;
29+
30+
InMemoryNoOpCommitDirectory(Directory realDirectory) {
31+
super(new ByteBuffersDirectory(NoLockFactory.INSTANCE));
32+
this.realDirectory = realDirectory;
33+
}
34+
35+
@Override
36+
public String[] listAll() throws IOException {
37+
final String[] ephemeralFiles = in.listAll();
38+
final String[] realFiles = realDirectory.listAll();
39+
final String[] allFiles = new String[ephemeralFiles.length + realFiles.length];
40+
System.arraycopy(ephemeralFiles, 0, allFiles, 0, ephemeralFiles.length);
41+
System.arraycopy(realFiles, 0, allFiles, ephemeralFiles.length, realFiles.length);
42+
return allFiles;
43+
}
44+
45+
@Override
46+
public void deleteFile(String name) throws IOException {
47+
ensureMutable(name);
48+
try {
49+
in.deleteFile(name);
50+
} catch (NoSuchFileException | FileNotFoundException e) {
51+
// cannot delete the segments_N file in the read-only directory, but that's ok, just ignore this
52+
}
53+
}
54+
55+
@Override
56+
public long fileLength(String name) throws IOException {
57+
try {
58+
return in.fileLength(name);
59+
} catch (NoSuchFileException | FileNotFoundException e) {
60+
return realDirectory.fileLength(name);
61+
}
62+
}
63+
64+
@Override
65+
public void sync(Collection<String> names) {
66+
}
67+
68+
@Override
69+
public void syncMetaData() {
70+
}
71+
72+
@Override
73+
public IndexOutput createOutput(String name, IOContext context) throws IOException {
74+
ensureMutable(name);
75+
return super.createOutput(name, context);
76+
}
77+
78+
@Override
79+
public void rename(String source, String dest) throws IOException {
80+
ensureMutable(source);
81+
ensureMutable(dest);
82+
super.rename(source, dest);
83+
}
84+
85+
@Override
86+
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) {
87+
throw new UnsupportedOperationException();
88+
}
89+
90+
@Override
91+
public void copyFrom(Directory from, String src, String dest, IOContext context) {
92+
throw new UnsupportedOperationException();
93+
}
94+
95+
@Override
96+
public IndexInput openInput(String name, IOContext context) throws IOException {
97+
try {
98+
return in.openInput(name, context);
99+
} catch (NoSuchFileException | FileNotFoundException e) {
100+
return realDirectory.openInput(name, context);
101+
}
102+
}
103+
104+
@Override
105+
public void close() throws IOException {
106+
IOUtils.close(in, realDirectory);
107+
}
108+
109+
@Override
110+
public Set<String> getPendingDeletions() throws IOException {
111+
return super.getPendingDeletions(); // read-only realDirectory has no pending deletions
112+
}
113+
114+
private static void ensureMutable(String name) {
115+
if ((name.startsWith("segments_")
116+
|| name.startsWith("pending_segments_")
117+
|| name.matches("^recovery\\..*\\.segments_.*$")) == false) {
118+
119+
throw new IllegalArgumentException("file [" + name + "] is not mutable");
120+
}
121+
}
122+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.searchablesnapshots;
7+
8+
import org.apache.lucene.index.IndexWriter;
9+
import org.apache.lucene.index.IndexWriterConfig;
10+
import org.apache.lucene.store.Directory;
11+
import org.elasticsearch.cluster.metadata.IndexMetaData;
12+
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
13+
import org.elasticsearch.common.Strings;
14+
import org.elasticsearch.common.blobstore.BlobContainer;
15+
import org.elasticsearch.common.settings.Setting;
16+
import org.elasticsearch.common.settings.Settings;
17+
import org.elasticsearch.index.IndexSettings;
18+
import org.elasticsearch.index.seqno.SequenceNumbers;
19+
import org.elasticsearch.index.shard.ShardPath;
20+
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
21+
import org.elasticsearch.index.store.SearchableSnapshotDirectory;
22+
import org.elasticsearch.index.translog.Translog;
23+
import org.elasticsearch.plugins.IndexStorePlugin;
24+
import org.elasticsearch.repositories.FilterRepository;
25+
import org.elasticsearch.repositories.IndexId;
26+
import org.elasticsearch.repositories.RepositoriesService;
27+
import org.elasticsearch.repositories.Repository;
28+
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
29+
import org.elasticsearch.snapshots.SnapshotId;
30+
31+
import java.io.IOException;
32+
import java.util.HashMap;
33+
import java.util.Map;
34+
import java.util.function.Function;
35+
import java.util.function.Supplier;
36+
37+
import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;
38+
39+
/**
40+
* A repository that wraps a {@link BlobStoreRepository} to add settings to the index metadata during a restore to identify the source
41+
* snapshot and index in order to create a {@link SearchableSnapshotDirectory} (and corresponding empty translog) to search these shards
42+
* without needing to fully restore them.
43+
*/
44+
public class SearchableSnapshotRepository extends FilterRepository {
45+
46+
public static final String TYPE = "searchable";
47+
48+
public static final Setting<String> SNAPSHOT_REPOSITORY_SETTING =
49+
Setting.simpleString("index.store.snapshot.repository_name", Setting.Property.IndexScope, Setting.Property.PrivateIndex);
50+
public static final Setting<String> SNAPSHOT_SNAPSHOT_NAME_SETTING =
51+
Setting.simpleString("index.store.snapshot.snapshot_name", Setting.Property.IndexScope, Setting.Property.PrivateIndex);
52+
public static final Setting<String> SNAPSHOT_SNAPSHOT_ID_SETTING =
53+
Setting.simpleString("index.store.snapshot.snapshot_uuid", Setting.Property.IndexScope, Setting.Property.PrivateIndex);
54+
public static final Setting<String> SNAPSHOT_INDEX_ID_SETTING =
55+
Setting.simpleString("index.store.snapshot.index_uuid", Setting.Property.IndexScope, Setting.Property.PrivateIndex);
56+
57+
public static final String SNAPSHOT_DIRECTORY_FACTORY_KEY = "snapshot";
58+
59+
private static final Setting<String> DELEGATE_TYPE
60+
= new Setting<>("delegate_type", "", Function.identity(), Setting.Property.NodeScope);
61+
62+
private final BlobStoreRepository blobStoreRepository;
63+
64+
public SearchableSnapshotRepository(Repository in) {
65+
super(in);
66+
if (in instanceof BlobStoreRepository == false) {
67+
throw new IllegalArgumentException("Repository [" + in + "] does not support searchable snapshots" );
68+
}
69+
blobStoreRepository = (BlobStoreRepository) in;
70+
}
71+
72+
private Directory makeDirectory(IndexSettings indexSettings, ShardPath shardPath) throws IOException {
73+
74+
IndexId indexId = new IndexId(indexSettings.getIndex().getName(), SNAPSHOT_INDEX_ID_SETTING.get(indexSettings.getSettings()));
75+
BlobContainer blobContainer = blobStoreRepository.shardContainer(indexId, shardPath.getShardId().id());
76+
77+
SnapshotId snapshotId = new SnapshotId(SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings.getSettings()),
78+
SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings()));
79+
BlobStoreIndexShardSnapshot snapshot = blobStoreRepository.loadShardSnapshot(blobContainer, snapshotId);
80+
81+
final SearchableSnapshotDirectory searchableSnapshotDirectory = new SearchableSnapshotDirectory(snapshot, blobContainer);
82+
final InMemoryNoOpCommitDirectory inMemoryNoOpCommitDirectory = new InMemoryNoOpCommitDirectory(searchableSnapshotDirectory);
83+
84+
try (IndexWriter indexWriter = new IndexWriter(inMemoryNoOpCommitDirectory, new IndexWriterConfig())) {
85+
final Map<String, String> userData = new HashMap<>();
86+
indexWriter.getLiveCommitData().forEach(e -> userData.put(e.getKey(), e.getValue()));
87+
88+
final String translogUUID = Translog.createEmptyTranslog(shardPath.resolveTranslog(),
89+
Long.parseLong(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)),
90+
shardPath.getShardId(), 0L);
91+
92+
userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
93+
indexWriter.setLiveCommitData(userData.entrySet());
94+
indexWriter.commit();
95+
}
96+
97+
return inMemoryNoOpCommitDirectory;
98+
}
99+
100+
@Override
101+
public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId index) throws IOException {
102+
final IndexMetaData indexMetaData = super.getSnapshotIndexMetaData(snapshotId, index);
103+
final IndexMetaData.Builder builder = IndexMetaData.builder(indexMetaData);
104+
builder.settings(Settings.builder().put(indexMetaData.getSettings()).put(getIndexSettings(blobStoreRepository, snapshotId, index)));
105+
return builder.build();
106+
}
107+
108+
public static Settings getIndexSettings(Repository repository, SnapshotId snapshotId, IndexId indexId) {
109+
return Settings.builder()
110+
.put(SNAPSHOT_REPOSITORY_SETTING.getKey(), repository.getMetadata().name())
111+
.put(SNAPSHOT_SNAPSHOT_NAME_SETTING.getKey(), snapshotId.getName())
112+
.put(SNAPSHOT_SNAPSHOT_ID_SETTING.getKey(), snapshotId.getUUID())
113+
.put(SNAPSHOT_INDEX_ID_SETTING.getKey(), indexId.getId())
114+
.put(INDEX_STORE_TYPE_SETTING.getKey(), SNAPSHOT_DIRECTORY_FACTORY_KEY)
115+
.put(IndexMetaData.SETTING_BLOCKS_WRITE, true)
116+
.build();
117+
}
118+
119+
static Factory getRepositoryFactory() {
120+
return new Repository.Factory() {
121+
@Override
122+
public Repository create(RepositoryMetaData metadata) {
123+
throw new UnsupportedOperationException();
124+
}
125+
126+
@Override
127+
public Repository create(RepositoryMetaData metaData, Function<String, Factory> typeLookup) throws Exception {
128+
String delegateType = DELEGATE_TYPE.get(metaData.settings());
129+
if (Strings.hasLength(delegateType) == false) {
130+
throw new IllegalArgumentException(DELEGATE_TYPE.getKey() + " must be set");
131+
}
132+
Repository.Factory factory = typeLookup.apply(delegateType);
133+
return new SearchableSnapshotRepository(factory.create(new RepositoryMetaData(metaData.name(),
134+
delegateType, metaData.settings()), typeLookup));
135+
}
136+
};
137+
}
138+
139+
public static IndexStorePlugin.DirectoryFactory newDirectoryFactory(final Supplier<RepositoriesService> repositoriesService) {
140+
return (indexSettings, shardPath) -> {
141+
final RepositoriesService repositories = repositoriesService.get();
142+
assert repositories != null;
143+
144+
final Repository repository = repositories.repository(SNAPSHOT_REPOSITORY_SETTING.get(indexSettings.getSettings()));
145+
if (repository instanceof SearchableSnapshotRepository == false) {
146+
throw new IllegalArgumentException("Repository [" + repository + "] is not searchable" );
147+
}
148+
149+
return ((SearchableSnapshotRepository)repository).makeDirectory(indexSettings, shardPath);
150+
};
151+
}
152+
}

0 commit comments

Comments
 (0)