Skip to content

Commit c6abe74

Browse files
Close and acquire commit during reset engine fix (elastic#41584) (elastic#41709)
If closing a shard while resetting engine, IndexEventListener.afterIndexShardClosed would be called while there is still an active IndexWriter on the shard. For integration tests, this leads to an exception during check index called from MockFSIndexStore .Listener. Fixed. Relates to elastic#38561
1 parent 990be1f commit c6abe74

File tree

2 files changed

+168
-31
lines changed

2 files changed

+168
-31
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 51 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.lucene.search.Sort;
3333
import org.apache.lucene.search.UsageTrackingQueryCachingPolicy;
3434
import org.apache.lucene.store.AlreadyClosedException;
35+
import org.apache.lucene.util.SetOnce;
3536
import org.apache.lucene.util.ThreadInterruptedException;
3637
import org.elasticsearch.Assertions;
3738
import org.elasticsearch.ElasticsearchException;
@@ -3067,42 +3068,61 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
30673068
final TranslogStats translogStats = translogStats();
30683069
// flush to make sure the latest commit, which will be opened by the read-only engine, includes all operations.
30693070
flush(new FlushRequest().waitIfOngoing(true));
3071+
3072+
SetOnce<Engine> newEngineReference = new SetOnce<>();
3073+
final long globalCheckpoint = getGlobalCheckpoint();
30703074
synchronized (mutex) {
30713075
verifyNotClosed();
3072-
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
3073-
final Engine readOnlyEngine = new ReadOnlyEngine(newEngineConfig(), seqNoStats, translogStats, false, Function.identity());
3076+
// we must create both new read-only engine and new read-write engine under mutex to ensure snapshotStoreMetadata,
3077+
// acquireXXXCommit and close works.
3078+
final Engine readOnlyEngine =
3079+
new ReadOnlyEngine(newEngineConfig(), seqNoStats, translogStats, false, Function.identity()) {
3080+
@Override
3081+
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
3082+
synchronized (mutex) {
3083+
// ignore flushFirst since we flushed above and we do not want to interfere with ongoing translog replay
3084+
return newEngineReference.get().acquireLastIndexCommit(false);
3085+
}
3086+
}
3087+
3088+
@Override
3089+
public IndexCommitRef acquireSafeIndexCommit() {
3090+
synchronized (mutex) {
3091+
return newEngineReference.get().acquireSafeIndexCommit();
3092+
}
3093+
}
3094+
3095+
@Override
3096+
public void close() throws IOException {
3097+
assert Thread.holdsLock(mutex);
3098+
3099+
Engine newEngine = newEngineReference.get();
3100+
if (newEngine == currentEngineReference.get()) {
3101+
// we successfully installed the new engine so do not close it.
3102+
newEngine = null;
3103+
}
3104+
IOUtils.close(super::close, newEngine);
3105+
}
3106+
};
30743107
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
3108+
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig()));
3109+
onNewEngine(newEngineReference.get());
30753110
}
3076-
3077-
Engine newEngine = null;
3078-
try {
3079-
final long globalCheckpoint = getGlobalCheckpoint();
3080-
synchronized (mutex) {
3081-
assert currentEngineReference.get() instanceof ReadOnlyEngine : "another write engine is running";
3082-
verifyNotClosed();
3083-
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
3084-
newEngine = engineFactory.newReadWriteEngine(newEngineConfig());
3085-
onNewEngine(newEngine);
3086-
}
3087-
final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery(
3088-
engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
3089-
// TODO: add a dedicate recovery stats for the reset translog
3090-
});
3091-
newEngine.recoverFromTranslog(translogRunner, globalCheckpoint);
3092-
synchronized (mutex) {
3093-
verifyNotClosed();
3094-
IOUtils.close(currentEngineReference.getAndSet(newEngine));
3095-
// We set active because we are now writing operations to the engine; this way,
3096-
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
3097-
active.set(true);
3098-
newEngine = null;
3099-
}
3100-
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
3101-
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
3102-
onSettingsChanged();
3103-
} finally {
3104-
IOUtils.close(newEngine);
3111+
final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery(
3112+
engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
3113+
// TODO: add a dedicate recovery stats for the reset translog
3114+
});
3115+
newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint);
3116+
synchronized (mutex) {
3117+
verifyNotClosed();
3118+
IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get()));
3119+
// We set active because we are now writing operations to the engine; this way,
3120+
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
3121+
active.set(true);
31053122
}
3123+
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
3124+
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
3125+
onSettingsChanged();
31063126
}
31073127

