Skip to content

Commit d029e18

Browse files
authored
Closed shard should never open new engine (#47186)
We should not open new engines if a shard is closed. We break this assumption in #45263 where we stop verifying the shard state before creating an engine but only before swapping the engine reference. We can fail to snapshot the store metadata or checkIndex a closed shard if there's some IndexWriter holding the index lock. Closes #47060
1 parent 13aef72 commit d029e18

File tree

3 files changed

+42
-57
lines changed

3 files changed

+42
-57
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 25 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1158,11 +1158,9 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
11581158
synchronized (engineMutex) {
11591159
// if the engine is not running, we can access the store directly, but we need to make sure no one starts
11601160
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine.
1161-
synchronized (mutex) {
1162-
final Engine engine = getEngineOrNull();
1163-
if (engine != null) {
1164-
indexCommit = engine.acquireLastIndexCommit(false);
1165-
}
1161+
final Engine engine = getEngineOrNull();
1162+
if (engine != null) {
1163+
indexCommit = engine.acquireLastIndexCommit(false);
11661164
}
11671165
if (indexCommit == null) {
11681166
return store.getMetadata(null, true);
@@ -1286,9 +1284,11 @@ public CacheHelper getReaderCacheHelper() {
12861284
}
12871285

12881286
public void close(String reason, boolean flushEngine) throws IOException {
1289-
synchronized (mutex) {
1287+
synchronized (engineMutex) {
12901288
try {
1291-
changeState(IndexShardState.CLOSED, reason);
1289+
synchronized (mutex) {
1290+
changeState(IndexShardState.CLOSED, reason);
1291+
}
12921292
} finally {
12931293
final Engine engine = this.currentEngineReference.getAndSet(null);
12941294
try {
@@ -1343,6 +1343,7 @@ public void prepareForIndexRecovery() {
13431343
* This is the first operation after the local checkpoint of the safe commit if exists.
13441344
*/
13451345
public long recoverLocallyUpToGlobalCheckpoint() {
1346+
assert Thread.holdsLock(mutex) == false : "recover locally under mutex";
13461347
if (state != IndexShardState.RECOVERING) {
13471348
throw new IndexShardNotRecoveringException(shardId, state);
13481349
}
@@ -1394,7 +1395,7 @@ public long recoverLocallyUpToGlobalCheckpoint() {
13941395
getEngine().recoverFromTranslog(translogRecoveryRunner, globalCheckpoint);
13951396
logger.trace("shard locally recovered up to {}", getEngine().getSeqNoStats(globalCheckpoint));
13961397
} finally {
1397-
synchronized (mutex) {
1398+
synchronized (engineMutex) {
13981399
IOUtils.close(currentEngineReference.getAndSet(null));
13991400
}
14001401
}
@@ -1569,23 +1570,15 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
15691570
: "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource()
15701571
+ "] but got " + getRetentionLeases();
15711572
synchronized (engineMutex) {
1573+
assert currentEngineReference.get() == null : "engine is running";
1574+
verifyNotClosed();
15721575
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
15731576
final Engine newEngine = engineFactory.newReadWriteEngine(config);
1574-
synchronized (mutex) {
1575-
try {
1576-
verifyNotClosed();
1577-
assert currentEngineReference.get() == null : "engine is running";
1578-
onNewEngine(newEngine);
1579-
currentEngineReference.set(newEngine);
1580-
// We set active because we are now writing operations to the engine; this way,
1581-
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
1582-
active.set(true);
1583-
} finally {
1584-
if (currentEngineReference.get() != newEngine) {
1585-
newEngine.close();
1586-
}
1587-
}
1588-
}
1577+
onNewEngine(newEngine);
1578+
currentEngineReference.set(newEngine);
1579+
// We set active because we are now writing operations to the engine; this way,
1580+
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
1581+
active.set(true);
15891582
}
15901583
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
15911584
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
@@ -1616,7 +1609,8 @@ private void onNewEngine(Engine newEngine) {
16161609
* called if recovery has to be restarted after network error / delay **
16171610
*/
16181611
public void performRecoveryRestart() throws IOException {
1619-
synchronized (mutex) {
1612+
assert Thread.holdsLock(mutex) == false : "restart recovery under mutex";
1613+
synchronized (engineMutex) {
16201614
assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners";
16211615
IOUtils.close(currentEngineReference.getAndSet(null));
16221616
resetRecoveryStage();
@@ -3288,7 +3282,7 @@ public ParsedDocument newNoopTombstoneDoc(String reason) {
32883282
* Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint.
32893283
*/
32903284
void resetEngineToGlobalCheckpoint() throws IOException {
3291-
assert Thread.holdsLock(engineMutex) == false : "resetting engine under mutex";
3285+
assert Thread.holdsLock(mutex) == false : "resetting engine under mutex";
32923286
assert getActiveOperationsCount() == OPERATIONS_BLOCKED
32933287
: "resetting engine without blocking operations; active operations are [" + getActiveOperations() + ']';
32943288
sync(); // persist the global checkpoint to disk
@@ -3301,6 +3295,7 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
33013295
final long globalCheckpoint = getLastKnownGlobalCheckpoint();
33023296
assert globalCheckpoint == getLastSyncedGlobalCheckpoint();
33033297
synchronized (engineMutex) {
3298+
verifyNotClosed();
33043299
// we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata,
33053300
// acquireXXXCommit and close works.
33063301
final Engine readOnlyEngine =
@@ -3328,7 +3323,7 @@ public IndexCommitRef acquireSafeIndexCommit() {
33283323

33293324
@Override
33303325
public void close() throws IOException {
3331-
assert Thread.holdsLock(mutex);
3326+
assert Thread.holdsLock(engineMutex);
33323327

33333328
Engine newEngine = newEngineReference.get();
33343329
if (newEngine == currentEngineReference.get()) {
@@ -3338,36 +3333,17 @@ public void close() throws IOException {
33383333
IOUtils.close(super::close, newEngine);
33393334
}
33403335
};
3341-
synchronized (mutex) {
3342-
try {
3343-
verifyNotClosed();
3344-
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
3345-
} finally {
3346-
if (currentEngineReference.get() != readOnlyEngine) {
3347-
readOnlyEngine.close();
3348-
}
3349-
}
3350-
}
3351-
final Engine newReadWriteEngine = engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker));
3352-
synchronized (mutex) {
3353-
try {
3354-
verifyNotClosed();
3355-
newEngineReference.set(newReadWriteEngine);
3356-
onNewEngine(newReadWriteEngine);
3357-
} finally {
3358-
if (newEngineReference.get() != newReadWriteEngine) {
3359-
newReadWriteEngine.close(); // shard was closed
3360-
}
3361-
}
3362-
}
3336+
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
3337+
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
3338+
onNewEngine(newEngineReference.get());
33633339
}
33643340
final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery(
33653341
engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
33663342
// TODO: add a dedicate recovery stats for the reset translog
33673343
});
33683344
newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint);
33693345
newEngineReference.get().refresh("reset_engine");
3370-
synchronized (mutex) {
3346+
synchronized (engineMutex) {
33713347
verifyNotClosed();
33723348
IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get()));
33733349
// We set active because we are now writing operations to the engine; this way,

test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@
133133
import java.util.Random;
134134
import java.util.Set;
135135
import java.util.TimeZone;
136+
import java.util.concurrent.CopyOnWriteArrayList;
136137
import java.util.concurrent.ExecutorService;
137138
import java.util.concurrent.TimeUnit;
138139
import java.util.concurrent.atomic.AtomicInteger;
@@ -511,15 +512,21 @@ public final void ensureAllSearchContextsReleased() throws Exception {
511512
// TODO: can we do this cleaner???
512513

513514
/** MockFSDirectoryService sets this: */
514-
public static boolean checkIndexFailed;
515+
public static final List<Exception> checkIndexFailures = new CopyOnWriteArrayList<>();
515516

516517
@Before
517518
public final void resetCheckIndexStatus() throws Exception {
518-
checkIndexFailed = false;
519+
checkIndexFailures.clear();
519520
}
520521

521522
public final void ensureCheckIndexPassed() {
522-
assertFalse("at least one shard failed CheckIndex", checkIndexFailed);
523+
if (checkIndexFailures.isEmpty() == false) {
524+
final AssertionError e = new AssertionError("at least one shard failed CheckIndex");
525+
for (Exception failure : checkIndexFailures) {
526+
e.addSuppressed(failure);
527+
}
528+
throw e;
529+
}
523530
}
524531

525532
// -----------------------------------------------------------------

test/framework/src/main/java/org/elasticsearch/test/store/MockFSDirectoryFactory.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,17 +83,19 @@ public static void checkIndex(Logger logger, Store store, ShardId shardId) {
8383
CheckIndex.Status status = store.checkIndex(out);
8484
out.flush();
8585
if (!status.clean) {
86-
ESTestCase.checkIndexFailed = true;
87-
logger.warn("check index [failure] index files={}\n{}", Arrays.toString(dir.listAll()), os.bytes().utf8ToString());
88-
throw new IOException("index check failure");
86+
IOException failure = new IOException("failed to check index for shard " + shardId +
87+
";index files [" + Arrays.toString(dir.listAll()) + "] os [" + os.bytes().utf8ToString() + "]");
88+
ESTestCase.checkIndexFailures.add(failure);
89+
throw failure;
8990
} else {
9091
if (logger.isDebugEnabled()) {
9192
logger.debug("check index [success]\n{}", os.bytes().utf8ToString());
9293
}
9394
}
9495
} catch (LockObtainFailedException e) {
95-
ESTestCase.checkIndexFailed = true;
96-
throw new IllegalStateException("IndexWriter is still open on shard " + shardId, e);
96+
IllegalStateException failure = new IllegalStateException("IndexWriter is still open on shard " + shardId, e);
97+
ESTestCase.checkIndexFailures.add(failure);
98+
throw failure;
9799
}
98100
} catch (Exception e) {
99101
logger.warn("failed to check index", e);

0 commit comments

Comments
 (0)