From 5b3960d3021e19dcd7f3ffd05e62f301d008a658 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 26 Nov 2019 10:00:55 +0100 Subject: [PATCH 1/7] SearchableSnapshotDirectory --- .../index/store/BlobBytesReader.java | 13 + .../store/SearchableSnapshotDirectory.java | 121 ++++++ .../store/SearchableSnapshotIndexInput.java | 182 +++++++++ .../SearchableSnapshotDirectoryTests.java | 361 ++++++++++++++++++ .../SearchableSnapshotIndexInputTests.java | 134 +++++++ 5 files changed, 811 insertions(+) create mode 100644 x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BlobBytesReader.java create mode 100644 x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java create mode 100644 x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotIndexInput.java create mode 100644 x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java create mode 100644 x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotIndexInputTests.java diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BlobBytesReader.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BlobBytesReader.java new file mode 100644 index 0000000000000..d98ce81bcc4f0 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BlobBytesReader.java @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.index.store; + +import java.io.IOException; + +public interface BlobBytesReader { + + void readBlobBytes(String name, long from, int length, byte[] buffer, int offset) throws IOException; +} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java new file mode 100644 index 0000000000000..f22bfd29ef1ad --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java @@ -0,0 +1,121 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.index.store; + +import org.apache.lucene.store.BaseDirectory; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.SingleInstanceLockFactory; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Collection; +import java.util.Objects; +import java.util.Set; + +/** + * Implementation of {@link Directory} that exposes files from a snapshot as a Lucene directory. Because snapshot are immutable this + * implementation does not allow modification of the directory files and only supports {@link #listAll()}, {@link #fileLength(String)} and + * {@link #openInput(String, IOContext)} methods. + * + * To create a {@link SearchableSnapshotDirectory} both the list of the snapshot files and a way to read these files must be provided. The + * definition of the snapshot files are provided using a {@link BlobStoreIndexShardSnapshot} object which contains the name of the snapshot + * and all the files it contains along with their metadata. Because there is no one-to-one relationship between the original shard files + * and what it stored in the snapshot the {@link BlobStoreIndexShardSnapshot} is used to map a physical file name as expected by Lucene with + * the one (or the ones) corresponding blob(s) in the snapshot. + */ +public class SearchableSnapshotDirectory extends BaseDirectory { + + private final BlobStoreIndexShardSnapshot snapshot; + private final BlobBytesReader reader; + + protected SearchableSnapshotDirectory(final BlobStoreIndexShardSnapshot snapshot, final BlobBytesReader reader) { + super(new SingleInstanceLockFactory()); + this.snapshot = Objects.requireNonNull(snapshot); + this.reader = Objects.requireNonNull(reader); + } + + private FileInfo fileInfo(final String name) throws FileNotFoundException { + return snapshot.indexFiles().stream() + .filter(fileInfo -> fileInfo.physicalName().equals(name)) + .findFirst() + .orElseThrow(() -> new FileNotFoundException(name)); + } + + @Override + public String[] listAll() throws IOException { + ensureOpen(); + return snapshot.indexFiles().stream() + .map(FileInfo::physicalName) + .sorted(String::compareTo) + .toArray(String[]::new); + } + + @Override + public long fileLength(final String name) throws IOException { + ensureOpen(); + return fileInfo(name).length(); + } + + @Override + public IndexInput openInput(final String name, final IOContext context) throws IOException { + ensureOpen(); + return new SearchableSnapshotIndexInput(reader, fileInfo(name)); + } + + @Override + public void close() throws IOException { + isOpen = false; + } + + @Override + public String toString() { + return this.getClass().getSimpleName() + "@" + snapshot.snapshot() + " lockFactory=" + lockFactory; + } + + @Override + public Set getPendingDeletions() throws IOException { + throw unsupportedException(); + } + + @Override + public void sync(Collection names) throws IOException { + throw unsupportedException(); + } + + @Override + public void syncMetaData() throws IOException { + throw unsupportedException(); + } + + @Override + public void deleteFile(String name) throws IOException { + throw unsupportedException(); + } + + @Override + public IndexOutput createOutput(String name, IOContext context) throws IOException { + throw unsupportedException(); + } + + @Override + public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException { + throw unsupportedException(); + } + + @Override + public void rename(String source, String dest) throws IOException { + throw unsupportedException(); + } + + private static UnsupportedOperationException unsupportedException() { + return new UnsupportedOperationException("Searchable snapshot directory does not support this operation"); + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotIndexInput.java new file mode 100644 index 0000000000000..2585bd3bda4aa --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotIndexInput.java @@ -0,0 +1,182 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.index.store; + +import org.apache.lucene.store.BufferedIndexInput; +import org.apache.lucene.store.IndexInput; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; + +import java.io.EOFException; +import java.io.IOException; +import java.util.Objects; + +/** + * A {@link SearchableSnapshotIndexInput} instance corresponds to a single file from a Lucene directory that has been snapshotted. Because + * large Lucene file might be split into multiple parts during the snapshot, {@link SearchableSnapshotIndexInput} requires a + * {@link FileInfo} object at creation time. This object is used to retrieve the file name and length of the original Lucene file, as well + * as all the parts (stored as "blobs" in the repository) that composed the file in the snapshot. + * + * For example, the following {@link FileInfo}: + * [name: __4vdpz_HFQ8CuKjCERX0o2A, numberOfParts: 2, partSize: 997b, partBytes: 997, metadata: name [_0_Asserting_0.pos], length [1413] + * + * Indicates that the Lucene file "_0_Asserting_0.pos" has a total length of 1413 and is snapshotted into 2 parts: + * - __4vdpz_HFQ8CuKjCERX0o2A.part1 of size 997b + * - __4vdpz_HFQ8CuKjCERX0o2A.part2 of size 416b + * + * {@link SearchableSnapshotIndexInput} maintains a global position that indicates the current position in the Lucene file where the + * next read will occur. In the case of a Lucene file snapshotted into multiple parts, this position is used to identify which part must + * be read at which position (see {@link #readInternal(byte[], int, int)}. This position is also passed over to cloned and sliced input + * along with the {@link FileInfo} so that they can also track their reading position. + */ +public class SearchableSnapshotIndexInput extends BufferedIndexInput { + + private final BlobBytesReader reader; + private final FileInfo fileInfo; + + private long position; + private boolean closed; + + public SearchableSnapshotIndexInput(final BlobBytesReader reader, final FileInfo fileInfo) { + this("SearchableSnapshotIndexInput(" + fileInfo + ")", reader, fileInfo, 0L); + } + + private SearchableSnapshotIndexInput(final String resourceDesc, final BlobBytesReader reader, + final FileInfo fileInfo, final long position) { + super(resourceDesc); + this.reader = Objects.requireNonNull(reader); + this.fileInfo = Objects.requireNonNull(fileInfo); + this.position = position; + this.closed = false; + } + + @Override + public long length() { + return fileInfo.length(); + } + + private void ensureOpen() throws IOException { + if (closed) { + throw new IOException(toString() + " is closed"); + } + } + + @Override + protected void readInternal(byte[] b, int offset, int length) throws IOException { + ensureOpen(); + if (fileInfo.numberOfParts() == 1L) { + readInternalBytes(0L, position, b, offset, length); + } else { + int len = length; + int off = offset; + while (len > 0) { + long currentPart = position / fileInfo.partSize().getBytes(); + int remainingBytesInPart; + if (currentPart < (fileInfo.numberOfParts() - 1)) { + remainingBytesInPart = Math.toIntExact(((currentPart + 1L) * fileInfo.partSize().getBytes()) - position); + } else { + remainingBytesInPart = Math.toIntExact(fileInfo.length() - position); + } + final int read = Math.min(len, remainingBytesInPart); + readInternalBytes(currentPart, position % fileInfo.partSize().getBytes(), b, off, read); + len -= read; + off += read; + } + } + } + + private void readInternalBytes(final long part, final long pos, byte[] b, int offset, int length) throws IOException { + reader.readBlobBytes(fileInfo.partName(part), pos, length, b, offset); + position += length; + } + + @Override + protected void seekInternal(long pos) throws IOException { + if (pos > fileInfo.length()) { + throw new EOFException("Reading past end of file [position=" + pos + ", length=" + fileInfo.length() + "] for " + toString()); + } else if (pos < 0L) { + throw new IOException("Seeking to negative position [" + pos + "] for " + toString()); + } + this.position = pos; + } + + @Override + public BufferedIndexInput clone() { + return new SearchableSnapshotIndexInput("clone(" + this + ")", reader, fileInfo, position); + } + + @Override + public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { + if ((offset >= 0L) && (length >= 0L) && (offset + length <= length())) { + final Slice slice = new Slice(sliceDescription, offset, length, this); + slice.seek(0L); + return slice; + } else { + throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: offset=" + offset + + ",length=" + length + ",fileLength=" + length() + ": " + this); + } + } + + @Override + public void close() throws IOException { + closed = true; + } + + @Override + public String toString() { + return getClass().getSimpleName() + + "(resourceDesc=" + super.toString() + + ", name=" + fileInfo.physicalName() + + ", length=" + fileInfo.length() + + ", sizeOfParts=" + fileInfo.partSize() + + ", numberOfParts=" + fileInfo.numberOfParts() + ")"; + } + + /** + * A slice created from a {@link SearchableSnapshotIndexInput}. + * + * The slice overrides the {@link #length()} and {@link #seekInternal(long)} + * methods so that it adjust the values according to initial offset position + * from which the slice was created. + */ + private static class Slice extends SearchableSnapshotIndexInput { + + private final long offset; + private final long length; + + Slice(String sliceDescription, long offset, long length, SearchableSnapshotIndexInput base) { + super(base.toString() + " [slice=" + sliceDescription + "]", base.reader, base.fileInfo, base.position); + this.offset = offset; + this.length = length; + } + + @Override + public long length() { + return length; + } + + @Override + protected void seekInternal(long pos) throws IOException { + super.seekInternal(offset + pos); + } + + @Override + public BufferedIndexInput clone() { + return new Slice("clone(" + this + ")", offset, length, this); + } + + @Override + public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { + if ((offset >= 0L) && (length >= 0L) && (offset + length <= length())) { + final Slice slice = new Slice(sliceDescription, offset + this.offset, length, this); + slice.seek(0L); + return slice; + } else { + throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: offset=" + offset + + ",length=" + length + ",fileLength=" + length() + ": " + this); + } + } + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java new file mode 100644 index 0000000000000..a8c336288e211 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java @@ -0,0 +1,361 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.index.store; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.Term; +import org.apache.lucene.index.Terms; +import org.apache.lucene.search.CheckHits; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TermInSetQuery; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TermRangeQuery; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.common.CheckedBiConsumer; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.lucene.BytesRefs; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.env.Environment; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.repositories.blobstore.ChecksumBlobStoreFormat; +import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.test.DummyShardLock; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.IndexSettingsModule; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.Closeable; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.SNAPSHOT_CODEC; +import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.SNAPSHOT_NAME_FORMAT; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; + +public class SearchableSnapshotDirectoryTests extends ESTestCase { + + public void testListAll() throws Exception { + testDirectories((directory, searchableDirectory) -> + assertThat(searchableDirectory.listAll(), equalTo(Arrays.stream(directory.listAll()) + .filter(file -> "write.lock".equals(file) == false) + .filter(file -> file.startsWith("extra") == false) + .toArray(String[]::new)))); + } + + public void testFileLength() throws Exception { + testDirectories((directory, searchableDirectory) -> + Arrays.stream(directory.listAll()) + .filter(file -> "write.lock".equals(file) == false) + .filter(file -> file.startsWith("extra") == false) + .forEach(file -> { + try { + assertThat(searchableDirectory.fileLength(file), equalTo(directory.fileLength(file))); + } catch (IOException e) { + throw new AssertionError(e); + } + })); + } + + public void testIndexSearcher() throws Exception { + testDirectories((directory, searchableDirectory) -> { + try (DirectoryReader reader = DirectoryReader.open(directory)) { + final IndexSearcher searcher = newSearcher(reader); + + try (DirectoryReader searchableReader = DirectoryReader.open(searchableDirectory)) { + final IndexSearcher searchableSearcher = newSearcher(searchableReader); + { + Query query = new MatchAllDocsQuery(); + assertThat(searchableSearcher.count(query), equalTo(searcher.count(query))); + CheckHits.checkEqual(query, + searchableSearcher.search(query, 10).scoreDocs, + searcher.search(query, 10).scoreDocs); + } + { + Query query = new TermQuery(new Term("text", "fox")); + assertThat(searchableSearcher.count(query), equalTo(searcher.count(query))); + CheckHits.checkEqual(query, + searchableSearcher.search(query, 10).scoreDocs, + searcher.search(query, 10).scoreDocs); + } + { + Query query = new TermInSetQuery("text", List.of(new BytesRef("quick"), new BytesRef("lazy"))); + assertThat(searchableSearcher.count(query), equalTo(searcher.count(query))); + CheckHits.checkEqual(query, + searchableSearcher.search(query, 10).scoreDocs, + searcher.search(query, 10).scoreDocs); + } + { + Query query = new TermRangeQuery("rank", + BytesRefs.toBytesRef(randomLongBetween(0L, 500L)), + BytesRefs.toBytesRef(randomLongBetween(501L, 1000L)), + randomBoolean(), randomBoolean()); + assertThat(searchableSearcher.count(query), equalTo(searcher.count(query))); + CheckHits.checkEqual(query, + searchableSearcher.search(query, 10).scoreDocs, + searcher.search(query, 10).scoreDocs); + } + } + } + }); + } + + public void testDirectoryReader() throws Exception { + testDirectories((directory, searchableDirectory) -> { + try (DirectoryReader reader = DirectoryReader.open(directory)) { + try (DirectoryReader searchableReader = DirectoryReader.open(searchableDirectory)) { + assertThat(searchableReader.leaves(), hasSize(reader.leaves().size())); + assertThat(searchableReader.maxDoc(), equalTo(reader.maxDoc())); + assertThat(searchableReader.getVersion(), equalTo(reader.getVersion())); + assertThat(searchableReader.getIndexCommit().getGeneration(), equalTo(reader.getIndexCommit().getGeneration())); + + String field = randomFrom("id", "text"); + Terms terms = reader.leaves().get(0).reader().terms(field); + Terms searchableTerms = searchableReader.leaves().get(0).reader().terms(field); + assertThat(searchableTerms.size(), equalTo(terms.size())); + assertThat(searchableTerms.getDocCount(), equalTo(terms.getDocCount())); + assertThat(searchableTerms.getMin(), equalTo(terms.getMin())); + assertThat(searchableTerms.getMax(), equalTo(terms.getMax())); + } + } + }); + } + + public void testReadByte() throws Exception { + testIndexInputs((indexInput, searchableIndexInput) -> { + try { + for (int i = 0; i < 10; i++) { + if (randomBoolean()) { + long position = randomLongBetween(0L, indexInput.length()); + indexInput.seek(position); + searchableIndexInput.seek(position); + } + assertThat("File pointers values should be the same before reading a byte", + searchableIndexInput.getFilePointer(), equalTo(indexInput.getFilePointer())); + + if (indexInput.getFilePointer() < indexInput.length()) { + assertThat(searchableIndexInput.readByte(), equalTo(indexInput.readByte())); + } else { + expectThrows(EOFException.class, searchableIndexInput::readByte); + } + assertThat("File pointers values should be the same after reading a byte", + searchableIndexInput.getFilePointer(), equalTo(indexInput.getFilePointer())); + } + } catch (IOException e) { + throw new AssertionError(e); + } + }); + } + + public void testReadBytes() throws Exception { + final byte[] buffer = new byte[8192]; + final byte[] searchableBuffer = new byte[buffer.length]; + + testIndexInputs((indexInput, searchableIndexInput) -> { + try { + if (randomBoolean()) { + long position = randomLongBetween(0L, indexInput.length()); + indexInput.seek(position); + searchableIndexInput.seek(position); + } + assertThat("File pointers values should be the same before reading a byte", + searchableIndexInput.getFilePointer(), equalTo(indexInput.getFilePointer())); + + int available = Math.toIntExact(indexInput.length() - indexInput.getFilePointer()); + if (available == 0) { + expectThrows(EOFException.class, () -> searchableIndexInput.readBytes(searchableBuffer, 0, searchableBuffer.length)); + return; + } + + int length = randomIntBetween(1, Math.min(available, buffer.length)); + + Arrays.fill(buffer, (byte) 0); + indexInput.readBytes(buffer, 0, length); + + Arrays.fill(searchableBuffer, (byte) 0); + searchableIndexInput.readBytes(searchableBuffer, 0, length); + + assertThat("File pointers values should be the same after reading a byte", + searchableIndexInput.getFilePointer(), equalTo(indexInput.getFilePointer())); + assertArrayEquals(searchableBuffer, buffer); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + } + + /** + * This method : + * - sets up a default {@link Directory} and index random documents + * - snapshots the directory using a FS repository + * - creates a {@link SearchableSnapshotDirectory} instance based on the snapshotted files + * - consumes the default and the searchable snapshot directories using the {@link CheckedBiConsumer}. + */ + private void testDirectories(final CheckedBiConsumer consumer) throws Exception { + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("_index", Settings.builder() + .put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random())) + .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT) + .build()); + final ShardId shardId = new ShardId(indexSettings.getIndex(), randomIntBetween(0, 10)); + final List releasables = new ArrayList<>(); + + try (Directory directory = newDirectory()) { + final IndexWriterConfig indexWriterConfig = newIndexWriterConfig(); + try (IndexWriter writer = new IndexWriter(directory, indexWriterConfig)) { + final int nbDocs = scaledRandomIntBetween(0, 1_000); + final List words = List.of("the", "quick", "brown", "fox", "jumps", "over", "the", "lazy", "dog"); + for (int i = 0; i < nbDocs; i++) { + final Document doc = new Document(); + doc.add(new StringField("id", "" + i, Field.Store.YES)); + String text = String.join(" ", randomSubsetOf(randomIntBetween(1, words.size()), words)); + doc.add(new TextField("text", text, Field.Store.YES)); + doc.add(new NumericDocValuesField("rank", i)); + writer.addDocument(doc); + } + if (randomBoolean()) { + writer.flush(); + } + if (randomBoolean()) { + writer.forceMerge(1, true); + } + writer.commit(); + } + + final SetOnce searchableSnapshotDirectory = new SetOnce<>(); + + final ThreadPool threadPool = new TestThreadPool(getClass().getSimpleName()); + releasables.add(() -> terminate(threadPool)); + + final Store store = new Store(shardId, indexSettings, directory, new DummyShardLock(shardId)); + store.incRef(); + releasables.add(store::decRef); + try { + final SegmentInfos segmentInfos = Lucene.readSegmentInfos(store.directory()); + final IndexCommit indexCommit = Lucene.getIndexCommit(segmentInfos, store.directory()); + + Path repositoryPath = createTempDir(); + Settings.Builder repositorySettings = Settings.builder().put("location", repositoryPath); + boolean compress = randomBoolean(); + if (compress) { + repositorySettings.put("compress", randomBoolean()); + } + if (randomBoolean()) { + repositorySettings.put("base_path", randomAlphaOfLengthBetween(3, 10)); + } + if (randomBoolean()) { + repositorySettings.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES); + } + final BlobStoreRepository repository = new FsRepository( + new RepositoryMetaData("_fs", FsRepository.TYPE, repositorySettings.build()), + new Environment(Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath()) + .put(Environment.PATH_REPO_SETTING.getKey(), repositoryPath.toAbsolutePath()) + .putList(Environment.PATH_DATA_SETTING.getKey(), tmpPaths()).build(), null), + NamedXContentRegistry.EMPTY, threadPool); + repository.start(); + releasables.add(repository::stop); + + final CountDownLatch latch = new CountDownLatch(1); + threadPool.generic().submit(() -> { + final PlainActionFuture future = PlainActionFuture.newFuture(); + SnapshotId snapshotId = new SnapshotId("_snapshot", UUIDs.randomBase64UUID(random())); + IndexId indexId = new IndexId(indexSettings.getIndex().getName(), UUIDs.randomBase64UUID(random())); + IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(null); + repository.snapshotShard(store, null, snapshotId, indexId, indexCommit, snapshotStatus, true, future); + future.actionGet(); + + // TODO: fix this + try { + BlobContainer shardContainer = repository.blobStore().blobContainer(repository.basePath() + .add("indices").add(indexId.getId()).add(Integer.toString(shardId.id()))); + BlobStoreIndexShardSnapshot blobs = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT, + BlobStoreIndexShardSnapshot::fromXContent, NamedXContentRegistry.EMPTY, true) + .read(shardContainer, snapshotId.getUUID()); + searchableSnapshotDirectory.set(new SearchableSnapshotDirectory(blobs, + (name, from, length, buffer, offset) -> { + try (InputStream stream = shardContainer.readBlob(name)) { + stream.skip(from); + int read = stream.read(buffer, offset, length); + assert read == length; + } + })); + } catch (IOException e) { + logger.error("failed to build searchable snapshot directory instance", e); + } finally { + latch.countDown(); + } + }); + latch.await(30L, TimeUnit.SECONDS); + + try (Directory searchableDirectory = searchableSnapshotDirectory.get()) { + consumer.accept(directory, searchableDirectory); + } + } finally { + Releasables.close(releasables); + } + } + } + + private void testIndexInputs(final CheckedBiConsumer consumer) throws Exception { + testDirectories((directory, searchableDirectory) -> { + for (String fileName : randomSubsetOf(Arrays.asList(searchableDirectory.listAll()))) { + final IOContext context = newIOContext(random()); + try (IndexInput indexInput = directory.openInput(fileName, context)) { + final List closeables = new ArrayList<>(); + try { + IndexInput searchableIndexInput = searchableDirectory.openInput(fileName, context); + closeables.add(searchableIndexInput); + if (randomBoolean()) { + searchableIndexInput = searchableIndexInput.clone(); + } + consumer.accept(indexInput, searchableIndexInput); + } finally { + IOUtils.close(closeables); + } + } + } + }); + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotIndexInputTests.java new file mode 100644 index 0000000000000..3fc7d3c826814 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotIndexInputTests.java @@ -0,0 +1,134 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.index.store; + +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.util.Version; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; +import org.elasticsearch.test.ESTestCase; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +public class SearchableSnapshotIndexInputTests extends ESTestCase { + + private SearchableSnapshotIndexInput createIndexInput(final byte[] input) { + StoreFileMetaData metadata = new StoreFileMetaData("test", (long) input.length, "_checksum", Version.LATEST); + long partSize = (long) (randomBoolean() ? input.length : randomIntBetween(1, input.length)); + FileInfo fileInfo = new FileInfo(randomAlphaOfLength(5), metadata, new ByteSizeValue(partSize, ByteSizeUnit.BYTES)); + + BlobBytesReader reader = (name, from, len, buffer, offset) -> { + if (fileInfo.numberOfParts() == 1L) { + if (name.equals(fileInfo.name()) == false || name.contains(".part")) { + throw new IOException("Unexpected part name " + name); + } + System.arraycopy(input, Math.toIntExact(from), buffer, offset, len); + } else { + if (name.startsWith(fileInfo.name()) == false || name.contains(".part") == false) { + throw new IOException("Unexpected part name " + name); + } + long partNumber = Long.parseLong(name.substring(name.indexOf(".part") + ".part".length())); + if (partNumber < 0 || partNumber >= fileInfo.numberOfParts()) { + throw new IOException("Unexpected part number " + name); + } + System.arraycopy(input, Math.toIntExact(partNumber * partSize + from), buffer, offset, len); + } + }; + return new SearchableSnapshotIndexInput(reader, fileInfo); + } + + public void testRandomReads() throws IOException { + for (int i = 0; i < 100; i++) { + byte[] input = randomUnicodeOfLength(randomIntBetween(1, 1000)).getBytes(StandardCharsets.UTF_8); + SearchableSnapshotIndexInput indexInput = createIndexInput(input); + assertEquals(input.length, indexInput.length()); + assertEquals(0, indexInput.getFilePointer()); + byte[] output = randomReadAndSlice(indexInput, input.length); + assertArrayEquals(input, output); + } + } + + public void testRandomOverflow() throws IOException { + for (int i = 0; i < 100; i++) { + byte[] input = randomUnicodeOfLength(randomIntBetween(1, 1000)).getBytes(StandardCharsets.UTF_8); + SearchableSnapshotIndexInput indexInput = createIndexInput(input); + int firstReadLen = randomIntBetween(0, input.length - 1); + randomReadAndSlice(indexInput, firstReadLen); + int bytesLeft = input.length - firstReadLen; + int secondReadLen = bytesLeft + randomIntBetween(1, 100); + expectThrows(EOFException.class, () -> indexInput.readBytes(new byte[secondReadLen], 0, secondReadLen)); + } + } + + public void testSeekOverflow() throws IOException { + for (int i = 0; i < 100; i++) { + byte[] input = randomUnicodeOfLength(randomIntBetween(1, 1000)).getBytes(StandardCharsets.UTF_8); + SearchableSnapshotIndexInput indexInput = createIndexInput(input); + int firstReadLen = randomIntBetween(0, input.length - 1); + randomReadAndSlice(indexInput, firstReadLen); + expectThrows(IOException.class, () -> { + switch (randomIntBetween(0, 2)) { + case 0: + indexInput.seek(Integer.MAX_VALUE + 4L); + break; + case 1: + indexInput.seek(-randomIntBetween(1, 10)); + break; + default: + int seek = input.length + randomIntBetween(1, 100); + indexInput.seek(seek); + break; + } + }); + } + } + + private byte[] randomReadAndSlice(IndexInput indexInput, int length) throws IOException { + int readPos = (int) indexInput.getFilePointer(); + byte[] output = new byte[length]; + while (readPos < length) { + switch (randomIntBetween(0, 3)) { + case 0: + // Read by one byte at a time + output[readPos++] = indexInput.readByte(); + break; + case 1: + // Read several bytes into target + int len = randomIntBetween(1, length - readPos); + indexInput.readBytes(output, readPos, len); + readPos += len; + break; + case 2: + // Read several bytes into 0-offset target + len = randomIntBetween(1, length - readPos); + byte[] temp = new byte[len]; + indexInput.readBytes(temp, 0, len); + System.arraycopy(temp, 0, output, readPos, len); + readPos += len; + break; + case 3: + // Read using slice + len = randomIntBetween(1, length - readPos); + IndexInput slice = indexInput.slice("slice (" + readPos + ", " + len + ") of " + indexInput.toString(), readPos, len); + temp = randomReadAndSlice(slice, len); + // assert that position in the original input didn't change + assertEquals(readPos, indexInput.getFilePointer()); + System.arraycopy(temp, 0, output, readPos, len); + readPos += len; + indexInput.seek(readPos); + assertEquals(readPos, indexInput.getFilePointer()); + break; + default: + fail(); + } + assertEquals(readPos, indexInput.getFilePointer()); + } + return output; + } +} From e2438cb275e7dddefe24e1e66a87add37b06b698 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 11 Dec 2019 13:58:45 +0100 Subject: [PATCH 2/7] few adjustments --- .../common/blobstore/BlobContainer.java | 16 +++ .../common/blobstore/fs/FsBlobContainer.java | 8 ++ .../plugins/RepositoryPlugin.java | 10 ++ .../blobstore/BlobStoreRepository.java | 8 +- .../plugin/searchable-snapshots/build.gradle | 2 - .../index/store/BlobBytesReader.java | 13 --- .../store/SearchableSnapshotDirectory.java | 19 ++-- .../store/SearchableSnapshotIndexInput.java | 97 ++++++------------- .../SearchableSnapshots.java | 93 +++++++++++++++++- .../SearchableSnapshotDirectoryTests.java | 74 +++++++------- .../SearchableSnapshotIndexInputTests.java | 63 +++++++----- 11 files changed, 248 insertions(+), 155 deletions(-) delete mode 100644 x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BlobBytesReader.java diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java index 83de4aba8e629..25da12ed876d5 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java @@ -49,6 +49,22 @@ public interface BlobContainer { */ InputStream readBlob(String blobName) throws IOException; + /** + * Creates a new {@link InputStream} that can be used to read the given blob starting from + * a specific {@code position} in the blob. The {@code length} is an indication of the + * number of bytes that are expected to be read from the {@link InputStream}. + * + * @param blobName The name of the blob to get an {@link InputStream} for. + * @param position The position in the blob where the next byte will be read. + * @param length An indication of the number of bytes to be read. + * @return The {@code InputStream} to read the blob. + * @throws NoSuchFileException if the blob does not exist + * @throws IOException if the blob can not be read. + */ + default InputStream readBlob(final String blobName, final long position, final int length) throws IOException { + throw new UnsupportedOperationException(); // NORELEASE + } + /** * Reads blob content from the input stream and writes it to the container in a new blob with the given name. * This method assumes the container does not already contain a blob of the same blobName. If a blob by the diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java index d3a9731b2f656..8b9bdce8d3d66 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java @@ -163,6 +163,14 @@ public InputStream readBlob(String name) throws IOException { } } + @Override + public InputStream readBlob(String blobName, long position, int length) throws IOException { + final InputStream inputStream = readBlob(blobName); + long skipped = inputStream.skip(position); // NORELEASE + assert skipped == position; + return inputStream; + } + @Override public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { if (failIfAlreadyExists == false) { diff --git a/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java b/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java index 1ac61b27fd1ae..4011834c31cbc 100644 --- a/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; +import org.elasticsearch.repositories.RepositoriesModule; import org.elasticsearch.repositories.Repository; /** @@ -58,4 +59,13 @@ default Map getInternalRepositories(Environment env, ClusterService clusterService) { return Collections.emptyMap(); } + + /** + * Passes down the current {@link RepositoriesModule} to repository plugins. + * + * @param module the current {@link RepositoriesModule} + */ + default void onRepositoriesModule(RepositoriesModule module) { + // NORELEASE + } } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 838bede23bf54..e3d66caa6d6e5 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -910,7 +910,7 @@ private BlobContainer shardContainer(IndexId indexId, ShardId shardId) { return shardContainer(indexId, shardId.getId()); } - private BlobContainer shardContainer(IndexId indexId, int shardId) { + public BlobContainer shardContainer(IndexId indexId, int shardId) { return blobStore().blobContainer(indicesPath().add(indexId.getId()).add(Integer.toString(shardId))); } @@ -942,9 +942,11 @@ public long getRestoreThrottleTimeInNanos() { } protected void assertSnapshotOrGenericThread() { + // NORELEASE + /* assert Thread.currentThread().getName().contains(ThreadPool.Names.SNAPSHOT) || Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC) : - "Expected current thread [" + Thread.currentThread() + "] to be the snapshot or generic thread."; + "Expected current thread [" + Thread.currentThread() + "] to be the snapshot or generic thread.";*/ } @Override @@ -1663,7 +1665,7 @@ private static List unusedBlobs(Set blobs, Set surviving /** * Loads information about shard snapshot */ - private BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) { + public BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) { try { return indexShardSnapshotFormat.read(shardContainer, snapshotId.getUUID()); } catch (NoSuchFileException ex) { diff --git a/x-pack/plugin/searchable-snapshots/build.gradle b/x-pack/plugin/searchable-snapshots/build.gradle index b7c722ee8148b..dd928bfbcc41c 100644 --- a/x-pack/plugin/searchable-snapshots/build.gradle +++ b/x-pack/plugin/searchable-snapshots/build.gradle @@ -18,5 +18,3 @@ dependencies { // installing them as individual plugins for integ tests doesn't make sense, // so we disable integ tests integTest.enabled = false - -test.enabled = false diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BlobBytesReader.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BlobBytesReader.java deleted file mode 100644 index d98ce81bcc4f0..0000000000000 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BlobBytesReader.java +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.index.store; - -import java.io.IOException; - -public interface BlobBytesReader { - - void readBlobBytes(String name, long from, int length, byte[] buffer, int offset) throws IOException; -} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java index f22bfd29ef1ad..9a4ca9fab66ee 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java @@ -11,6 +11,7 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.SingleInstanceLockFactory; +import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; @@ -25,21 +26,21 @@ * implementation does not allow modification of the directory files and only supports {@link #listAll()}, {@link #fileLength(String)} and * {@link #openInput(String, IOContext)} methods. * - * To create a {@link SearchableSnapshotDirectory} both the list of the snapshot files and a way to read these files must be provided. The - * definition of the snapshot files are provided using a {@link BlobStoreIndexShardSnapshot} object which contains the name of the snapshot - * and all the files it contains along with their metadata. Because there is no one-to-one relationship between the original shard files - * and what it stored in the snapshot the {@link BlobStoreIndexShardSnapshot} is used to map a physical file name as expected by Lucene with - * the one (or the ones) corresponding blob(s) in the snapshot. + * To create a {@link SearchableSnapshotDirectory} both the list of the snapshot files and a {@link BlobContainer} to read these files must + * be provided. The definition of the snapshot files are provided using a {@link BlobStoreIndexShardSnapshot} object which contains the name + * of the snapshot and all the files it contains along with their metadata. Because there is no one-to-one relationship between the original + * shard files and what it stored in the snapshot the {@link BlobStoreIndexShardSnapshot} is used to map a physical file name as expected by + * Lucene with the one (or the ones) corresponding blob(s) in the snapshot. */ public class SearchableSnapshotDirectory extends BaseDirectory { private final BlobStoreIndexShardSnapshot snapshot; - private final BlobBytesReader reader; + private final BlobContainer blobContainer; - protected SearchableSnapshotDirectory(final BlobStoreIndexShardSnapshot snapshot, final BlobBytesReader reader) { + public SearchableSnapshotDirectory(final BlobStoreIndexShardSnapshot snapshot, final BlobContainer blobContainer) { super(new SingleInstanceLockFactory()); this.snapshot = Objects.requireNonNull(snapshot); - this.reader = Objects.requireNonNull(reader); + this.blobContainer = Objects.requireNonNull(blobContainer); } private FileInfo fileInfo(final String name) throws FileNotFoundException { @@ -67,7 +68,7 @@ public long fileLength(final String name) throws IOException { @Override public IndexInput openInput(final String name, final IOContext context) throws IOException { ensureOpen(); - return new SearchableSnapshotIndexInput(reader, fileInfo(name)); + return new SearchableSnapshotIndexInput(blobContainer, fileInfo(name)); } @Override diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotIndexInput.java index 2585bd3bda4aa..9ce1620a5d49d 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotIndexInput.java @@ -7,10 +7,12 @@ import org.apache.lucene.store.BufferedIndexInput; import org.apache.lucene.store.IndexInput; +import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; import java.io.EOFException; import java.io.IOException; +import java.io.InputStream; import java.util.Objects; /** @@ -33,28 +35,32 @@ */ public class SearchableSnapshotIndexInput extends BufferedIndexInput { - private final BlobBytesReader reader; + private final BlobContainer blobContainer; private final FileInfo fileInfo; + private final long offset; + private final long length; private long position; private boolean closed; - public SearchableSnapshotIndexInput(final BlobBytesReader reader, final FileInfo fileInfo) { - this("SearchableSnapshotIndexInput(" + fileInfo + ")", reader, fileInfo, 0L); + public SearchableSnapshotIndexInput(final BlobContainer blobContainer, final FileInfo fileInfo) { + this("SearchableSnapshotIndexInput(" + fileInfo.physicalName() + ")", blobContainer, fileInfo, 0L, 0L, fileInfo.length()); } - private SearchableSnapshotIndexInput(final String resourceDesc, final BlobBytesReader reader, - final FileInfo fileInfo, final long position) { + private SearchableSnapshotIndexInput(final String resourceDesc, final BlobContainer blobContainer, + final FileInfo fileInfo, final long position, final long offset, final long length) { super(resourceDesc); - this.reader = Objects.requireNonNull(reader); + this.blobContainer = Objects.requireNonNull(blobContainer); this.fileInfo = Objects.requireNonNull(fileInfo); + this.offset = offset; + this.length = length; this.position = position; this.closed = false; } @Override public long length() { - return fileInfo.length(); + return length; } private void ensureOpen() throws IOException { @@ -88,29 +94,33 @@ protected void readInternal(byte[] b, int offset, int length) throws IOException } private void readInternalBytes(final long part, final long pos, byte[] b, int offset, int length) throws IOException { - reader.readBlobBytes(fileInfo.partName(part), pos, length, b, offset); - position += length; + try (InputStream inputStream = blobContainer.readBlob(fileInfo.partName(part), pos, length)) { + int read = inputStream.read(b, offset, length); + assert read == length; + position += read; + } } @Override protected void seekInternal(long pos) throws IOException { - if (pos > fileInfo.length()) { - throw new EOFException("Reading past end of file [position=" + pos + ", length=" + fileInfo.length() + "] for " + toString()); + if (pos > length) { + throw new EOFException("Reading past end of file [position=" + pos + ", length=" + length + "] for " + toString()); } else if (pos < 0L) { throw new IOException("Seeking to negative position [" + pos + "] for " + toString()); } - this.position = pos; + this.position = offset + pos; } @Override public BufferedIndexInput clone() { - return new SearchableSnapshotIndexInput("clone(" + this + ")", reader, fileInfo, position); + return new SearchableSnapshotIndexInput("clone(" + this + ")", blobContainer, fileInfo, position, offset, length); } @Override public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { if ((offset >= 0L) && (length >= 0L) && (offset + length <= length())) { - final Slice slice = new Slice(sliceDescription, offset, length, this); + final SearchableSnapshotIndexInput slice = + new SearchableSnapshotIndexInput(sliceDescription, blobContainer, fileInfo, position, this.offset + offset, length); slice.seek(0L); return slice; } else { @@ -126,57 +136,12 @@ public void close() throws IOException { @Override public String toString() { - return getClass().getSimpleName() - + "(resourceDesc=" + super.toString() - + ", name=" + fileInfo.physicalName() - + ", length=" + fileInfo.length() - + ", sizeOfParts=" + fileInfo.partSize() - + ", numberOfParts=" + fileInfo.numberOfParts() + ")"; - } - - /** - * A slice created from a {@link SearchableSnapshotIndexInput}. - * - * The slice overrides the {@link #length()} and {@link #seekInternal(long)} - * methods so that it adjust the values according to initial offset position - * from which the slice was created. - */ - private static class Slice extends SearchableSnapshotIndexInput { - - private final long offset; - private final long length; - - Slice(String sliceDescription, long offset, long length, SearchableSnapshotIndexInput base) { - super(base.toString() + " [slice=" + sliceDescription + "]", base.reader, base.fileInfo, base.position); - this.offset = offset; - this.length = length; - } - - @Override - public long length() { - return length; - } - - @Override - protected void seekInternal(long pos) throws IOException { - super.seekInternal(offset + pos); - } - - @Override - public BufferedIndexInput clone() { - return new Slice("clone(" + this + ")", offset, length, this); - } - - @Override - public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { - if ((offset >= 0L) && (length >= 0L) && (offset + length <= length())) { - final Slice slice = new Slice(sliceDescription, offset + this.offset, length, this); - slice.seek(0L); - return slice; - } else { - throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: offset=" + offset - + ",length=" + length + ",fileLength=" + length() + ": " + this); - } - } + return "SearchableSnapshotIndexInput{" + + "resourceDesc=" + super.toString() + + ", fileInfo=" + fileInfo + + ", offset=" + offset + + ", length=" + length + + ", position=" + position + + '}'; } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index 71af60c9e9706..268d40845a2d6 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -5,11 +5,102 @@ */ package org.elasticsearch.xpack.searchablesnapshots; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.index.store.SearchableSnapshotDirectory; +import org.elasticsearch.plugins.IndexStorePlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.RepositoryPlugin; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.RepositoriesModule; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.ResourceWatcherService; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; /** * Plugin for Searchable Snapshots feature */ -public class SearchableSnapshots extends Plugin { +public class SearchableSnapshots extends Plugin implements IndexStorePlugin, RepositoryPlugin { + + public static final Setting EPHEMERAL_INDEX_REPOSITORY_SETTING = + Setting.simpleString("index.ephemeral.repository", Setting.Property.IndexScope, Setting.Property.PrivateIndex); + + public static final Setting EPHEMERAL_INDEX_SNAPSHOT_SETTING = + Setting.simpleString("index.ephemeral.snapshot", Setting.Property.IndexScope, Setting.Property.PrivateIndex); + + private final SetOnce repositoriesService; + private final SetOnce threadPool; + + public SearchableSnapshots() { + this.repositoriesService = new SetOnce<>(); + this.threadPool = new SetOnce<>(); + } + + @Override + public List> getSettings() { + return List.of(EPHEMERAL_INDEX_REPOSITORY_SETTING, EPHEMERAL_INDEX_SNAPSHOT_SETTING); + } + + @Override + public Collection createComponents( + final Client client, + final ClusterService clusterService, + final ThreadPool threadPool, + final ResourceWatcherService resourceWatcherService, + final ScriptService scriptService, + final NamedXContentRegistry xContentRegistry, + final Environment environment, + final NodeEnvironment nodeEnvironment, + final NamedWriteableRegistry namedWriteableRegistry) { + + this.threadPool.set(threadPool); + return List.of(); + } + + @Override + public void onRepositoriesModule(RepositoriesModule repositoriesModule) { + repositoriesService.set(repositoriesModule.getRepositoryService()); // should we use some SPI mechanism? + } + + @Override + public Map getDirectoryFactories() { + return Map.of("ephemeral", newDirectoryFactory(repositoriesService::get)); + } + + public static DirectoryFactory newDirectoryFactory(final Supplier repositoriesService) { + return (indexSettings, shardPath) -> { + final RepositoriesService repositories = repositoriesService.get(); + assert repositories != null; + + final Repository repository = repositories.repository(EPHEMERAL_INDEX_REPOSITORY_SETTING.get(indexSettings.getSettings())); + if (repository instanceof BlobStoreRepository == false) { + throw new IllegalArgumentException("Repository [" + repository + "] does not support searchable snapshots" ); + } + + BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository; + BlobContainer blobContainer = blobStoreRepository.shardContainer(new IndexId("_ephemeral", + shardPath.getShardId().getIndex().getUUID()), shardPath.getShardId().id()); + BlobStoreIndexShardSnapshot snapshot = blobStoreRepository.loadShardSnapshot(blobContainer, + new SnapshotId("_ephemeral", EPHEMERAL_INDEX_SNAPSHOT_SETTING.get(indexSettings.getSettings()))); + return new SearchableSnapshotDirectory(snapshot, blobContainer); + }; + } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java index a8c336288e211..ccba2999bd2bf 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java @@ -34,7 +34,6 @@ import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.UUIDs; -import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.BytesRefs; @@ -44,13 +43,16 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; +import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.plugins.IndexStorePlugin.DirectoryFactory; import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; -import org.elasticsearch.repositories.blobstore.ChecksumBlobStoreFormat; +import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.test.DummyShardLock; @@ -58,22 +60,21 @@ import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots; import java.io.Closeable; import java.io.EOFException; import java.io.IOException; -import java.io.InputStream; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.SNAPSHOT_CODEC; -import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.SNAPSHOT_NAME_FORMAT; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class SearchableSnapshotDirectoryTests extends ESTestCase { @@ -287,49 +288,48 @@ private void testDirectories(final CheckedBiConsumer future = PlainActionFuture.newFuture(); threadPool.generic().submit(() -> { - final PlainActionFuture future = PlainActionFuture.newFuture(); - SnapshotId snapshotId = new SnapshotId("_snapshot", UUIDs.randomBase64UUID(random())); - IndexId indexId = new IndexId(indexSettings.getIndex().getName(), UUIDs.randomBase64UUID(random())); IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(null); repository.snapshotShard(store, null, snapshotId, indexId, indexCommit, snapshotStatus, true, future); future.actionGet(); - - // TODO: fix this - try { - BlobContainer shardContainer = repository.blobStore().blobContainer(repository.basePath() - .add("indices").add(indexId.getId()).add(Integer.toString(shardId.id()))); - BlobStoreIndexShardSnapshot blobs = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT, - BlobStoreIndexShardSnapshot::fromXContent, NamedXContentRegistry.EMPTY, true) - .read(shardContainer, snapshotId.getUUID()); - searchableSnapshotDirectory.set(new SearchableSnapshotDirectory(blobs, - (name, from, length, buffer, offset) -> { - try (InputStream stream = shardContainer.readBlob(name)) { - stream.skip(from); - int read = stream.read(buffer, offset, length); - assert read == length; - } - })); - } catch (IOException e) { - logger.error("failed to build searchable snapshot directory instance", e); - } finally { - latch.countDown(); - } }); - latch.await(30L, TimeUnit.SECONDS); + future.actionGet(); + + final RepositoriesService repositories = mock(RepositoriesService.class); + when(repositories.repository(eq(repositoryName))).thenReturn(repository); + + final IndexSettings ephemeralIndexSettings = IndexSettingsModule.newIndexSettings("_searchable_snapshot_index", + Settings.builder() + .put(indexSettings.getSettings()) + .put(SearchableSnapshots.EPHEMERAL_INDEX_REPOSITORY_SETTING.getKey(), repositoryName) + .put(SearchableSnapshots.EPHEMERAL_INDEX_SNAPSHOT_SETTING.getKey(), snapshotId.getUUID()) + .build()); + + Path tmpDir = createTempDir().resolve(indexId.getId()).resolve(Integer.toString(shardId.id())); + ShardId ephemeralShardId = new ShardId(new Index(indexId.getName(), indexId.getId()), shardId.id()); + ShardPath ephemeralShardPath = new ShardPath(false, tmpDir, tmpDir, ephemeralShardId); - try (Directory searchableDirectory = searchableSnapshotDirectory.get()) { + final DirectoryFactory factory = SearchableSnapshots.newDirectoryFactory(() -> repositories); + try (Directory searchableDirectory = factory.newDirectory(ephemeralIndexSettings, ephemeralShardPath)) { consumer.accept(directory, searchableDirectory); } } finally { diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotIndexInputTests.java index 3fc7d3c826814..e48bddec6ce53 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotIndexInputTests.java @@ -7,46 +7,61 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.util.Version; +import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; import org.elasticsearch.test.ESTestCase; +import java.io.ByteArrayInputStream; import java.io.EOFException; import java.io.IOException; import java.nio.charset.StandardCharsets; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class SearchableSnapshotIndexInputTests extends ESTestCase { - private SearchableSnapshotIndexInput createIndexInput(final byte[] input) { - StoreFileMetaData metadata = new StoreFileMetaData("test", (long) input.length, "_checksum", Version.LATEST); - long partSize = (long) (randomBoolean() ? input.length : randomIntBetween(1, input.length)); - FileInfo fileInfo = new FileInfo(randomAlphaOfLength(5), metadata, new ByteSizeValue(partSize, ByteSizeUnit.BYTES)); + private SearchableSnapshotIndexInput createIndexInput(final byte[] input) throws IOException { + final long partSize = (long) (randomBoolean() ? input.length : randomIntBetween(1, input.length)); + final FileInfo fileInfo = new FileInfo(randomAlphaOfLength(5), + new StoreFileMetaData("test", (long) input.length, "_checksum", Version.LATEST), + new ByteSizeValue(partSize, ByteSizeUnit.BYTES)); - BlobBytesReader reader = (name, from, len, buffer, offset) -> { - if (fileInfo.numberOfParts() == 1L) { - if (name.equals(fileInfo.name()) == false || name.contains(".part")) { - throw new IOException("Unexpected part name " + name); - } - System.arraycopy(input, Math.toIntExact(from), buffer, offset, len); - } else { - if (name.startsWith(fileInfo.name()) == false || name.contains(".part") == false) { - throw new IOException("Unexpected part name " + name); - } - long partNumber = Long.parseLong(name.substring(name.indexOf(".part") + ".part".length())); - if (partNumber < 0 || partNumber >= fileInfo.numberOfParts()) { - throw new IOException("Unexpected part number " + name); + final BlobContainer blobContainer = mock(BlobContainer.class); + when(blobContainer.readBlob(anyString(), anyLong(), anyInt())) + .thenAnswer(invocationOnMock -> { + String name = (String) invocationOnMock.getArguments()[0]; + long position = (long) invocationOnMock.getArguments()[1]; + int length = (int) invocationOnMock.getArguments()[2]; + + if (fileInfo.numberOfParts() == 1L) { + if (name.equals(fileInfo.name()) == false || name.contains(".part")) { + throw new IOException("Unexpected part name " + name); + } + return new ByteArrayInputStream(input, Math.toIntExact(position), length); + } else { + if (name.startsWith(fileInfo.name()) == false || name.contains(".part") == false) { + throw new IOException("Unexpected part name " + name); + } + long partNumber = Long.parseLong(name.substring(name.indexOf(".part") + ".part".length())); + if (partNumber < 0 || partNumber >= fileInfo.numberOfParts()) { + throw new IOException("Unexpected part number " + name); + } + return new ByteArrayInputStream(input, Math.toIntExact(partNumber * partSize + position), length); } - System.arraycopy(input, Math.toIntExact(partNumber * partSize + from), buffer, offset, len); - } - }; - return new SearchableSnapshotIndexInput(reader, fileInfo); + }); + return new SearchableSnapshotIndexInput(blobContainer, fileInfo); } public void testRandomReads() throws IOException { for (int i = 0; i < 100; i++) { byte[] input = randomUnicodeOfLength(randomIntBetween(1, 1000)).getBytes(StandardCharsets.UTF_8); - SearchableSnapshotIndexInput indexInput = createIndexInput(input); + IndexInput indexInput = createIndexInput(input); assertEquals(input.length, indexInput.length()); assertEquals(0, indexInput.getFilePointer()); byte[] output = randomReadAndSlice(indexInput, input.length); @@ -57,7 +72,7 @@ public void testRandomReads() throws IOException { public void testRandomOverflow() throws IOException { for (int i = 0; i < 100; i++) { byte[] input = randomUnicodeOfLength(randomIntBetween(1, 1000)).getBytes(StandardCharsets.UTF_8); - SearchableSnapshotIndexInput indexInput = createIndexInput(input); + IndexInput indexInput = createIndexInput(input); int firstReadLen = randomIntBetween(0, input.length - 1); randomReadAndSlice(indexInput, firstReadLen); int bytesLeft = input.length - firstReadLen; @@ -69,7 +84,7 @@ public void testRandomOverflow() throws IOException { public void testSeekOverflow() throws IOException { for (int i = 0; i < 100; i++) { byte[] input = randomUnicodeOfLength(randomIntBetween(1, 1000)).getBytes(StandardCharsets.UTF_8); - SearchableSnapshotIndexInput indexInput = createIndexInput(input); + IndexInput indexInput = createIndexInput(input); int firstReadLen = randomIntBetween(0, input.length - 1); randomReadAndSlice(indexInput, firstReadLen); expectThrows(IOException.class, () -> { From dce48082adbac5e4d52792d5c6137fa90ff1b129 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 12 Dec 2019 16:14:08 +0100 Subject: [PATCH 3/7] Apply suggestions from code review Co-Authored-By: David Turner --- .../index/store/SearchableSnapshotDirectory.java | 16 ++++++++-------- .../searchablesnapshots/SearchableSnapshots.java | 8 ++++---- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java index 9a4ca9fab66ee..bcad968cec456 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java @@ -72,7 +72,7 @@ public IndexInput openInput(final String name, final IOContext context) throws I } @Override - public void close() throws IOException { + public void close() { isOpen = false; } @@ -82,37 +82,37 @@ public String toString() { } @Override - public Set getPendingDeletions() throws IOException { + public Set getPendingDeletions() { throw unsupportedException(); } @Override - public void sync(Collection names) throws IOException { + public void sync(Collection names) { throw unsupportedException(); } @Override - public void syncMetaData() throws IOException { + public void syncMetaData() { throw unsupportedException(); } @Override - public void deleteFile(String name) throws IOException { + public void deleteFile(String name) { throw unsupportedException(); } @Override - public IndexOutput createOutput(String name, IOContext context) throws IOException { + public IndexOutput createOutput(String name, IOContext context) { throw unsupportedException(); } @Override - public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException { + public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) { throw unsupportedException(); } @Override - public void rename(String source, String dest) throws IOException { + public void rename(String source, String dest) { throw unsupportedException(); } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index 268d40845a2d6..6721d51ece67b 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -40,10 +40,10 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, RepositoryPlugin { public static final Setting EPHEMERAL_INDEX_REPOSITORY_SETTING = - Setting.simpleString("index.ephemeral.repository", Setting.Property.IndexScope, Setting.Property.PrivateIndex); + Setting.simpleString("index.store.snapshot.repository_name", Setting.Property.IndexScope, Setting.Property.PrivateIndex); public static final Setting EPHEMERAL_INDEX_SNAPSHOT_SETTING = - Setting.simpleString("index.ephemeral.snapshot", Setting.Property.IndexScope, Setting.Property.PrivateIndex); + Setting.simpleString("index.store.snapshot.snapshot_uuid", Setting.Property.IndexScope, Setting.Property.PrivateIndex); private final SetOnce repositoriesService; private final SetOnce threadPool; @@ -81,7 +81,7 @@ public void onRepositoriesModule(RepositoriesModule repositoriesModule) { @Override public Map getDirectoryFactories() { - return Map.of("ephemeral", newDirectoryFactory(repositoriesService::get)); + return Map.of("snapshot", newDirectoryFactory(repositoriesService::get)); } public static DirectoryFactory newDirectoryFactory(final Supplier repositoriesService) { @@ -95,7 +95,7 @@ public static DirectoryFactory newDirectoryFactory(final Supplier Date: Thu, 12 Dec 2019 17:12:04 +0100 Subject: [PATCH 4/7] Apply feedback --- .../SearchableSnapshots.java | 24 ++-- .../SearchableSnapshotDirectoryTests.java | 130 +++++++++--------- .../SearchableSnapshotIndexInputTests.java | 25 ++-- 3 files changed, 97 insertions(+), 82 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index 6721d51ece67b..287b0fc408c68 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -39,12 +39,18 @@ */ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, RepositoryPlugin { - public static final Setting EPHEMERAL_INDEX_REPOSITORY_SETTING = + public static final Setting SNAPSHOT_REPOSITORY_SETTING = Setting.simpleString("index.store.snapshot.repository_name", Setting.Property.IndexScope, Setting.Property.PrivateIndex); - public static final Setting EPHEMERAL_INDEX_SNAPSHOT_SETTING = + public static final Setting SNAPSHOT_SNAPSHOT_NAME_SETTING = + Setting.simpleString("index.store.snapshot.snapshot_name", Setting.Property.IndexScope, Setting.Property.PrivateIndex); + + public static final Setting SNAPSHOT_SNAPSHOT_ID_SETTING = Setting.simpleString("index.store.snapshot.snapshot_uuid", Setting.Property.IndexScope, Setting.Property.PrivateIndex); + public static final Setting SNAPSHOT_INDEX_ID_SETTING = + Setting.simpleString("index.store.snapshot.index_uuid", Setting.Property.IndexScope, Setting.Property.PrivateIndex); + private final SetOnce repositoriesService; private final SetOnce threadPool; @@ -55,7 +61,7 @@ public SearchableSnapshots() { @Override public List> getSettings() { - return List.of(EPHEMERAL_INDEX_REPOSITORY_SETTING, EPHEMERAL_INDEX_SNAPSHOT_SETTING); + return List.of(SNAPSHOT_REPOSITORY_SETTING, SNAPSHOT_SNAPSHOT_ID_SETTING, SNAPSHOT_INDEX_ID_SETTING); } @Override @@ -89,16 +95,18 @@ public static DirectoryFactory newDirectoryFactory(final Supplier - assertThat(searchableDirectory.listAll(), equalTo(Arrays.stream(directory.listAll()) + testDirectories((directory, snapshotDirectory) -> + assertThat(snapshotDirectory.listAll(), equalTo(Arrays.stream(directory.listAll()) .filter(file -> "write.lock".equals(file) == false) .filter(file -> file.startsWith("extra") == false) .toArray(String[]::new)))); } public void testFileLength() throws Exception { - testDirectories((directory, searchableDirectory) -> + testDirectories((directory, snapshotDirectory) -> Arrays.stream(directory.listAll()) .filter(file -> "write.lock".equals(file) == false) .filter(file -> file.startsWith("extra") == false) .forEach(file -> { try { - assertThat(searchableDirectory.fileLength(file), equalTo(directory.fileLength(file))); + assertThat("File [" + file + "] length mismatch", + snapshotDirectory.fileLength(file), equalTo(directory.fileLength(file))); } catch (IOException e) { throw new AssertionError(e); } @@ -101,42 +102,34 @@ public void testFileLength() throws Exception { } public void testIndexSearcher() throws Exception { - testDirectories((directory, searchableDirectory) -> { + testDirectories((directory, snapshotDirectory) -> { try (DirectoryReader reader = DirectoryReader.open(directory)) { final IndexSearcher searcher = newSearcher(reader); - try (DirectoryReader searchableReader = DirectoryReader.open(searchableDirectory)) { - final IndexSearcher searchableSearcher = newSearcher(searchableReader); + try (DirectoryReader snapshotReader = DirectoryReader.open(snapshotDirectory)) { + final IndexSearcher snapshotSearcher = newSearcher(snapshotReader); { Query query = new MatchAllDocsQuery(); - assertThat(searchableSearcher.count(query), equalTo(searcher.count(query))); - CheckHits.checkEqual(query, - searchableSearcher.search(query, 10).scoreDocs, - searcher.search(query, 10).scoreDocs); + assertThat(snapshotSearcher.count(query), equalTo(searcher.count(query))); + CheckHits.checkEqual(query, snapshotSearcher.search(query, 10).scoreDocs, searcher.search(query, 10).scoreDocs); } { Query query = new TermQuery(new Term("text", "fox")); - assertThat(searchableSearcher.count(query), equalTo(searcher.count(query))); - CheckHits.checkEqual(query, - searchableSearcher.search(query, 10).scoreDocs, - searcher.search(query, 10).scoreDocs); + assertThat(snapshotSearcher.count(query), equalTo(searcher.count(query))); + CheckHits.checkEqual(query, snapshotSearcher.search(query, 10).scoreDocs, searcher.search(query, 10).scoreDocs); } { Query query = new TermInSetQuery("text", List.of(new BytesRef("quick"), new BytesRef("lazy"))); - assertThat(searchableSearcher.count(query), equalTo(searcher.count(query))); - CheckHits.checkEqual(query, - searchableSearcher.search(query, 10).scoreDocs, - searcher.search(query, 10).scoreDocs); + assertThat(snapshotSearcher.count(query), equalTo(searcher.count(query))); + CheckHits.checkEqual(query, snapshotSearcher.search(query, 10).scoreDocs, searcher.search(query, 10).scoreDocs); } { Query query = new TermRangeQuery("rank", BytesRefs.toBytesRef(randomLongBetween(0L, 500L)), BytesRefs.toBytesRef(randomLongBetween(501L, 1000L)), randomBoolean(), randomBoolean()); - assertThat(searchableSearcher.count(query), equalTo(searcher.count(query))); - CheckHits.checkEqual(query, - searchableSearcher.search(query, 10).scoreDocs, - searcher.search(query, 10).scoreDocs); + assertThat(snapshotSearcher.count(query), equalTo(searcher.count(query))); + CheckHits.checkEqual(query, snapshotSearcher.search(query, 10).scoreDocs, searcher.search(query, 10).scoreDocs); } } } @@ -144,45 +137,45 @@ public void testIndexSearcher() throws Exception { } public void testDirectoryReader() throws Exception { - testDirectories((directory, searchableDirectory) -> { + testDirectories((directory, snapshotDirectory) -> { try (DirectoryReader reader = DirectoryReader.open(directory)) { - try (DirectoryReader searchableReader = DirectoryReader.open(searchableDirectory)) { - assertThat(searchableReader.leaves(), hasSize(reader.leaves().size())); - assertThat(searchableReader.maxDoc(), equalTo(reader.maxDoc())); - assertThat(searchableReader.getVersion(), equalTo(reader.getVersion())); - assertThat(searchableReader.getIndexCommit().getGeneration(), equalTo(reader.getIndexCommit().getGeneration())); + try (DirectoryReader snapshotReader = DirectoryReader.open(snapshotDirectory)) { + assertThat(snapshotReader.leaves(), hasSize(reader.leaves().size())); + assertThat(snapshotReader.maxDoc(), equalTo(reader.maxDoc())); + assertThat(snapshotReader.getVersion(), equalTo(reader.getVersion())); + assertThat(snapshotReader.getIndexCommit().getGeneration(), equalTo(reader.getIndexCommit().getGeneration())); String field = randomFrom("id", "text"); Terms terms = reader.leaves().get(0).reader().terms(field); - Terms searchableTerms = searchableReader.leaves().get(0).reader().terms(field); - assertThat(searchableTerms.size(), equalTo(terms.size())); - assertThat(searchableTerms.getDocCount(), equalTo(terms.getDocCount())); - assertThat(searchableTerms.getMin(), equalTo(terms.getMin())); - assertThat(searchableTerms.getMax(), equalTo(terms.getMax())); + Terms snapshotTerms = snapshotReader.leaves().get(0).reader().terms(field); + assertThat(snapshotTerms.size(), equalTo(terms.size())); + assertThat(snapshotTerms.getDocCount(), equalTo(terms.getDocCount())); + assertThat(snapshotTerms.getMin(), equalTo(terms.getMin())); + assertThat(snapshotTerms.getMax(), equalTo(terms.getMax())); } } }); } public void testReadByte() throws Exception { - testIndexInputs((indexInput, searchableIndexInput) -> { + testIndexInputs((indexInput, snapshotIndexInput) -> { try { for (int i = 0; i < 10; i++) { if (randomBoolean()) { long position = randomLongBetween(0L, indexInput.length()); indexInput.seek(position); - searchableIndexInput.seek(position); + snapshotIndexInput.seek(position); } assertThat("File pointers values should be the same before reading a byte", - searchableIndexInput.getFilePointer(), equalTo(indexInput.getFilePointer())); + snapshotIndexInput, indexInput, IndexInput::getFilePointer); if (indexInput.getFilePointer() < indexInput.length()) { - assertThat(searchableIndexInput.readByte(), equalTo(indexInput.readByte())); + assertThat("Read byte result should be the same", snapshotIndexInput, indexInput, IndexInput::readByte); } else { - expectThrows(EOFException.class, searchableIndexInput::readByte); + expectThrows(EOFException.class, snapshotIndexInput::readByte); } assertThat("File pointers values should be the same after reading a byte", - searchableIndexInput.getFilePointer(), equalTo(indexInput.getFilePointer())); + snapshotIndexInput, indexInput, IndexInput::getFilePointer); } } catch (IOException e) { throw new AssertionError(e); @@ -192,21 +185,21 @@ public void testReadByte() throws Exception { public void testReadBytes() throws Exception { final byte[] buffer = new byte[8192]; - final byte[] searchableBuffer = new byte[buffer.length]; + final byte[] snapshotBuffer = new byte[buffer.length]; - testIndexInputs((indexInput, searchableIndexInput) -> { + testIndexInputs((indexInput, snapshotIndexInput) -> { try { if (randomBoolean()) { long position = randomLongBetween(0L, indexInput.length()); indexInput.seek(position); - searchableIndexInput.seek(position); + snapshotIndexInput.seek(position); } assertThat("File pointers values should be the same before reading a byte", - searchableIndexInput.getFilePointer(), equalTo(indexInput.getFilePointer())); + snapshotIndexInput, indexInput, IndexInput::getFilePointer); int available = Math.toIntExact(indexInput.length() - indexInput.getFilePointer()); if (available == 0) { - expectThrows(EOFException.class, () -> searchableIndexInput.readBytes(searchableBuffer, 0, searchableBuffer.length)); + expectThrows(EOFException.class, () -> snapshotIndexInput.readBytes(snapshotBuffer, 0, snapshotBuffer.length)); return; } @@ -215,12 +208,12 @@ public void testReadBytes() throws Exception { Arrays.fill(buffer, (byte) 0); indexInput.readBytes(buffer, 0, length); - Arrays.fill(searchableBuffer, (byte) 0); - searchableIndexInput.readBytes(searchableBuffer, 0, length); + Arrays.fill(snapshotBuffer, (byte) 0); + snapshotIndexInput.readBytes(snapshotBuffer, 0, length); assertThat("File pointers values should be the same after reading a byte", - searchableIndexInput.getFilePointer(), equalTo(indexInput.getFilePointer())); - assertArrayEquals(searchableBuffer, buffer); + snapshotIndexInput, indexInput, IndexInput::getFilePointer); + assertArrayEquals(snapshotBuffer, buffer); } catch (IOException e) { throw new AssertionError(e); } @@ -264,8 +257,6 @@ private void testDirectories(final CheckedBiConsumer searchableSnapshotDirectory = new SetOnce<>(); - final ThreadPool threadPool = new TestThreadPool(getClass().getSimpleName()); releasables.add(() -> terminate(threadPool)); @@ -317,20 +308,22 @@ private void testDirectories(final CheckedBiConsumer repositories); - try (Directory searchableDirectory = factory.newDirectory(ephemeralIndexSettings, ephemeralShardPath)) { - consumer.accept(directory, searchableDirectory); + try (Directory snapshotDirectory = factory.newDirectory(tmpIndexSettings, tmpShardPath)) { + consumer.accept(directory, snapshotDirectory); } } finally { Releasables.close(releasables); @@ -339,18 +332,18 @@ private void testDirectories(final CheckedBiConsumer consumer) throws Exception { - testDirectories((directory, searchableDirectory) -> { - for (String fileName : randomSubsetOf(Arrays.asList(searchableDirectory.listAll()))) { + testDirectories((directory, snapshotDirectory) -> { + for (String fileName : randomSubsetOf(Arrays.asList(snapshotDirectory.listAll()))) { final IOContext context = newIOContext(random()); try (IndexInput indexInput = directory.openInput(fileName, context)) { final List closeables = new ArrayList<>(); try { - IndexInput searchableIndexInput = searchableDirectory.openInput(fileName, context); - closeables.add(searchableIndexInput); + IndexInput snapshotIndexInput = snapshotDirectory.openInput(fileName, context); + closeables.add(snapshotIndexInput); if (randomBoolean()) { - searchableIndexInput = searchableIndexInput.clone(); + snapshotIndexInput = snapshotIndexInput.clone(); } - consumer.accept(indexInput, searchableIndexInput); + consumer.accept(indexInput, snapshotIndexInput); } finally { IOUtils.close(closeables); } @@ -358,4 +351,11 @@ private void testIndexInputs(final CheckedBiConsumer void assertThat(String reason, IndexInput actual, IndexInput expected, + CheckedFunction eval) throws IOException { + assertThat(reason + + "\n\t actual index input: " + actual.toString() + + "\n\texpected index input: " + expected.toString(), eval.apply(actual), equalTo(eval.apply(expected))); + } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotIndexInputTests.java index e48bddec6ce53..47c786c6c8415 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotIndexInputTests.java @@ -18,6 +18,13 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.startsWith; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; @@ -38,20 +45,20 @@ private SearchableSnapshotIndexInput createIndexInput(final byte[] input) throws String name = (String) invocationOnMock.getArguments()[0]; long position = (long) invocationOnMock.getArguments()[1]; int length = (int) invocationOnMock.getArguments()[2]; + assertThat("Reading [" + length + "] bytes from [" + name + "] at [" + position + "] exceeds part size [" + partSize + "]", + position + length, lessThanOrEqualTo(partSize)); if (fileInfo.numberOfParts() == 1L) { - if (name.equals(fileInfo.name()) == false || name.contains(".part")) { - throw new IOException("Unexpected part name " + name); - } + assertThat("Unexpected blob name [" + name + "]", name, equalTo(fileInfo.name())); return new ByteArrayInputStream(input, Math.toIntExact(position), length); + } else { - if (name.startsWith(fileInfo.name()) == false || name.contains(".part") == false) { - throw new IOException("Unexpected part name " + name); - } + assertThat("Unexpected blob name [" + name + "]", name, allOf(startsWith(fileInfo.name()), containsString(".part"))); + long partNumber = Long.parseLong(name.substring(name.indexOf(".part") + ".part".length())); - if (partNumber < 0 || partNumber >= fileInfo.numberOfParts()) { - throw new IOException("Unexpected part number " + name); - } + assertThat("Unexpected part number [" + partNumber + "] for [" + name + "]", partNumber, + allOf(greaterThanOrEqualTo(0L), lessThan(fileInfo.numberOfParts()))); + return new ByteArrayInputStream(input, Math.toIntExact(partNumber * partSize + position), length); } }); From 8ad717425e704b5c235f20d6dd67ac45e2cd3e74 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 13 Dec 2019 09:35:33 +0100 Subject: [PATCH 5/7] Add missing setting --- .../xpack/searchablesnapshots/SearchableSnapshots.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index 287b0fc408c68..3a7ed8551ed50 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -61,7 +61,10 @@ public SearchableSnapshots() { @Override public List> getSettings() { - return List.of(SNAPSHOT_REPOSITORY_SETTING, SNAPSHOT_SNAPSHOT_ID_SETTING, SNAPSHOT_INDEX_ID_SETTING); + return List.of(SNAPSHOT_REPOSITORY_SETTING, + SNAPSHOT_SNAPSHOT_NAME_SETTING, + SNAPSHOT_SNAPSHOT_ID_SETTING, + SNAPSHOT_INDEX_ID_SETTING); } @Override From a2de0d2815e71b4de4cab624d4ff1ca43dc0d466 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 13 Dec 2019 10:18:52 +0100 Subject: [PATCH 6/7] random seeking --- .../store/SearchableSnapshotIndexInputTests.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotIndexInputTests.java index 47c786c6c8415..65034a33aacea 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotIndexInputTests.java @@ -115,7 +115,7 @@ private byte[] randomReadAndSlice(IndexInput indexInput, int length) throws IOEx int readPos = (int) indexInput.getFilePointer(); byte[] output = new byte[length]; while (readPos < length) { - switch (randomIntBetween(0, 3)) { + switch (randomIntBetween(0, 4)) { case 0: // Read by one byte at a time output[readPos++] = indexInput.readByte(); @@ -146,6 +146,18 @@ private byte[] randomReadAndSlice(IndexInput indexInput, int length) throws IOEx indexInput.seek(readPos); assertEquals(readPos, indexInput.getFilePointer()); break; + case 4: + // Seek at a random position and read a single byte, + // then seek back to original position + final int lastReadPos = readPos; + readPos = randomIntBetween(0, length - 1); + indexInput.seek(readPos); + assertEquals(readPos, indexInput.getFilePointer()); + randomReadAndSlice(indexInput, 1); + readPos = lastReadPos; + indexInput.seek(readPos); + assertEquals(readPos, indexInput.getFilePointer()); + break; default: fail(); } From 9cba9a9245d3ff9347096ff8e3f0bc2966c236f3 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 13 Dec 2019 11:03:48 +0100 Subject: [PATCH 7/7] override output value so we can check the random read succeded --- .../index/store/SearchableSnapshotIndexInputTests.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotIndexInputTests.java index 65034a33aacea..5de5cb8e0c2ce 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotIndexInputTests.java @@ -153,7 +153,9 @@ private byte[] randomReadAndSlice(IndexInput indexInput, int length) throws IOEx readPos = randomIntBetween(0, length - 1); indexInput.seek(readPos); assertEquals(readPos, indexInput.getFilePointer()); - randomReadAndSlice(indexInput, 1); + final int bytesToRead = 1; + temp = randomReadAndSlice(indexInput, readPos + bytesToRead); + System.arraycopy(temp, readPos, output, readPos, bytesToRead); readPos = lastReadPos; indexInput.seek(readPos); assertEquals(readPos, indexInput.getFilePointer());