diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 2a10b69eb8a99..369ba6fa2eb8d 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1027,11 +1027,9 @@ public long getRestoreThrottleTimeInNanos() { } protected void assertSnapshotOrGenericThread() { - // NORELEASE - /* - assert Thread.currentThread().getName().contains(ThreadPool.Names.SNAPSHOT) - || Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC) : - "Expected current thread [" + Thread.currentThread() + "] to be the snapshot or generic thread.";*/ + assert Thread.currentThread().getName().contains('[' + ThreadPool.Names.SNAPSHOT + ']') + || Thread.currentThread().getName().contains('[' + ThreadPool.Names.GENERIC + ']') : + "Expected current thread [" + Thread.currentThread() + "] to be the snapshot or generic thread."; } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/common/lucene/store/ESIndexInputTestCase.java b/test/framework/src/main/java/org/elasticsearch/common/lucene/store/ESIndexInputTestCase.java index 73685ce6896ce..78ca476334214 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/lucene/store/ESIndexInputTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/common/lucene/store/ESIndexInputTestCase.java @@ -42,7 +42,7 @@ public class ESIndexInputTestCase extends ESTestCase { @BeforeClass public static void createExecutor() { - final String name = getTestClass().getSimpleName() + "#randomReadAndSlice"; + final String name = "TEST-" + getTestClass().getSimpleName() + "#randomReadAndSlice"; executor = EsExecutors.newFixed(name, 10, 0, EsExecutors.daemonThreadFactory(name), new ThreadContext(Settings.EMPTY), false); } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java index 13dc37787b5e2..140f92084ddc3 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java @@ -7,9 +7,11 @@ import org.apache.lucene.store.BufferedIndexInput; import org.apache.lucene.store.IOContext; +import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.io.InputStream; @@ -42,6 +44,7 @@ public BaseSearchableSnapshotIndexInput( } protected InputStream openInputStream(final long position, final long length) throws IOException { + assert assertCurrentThreadMayAccessBlobStore(); final long startPart = getPartNumberForPosition(position); final long endPart = getPartNumberForPosition(position + length); if ((startPart == endPart) || fileInfo.numberOfParts() == 1L) { @@ -61,6 +64,28 @@ protected InputStream openSlice(long slice) throws IOException { } } + protected final boolean assertCurrentThreadMayAccessBlobStore() { + final String threadName = Thread.currentThread().getName(); + assert threadName.contains('[' + ThreadPool.Names.SNAPSHOT + ']') + || threadName.contains('[' + ThreadPool.Names.GENERIC + ']') + || threadName.contains('[' + ThreadPool.Names.SEARCH + ']') + || threadName.contains('[' + ThreadPool.Names.SEARCH_THROTTLED + ']') + + // Today processExistingRecoveries considers all shards and constructs a shard store snapshot on this thread, this needs + // addressing. TODO NORELEASE + || threadName.contains('[' + ThreadPool.Names.FETCH_SHARD_STORE + ']') + + // Today for as-yet-unknown reasons we sometimes try and compute the snapshot size on the cluster applier thread, which needs + // addressing. TODO NORELEASE + || threadName.contains('[' + ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME + ']') + + // Unit tests access the blob store on the main test thread; simplest just to permit this rather than have them override this + // method somehow. + || threadName.startsWith("TEST-") + : "current thread [" + Thread.currentThread() + "] may not read " + fileInfo; + return true; + } + private long getPartNumberForPosition(long position) { ensureValidPosition(position); final long part = position / fileInfo.partSize().getBytes(); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java index 6dc147dbf2e8c..a7580555dd2cb 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java @@ -18,14 +18,15 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.lucene.store.ByteArrayIndexInput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.LazyInitializable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; -import org.elasticsearch.index.store.cache.CachedBlobContainerIndexInput; import org.elasticsearch.index.store.cache.CacheFile; import org.elasticsearch.index.store.cache.CacheKey; +import org.elasticsearch.index.store.cache.CachedBlobContainerIndexInput; import org.elasticsearch.index.store.direct.DirectBlobContainerIndexInput; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; @@ -46,9 +47,10 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.LongSupplier; +import java.util.function.Supplier; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_REPOSITORY_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING; @@ -68,8 +70,8 @@ */ public class SearchableSnapshotDirectory extends BaseDirectory { - private final BlobStoreIndexShardSnapshot snapshot; - private final BlobContainer blobContainer; + private final Supplier blobContainer; + private final Supplier snapshot; private final SnapshotId snapshotId; private final IndexId indexId; private final ShardId shardId; @@ -78,13 +80,13 @@ public class SearchableSnapshotDirectory extends BaseDirectory { private final CacheService cacheService; private final boolean useCache; private final Set excludedFileTypes; - private final long uncachedChunkSize; + private final long uncachedChunkSize; // if negative use BlobContainer#readBlobPreferredLength, see #getUncachedChunkSize() private final Path cacheDir; private final AtomicBoolean closed; public SearchableSnapshotDirectory( - BlobContainer blobContainer, - BlobStoreIndexShardSnapshot snapshot, + Supplier blobContainer, + Supplier snapshot, SnapshotId snapshotId, IndexId indexId, ShardId shardId, @@ -106,15 +108,21 @@ public SearchableSnapshotDirectory( this.closed = new AtomicBoolean(false); this.useCache = SNAPSHOT_CACHE_ENABLED_SETTING.get(indexSettings); this.excludedFileTypes = new HashSet<>(SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING.get(indexSettings)); - this.uncachedChunkSize = SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING.get(indexSettings).getBytes() < 0 ? - blobContainer.readBlobPreferredLength() : - SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING.get(indexSettings).getBytes(); + this.uncachedChunkSize = SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING.get(indexSettings).getBytes(); } public BlobContainer blobContainer() { + final BlobContainer blobContainer = this.blobContainer.get(); + assert blobContainer != null; return blobContainer; } + public BlobStoreIndexShardSnapshot snapshot() { + final BlobStoreIndexShardSnapshot snapshot = this.snapshot.get(); + assert snapshot != null; + return snapshot; + } + public SnapshotId getSnapshotId() { return snapshotId; } @@ -141,7 +149,7 @@ public long statsCurrentTimeNanos() { } private BlobStoreIndexShardSnapshot.FileInfo fileInfo(final String name) throws FileNotFoundException { - return snapshot.indexFiles() + return snapshot().indexFiles() .stream() .filter(fileInfo -> fileInfo.physicalName().equals(name)) .findFirst() @@ -151,7 +159,7 @@ private BlobStoreIndexShardSnapshot.FileInfo fileInfo(final String name) throws @Override public final String[] listAll() { ensureOpen(); - return snapshot.indexFiles() + return snapshot().indexFiles() .stream() .map(BlobStoreIndexShardSnapshot.FileInfo::physicalName) .sorted(String::compareTo) @@ -244,7 +252,16 @@ public IndexInput openInput(final String name, final IOContext context) throws I if (useCache && isExcludedFromCache(name) == false) { return new CachedBlobContainerIndexInput(this, fileInfo, context, inputStats); } else { - return new DirectBlobContainerIndexInput(blobContainer, fileInfo, context, uncachedChunkSize, BufferedIndexInput.BUFFER_SIZE); + return new DirectBlobContainerIndexInput( + blobContainer(), fileInfo, context, getUncachedChunkSize(), BufferedIndexInput.BUFFER_SIZE); + } + } + + private long getUncachedChunkSize() { + if (uncachedChunkSize < 0) { + return blobContainer().readBlobPreferredLength(); + } else { + return uncachedChunkSize; } } @@ -255,7 +272,7 @@ private boolean isExcludedFromCache(String name) { @Override public String toString() { - return this.getClass().getSimpleName() + "@" + snapshot.snapshot() + " lockFactory=" + lockFactory; + return this.getClass().getSimpleName() + "@" + snapshot().snapshot() + " lockFactory=" + lockFactory; } public static Directory create(RepositoriesService repositories, @@ -271,21 +288,23 @@ public static Directory create(RepositoriesService repositories, final BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository; final IndexId indexId = new IndexId(indexSettings.getIndex().getName(), SNAPSHOT_INDEX_ID_SETTING.get(indexSettings.getSettings())); - final BlobContainer blobContainer = blobStoreRepository.shardContainer(indexId, shardPath.getShardId().id()); - final SnapshotId snapshotId = new SnapshotId( SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings.getSettings()), SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings()) ); - final BlobStoreIndexShardSnapshot snapshot = blobStoreRepository.loadShardSnapshot(blobContainer, snapshotId); + + final LazyInitializable lazyBlobContainer + = new LazyInitializable<>(() -> blobStoreRepository.shardContainer(indexId, shardPath.getShardId().id())); + final LazyInitializable lazySnapshot + = new LazyInitializable<>(() -> blobStoreRepository.loadShardSnapshot(lazyBlobContainer.getOrCompute(), snapshotId)); final Path cacheDir = shardPath.getDataPath().resolve("snapshots").resolve(snapshotId.getUUID()); Files.createDirectories(cacheDir); return new InMemoryNoOpCommitDirectory( new SearchableSnapshotDirectory( - blobContainer, - snapshot, + lazyBlobContainer::getOrCompute, + lazySnapshot::getOrCompute, snapshotId, indexId, shardPath.getShardId(), diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java index 8749d8eed6419..0d1f077a34bbc 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java @@ -258,6 +258,7 @@ public String toString() { } private InputStream openBlobStream(int part, long pos, long length) throws IOException { + assert assertCurrentThreadMayAccessBlobStore(); return blobContainer.readBlob(fileInfo.partName(part), pos, length); } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java index 7939337c1bf87..2757a5e9cb3e4 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java @@ -349,7 +349,13 @@ private void testDirectories(final CheckedBiConsumer 0L, + try (Directory snapshotDirectory = new SearchableSnapshotDirectory(() -> blobContainer, () -> snapshot, snapshotId, indexId, + shardId, Settings.builder().put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), randomBoolean()).build(), () -> 0L, cacheService, cacheDir)) { consumer.accept(directory, snapshotDirectory); } @@ -432,8 +438,9 @@ public void testClearCache() throws Exception { final ShardId shardId = new ShardId(new Index("_name", "_id"), 0); final Path cacheDir = createTempDir(); - try (SearchableSnapshotDirectory directory = new SearchableSnapshotDirectory(blobContainer, snapshot, snapshotId, indexId, - shardId, Settings.builder().put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true).build(), () -> 0L, cacheService, cacheDir)) { + try (SearchableSnapshotDirectory directory = new SearchableSnapshotDirectory(() -> blobContainer, () -> snapshot, snapshotId, + indexId, shardId, Settings.builder().put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true).build(), () -> 0L, cacheService, + cacheDir)) { final byte[] buffer = new byte[1024]; for (int i = 0; i < randomIntBetween(10, 50); i++) { diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputStatsTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputStatsTests.java index 2d7ace671c523..75e0de8da6d29 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputStatsTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputStatsTests.java @@ -451,7 +451,7 @@ private static void executeTestCase( try (CacheService ignored = cacheService; SearchableSnapshotDirectory directory = - new SearchableSnapshotDirectory(blobContainer, snapshot, snapshotId, indexId, shardId, indexSettings, + new SearchableSnapshotDirectory(() -> blobContainer, () -> snapshot, snapshotId, indexId, shardId, indexSettings, () -> fakeClock.addAndGet(FAKE_CLOCK_ADVANCE_NANOS), cacheService, createTempDir()) { @Override protected IndexInputStats createIndexInputStats(long fileLength) { diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java index 71b1af6ac6012..cc24afbcbac2c 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java @@ -55,15 +55,18 @@ public void testRandomReads() throws IOException { final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(), 0L, List.of(new BlobStoreIndexShardSnapshot.FileInfo(blobName, metaData, new ByteSizeValue(input.length))), 0L, 0L, 0, 0L); - BlobContainer blobContainer = singleBlobContainer(blobName, input); + final BlobContainer singleBlobContainer = singleBlobContainer(blobName, input); + final BlobContainer blobContainer; if (input.length <= cacheService.getCacheSize()) { - blobContainer = new CountingBlobContainer(blobContainer, cacheService.getRangeSize()); + blobContainer = new CountingBlobContainer(singleBlobContainer, cacheService.getRangeSize()); + } else { + blobContainer = singleBlobContainer; } final Path cacheDir = createTempDir(); - try (SearchableSnapshotDirectory directory = new SearchableSnapshotDirectory(blobContainer, snapshot, snapshotId, indexId, - shardId, Settings.builder().put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true).build(), () -> 0L, cacheService, cacheDir - )) { + try (SearchableSnapshotDirectory directory = new SearchableSnapshotDirectory(() -> blobContainer, () -> snapshot, + snapshotId, indexId, shardId, Settings.builder().put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true).build(), + () -> 0L, cacheService, cacheDir)) { try (IndexInput indexInput = directory.openInput(fileName, newIOContext(random()))) { assertEquals(input.length, indexInput.length()); assertEquals(0, indexInput.getFilePointer()); @@ -102,8 +105,8 @@ public void testThrowsEOFException() throws IOException { final BlobContainer blobContainer = singleBlobContainer(blobName, input); final Path cacheDir = createTempDir(); - try (SearchableSnapshotDirectory searchableSnapshotDirectory = new SearchableSnapshotDirectory( blobContainer, snapshot, - snapshotId, indexId, shardId, Settings.EMPTY, () -> 0L, cacheService, cacheDir)) { + try (SearchableSnapshotDirectory searchableSnapshotDirectory = new SearchableSnapshotDirectory(() -> blobContainer, + () -> snapshot, snapshotId, indexId, shardId, Settings.EMPTY, () -> 0L, cacheService, cacheDir)) { try (IndexInput indexInput = searchableSnapshotDirectory.openInput(fileName, newIOContext(random()))) { final byte[] buffer = new byte[input.length + 1]; final IOException exception = expectThrows(IOException.class, () -> indexInput.readBytes(buffer, 0, buffer.length)); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsRestTestCase.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsRestTestCase.java index 1cd35a68e223b..53c7317937c78 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsRestTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsRestTestCase.java @@ -191,8 +191,7 @@ public void testClearCache() throws Exception { final long bytesInCacheBeforeClear = sumCachedBytesWritten.apply(searchableSnapshotStats(restoredIndexName)); assertThat(bytesInCacheBeforeClear, greaterThan(0L)); - final Request request = new Request(HttpPost.METHOD_NAME, restoredIndexName + "/_searchable_snapshots/cache/clear"); - assertOK(client().performRequest(request)); + clearCache(restoredIndexName); final long bytesInCacheAfterClear = sumCachedBytesWritten.apply(searchableSnapshotStats(restoredIndexName)); assertThat(bytesInCacheAfterClear, equalTo(bytesInCacheBeforeClear)); @@ -205,7 +204,18 @@ public void testClearCache() throws Exception { }); } + private void clearCache(String restoredIndexName) throws IOException { + final Request request = new Request(HttpPost.METHOD_NAME, restoredIndexName + "/_searchable_snapshots/cache/clear"); + assertOK(client().performRequest(request)); + } + public void assertSearchResults(String indexName, int numDocs, Boolean ignoreThrottled) throws IOException { + + if (randomBoolean()) { + logger.info("clearing searchable snapshots cache for [{}] before search", indexName); + clearCache(indexName); + } + final int randomTieBreaker = randomIntBetween(0, numDocs - 1); Map searchResults; switch (randomInt(3)) {