Skip to content

Commit 33f11e6

Browse files
authored
Fail shard if IndexShard#storeStats runs into an IOException (#32241)
Fail shard if IndexShard#storeStats runs into an IOException. Closes #29008
1 parent 2610022 commit 33f11e6

File tree

5 files changed

+113
-20
lines changed

5 files changed

+113
-20
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+1
Original file line numberDiff line numberDiff line change
@@ -917,6 +917,7 @@ public StoreStats storeStats() {
917917
try {
918918
return store.stats();
919919
} catch (IOException e) {
920+
failShard("Failing shard because of exception during storeStats", e);
920921
throw new ElasticsearchException("io exception while building 'store stats'", e);
921922
}
922923
}

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

+83-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.apache.lucene.search.TermQuery;
2828
import org.apache.lucene.search.TopDocs;
2929
import org.apache.lucene.store.AlreadyClosedException;
30+
import org.apache.lucene.store.Directory;
31+
import org.apache.lucene.store.FilterDirectory;
3032
import org.apache.lucene.store.IOContext;
3133
import org.apache.lucene.util.Constants;
3234
import org.elasticsearch.Version;
@@ -112,6 +114,7 @@
112114
import org.elasticsearch.test.FieldMaskingReader;
113115
import org.elasticsearch.test.VersionUtils;
114116
import org.elasticsearch.threadpool.ThreadPool;
117+
import org.elasticsearch.ElasticsearchException;
115118

116119
import java.io.IOException;
117120
import java.nio.charset.Charset;
@@ -138,6 +141,7 @@
138141
import java.util.function.BiConsumer;
139142
import java.util.function.Consumer;
140143
import java.util.function.LongFunction;
144+
import java.util.function.Supplier;
141145
import java.util.stream.Collectors;
142146
import java.util.stream.IntStream;
143147

@@ -1162,6 +1166,81 @@ public void testShardStats() throws IOException {
11621166
closeShards(shard);
11631167
}
11641168

