Skip to content

Commit 2a0b430

Browse files
committed
Closed shard should never open new engine (#47186)
We should not open new engines if a shard is closed. We break this assumption in #45263 where we stop verifying the shard state before creating an engine but only before swapping the engine reference. We can fail to snapshot the store metadata or checkIndex a closed shard if there's some IndexWriter holding the index lock. Closes #47060
1 parent 342a8c2 commit 2a0b430

File tree

3 files changed

+42
-57
lines changed

3 files changed

+42
-57
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+25-49
Original file line numberDiff line numberDiff line change
@@ -1190,11 +1190,9 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
11901190
synchronized (engineMutex) {
11911191
// if the engine is not running, we can access the store directly, but we need to make sure no one starts
11921192
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine.
1193-
synchronized (mutex) {
1194-
final Engine engine = getEngineOrNull();
1195-
if (engine != null) {
1196-
indexCommit = engine.acquireLastIndexCommit(false);
1197-
}
1193+
final Engine engine = getEngineOrNull();
1194+
if (engine != null) {
1195+
indexCommit = engine.acquireLastIndexCommit(false);
11981196
}
11991197
if (indexCommit == null) {
12001198
return store.getMetadata(null, true);
@@ -1318,9 +1316,11 @@ public CacheHelper getReaderCacheHelper() {
13181316
}
13191317

13201318
public void close(String reason, boolean flushEngine) throws IOException {
1321-
synchronized (mutex) {
1319+
synchronized (engineMutex) {
13221320
try {
1323-
changeState(IndexShardState.CLOSED, reason);
1321+
synchronized (mutex) {
1322+
changeState(IndexShardState.CLOSED, reason);
1323+
}
13241324
} finally {
13251325
final Engine engine = this.currentEngineReference.getAndSet(null);
13261326
try {
@@ -1375,6 +1375,7 @@ public void prepareForIndexRecovery() {
13751375
* This is the first operation after the local checkpoint of the safe commit if exists.
13761376
*/
13771377
public long recoverLocallyUpToGlobalCheckpoint() {
1378+
assert Thread.holdsLock(mutex) == false : "recover locally under mutex";
13781379
if (state != IndexShardState.RECOVERING) {
13791380
throw new IndexShardNotRecoveringException(shardId, state);
13801381
}
@@ -1426,7 +1427,7 @@ public long recoverLocallyUpToGlobalCheckpoint() {
14261427
getEngine().recoverFromTranslog(translogRecoveryRunner, globalCheckpoint);
14271428
logger.trace("shard locally recovered up to {}", getEngine().getSeqNoStats(globalCheckpoint));
14281429
} finally {
1429-
synchronized (mutex) {
1430+
synchronized (engineMutex) {
14301431
IOUtils.close(currentEngineReference.getAndSet(null));
14311432
}
14321433
}
@@ -1601,23 +1602,15 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
16011602
: "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource()
16021603
+ "] but got " + getRetentionLeases();
16031604
synchronized (engineMutex) {
1605+
assert currentEngineReference.get() == null : "engine is running";
1606+
verifyNotClosed();
16041607
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
16051608
final Engine newEngine = engineFactory.newReadWriteEngine(config);
1606-
synchronized (mutex) {
1607-
try {
1608-
verifyNotClosed();
1609-
assert currentEngineReference.get() == null : "engine is running";
1610-
onNewEngine(newEngine);
1611-
currentEngineReference.set(newEngine);
1612-
// We set active because we are now writing operations to the engine; this way,
1613-
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
1614-
active.set(true);
1615-
} finally {
1616-
if (currentEngineReference.get() != newEngine) {
1617-
newEngine.close();
1618-
}
1619-
}
1620-
}
1609+
onNewEngine(newEngine);
1610+
currentEngineReference.set(newEngine);
1611+
// We set active because we are now writing operations to the engine; this way,
1612+
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
1613+
active.set(true);
16211614
}
16221615
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
16231616
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
@@ -1648,7 +1641,8 @@ private void onNewEngine(Engine newEngine) {
16481641
* called if recovery has to be restarted after network error / delay **
16491642
*/
16501643
public void performRecoveryRestart() throws IOException {
1651-
synchronized (mutex) {
1644+
assert Thread.holdsLock(mutex) == false : "restart recovery under mutex";
1645+
synchronized (engineMutex) {
16521646
assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners";
16531647
IOUtils.close(currentEngineReference.getAndSet(null));
16541648
resetRecoveryStage();
@@ -3333,7 +3327,7 @@ public ParsedDocument newNoopTombstoneDoc(String reason) {
33333327
* Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint.
33343328
*/
33353329
void resetEngineToGlobalCheckpoint() throws IOException {
3336-
assert Thread.holdsLock(engineMutex) == false : "resetting engine under mutex";
3330+
assert Thread.holdsLock(mutex) == false : "resetting engine under mutex";
33373331
assert getActiveOperationsCount() == OPERATIONS_BLOCKED
33383332
: "resetting engine without blocking operations; active operations are [" + getActiveOperations() + ']';
33393333
sync(); // persist the global checkpoint to disk
@@ -3346,6 +3340,7 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
33463340
final long globalCheckpoint = getLastKnownGlobalCheckpoint();
33473341
assert globalCheckpoint == getLastSyncedGlobalCheckpoint();
33483342
synchronized (engineMutex) {
3343+
verifyNotClosed();
33493344
// we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata,
33503345
// acquireXXXCommit and close works.
33513346
final Engine readOnlyEngine =
@@ -3373,7 +3368,7 @@ public IndexCommitRef acquireSafeIndexCommit() {
33733368

33743369
@Override
33753370
public void close() throws IOException {
3376-
assert Thread.holdsLock(mutex);
3371+
assert Thread.holdsLock(engineMutex);
33773372

33783373
Engine newEngine = newEngineReference.get();
33793374
if (newEngine == currentEngineReference.get()) {
@@ -3383,36 +3378,17 @@ public void close() throws IOException {
33833378
IOUtils.close(super::close, newEngine);
33843379
}
33853380
};
3386-
synchronized (mutex) {
3387-
try {
3388-
verifyNotClosed();
3389-
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
3390-
} finally {
3391-
if (currentEngineReference.get() != readOnlyEngine) {
3392-
readOnlyEngine.close();
3393-
}
3394-
}
3395-
}
3396-
final Engine newReadWriteEngine = engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker));
3397-
synchronized (mutex) {
3398-
try {
3399-
verifyNotClosed();
3400-
newEngineReference.set(newReadWriteEngine);
3401-
onNewEngine(newReadWriteEngine);
3402-
} finally {
3403-
if (newEngineReference.get() != newReadWriteEngine) {
3404-
newReadWriteEngine.close(); // shard was closed
3405-
}
3406-
}
3407-
}
3381+
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
3382+
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
3383+
onNewEngine(newEngineReference.get());
34083384
}
34093385
final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery(
34103386
engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
34113387
// TODO: add a dedicate recovery stats for the reset translog
34123388
});
34133389
newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint);
34143390
newEngineReference.get().refresh("reset_engine");
3415-
synchronized (mutex) {
3391+
synchronized (engineMutex) {
34163392
verifyNotClosed();
34173393
IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get()));
34183394
// We set active because we are now writing operations to the engine; this way,

test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java

+10-3
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@
134134
import java.util.Random;
135135
import java.util.Set;
136136
import java.util.TimeZone;
137+
import java.util.concurrent.CopyOnWriteArrayList;
137138
import java.util.concurrent.ExecutorService;
138139
import java.util.concurrent.TimeUnit;
139140
import java.util.concurrent.atomic.AtomicInteger;
@@ -537,15 +538,21 @@ public final void ensureAllSearchContextsReleased() throws Exception {
537538
// TODO: can we do this cleaner???
538539

539540
/** MockFSDirectoryService sets this: */
540-
public static boolean checkIndexFailed;
541+
public static final List<Exception> checkIndexFailures = new CopyOnWriteArrayList<>();
541542

542543
@Before
543544
public final void resetCheckIndexStatus() throws Exception {
544-
checkIndexFailed = false;
545+
checkIndexFailures.clear();
545546
}
546547

547548
public final void ensureCheckIndexPassed() {
548-
assertFalse("at least one shard failed CheckIndex", checkIndexFailed);
549+
if (checkIndexFailures.isEmpty() == false) {
550+
final AssertionError e = new AssertionError("at least one shard failed CheckIndex");
551+
for (Exception failure : checkIndexFailures) {
552+
e.addSuppressed(failure);
553+
}
554+
throw e;
555+
}
549556
}
550557

551558
// -----------------------------------------------------------------

test/framework/src/main/java/org/elasticsearch/test/store/MockFSDirectoryFactory.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -84,17 +84,19 @@ public static void checkIndex(Logger logger, Store store, ShardId shardId) {
8484
CheckIndex.Status status = store.checkIndex(out);
8585
out.flush();
8686
if (!status.clean) {
87-
ESTestCase.checkIndexFailed = true;
88-
logger.warn("check index [failure] index files={}\n{}", Arrays.toString(dir.listAll()), os.bytes().utf8ToString());
89-
throw new IOException("index check failure");
87+
IOException failure = new IOException("failed to check index for shard " + shardId +
88+
";index files [" + Arrays.toString(dir.listAll()) + "] os [" + os.bytes().utf8ToString() + "]");
89+
ESTestCase.checkIndexFailures.add(failure);
90+
throw failure;
9091
} else {
9192
if (logger.isDebugEnabled()) {
9293
logger.debug("check index [success]\n{}", os.bytes().utf8ToString());
9394
}
9495
}
9596
} catch (LockObtainFailedException e) {
96-
ESTestCase.checkIndexFailed = true;
97-
throw new IllegalStateException("IndexWriter is still open on shard " + shardId, e);
97+
IllegalStateException failure = new IllegalStateException("IndexWriter is still open on shard " + shardId, e);
98+
ESTestCase.checkIndexFailures.add(failure);
99+
throw failure;
98100
}
99101
} catch (Exception e) {
100102
logger.warn("failed to check index", e);

0 commit comments

Comments
 (0)