Skip to content

Commit 773e7e3

Browse files
DaveCTurnertlrx
andauthored
Defer repo ops in searchable snapshot restore (#54211)
A searchable snapshots `Directory` requires a `BlobContainer` and a `BlobStoreIndexShardSnapshot`, which require us to read some data from the repository. Today we construct these objects on the cluster applier thread, blocking that thread on remote operations. This commit defers their construction until the restore process starts, so that they can happen on a more appropriate thread. It also reinstates the assertion that constructing the blob container only occurs on the snapshot or generic threadpool, and adds an assertion that blobs are only accessed on snapshot or generic or search threads too. Co-authored-by: Tanguy Leroux <[email protected]>
1 parent f8edeee commit 773e7e3

File tree

9 files changed

+103
-40
lines changed

9 files changed

+103
-40
lines changed

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -1027,11 +1027,9 @@ public long getRestoreThrottleTimeInNanos() {
10271027
}
10281028

10291029
protected void assertSnapshotOrGenericThread() {
1030-
// NORELEASE
1031-
/*
1032-
assert Thread.currentThread().getName().contains(ThreadPool.Names.SNAPSHOT)
1033-
|| Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC) :
1034-
"Expected current thread [" + Thread.currentThread() + "] to be the snapshot or generic thread.";*/
1030+
assert Thread.currentThread().getName().contains('[' + ThreadPool.Names.SNAPSHOT + ']')
1031+
|| Thread.currentThread().getName().contains('[' + ThreadPool.Names.GENERIC + ']') :
1032+
"Expected current thread [" + Thread.currentThread() + "] to be the snapshot or generic thread.";
10351033
}
10361034

10371035
@Override

test/framework/src/main/java/org/elasticsearch/common/lucene/store/ESIndexInputTestCase.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public class ESIndexInputTestCase extends ESTestCase {
4242

4343
@BeforeClass
4444
public static void createExecutor() {
45-
final String name = getTestClass().getSimpleName() + "#randomReadAndSlice";
45+
final String name = "TEST-" + getTestClass().getSimpleName() + "#randomReadAndSlice";
4646
executor = EsExecutors.newFixed(name, 10, 0, EsExecutors.daemonThreadFactory(name), new ThreadContext(Settings.EMPTY), false);
4747
}
4848

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java

+25
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@
77

88
import org.apache.lucene.store.BufferedIndexInput;
99
import org.apache.lucene.store.IOContext;
10+
import org.elasticsearch.cluster.service.ClusterApplierService;
1011
import org.elasticsearch.common.blobstore.BlobContainer;
1112
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
1213
import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream;
14+
import org.elasticsearch.threadpool.ThreadPool;
1315

1416
import java.io.IOException;
1517
import java.io.InputStream;
@@ -42,6 +44,7 @@ public BaseSearchableSnapshotIndexInput(
4244
}
4345

4446
protected InputStream openInputStream(final long position, final long length) throws IOException {
47+
assert assertCurrentThreadMayAccessBlobStore();
4548
final long startPart = getPartNumberForPosition(position);
4649
final long endPart = getPartNumberForPosition(position + length);
4750
if ((startPart == endPart) || fileInfo.numberOfParts() == 1L) {
@@ -61,6 +64,28 @@ protected InputStream openSlice(long slice) throws IOException {
6164
}
6265
}
6366

67+
protected final boolean assertCurrentThreadMayAccessBlobStore() {
68+
final String threadName = Thread.currentThread().getName();
69+
assert threadName.contains('[' + ThreadPool.Names.SNAPSHOT + ']')
70+
|| threadName.contains('[' + ThreadPool.Names.GENERIC + ']')
71+
|| threadName.contains('[' + ThreadPool.Names.SEARCH + ']')
72+
|| threadName.contains('[' + ThreadPool.Names.SEARCH_THROTTLED + ']')
73+
74+
// Today processExistingRecoveries considers all shards and constructs a shard store snapshot on this thread, this needs
75+
// addressing. TODO NORELEASE
76+
|| threadName.contains('[' + ThreadPool.Names.FETCH_SHARD_STORE + ']')
77+
78+
// Today for as-yet-unknown reasons we sometimes try and compute the snapshot size on the cluster applier thread, which needs
79+
// addressing. TODO NORELEASE
80+
|| threadName.contains('[' + ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME + ']')
81+
82+
// Unit tests access the blob store on the main test thread; simplest just to permit this rather than have them override this
83+
// method somehow.
84+
|| threadName.startsWith("TEST-")
85+
: "current thread [" + Thread.currentThread() + "] may not read " + fileInfo;
86+
return true;
87+
}
88+
6489
private long getPartNumberForPosition(long position) {
6590
ensureValidPosition(position);
6691
final long part = position / fileInfo.partSize().getBytes();

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java

+38-19
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,15 @@
1818
import org.elasticsearch.common.blobstore.BlobContainer;
1919
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
2020
import org.elasticsearch.common.settings.Settings;
21+
import org.elasticsearch.common.util.LazyInitializable;
2122
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
2223
import org.elasticsearch.index.IndexSettings;
2324
import org.elasticsearch.index.shard.ShardId;
2425
import org.elasticsearch.index.shard.ShardPath;
2526
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
26-
import org.elasticsearch.index.store.cache.CachedBlobContainerIndexInput;
2727
import org.elasticsearch.index.store.cache.CacheFile;
2828
import org.elasticsearch.index.store.cache.CacheKey;
29+
import org.elasticsearch.index.store.cache.CachedBlobContainerIndexInput;
2930
import org.elasticsearch.index.store.direct.DirectBlobContainerIndexInput;
3031
import org.elasticsearch.repositories.IndexId;
3132
import org.elasticsearch.repositories.RepositoriesService;
@@ -46,9 +47,10 @@
4647
import java.util.Set;
4748
import java.util.concurrent.atomic.AtomicBoolean;
4849
import java.util.function.LongSupplier;
50+
import java.util.function.Supplier;
4951

50-
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING;
5152
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING;
53+
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING;
5254
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING;
5355
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_REPOSITORY_SETTING;
5456
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
@@ -68,8 +70,8 @@
6870
*/
6971
public class SearchableSnapshotDirectory extends BaseDirectory {
7072

71-
private final BlobStoreIndexShardSnapshot snapshot;
72-
private final BlobContainer blobContainer;
73+
private final Supplier<BlobContainer> blobContainer;
74+
private final Supplier<BlobStoreIndexShardSnapshot> snapshot;
7375
private final SnapshotId snapshotId;
7476
private final IndexId indexId;
7577
private final ShardId shardId;
@@ -78,13 +80,13 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
7880
private final CacheService cacheService;
7981
private final boolean useCache;
8082
private final Set<String> excludedFileTypes;
81-
private final long uncachedChunkSize;
83+
private final long uncachedChunkSize; // if negative use BlobContainer#readBlobPreferredLength, see #getUncachedChunkSize()
8284
private final Path cacheDir;
8385
private final AtomicBoolean closed;
8486

8587
public SearchableSnapshotDirectory(
86-
BlobContainer blobContainer,
87-
BlobStoreIndexShardSnapshot snapshot,
88+
Supplier<BlobContainer> blobContainer,
89+
Supplier<BlobStoreIndexShardSnapshot> snapshot,
8890
SnapshotId snapshotId,
8991
IndexId indexId,
9092
ShardId shardId,
@@ -106,15 +108,21 @@ public SearchableSnapshotDirectory(
106108
this.closed = new AtomicBoolean(false);
107109
this.useCache = SNAPSHOT_CACHE_ENABLED_SETTING.get(indexSettings);
108110
this.excludedFileTypes = new HashSet<>(SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING.get(indexSettings));
109-
this.uncachedChunkSize = SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING.get(indexSettings).getBytes() < 0 ?
110-
blobContainer.readBlobPreferredLength() :
111-
SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING.get(indexSettings).getBytes();
111+
this.uncachedChunkSize = SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING.get(indexSettings).getBytes();
112112
}
113113

114114
public BlobContainer blobContainer() {
115+
final BlobContainer blobContainer = this.blobContainer.get();
116+
assert blobContainer != null;
115117
return blobContainer;
116118
}
117119

120+
public BlobStoreIndexShardSnapshot snapshot() {
121+
final BlobStoreIndexShardSnapshot snapshot = this.snapshot.get();
122+
assert snapshot != null;
123+
return snapshot;
124+
}
125+
118126
public SnapshotId getSnapshotId() {
119127
return snapshotId;
120128
}
@@ -141,7 +149,7 @@ public long statsCurrentTimeNanos() {
141149
}
142150

143151
private BlobStoreIndexShardSnapshot.FileInfo fileInfo(final String name) throws FileNotFoundException {
144-
return snapshot.indexFiles()
152+
return snapshot().indexFiles()
145153
.stream()
146154
.filter(fileInfo -> fileInfo.physicalName().equals(name))
147155
.findFirst()
@@ -151,7 +159,7 @@ private BlobStoreIndexShardSnapshot.FileInfo fileInfo(final String name) throws
151159
@Override
152160
public final String[] listAll() {
153161
ensureOpen();
154-
return snapshot.indexFiles()
162+
return snapshot().indexFiles()
155163
.stream()
156164
.map(BlobStoreIndexShardSnapshot.FileInfo::physicalName)
157165
.sorted(String::compareTo)
@@ -244,7 +252,16 @@ public IndexInput openInput(final String name, final IOContext context) throws I
244252
if (useCache && isExcludedFromCache(name) == false) {
245253
return new CachedBlobContainerIndexInput(this, fileInfo, context, inputStats);
246254
} else {
247-
return new DirectBlobContainerIndexInput(blobContainer, fileInfo, context, uncachedChunkSize, BufferedIndexInput.BUFFER_SIZE);
255+
return new DirectBlobContainerIndexInput(
256+
blobContainer(), fileInfo, context, getUncachedChunkSize(), BufferedIndexInput.BUFFER_SIZE);
257+
}
258+
}
259+
260+
private long getUncachedChunkSize() {
261+
if (uncachedChunkSize < 0) {
262+
return blobContainer().readBlobPreferredLength();
263+
} else {
264+
return uncachedChunkSize;
248265
}
249266
}
250267

@@ -255,7 +272,7 @@ private boolean isExcludedFromCache(String name) {
255272

256273
@Override
257274
public String toString() {
258-
return this.getClass().getSimpleName() + "@" + snapshot.snapshot() + " lockFactory=" + lockFactory;
275+
return this.getClass().getSimpleName() + "@" + snapshot().snapshot() + " lockFactory=" + lockFactory;
259276
}
260277

261278
public static Directory create(RepositoriesService repositories,
@@ -271,21 +288,23 @@ public static Directory create(RepositoriesService repositories,
271288
final BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
272289

273290
final IndexId indexId = new IndexId(indexSettings.getIndex().getName(), SNAPSHOT_INDEX_ID_SETTING.get(indexSettings.getSettings()));
274-
final BlobContainer blobContainer = blobStoreRepository.shardContainer(indexId, shardPath.getShardId().id());
275-
276291
final SnapshotId snapshotId = new SnapshotId(
277292
SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings.getSettings()),
278293
SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings())
279294
);
280-
final BlobStoreIndexShardSnapshot snapshot = blobStoreRepository.loadShardSnapshot(blobContainer, snapshotId);
295+
296+
final LazyInitializable<BlobContainer, RuntimeException> lazyBlobContainer
297+
= new LazyInitializable<>(() -> blobStoreRepository.shardContainer(indexId, shardPath.getShardId().id()));
298+
final LazyInitializable<BlobStoreIndexShardSnapshot, RuntimeException> lazySnapshot
299+
= new LazyInitializable<>(() -> blobStoreRepository.loadShardSnapshot(lazyBlobContainer.getOrCompute(), snapshotId));
281300

282301
final Path cacheDir = shardPath.getDataPath().resolve("snapshots").resolve(snapshotId.getUUID());
283302
Files.createDirectories(cacheDir);
284303

285304
return new InMemoryNoOpCommitDirectory(
286305
new SearchableSnapshotDirectory(
287-
blobContainer,
288-
snapshot,
306+
lazyBlobContainer::getOrCompute,
307+
lazySnapshot::getOrCompute,
289308
snapshotId,
290309
indexId,
291310
shardPath.getShardId(),

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java

+1
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@ public String toString() {
258258
}
259259

260260
private InputStream openBlobStream(int part, long pos, long length) throws IOException {
261+
assert assertCurrentThreadMayAccessBlobStore();
261262
return blobContainer.readBlob(fileInfo.partName(part), pos, length);
262263
}
263264

x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java

+12-5
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,13 @@ private void testDirectories(final CheckedBiConsumer<Directory, Directory, Excep
349349
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath())
350350
.put(Environment.PATH_REPO_SETTING.getKey(), repositoryPath.toAbsolutePath())
351351
.putList(Environment.PATH_DATA_SETTING.getKey(), tmpPaths()).build(), null),
352-
NamedXContentRegistry.EMPTY, BlobStoreTestUtil.mockClusterService(repositoryMetaData));
352+
NamedXContentRegistry.EMPTY, BlobStoreTestUtil.mockClusterService(repositoryMetaData)) {
353+
354+
@Override
355+
protected void assertSnapshotOrGenericThread() {
356+
// eliminate thread name check as we create repo manually on test/main threads
357+
}
358+
};
353359
repository.start();
354360
releasables.add(repository::stop);
355361

@@ -373,8 +379,8 @@ private void testDirectories(final CheckedBiConsumer<Directory, Directory, Excep
373379
releasables.add(cacheService);
374380
cacheService.start();
375381

376-
try (Directory snapshotDirectory = new SearchableSnapshotDirectory(blobContainer, snapshot, snapshotId, indexId, shardId,
377-
Settings.builder().put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), randomBoolean()).build(), () -> 0L,
382+
try (Directory snapshotDirectory = new SearchableSnapshotDirectory(() -> blobContainer, () -> snapshot, snapshotId, indexId,
383+
shardId, Settings.builder().put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), randomBoolean()).build(), () -> 0L,
378384
cacheService, cacheDir)) {
379385
consumer.accept(directory, snapshotDirectory);
380386
}
@@ -432,8 +438,9 @@ public void testClearCache() throws Exception {
432438
final ShardId shardId = new ShardId(new Index("_name", "_id"), 0);
433439

434440
final Path cacheDir = createTempDir();
435-
try (SearchableSnapshotDirectory directory = new SearchableSnapshotDirectory(blobContainer, snapshot, snapshotId, indexId,
436-
shardId, Settings.builder().put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true).build(), () -> 0L, cacheService, cacheDir)) {
441+
try (SearchableSnapshotDirectory directory = new SearchableSnapshotDirectory(() -> blobContainer, () -> snapshot, snapshotId,
442+
indexId, shardId, Settings.builder().put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true).build(), () -> 0L, cacheService,
443+
cacheDir)) {
437444

438445
final byte[] buffer = new byte[1024];
439446
for (int i = 0; i < randomIntBetween(10, 50); i++) {

x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputStatsTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ private static void executeTestCase(
451451

452452
try (CacheService ignored = cacheService;
453453
SearchableSnapshotDirectory directory =
454-
new SearchableSnapshotDirectory(blobContainer, snapshot, snapshotId, indexId, shardId, indexSettings,
454+
new SearchableSnapshotDirectory(() -> blobContainer, () -> snapshot, snapshotId, indexId, shardId, indexSettings,
455455
() -> fakeClock.addAndGet(FAKE_CLOCK_ADVANCE_NANOS), cacheService, createTempDir()) {
456456
@Override
457457
protected IndexInputStats createIndexInputStats(long fileLength) {

x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java

+10-7
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,18 @@ public void testRandomReads() throws IOException {
5555
final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(), 0L,
5656
List.of(new BlobStoreIndexShardSnapshot.FileInfo(blobName, metaData, new ByteSizeValue(input.length))), 0L, 0L, 0, 0L);
5757

58-
BlobContainer blobContainer = singleBlobContainer(blobName, input);
58+
final BlobContainer singleBlobContainer = singleBlobContainer(blobName, input);
59+
final BlobContainer blobContainer;
5960
if (input.length <= cacheService.getCacheSize()) {
60-
blobContainer = new CountingBlobContainer(blobContainer, cacheService.getRangeSize());
61+
blobContainer = new CountingBlobContainer(singleBlobContainer, cacheService.getRangeSize());
62+
} else {
63+
blobContainer = singleBlobContainer;
6164
}
6265

6366
final Path cacheDir = createTempDir();
64-
try (SearchableSnapshotDirectory directory = new SearchableSnapshotDirectory(blobContainer, snapshot, snapshotId, indexId,
65-
shardId, Settings.builder().put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true).build(), () -> 0L, cacheService, cacheDir
66-
)) {
67+
try (SearchableSnapshotDirectory directory = new SearchableSnapshotDirectory(() -> blobContainer, () -> snapshot,
68+
snapshotId, indexId, shardId, Settings.builder().put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true).build(),
69+
() -> 0L, cacheService, cacheDir)) {
6770
try (IndexInput indexInput = directory.openInput(fileName, newIOContext(random()))) {
6871
assertEquals(input.length, indexInput.length());
6972
assertEquals(0, indexInput.getFilePointer());
@@ -102,8 +105,8 @@ public void testThrowsEOFException() throws IOException {
102105
final BlobContainer blobContainer = singleBlobContainer(blobName, input);
103106

104107
final Path cacheDir = createTempDir();
105-
try (SearchableSnapshotDirectory searchableSnapshotDirectory = new SearchableSnapshotDirectory( blobContainer, snapshot,
106-
snapshotId, indexId, shardId, Settings.EMPTY, () -> 0L, cacheService, cacheDir)) {
108+
try (SearchableSnapshotDirectory searchableSnapshotDirectory = new SearchableSnapshotDirectory(() -> blobContainer,
109+
() -> snapshot, snapshotId, indexId, shardId, Settings.EMPTY, () -> 0L, cacheService, cacheDir)) {
107110
try (IndexInput indexInput = searchableSnapshotDirectory.openInput(fileName, newIOContext(random()))) {
108111
final byte[] buffer = new byte[input.length + 1];
109112
final IOException exception = expectThrows(IOException.class, () -> indexInput.readBytes(buffer, 0, buffer.length));

x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsRestTestCase.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -191,8 +191,7 @@ public void testClearCache() throws Exception {
191191
final long bytesInCacheBeforeClear = sumCachedBytesWritten.apply(searchableSnapshotStats(restoredIndexName));
192192
assertThat(bytesInCacheBeforeClear, greaterThan(0L));
193193

194-
final Request request = new Request(HttpPost.METHOD_NAME, restoredIndexName + "/_searchable_snapshots/cache/clear");
195-
assertOK(client().performRequest(request));
194+
clearCache(restoredIndexName);
196195

197196
final long bytesInCacheAfterClear = sumCachedBytesWritten.apply(searchableSnapshotStats(restoredIndexName));
198197
assertThat(bytesInCacheAfterClear, equalTo(bytesInCacheBeforeClear));
@@ -205,7 +204,18 @@ public void testClearCache() throws Exception {
205204
});
206205
}
207206

207+
private void clearCache(String restoredIndexName) throws IOException {
208+
final Request request = new Request(HttpPost.METHOD_NAME, restoredIndexName + "/_searchable_snapshots/cache/clear");
209+
assertOK(client().performRequest(request));
210+
}
211+
208212
public void assertSearchResults(String indexName, int numDocs, Boolean ignoreThrottled) throws IOException {
213+
214+
if (randomBoolean()) {
215+
logger.info("clearing searchable snapshots cache for [{}] before search", indexName);
216+
clearCache(indexName);
217+
}
218+
209219
final int randomTieBreaker = randomIntBetween(0, numDocs - 1);
210220
Map<String, Object> searchResults;
211221
switch (randomInt(3)) {

0 commit comments

Comments
 (0)