31083128
/**

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@
125125
import org.elasticsearch.test.DummyShardLock;
126126
import org.elasticsearch.test.FieldMaskingReader;
127127
import org.elasticsearch.test.VersionUtils;
128+
import org.elasticsearch.test.store.MockFSDirectoryService;
128129
import org.elasticsearch.threadpool.ThreadPool;
129130
import org.junit.Assert;
130131

@@ -3697,6 +3698,122 @@ public void testResetEngine() throws Exception {
36973698
closeShard(shard, false);
36983699
}
36993700

3701+
/**
3702+
* This test simulates a scenario seen rarely in ConcurrentSeqNoVersioningIT. Closing a shard while engine is inside
3703+
* resetEngineToGlobalCheckpoint can lead to check index failure in integration tests.
3704+
*/
3705+
public void testCloseShardWhileResettingEngine() throws Exception {
3706+
CountDownLatch readyToCloseLatch = new CountDownLatch(1);
3707+
CountDownLatch closeDoneLatch = new CountDownLatch(1);
3708+
IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> new InternalEngine(config) {
3709+
@Override
3710+
public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner,
3711+
long recoverUpToSeqNo) throws IOException {
3712+
readyToCloseLatch.countDown();
3713+
try {
3714+
closeDoneLatch.await();
3715+
} catch (InterruptedException e) {
3716+
throw new AssertionError(e);
3717+
}
3718+
return super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo);
3719+
}
3720+
});
3721+
3722+
Thread closeShardThread = new Thread(() -> {
3723+
try {
3724+
readyToCloseLatch.await();
3725+
shard.close("testing", false);
3726+
// in integration tests, this is done as a listener on IndexService.
3727+
MockFSDirectoryService.checkIndex(logger, shard.store(), shard.shardId);
3728+
} catch (InterruptedException | IOException e) {
3729+
throw new AssertionError(e);
3730+
} finally {
3731+
closeDoneLatch.countDown();
3732+
}
3733+
});
3734+
3735+
closeShardThread.start();
3736+
3737+
final CountDownLatch engineResetLatch = new CountDownLatch(1);
3738+
shard.acquireAllReplicaOperationsPermits(shard.getOperationPrimaryTerm(), shard.getGlobalCheckpoint(), 0L,
3739+
ActionListener.wrap(r -> {
3740+
try (Releasable dummy = r) {
3741+
shard.resetEngineToGlobalCheckpoint();
3742+
} finally {
3743+
engineResetLatch.countDown();
3744+
}
3745+
}, Assert::assertNotNull), TimeValue.timeValueMinutes(1L));
3746+
3747+
engineResetLatch.await();
3748+
3749+
closeShardThread.join();
3750+
3751+
// close store.
3752+
closeShard(shard, false);
3753+
}
3754+
3755+
/**
3756+
* This test simulates a scenario seen rarely in ConcurrentSeqNoVersioningIT. While engine is inside
3757+
* resetEngineToGlobalCheckpoint snapshot metadata could fail
3758+
*/
3759+
public void testSnapshotWhileResettingEngine() throws Exception {
3760+
CountDownLatch readyToSnapshotLatch = new CountDownLatch(1);
3761+
CountDownLatch snapshotDoneLatch = new CountDownLatch(1);
3762+
IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> new InternalEngine(config) {
3763+
@Override
3764+
public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner,
3765+
long recoverUpToSeqNo) throws IOException {
3766+
InternalEngine internalEngine = super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo);
3767+
readyToSnapshotLatch.countDown();
3768+
try {
3769+
snapshotDoneLatch.await();
3770+
} catch (InterruptedException e) {
3771+
throw new AssertionError(e);
3772+
}
3773+
return internalEngine;
3774+
}
3775+
});
3776+
3777+
indexOnReplicaWithGaps(shard, between(0, 1000), Math.toIntExact(shard.getLocalCheckpoint()));
3778+
final long globalCheckpoint = randomLongBetween(shard.getGlobalCheckpoint(), shard.getLocalCheckpoint());
3779+
shard.updateGlobalCheckpointOnReplica(globalCheckpoint, "test");
3780+
3781+
Thread snapshotThread = new Thread(() -> {
3782+
try {
3783+
readyToSnapshotLatch.await();
3784+
shard.snapshotStoreMetadata();
3785+
try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(randomBoolean())) {
3786+
shard.store().getMetadata(indexCommitRef.getIndexCommit());
3787+
}
3788+
try (Engine.IndexCommitRef indexCommitRef = shard.acquireSafeIndexCommit()) {
3789+
shard.store().getMetadata(indexCommitRef.getIndexCommit());
3790+
}
3791+
} catch (InterruptedException | IOException e) {
3792+
throw new AssertionError(e);
3793+
} finally {
3794+
snapshotDoneLatch.countDown();
3795+
}
3796+
});
3797+
3798+
snapshotThread.start();
3799+
3800+
final CountDownLatch engineResetLatch = new CountDownLatch(1);
3801+
shard.acquireAllReplicaOperationsPermits(shard.getOperationPrimaryTerm(), shard.getGlobalCheckpoint(), 0L,
3802+
ActionListener.wrap(r -> {
3803+
try (Releasable dummy = r) {
3804+
shard.resetEngineToGlobalCheckpoint();
3805+
} finally {
3806+
engineResetLatch.countDown();
3807+
}
3808+
}, Assert::assertNotNull), TimeValue.timeValueMinutes(1L));
3809+
3810+
engineResetLatch.await();
3811+
3812+
snapshotThread.join();
3813+
3814+
closeShard(shard, false);
3815+
}
3816+
37003817
public void testConcurrentAcquireAllReplicaOperationsPermitsWithPrimaryTermUpdate() throws Exception {
37013818
final IndexShard replica = newStartedShard(false);
37023819
indexOnReplicaWithGaps(replica, between(0, 1000), Math.toIntExact(replica.getLocalCheckpoint()));

0 commit comments

Comments
 (0)