1169+
1170+
public void testShardStatsWithFailures() throws IOException {
1171+
allowShardFailures();
1172+
final ShardId shardId = new ShardId("index", "_na_", 0);
1173+
final ShardRouting shardRouting = newShardRouting(shardId, "node", true, RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE, ShardRoutingState.INITIALIZING);
1174+
final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir());
1175+
1176+
1177+
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);
1178+
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
1179+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
1180+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
1181+
.build();
1182+
IndexMetaData metaData = IndexMetaData.builder(shardRouting.getIndexName())
1183+
.settings(settings)
1184+
.primaryTerm(0, 1)
1185+
.build();
1186+
1187+
// Override two Directory methods to make them fail at our will
1188+
// We use AtomicReference here to inject failure in the middle of the test not immediately
1189+
// We use Supplier<IOException> instead of IOException to produce meaningful stacktrace
1190+
// (remember stack trace is filled when exception is instantiated)
1191+
AtomicReference<Supplier<IOException>> exceptionToThrow = new AtomicReference<>();
1192+
AtomicBoolean throwWhenMarkingStoreCorrupted = new AtomicBoolean(false);
1193+
Directory directory = new FilterDirectory(newFSDirectory(shardPath.resolveIndex())) {
1194+
//fileLength method is called during storeStats try block
1195+
//it's not called when store is marked as corrupted
1196+
@Override
1197+
public long fileLength(String name) throws IOException {
1198+
Supplier<IOException> ex = exceptionToThrow.get();
1199+
if (ex == null) {
1200+
return super.fileLength(name);
1201+
} else {
1202+
throw ex.get();
1203+
}
1204+
}
1205+
1206+
//listAll method is called when marking store as corrupted
1207+
@Override
1208+
public String[] listAll() throws IOException {
1209+
Supplier<IOException> ex = exceptionToThrow.get();
1210+
if (throwWhenMarkingStoreCorrupted.get() && ex != null) {
1211+
throw ex.get();
1212+
} else {
1213+
return super.listAll();
1214+
}
1215+
}
1216+
};
1217+
1218+
try (Store store = createStore(shardId, new IndexSettings(metaData, Settings.EMPTY), directory)) {
1219+
IndexShard shard = newShard(shardRouting, shardPath, metaData, store,
1220+
null, new InternalEngineFactory(), () -> {
1221+
}, EMPTY_EVENT_LISTENER);
1222+
AtomicBoolean failureCallbackTriggered = new AtomicBoolean(false);
1223+
shard.addShardFailureCallback((ig)->failureCallbackTriggered.set(true));
1224+
1225+
recoverShardFromStore(shard);
1226+
1227+
final boolean corruptIndexException = randomBoolean();
1228+
1229+
if (corruptIndexException) {
1230+
exceptionToThrow.set(() -> new CorruptIndexException("Test CorruptIndexException", "Test resource"));
1231+
throwWhenMarkingStoreCorrupted.set(randomBoolean());
1232+
} else {
1233+
exceptionToThrow.set(() -> new IOException("Test IOException"));
1234+
}
1235+
ElasticsearchException e = expectThrows(ElasticsearchException.class, shard::storeStats);
1236+
assertTrue(failureCallbackTriggered.get());
1237+
1238+
if (corruptIndexException && !throwWhenMarkingStoreCorrupted.get()) {
1239+
assertTrue(store.isMarkedCorrupted());
1240+
}
1241+
}
1242+
}
1243+
11651244
public void testRefreshMetric() throws IOException {
11661245
IndexShard shard = newStartedShard();
11671246
assertThat(shard.refreshStats().getTotal(), equalTo(2L)); // refresh on: finalize and end of recovery
@@ -1868,6 +1947,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
18681947
ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE),
18691948
shard.shardPath(),
18701949
shard.indexSettings().getIndexMetaData(),
1950+
null,
18711951
wrapper,
18721952
new InternalEngineFactory(),
18731953
() -> {},
@@ -2020,6 +2100,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
20202100
ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE),
20212101
shard.shardPath(),
20222102
shard.indexSettings().getIndexMetaData(),
2103+
null,
20232104
wrapper,
20242105
new InternalEngineFactory(),
20252106
() -> {},
@@ -2506,7 +2587,7 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception {
25062587
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("false", "true", "checksum", "fix")))
25072588
.build();
25082589
final IndexShard newShard = newShard(shardRouting, indexShard.shardPath(), indexMetaData,
2509-
null, indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);
2590+
null, null, indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);
25102591

