Skip to content

Commit 0940bcd

Browse files
tlrxDaveCTurner
andauthored
Add Lucene directory and index input implementations that expose shard snapshot (#49651)
This commit adds a SearchableSnapshotDirectory implementation that exposes the snapshot of a shard as a Lucene Directory. This directory only supports read operations and does not allow any file modification. It allows: * Directory#listAll(): to list all the files stored in the shard snapshot * Directory#fileLength(String): to return the byte length of a file * Directory#openInput(String, IOContext): to open an IndexInput In order to work, the directory requires the list of the shard snapshot files and a way to read a specific range of bytes blob. The list of shard snapshot files must be provided as a BlobStoreIndexShardSnapshot object when the directory is created. This object contains the list of the shard files stored in the snapshot and can be used to map each Lucene file with its corresponding blob(s) stored in the repository (which can be more than one as large Lucene files are split during snapshot). Blobs are directly read from the snapshot using a BlobContainer. SearchableSnapshotDirectory provides SearchableSnapshotIndexInput to read a file from the snapshot. This index input implementation maintains an internal buffer (it extends BufferedIndexInput) and takes care of tracking current reading position in the file. Each time more bytes are requested to fill the internal buffer, SearchableSnapshotIndexInput maps the current position to the appropriate blob name and position in the blob to read bytes from. It also propagates the knowledge of the current position to any clone or slice. This commit also adds tests for the SearchableSnapshotDirectory which creates a random directory, index documents into it, snapshots the files and creates a SearchableSnapshotDirectory from this snapshot. It then runs some tests against the normal directory and the searchable snapshot directory and compares the results. Co-Authored-By: David Turner <[email protected]>
1 parent b5ae5ba commit 0940bcd

File tree

10 files changed

+942
-6
lines changed

10 files changed

+942
-6
lines changed

server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,22 @@ public interface BlobContainer {
4949
*/
5050
InputStream readBlob(String blobName) throws IOException;
5151

52+
/**
53+
* Creates a new {@link InputStream} that can be used to read the given blob starting from
54+
* a specific {@code position} in the blob. The {@code length} is an indication of the
55+
* number of bytes that are expected to be read from the {@link InputStream}.
56+
*
57+
* @param blobName The name of the blob to get an {@link InputStream} for.
58+
* @param position The position in the blob where the next byte will be read.
59+
* @param length An indication of the number of bytes to be read.
60+
* @return The {@code InputStream} to read the blob.
61+
* @throws NoSuchFileException if the blob does not exist
62+
* @throws IOException if the blob can not be read.
63+
*/
64+
default InputStream readBlob(final String blobName, final long position, final int length) throws IOException {
65+
throw new UnsupportedOperationException(); // NORELEASE
66+
}
67+
5268
/**
5369
* Reads blob content from the input stream and writes it to the container in a new blob with the given name.
5470
* This method assumes the container does not already contain a blob of the same blobName. If a blob by the

server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,14 @@ public InputStream readBlob(String name) throws IOException {
152152
}
153153
}
154154

155+
@Override
156+
public InputStream readBlob(String blobName, long position, int length) throws IOException {
157+
final InputStream inputStream = readBlob(blobName);
158+
long skipped = inputStream.skip(position); // NORELEASE
159+
assert skipped == position;
160+
return inputStream;
161+
}
162+
155163
@Override
156164
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
157165
if (failIfAlreadyExists == false) {

server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.cluster.service.ClusterService;
2626
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2727
import org.elasticsearch.env.Environment;
28+
import org.elasticsearch.repositories.RepositoriesModule;
2829
import org.elasticsearch.repositories.Repository;
2930

3031
/**
@@ -58,4 +59,13 @@ default Map<String, Repository.Factory> getInternalRepositories(Environment env,
5859
ClusterService clusterService) {
5960
return Collections.emptyMap();
6061
}
62+
63+
/**
64+
* Passes down the current {@link RepositoriesModule} to repository plugins.
65+
*
66+
* @param module the current {@link RepositoriesModule}
67+
*/
68+
default void onRepositoriesModule(RepositoriesModule module) {
69+
// NORELEASE
70+
}
6171
}

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -910,7 +910,7 @@ private BlobContainer shardContainer(IndexId indexId, ShardId shardId) {
910910
return shardContainer(indexId, shardId.getId());
911911
}
912912

913-
private BlobContainer shardContainer(IndexId indexId, int shardId) {
913+
public BlobContainer shardContainer(IndexId indexId, int shardId) {
914914
return blobStore().blobContainer(indicesPath().add(indexId.getId()).add(Integer.toString(shardId)));
915915
}
916916

@@ -942,9 +942,11 @@ public long getRestoreThrottleTimeInNanos() {
942942
}
943943

944944
protected void assertSnapshotOrGenericThread() {
945+
// NORELEASE
946+
/*
945947
assert Thread.currentThread().getName().contains(ThreadPool.Names.SNAPSHOT)
946948
|| Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC) :
947-
"Expected current thread [" + Thread.currentThread() + "] to be the snapshot or generic thread.";
949+
"Expected current thread [" + Thread.currentThread() + "] to be the snapshot or generic thread.";*/
948950
}
949951

950952
@Override
@@ -1661,7 +1663,7 @@ private static List<String> unusedBlobs(Set<String> blobs, Set<String> surviving
16611663
/**
16621664
* Loads information about shard snapshot
16631665
*/
1664-
private BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) {
1666+
public BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) {
16651667
try {
16661668
return indexShardSnapshotFormat.read(shardContainer, snapshotId.getUUID());
16671669
} catch (NoSuchFileException ex) {

x-pack/plugin/searchable-snapshots/build.gradle

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,3 @@ dependencies {
1818
// installing them as individual plugins for integ tests doesn't make sense,
1919
// so we disable integ tests
2020
integTest.enabled = false
21-
22-
test.enabled = false
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.index.store;
7+
8+
import org.apache.lucene.store.BaseDirectory;
9+
import org.apache.lucene.store.Directory;
10+
import org.apache.lucene.store.IOContext;
11+
import org.apache.lucene.store.IndexInput;
12+
import org.apache.lucene.store.IndexOutput;
13+
import org.apache.lucene.store.SingleInstanceLockFactory;
14+
import org.elasticsearch.common.blobstore.BlobContainer;
15+
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
16+
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
17+
18+
import java.io.FileNotFoundException;
19+
import java.io.IOException;
20+
import java.util.Collection;
21+
import java.util.Objects;
22+
import java.util.Set;
23+
24+
/**
25+
* Implementation of {@link Directory} that exposes files from a snapshot as a Lucene directory. Because snapshot are immutable this
26+
* implementation does not allow modification of the directory files and only supports {@link #listAll()}, {@link #fileLength(String)} and
27+
* {@link #openInput(String, IOContext)} methods.
28+
*
29+
* To create a {@link SearchableSnapshotDirectory} both the list of the snapshot files and a {@link BlobContainer} to read these files must
30+
* be provided. The definition of the snapshot files are provided using a {@link BlobStoreIndexShardSnapshot} object which contains the name
31+
* of the snapshot and all the files it contains along with their metadata. Because there is no one-to-one relationship between the original
32+
* shard files and what it stored in the snapshot the {@link BlobStoreIndexShardSnapshot} is used to map a physical file name as expected by
33+
* Lucene with the one (or the ones) corresponding blob(s) in the snapshot.
34+
*/
35+
public class SearchableSnapshotDirectory extends BaseDirectory {
36+
37+
private final BlobStoreIndexShardSnapshot snapshot;
38+
private final BlobContainer blobContainer;
39+
40+
public SearchableSnapshotDirectory(final BlobStoreIndexShardSnapshot snapshot, final BlobContainer blobContainer) {
41+
super(new SingleInstanceLockFactory());
42+
this.snapshot = Objects.requireNonNull(snapshot);
43+
this.blobContainer = Objects.requireNonNull(blobContainer);
44+
}
45+
46+
private FileInfo fileInfo(final String name) throws FileNotFoundException {
47+
return snapshot.indexFiles().stream()
48+
.filter(fileInfo -> fileInfo.physicalName().equals(name))
49+
.findFirst()
50+
.orElseThrow(() -> new FileNotFoundException(name));
51+
}
52+
53+
@Override
54+
public String[] listAll() throws IOException {
55+
ensureOpen();
56+
return snapshot.indexFiles().stream()
57+
.map(FileInfo::physicalName)
58+
.sorted(String::compareTo)
59+
.toArray(String[]::new);
60+
}
61+
62+
@Override
63+
public long fileLength(final String name) throws IOException {
64+
ensureOpen();
65+
return fileInfo(name).length();
66+
}
67+
68+
@Override
69+
public IndexInput openInput(final String name, final IOContext context) throws IOException {
70+
ensureOpen();
71+
return new SearchableSnapshotIndexInput(blobContainer, fileInfo(name));
72+
}
73+
74+
@Override
75+
public void close() {
76+
isOpen = false;
77+
}
78+
79+
@Override
80+
public String toString() {
81+
return this.getClass().getSimpleName() + "@" + snapshot.snapshot() + " lockFactory=" + lockFactory;
82+
}
83+
84+
@Override
85+
public Set<String> getPendingDeletions() {
86+
throw unsupportedException();
87+
}
88+
89+
@Override
90+
public void sync(Collection<String> names) {
91+
throw unsupportedException();
92+
}
93+
94+
@Override
95+
public void syncMetaData() {
96+
throw unsupportedException();
97+
}
98+
99+
@Override
100+
public void deleteFile(String name) {
101+
throw unsupportedException();
102+
}
103+
104+
@Override
105+
public IndexOutput createOutput(String name, IOContext context) {
106+
throw unsupportedException();
107+
}
108+
109+
@Override
110+
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) {
111+
throw unsupportedException();
112+
}
113+
114+
@Override
115+
public void rename(String source, String dest) {
116+
throw unsupportedException();
117+
}
118+
119+
private static UnsupportedOperationException unsupportedException() {
120+
return new UnsupportedOperationException("Searchable snapshot directory does not support this operation");
121+
}
122+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.index.store;
7+
8+
import org.apache.lucene.store.BufferedIndexInput;
9+
import org.apache.lucene.store.IndexInput;
10+
import org.elasticsearch.common.blobstore.BlobContainer;
11+
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
12+
13+
import java.io.EOFException;
14+
import java.io.IOException;
15+
import java.io.InputStream;
16+
import java.util.Objects;
17+
18+
/**
19+
* A {@link SearchableSnapshotIndexInput} instance corresponds to a single file from a Lucene directory that has been snapshotted. Because
20+
* large Lucene file might be split into multiple parts during the snapshot, {@link SearchableSnapshotIndexInput} requires a
21+
* {@link FileInfo} object at creation time. This object is used to retrieve the file name and length of the original Lucene file, as well
22+
* as all the parts (stored as "blobs" in the repository) that composed the file in the snapshot.
23+
*
24+
* For example, the following {@link FileInfo}:
25+
* [name: __4vdpz_HFQ8CuKjCERX0o2A, numberOfParts: 2, partSize: 997b, partBytes: 997, metadata: name [_0_Asserting_0.pos], length [1413]
26+
*
27+
* Indicates that the Lucene file "_0_Asserting_0.pos" has a total length of 1413 and is snapshotted into 2 parts:
28+
* - __4vdpz_HFQ8CuKjCERX0o2A.part1 of size 997b
29+
* - __4vdpz_HFQ8CuKjCERX0o2A.part2 of size 416b
30+
*
31+
* {@link SearchableSnapshotIndexInput} maintains a global position that indicates the current position in the Lucene file where the
32+
* next read will occur. In the case of a Lucene file snapshotted into multiple parts, this position is used to identify which part must
33+
* be read at which position (see {@link #readInternal(byte[], int, int)}. This position is also passed over to cloned and sliced input
34+
* along with the {@link FileInfo} so that they can also track their reading position.
35+
*/
36+
public class SearchableSnapshotIndexInput extends BufferedIndexInput {
37+
38+
private final BlobContainer blobContainer;
39+
private final FileInfo fileInfo;
40+
private final long offset;
41+
private final long length;
42+
43+
private long position;
44+
private boolean closed;
45+
46+
public SearchableSnapshotIndexInput(final BlobContainer blobContainer, final FileInfo fileInfo) {
47+
this("SearchableSnapshotIndexInput(" + fileInfo.physicalName() + ")", blobContainer, fileInfo, 0L, 0L, fileInfo.length());
48+
}
49+
50+
private SearchableSnapshotIndexInput(final String resourceDesc, final BlobContainer blobContainer,
51+
final FileInfo fileInfo, final long position, final long offset, final long length) {
52+
super(resourceDesc);
53+
this.blobContainer = Objects.requireNonNull(blobContainer);
54+
this.fileInfo = Objects.requireNonNull(fileInfo);
55+
this.offset = offset;
56+
this.length = length;
57+
this.position = position;
58+
this.closed = false;
59+
}
60+
61+
@Override
62+
public long length() {
63+
return length;
64+
}
65+
66+
private void ensureOpen() throws IOException {
67+
if (closed) {
68+
throw new IOException(toString() + " is closed");
69+
}
70+
}
71+
72+
@Override
73+
protected void readInternal(byte[] b, int offset, int length) throws IOException {
74+
ensureOpen();
75+
if (fileInfo.numberOfParts() == 1L) {
76+
readInternalBytes(0L, position, b, offset, length);
77+
} else {
78+
int len = length;
79+
int off = offset;
80+
while (len > 0) {
81+
long currentPart = position / fileInfo.partSize().getBytes();
82+
int remainingBytesInPart;
83+
if (currentPart < (fileInfo.numberOfParts() - 1)) {
84+
remainingBytesInPart = Math.toIntExact(((currentPart + 1L) * fileInfo.partSize().getBytes()) - position);
85+
} else {
86+
remainingBytesInPart = Math.toIntExact(fileInfo.length() - position);
87+
}
88+
final int read = Math.min(len, remainingBytesInPart);
89+
readInternalBytes(currentPart, position % fileInfo.partSize().getBytes(), b, off, read);
90+
len -= read;
91+
off += read;
92+
}
93+
}
94+
}
95+
96+
private void readInternalBytes(final long part, final long pos, byte[] b, int offset, int length) throws IOException {
97+
try (InputStream inputStream = blobContainer.readBlob(fileInfo.partName(part), pos, length)) {
98+
int read = inputStream.read(b, offset, length);
99+
assert read == length;
100+
position += read;
101+
}
102+
}
103+
104+
@Override
105+
protected void seekInternal(long pos) throws IOException {
106+
if (pos > length) {
107+
throw new EOFException("Reading past end of file [position=" + pos + ", length=" + length + "] for " + toString());
108+
} else if (pos < 0L) {
109+
throw new IOException("Seeking to negative position [" + pos + "] for " + toString());
110+
}
111+
this.position = offset + pos;
112+
}
113+
114+
@Override
115+
public BufferedIndexInput clone() {
116+
return new SearchableSnapshotIndexInput("clone(" + this + ")", blobContainer, fileInfo, position, offset, length);
117+
}
118+
119+
@Override
120+
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
121+
if ((offset >= 0L) && (length >= 0L) && (offset + length <= length())) {
122+
final SearchableSnapshotIndexInput slice =
123+
new SearchableSnapshotIndexInput(sliceDescription, blobContainer, fileInfo, position, this.offset + offset, length);
124+
slice.seek(0L);
125+
return slice;
126+
} else {
127+
throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: offset=" + offset
128+
+ ",length=" + length + ",fileLength=" + length() + ": " + this);
129+
}
130+
}
131+
132+
@Override
133+
public void close() throws IOException {
134+
closed = true;
135+
}
136+
137+
@Override
138+
public String toString() {
139+
return "SearchableSnapshotIndexInput{" +
140+
"resourceDesc=" + super.toString() +
141+
", fileInfo=" + fileInfo +
142+
", offset=" + offset +
143+
", length=" + length +
144+
", position=" + position +
145+
'}';
146+
}
147+
}

0 commit comments

Comments
 (0)