Skip to content

Commit 104ba40

Browse files
authored
Trigger explicit loading of blob container and snapshot in the pre-recovery hook (#54729)
In #53961 we defer the construction of Searchable Snapshot Directory's BlobContainer and a BlobStoreIndexShardSnapshot instances so that these objects are created when they are first accessed, which we expect to be during the recovery process. At the same time, assertions were added to ensure that the construction is effectively executed under a generic or snapshot thread. Sadly, this is not always the case because there is always a short window of time between the creation of the IndexShard and the time the objects are created during the recovery process. It is also possible that other components of Elasticsearch trigger the creation of the blob container and snapshot. We identified the following places: - computing avg shard size of index shards in IndexService.createShard() (this is triggered when relocating a primary shard under the cluster state applier thread) - computing indices stats in TransportIndicesStatsAction which calls indexShard.storeStats() which calls estimateSizeInBytes() (this is triggered by InternalClusterInfoService under the management thread pool) - computing shard store metadata in IndexShard.snapshotStoreMetadata while calls failIfCorrupted(Directory) (this is triggered by TransportNodesListShardStoreMetadata, executed under the fetch_shard_store thread pool) - TransportNodesListGatewayStartedShards should also use failIfCorrupted(Directory) at some point (triggered by the GatewayAllocator and executed under the fetch_shard_started thread pool) This commit changes the way BlobContainer and a BlobStoreIndexShardSnapshot instances are created so that it does not happen on first access anymore but the objects are now created using a specific loadSnapshot() method. This method is explicitly called during the pre-recovery phase. Until this method is called, the SearchableSnapshotDirectory acts as if it was empty: the listAll() method returns an empty array. Having this behavior allows the identified access points to not fail and not trigger the snapshot loading before we explicitly load it in the pre-recovery hook.
1 parent 676d441 commit 104ba40

File tree

6 files changed

+131
-37
lines changed

6 files changed

+131
-37
lines changed

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java

Lines changed: 84 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.apache.lucene.index.IndexFileNames;
99
import org.apache.lucene.store.BaseDirectory;
1010
import org.apache.lucene.store.Directory;
11+
import org.apache.lucene.store.FilterDirectory;
1112
import org.apache.lucene.store.IOContext;
1213
import org.apache.lucene.store.IndexInput;
1314
import org.apache.lucene.store.IndexOutput;
@@ -32,6 +33,7 @@
3233
import org.elasticsearch.repositories.Repository;
3334
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
3435
import org.elasticsearch.snapshots.SnapshotId;
36+
import org.elasticsearch.threadpool.ThreadPool;
3537
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
3638

3739
import java.io.FileNotFoundException;
@@ -41,6 +43,7 @@
4143
import java.util.Collection;
4244
import java.util.Collections;
4345
import java.util.HashSet;
46+
import java.util.List;
4447
import java.util.Map;
4548
import java.util.Objects;
4649
import java.util.Set;
@@ -49,8 +52,8 @@
4952
import java.util.function.Supplier;
5053

5154
import static org.apache.lucene.store.BufferedIndexInput.bufferSize;
52-
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING;
5355
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING;
56+
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING;
5457
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING;
5558
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_REPOSITORY_SETTING;
5659
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
@@ -70,8 +73,8 @@
7073
*/
7174
public class SearchableSnapshotDirectory extends BaseDirectory {
7275

73-
private final Supplier<BlobContainer> blobContainer;
74-
private final Supplier<BlobStoreIndexShardSnapshot> snapshot;
76+
private final Supplier<BlobContainer> blobContainerSupplier;
77+
private final Supplier<BlobStoreIndexShardSnapshot> snapshotSupplier;
7578
private final SnapshotId snapshotId;
7679
private final IndexId indexId;
7780
private final ShardId shardId;
@@ -84,6 +87,11 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
8487
private final Path cacheDir;
8588
private final AtomicBoolean closed;
8689

90+
// volatile fields are updated once under `this` lock, all together, iff loaded is not true.
91+
private volatile BlobStoreIndexShardSnapshot snapshot;
92+
private volatile BlobContainer blobContainer;
93+
private volatile boolean loaded;
94+
8795
public SearchableSnapshotDirectory(
8896
Supplier<BlobContainer> blobContainer,
8997
Supplier<BlobStoreIndexShardSnapshot> snapshot,
@@ -96,8 +104,8 @@ public SearchableSnapshotDirectory(
96104
Path cacheDir
97105
) {
98106
super(new SingleInstanceLockFactory());
99-
this.snapshot = Objects.requireNonNull(snapshot);
100-
this.blobContainer = Objects.requireNonNull(blobContainer);
107+
this.snapshotSupplier = Objects.requireNonNull(snapshot);
108+
this.blobContainerSupplier = Objects.requireNonNull(blobContainer);
101109
this.snapshotId = Objects.requireNonNull(snapshotId);
102110
this.indexId = Objects.requireNonNull(indexId);
103111
this.shardId = Objects.requireNonNull(shardId);
@@ -109,20 +117,71 @@ public SearchableSnapshotDirectory(
109117
this.useCache = SNAPSHOT_CACHE_ENABLED_SETTING.get(indexSettings);
110118
this.excludedFileTypes = new HashSet<>(SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING.get(indexSettings));
111119
this.uncachedChunkSize = SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING.get(indexSettings).getBytes();
120+
this.loaded = false;
121+
assert invariant();
122+
}
123+
124+
private synchronized boolean invariant() {
125+
assert loaded != (snapshot == null);
126+
assert loaded != (blobContainer == null);
127+
return true;
128+
}
129+
130+
protected final boolean assertCurrentThreadMayLoadSnapshot() {
131+
final String threadName = Thread.currentThread().getName();
132+
assert threadName.contains('[' + ThreadPool.Names.GENERIC + ']')
133+
// Unit tests access the blob store on the main test thread; simplest just to permit this rather than have them override this
134+
// method somehow.
135+
|| threadName.startsWith("TEST-") : "current thread [" + Thread.currentThread() + "] may not load " + snapshotId;
136+
return true;
137+
}
138+
139+
/**
140+
* Loads the snapshot if and only if it the snapshot is not loaded yet.
141+
*
142+
* @return true if the snapshot was loaded by executing this method, false otherwise
143+
*/
144+
public boolean loadSnapshot() {
145+
boolean alreadyLoaded = this.loaded;
146+
if (alreadyLoaded == false) {
147+
synchronized (this) {
148+
alreadyLoaded = this.loaded;
149+
if (alreadyLoaded == false) {
150+
this.blobContainer = blobContainerSupplier.get();
151+
this.snapshot = snapshotSupplier.get();
152+
this.loaded = true;
153+
}
154+
}
155+
}
156+
assert assertCurrentThreadMayLoadSnapshot();
157+
assert invariant();
158+
return alreadyLoaded == false;
112159
}
113160

161+
@Nullable
114162
public BlobContainer blobContainer() {
115-
final BlobContainer blobContainer = this.blobContainer.get();
163+
final BlobContainer blobContainer = this.blobContainer;
116164
assert blobContainer != null;
117165
return blobContainer;
118166
}
119167

168+
@Nullable
120169
public BlobStoreIndexShardSnapshot snapshot() {
121-
final BlobStoreIndexShardSnapshot snapshot = this.snapshot.get();
170+
final BlobStoreIndexShardSnapshot snapshot = this.snapshot;
122171
assert snapshot != null;
123172
return snapshot;
124173
}
125174

175+
private List<BlobStoreIndexShardSnapshot.FileInfo> files() {
176+
if (loaded == false) {
177+
return List.of();
178+
}
179+
final List<BlobStoreIndexShardSnapshot.FileInfo> files = snapshot().indexFiles();
180+
assert files != null;
181+
assert files.size() > 0;
182+
return files;
183+
}
184+
126185
public SnapshotId getSnapshotId() {
127186
return snapshotId;
128187
}
@@ -145,8 +204,7 @@ public IndexInputStats getStats(String fileName) {
145204
}
146205

147206
private BlobStoreIndexShardSnapshot.FileInfo fileInfo(final String name) throws FileNotFoundException {
148-
return snapshot().indexFiles()
149-
.stream()
207+
return files().stream()
150208
.filter(fileInfo -> fileInfo.physicalName().equals(name))
151209
.findFirst()
152210
.orElseThrow(() -> new FileNotFoundException(name));
@@ -155,11 +213,7 @@ private BlobStoreIndexShardSnapshot.FileInfo fileInfo(final String name) throws
155213
@Override
156214
public final String[] listAll() {
157215
ensureOpen();
158-
return snapshot().indexFiles()
159-
.stream()
160-
.map(BlobStoreIndexShardSnapshot.FileInfo::physicalName)
161-
.sorted(String::compareTo)
162-
.toArray(String[]::new);
216+
return files().stream().map(BlobStoreIndexShardSnapshot.FileInfo::physicalName).sorted(String::compareTo).toArray(String[]::new);
163217
}
164218

165219
@Override
@@ -274,7 +328,7 @@ private boolean isExcludedFromCache(String name) {
274328

275329
@Override
276330
public String toString() {
277-
return this.getClass().getSimpleName() + "@" + snapshot().snapshot() + " lockFactory=" + lockFactory;
331+
return this.getClass().getSimpleName() + "@snapshotId=" + snapshotId + " lockFactory=" + lockFactory;
278332
}
279333

280334
public static Directory create(
@@ -321,4 +375,19 @@ public static Directory create(
321375
)
322376
);
323377
}
378+
379+
public static SearchableSnapshotDirectory unwrapDirectory(Directory dir) {
380+
while (dir != null) {
381+
if (dir instanceof SearchableSnapshotDirectory) {
382+
return (SearchableSnapshotDirectory) dir;
383+
} else if (dir instanceof InMemoryNoOpCommitDirectory) {
384+
dir = ((InMemoryNoOpCommitDirectory) dir).getRealDirectory();
385+
} else if (dir instanceof FilterDirectory) {
386+
dir = ((FilterDirectory) dir).getDelegate();
387+
} else {
388+
dir = null;
389+
}
390+
}
391+
return null;
392+
}
324393
}

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@
66
package org.elasticsearch.xpack.searchablesnapshots;
77

88
import org.apache.lucene.index.SegmentInfos;
9+
import org.elasticsearch.cluster.routing.RecoverySource;
910
import org.elasticsearch.index.IndexSettings;
1011
import org.elasticsearch.index.seqno.SequenceNumbers;
1112
import org.elasticsearch.index.shard.IndexEventListener;
1213
import org.elasticsearch.index.shard.IndexShard;
1314
import org.elasticsearch.index.shard.ShardId;
15+
import org.elasticsearch.index.store.SearchableSnapshotDirectory;
1416
import org.elasticsearch.index.translog.Translog;
1517
import org.elasticsearch.index.translog.TranslogException;
1618
import org.elasticsearch.threadpool.ThreadPool;
@@ -24,9 +26,22 @@ public class SearchableSnapshotIndexEventListener implements IndexEventListener
2426
@Override
2527
public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexSettings) {
2628
assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC);
29+
ensureSnapshotIsLoaded(indexShard);
2730
associateNewEmptyTranslogWithIndex(indexShard);
2831
}
2932

33+
private static void ensureSnapshotIsLoaded(IndexShard indexShard) {
34+
final SearchableSnapshotDirectory directory = SearchableSnapshotDirectory.unwrapDirectory(indexShard.store().directory());
35+
assert directory != null;
36+
37+
final boolean success = directory.loadSnapshot();
38+
assert directory.listAll().length > 0 : "expecting directory listing to be non-empty";
39+
assert success
40+
|| indexShard.routingEntry()
41+
.recoverySource()
42+
.getType() == RecoverySource.Type.PEER : "loading snapshot must not be called twice unless we are retrying a peer recovery";
43+
}
44+
3045
private static void associateNewEmptyTranslogWithIndex(IndexShard indexShard) {
3146
final ShardId shardId = indexShard.shardId();
3247
assert isSearchableSnapshotStore(indexShard.indexSettings().getSettings()) : "Expected a searchable snapshot shard " + shardId;

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
*/
66
package org.elasticsearch.xpack.searchablesnapshots.action;
77

8-
import org.apache.lucene.store.Directory;
9-
import org.apache.lucene.store.FilterDirectory;
108
import org.elasticsearch.ResourceNotFoundException;
119
import org.elasticsearch.action.support.ActionFilters;
1210
import org.elasticsearch.action.support.broadcast.BroadcastRequest;
@@ -20,11 +18,9 @@
2018
import org.elasticsearch.cluster.routing.ShardRouting;
2119
import org.elasticsearch.cluster.routing.ShardsIterator;
2220
import org.elasticsearch.cluster.service.ClusterService;
23-
import org.elasticsearch.common.Nullable;
2421
import org.elasticsearch.common.io.stream.Writeable;
2522
import org.elasticsearch.common.settings.Settings;
2623
import org.elasticsearch.index.shard.IndexShard;
27-
import org.elasticsearch.index.store.InMemoryNoOpCommitDirectory;
2824
import org.elasticsearch.index.store.SearchableSnapshotDirectory;
2925
import org.elasticsearch.indices.IndicesService;
3026
import org.elasticsearch.license.XPackLicenseState;
@@ -37,6 +33,7 @@
3733
import java.util.Objects;
3834

3935
import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;
36+
import static org.elasticsearch.index.store.SearchableSnapshotDirectory.unwrapDirectory;
4037
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_DIRECTORY_FACTORY_KEY;
4138

4239
public abstract class AbstractTransportSearchableSnapshotsAction<
@@ -123,20 +120,4 @@ protected abstract ShardOperationResult executeShardOperation(
123120
ShardRouting shardRouting,
124121
SearchableSnapshotDirectory directory
125122
) throws IOException;
126-
127-
@Nullable
128-
private static SearchableSnapshotDirectory unwrapDirectory(Directory dir) {
129-
while (dir != null) {
130-
if (dir instanceof SearchableSnapshotDirectory) {
131-
return (SearchableSnapshotDirectory) dir;
132-
} else if (dir instanceof InMemoryNoOpCommitDirectory) {
133-
dir = ((InMemoryNoOpCommitDirectory) dir).getRealDirectory();
134-
} else if (dir instanceof FilterDirectory) {
135-
dir = ((FilterDirectory) dir).getDelegate();
136-
} else {
137-
dir = null;
138-
}
139-
}
140-
return null;
141-
}
142123
}

x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import static org.hamcrest.Matchers.equalTo;
3939
import static org.hamcrest.Matchers.greaterThan;
4040
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
41+
import static org.hamcrest.Matchers.is;
4142
import static org.hamcrest.Matchers.lessThanOrEqualTo;
4243
import static org.hamcrest.Matchers.notNullValue;
4344
import static org.hamcrest.Matchers.nullValue;
@@ -593,6 +594,11 @@ protected IndexInputStats createIndexInputStats(long fileLength) {
593594
cacheService.start();
594595
assertThat(directory.getStats(fileName), nullValue());
595596

597+
final boolean loaded = directory.loadSnapshot();
598+
assertThat("Failed to load snapshot", loaded, is(true));
599+
assertThat("Snapshot should be loaded", directory.snapshot(), notNullValue());
600+
assertThat("BlobContainer should be loaded", directory.blobContainer(), notNullValue());
601+
596602
test.apply(fileName, fileContent, directory);
597603
}
598604
}

x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,9 @@
9696
import static org.hamcrest.Matchers.equalTo;
9797
import static org.hamcrest.Matchers.greaterThan;
9898
import static org.hamcrest.Matchers.hasSize;
99+
import static org.hamcrest.Matchers.is;
99100
import static org.hamcrest.Matchers.lessThanOrEqualTo;
101+
import static org.hamcrest.Matchers.sameInstance;
100102

101103
public class SearchableSnapshotDirectoryTests extends ESTestCase {
102104

@@ -431,7 +433,7 @@ protected void assertSnapshotOrGenericThread() {
431433
cacheService.start();
432434

433435
try (
434-
Directory snapshotDirectory = new SearchableSnapshotDirectory(
436+
SearchableSnapshotDirectory snapshotDirectory = new SearchableSnapshotDirectory(
435437
() -> blobContainer,
436438
() -> snapshot,
437439
snapshotId,
@@ -443,6 +445,11 @@ protected void assertSnapshotOrGenericThread() {
443445
cacheDir
444446
)
445447
) {
448+
final boolean loaded = snapshotDirectory.loadSnapshot();
449+
assertThat("Failed to load snapshot", loaded, is(true));
450+
assertThat("Snapshot should be loaded", snapshotDirectory.snapshot(), sameInstance(snapshot));
451+
assertThat("BlobContainer should be loaded", snapshotDirectory.blobContainer(), sameInstance(blobContainer));
452+
446453
consumer.accept(directory, snapshotDirectory);
447454
}
448455
} finally {
@@ -520,6 +527,11 @@ public void testClearCache() throws Exception {
520527
)
521528
) {
522529

530+
final boolean loaded = directory.loadSnapshot();
531+
assertThat("Failed to load snapshot", loaded, is(true));
532+
assertThat("Snapshot should be loaded", directory.snapshot(), sameInstance(snapshot));
533+
assertThat("BlobContainer should be loaded", directory.blobContainer(), sameInstance(blobContainer));
534+
523535
final byte[] buffer = new byte[1024];
524536
for (int i = 0; i < randomIntBetween(10, 50); i++) {
525537
final BlobStoreIndexShardSnapshot.FileInfo fileInfo = randomFrom(randomFiles);

x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import static org.elasticsearch.index.store.cache.TestUtils.singleSplitBlobContainer;
3737
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING;
3838
import static org.hamcrest.Matchers.equalTo;
39+
import static org.hamcrest.Matchers.is;
40+
import static org.hamcrest.Matchers.notNullValue;
3941

4042
public class CachedBlobContainerIndexInputTests extends ESIndexInputTestCase {
4143

@@ -88,6 +90,11 @@ public void testRandomReads() throws IOException {
8890
cacheDir
8991
)
9092
) {
93+
final boolean loaded = directory.loadSnapshot();
94+
assertThat("Failed to load snapshot", loaded, is(true));
95+
assertThat("Snapshot should be loaded", directory.snapshot(), notNullValue());
96+
assertThat("BlobContainer should be loaded", directory.blobContainer(), notNullValue());
97+
9198
try (IndexInput indexInput = directory.openInput(fileName, newIOContext(random()))) {
9299
assertEquals(input.length, indexInput.length());
93100
assertEquals(0, indexInput.getFilePointer());
@@ -152,6 +159,11 @@ public void testThrowsEOFException() throws IOException {
152159
cacheDir
153160
)
154161
) {
162+
final boolean loaded = searchableSnapshotDirectory.loadSnapshot();
163+
assertThat("Failed to load snapshot", loaded, is(true));
164+
assertThat("Snapshot should be loaded", searchableSnapshotDirectory.snapshot(), notNullValue());
165+
assertThat("BlobContainer should be loaded", searchableSnapshotDirectory.blobContainer(), notNullValue());
166+
155167
try (IndexInput indexInput = searchableSnapshotDirectory.openInput(fileName, newIOContext(random()))) {
156168
final byte[] buffer = new byte[input.length + 1];
157169
final IOException exception = expectThrows(IOException.class, () -> indexInput.readBytes(buffer, 0, buffer.length));
@@ -294,5 +306,4 @@ public void close() throws IOException {
294306
this.container.totalBytes.add(bytesRead);
295307
}
296308
}
297-
298309
}

0 commit comments

Comments
 (0)