25112592
Store.MetadataSnapshot storeFileMetaDatas = newShard.snapshotStoreMetadata();
25122593
assertTrue("at least 2 files, commit and data: " + storeFileMetaDatas.toString(), storeFileMetaDatas.size() > 1);
@@ -3005,7 +3086,7 @@ public void testFlushOnInactive() throws Exception {
30053086
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);
30063087
AtomicBoolean markedInactive = new AtomicBoolean();
30073088
AtomicReference<IndexShard> primaryRef = new AtomicReference<>();
3008-
IndexShard primary = newShard(shardRouting, shardPath, metaData, null, new InternalEngineFactory(), () -> {
3089+
IndexShard primary = newShard(shardRouting, shardPath, metaData, null, null, new InternalEngineFactory(), () -> {
30093090
}, new IndexEventListener() {
30103091
@Override
30113092
public void onShardInactive(IndexShard indexShard) {

server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java

+1
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ public void testRestoreSnapshotWithExistingFiles() throws IOException {
105105
shard.shardPath(),
106106
shard.indexSettings().getIndexMetaData(),
107107
null,
108+
null,
108109
new InternalEngineFactory(),
109110
() -> {},
110111
EMPTY_EVENT_LISTENER);

test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -259,13 +259,14 @@ assert shardRoutings().stream()
259259

260260
public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardPath, final String nodeId) throws IOException {
261261
final ShardRouting shardRouting = TestShardRouting.newShardRouting(
262-
shardId,
263-
nodeId,
264-
false, ShardRoutingState.INITIALIZING,
265-
RecoverySource.PeerRecoverySource.INSTANCE);
262+
shardId,
263+
nodeId,
264+
false, ShardRoutingState.INITIALIZING,
265+
RecoverySource.PeerRecoverySource.INSTANCE);
266266

267267
final IndexShard newReplica =
268-
newShard(shardRouting, shardPath, indexMetaData, null, getEngineFactory(shardRouting), () -> {}, EMPTY_EVENT_LISTENER);
268+
newShard(shardRouting, shardPath, indexMetaData, null, null, getEngineFactory(shardRouting),
269+
() -> {}, EMPTY_EVENT_LISTENER);
269270
replicas.add(newReplica);
270271
updateAllocationIDsOnPrimary();
271272
return newReplica;

test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

+22-13
Original file line numberDiff line numberDiff line change
@@ -163,15 +163,20 @@ public Settings threadPoolSettings() {
163163
return Settings.EMPTY;
164164
}
165165

166-
private Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException {
167-
final ShardId shardId = shardPath.getShardId();
166+
167+
protected Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException {
168+
return createStore(shardPath.getShardId(), indexSettings, newFSDirectory(shardPath.resolveIndex()));
169+
}
170+
171+
protected Store createStore(ShardId shardId, IndexSettings indexSettings, Directory directory) throws IOException {
168172
final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) {
169173
@Override
170174
public Directory newDirectory() throws IOException {
171-
return newFSDirectory(shardPath.resolveIndex());
175+
return directory;
172176
}
173177
};
174178
return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId));
179+
175180
}
176181

177182
/**
@@ -284,29 +289,32 @@ protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData,
284289
final ShardId shardId = routing.shardId();
285290
final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir());
286291
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);
287-
return newShard(routing, shardPath, indexMetaData, indexSearcherWrapper, engineFactory, globalCheckpointSyncer,
292+
return newShard(routing, shardPath, indexMetaData, null, indexSearcherWrapper, engineFactory, globalCheckpointSyncer,
288293
EMPTY_EVENT_LISTENER, listeners);
289294
}
290295

291296
/**
292297
* creates a new initializing shard.
293-
* @param routing shard routing to use
294-
* @param shardPath path to use for shard data
295-
* @param indexMetaData indexMetaData for the shard, including any mapping
296-
* @param indexSearcherWrapper an optional wrapper to be used during searchers
297-
* @param globalCheckpointSyncer callback for syncing global checkpoints
298-
* @param indexEventListener index even listener
299-
* @param listeners an optional set of listeners to add to the shard
298+
* @param routing shard routing to use
299+
* @param shardPath path to use for shard data
300+
* @param indexMetaData indexMetaData for the shard, including any mapping
301+
* @param store an optional custom store to use. If null a default file based store will be created
302+
* @param indexSearcherWrapper an optional wrapper to be used during searchers
303+
* @param globalCheckpointSyncer callback for syncing global checkpoints
304+
* @param indexEventListener index event listener
305+
* @param listeners an optional set of listeners to add to the shard
300306
*/
301307
protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMetaData indexMetaData,
302-
@Nullable IndexSearcherWrapper indexSearcherWrapper,
308+
@Nullable Store store, @Nullable IndexSearcherWrapper indexSearcherWrapper,
303309
@Nullable EngineFactory engineFactory,
304310
Runnable globalCheckpointSyncer,
305311
IndexEventListener indexEventListener, IndexingOperationListener... listeners) throws IOException {
306312
final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build();
307313
final IndexSettings indexSettings = new IndexSettings(indexMetaData, nodeSettings);
308314
final IndexShard indexShard;
309-
final Store store = createStore(indexSettings, shardPath);
315+
if (store == null) {
316+
store = createStore(indexSettings, shardPath);
317+
}
310318
boolean success = false;
311319
try {
312320
IndexCache indexCache = new IndexCache(indexSettings, new DisabledQueryCache(indexSettings), null);
@@ -357,6 +365,7 @@ protected IndexShard reinitShard(IndexShard current, ShardRouting routing, Index
357365
current.shardPath(),
358366
current.indexSettings().getIndexMetaData(),
359367
null,
368+
null,
360369
current.engineFactory,
361370
current.getGlobalCheckpointSyncer(),
362371
EMPTY_EVENT_LISTENER, listeners);

0 commit comments

Comments
 (0)