From 797945a53334f717ee74c62699acd865509d0cd9 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 6 Aug 2019 11:52:13 -0400 Subject: [PATCH 1/8] Do not create engine under IndexShard#mutex --- .../index/shard/EngineReference.java | 84 +++++++++++++ .../elasticsearch/index/shard/IndexShard.java | 88 +++++++------ .../index/shard/EngineReferenceTests.java | 116 ++++++++++++++++++ .../index/shard/IndexShardTests.java | 37 ++++++ 4 files changed, 290 insertions(+), 35 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/shard/EngineReference.java create mode 100644 server/src/test/java/org/elasticsearch/index/shard/EngineReferenceTests.java diff --git a/server/src/main/java/org/elasticsearch/index/shard/EngineReference.java b/server/src/main/java/org/elasticsearch/index/shard/EngineReference.java new file mode 100644 index 0000000000000..dabaa5ad2158e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/shard/EngineReference.java @@ -0,0 +1,84 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.engine.Engine; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Used in {@link IndexShard} to hold the reference to the current {@link Engine}. + */ +final class EngineReference implements Closeable { + private volatile boolean closed; + private volatile Engine current; + + Engine get() { + return current; + } + + /** + * Closes the current engine and replaces with the new engine. If this method succeeds, the ownership of the new engine + * is transferred to this reference. Thus, we must not call close directly on the new engine. If this reference was closed, + * this method will throw {@link AlreadyClosedException} and the caller need to release the new engine. + * + * @throws AlreadyClosedException if this holder was closed already + * @throws IOException if fail to close the current engine + */ + void swapReference(Engine newEngine) throws IOException, AlreadyClosedException { + final Engine toClose; + synchronized (this) { + if (closed) { + assert current == null; + throw new AlreadyClosedException("engine reference was closed"); + } + toClose = current; + current = newEngine; + } + IOUtils.close(toClose); + } + + void flushAndClose() throws IOException { + doClose(true); + } + + @Override + public void close() throws IOException { + doClose(false); + } + + private void doClose(boolean flush) throws IOException { + final Engine toClose; + synchronized (this) { + assert closed == false || current == null; + toClose = current; + current = null; + closed = true; + } + if (toClose != null && flush) { + toClose.flushAndClose(); + } else { + IOUtils.close(toClose); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index a98f501946bc0..4632ae784158c 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -213,7 +213,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl protected volatile ShardRouting shardRouting; protected volatile IndexShardState state; private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm - protected final AtomicReference currentEngineReference = new AtomicReference<>(); + private final Object engineMutex = new Object(); + private final EngineReference currentEngineReference = new EngineReference(); final EngineFactory engineFactory; private final IndexingOperationListener indexingOperationListeners; @@ -1194,7 +1195,7 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { store.incRef(); try { Engine engine; - synchronized (mutex) { + synchronized (engineMutex) { // if the engine is not running, we can access the store directly, but we need to make sure no one starts // the engine on us. If the engine is running, we can get a snapshot via the deletion policy which is initialized. // That can be done out of mutex, since the engine can be closed half way. @@ -1317,15 +1318,14 @@ public void close(String reason, boolean flushEngine) throws IOException { try { changeState(IndexShardState.CLOSED, reason); } finally { - final Engine engine = this.currentEngineReference.getAndSet(null); try { - if (engine != null && flushEngine) { - engine.flushAndClose(); + if (flushEngine) { + currentEngineReference.flushAndClose(); } } finally { // playing safe here and close the engine even if the above succeeds - close can be called multiple times // Also closing refreshListeners to prevent us from accumulating any more listeners - IOUtils.close(engine, globalCheckpointListeners, refreshListeners); + IOUtils.close(currentEngineReference, globalCheckpointListeners, refreshListeners); indexShardOperationPermits.close(); } } @@ -1341,13 +1341,13 @@ public IndexShard postRecovery(String reason) if (state == IndexShardState.STARTED) { throw new IndexShardStartedException(shardId); } - // we need to refresh again to expose all operations that were index until now. Otherwise - // we may not expose operations that were indexed with a refresh listener that was immediately - // responded to in addRefreshListener. - getEngine().refresh("post_recovery"); recoveryState.setStage(RecoveryState.Stage.DONE); changeState(IndexShardState.POST_RECOVERY, reason); } + // we need to refresh again to expose all operations that were index until now. Otherwise + // we may not expose operations that were indexed with a refresh listener that was immediately + // responded to in addRefreshListener. + refresh("post_recovery"); return this; } @@ -1420,9 +1420,7 @@ public long recoverLocallyUpToGlobalCheckpoint() { getEngine().recoverFromTranslog(translogRecoveryRunner, globalCheckpoint); logger.trace("shard locally recovered up to {}", getEngine().getSeqNoStats(globalCheckpoint)); } finally { - synchronized (mutex) { - IOUtils.close(currentEngineReference.getAndSet(null)); - } + currentEngineReference.swapReference(null); } } catch (Exception e) { logger.debug(new ParameterizedMessage("failed to recover shard locally up to global checkpoint {}", globalCheckpoint), e); @@ -1581,6 +1579,7 @@ public void openEngineAndSkipTranslogRecovery() throws IOException { } private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) throws IOException { + assert Thread.holdsLock(mutex) == false : "opening engine under mutex [" + Thread.currentThread() + "]"; if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } @@ -1593,13 +1592,17 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t assert recoveryState.getRecoverySource().expectEmptyRetentionLeases() == false || getRetentionLeases().leases().isEmpty() : "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource() + "] but got " + getRetentionLeases(); - synchronized (mutex) { - verifyNotClosed(); + synchronized (engineMutex) { assert currentEngineReference.get() == null : "engine is running"; // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). - final Engine newEngine = engineFactory.newReadWriteEngine(config); + Engine newEngine = engineFactory.newReadWriteEngine(config); onNewEngine(newEngine); - currentEngineReference.set(newEngine); + try { + currentEngineReference.swapReference(newEngine); + newEngine = null; + } finally { + IOUtils.close(newEngine); + } // We set active because we are now writing operations to the engine; this way, // if we go idle after some time and become inactive, we still give sync'd flush a chance to run. active.set(true); @@ -1634,7 +1637,7 @@ private void onNewEngine(Engine newEngine) { public void performRecoveryRestart() throws IOException { synchronized (mutex) { assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners"; - IOUtils.close(currentEngineReference.getAndSet(null)); + currentEngineReference.swapReference(null); resetRecoveryStage(); } } @@ -2666,7 +2669,11 @@ private DocumentMapperForType docMapper(String type) { } private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { - Sort indexSort = indexSortSupplier.get(); + final Sort indexSort = indexSortSupplier.get(); + final Engine.Warmer warmer = reader -> { + assert Thread.holdsLock(mutex) == false : "warming engine under mutex [" + Thread.currentThread() + "]"; + this.warmer.warm(reader); + }; return new EngineConfig(shardId, shardRouting.allocationId().getId(), threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(), mapperService != null ? mapperService.indexAnalyzer() : null, @@ -3296,6 +3303,7 @@ public ParsedDocument newNoopTombstoneDoc(String reason) { * Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint. */ void resetEngineToGlobalCheckpoint() throws IOException { + assert Thread.holdsLock(mutex) == false : "resetting engine under mutex [" + Thread.currentThread() + "]"; assert getActiveOperationsCount() == OPERATIONS_BLOCKED : "resetting engine without blocking operations; active operations are [" + getActiveOperations() + ']'; sync(); // persist the global checkpoint to disk @@ -3307,15 +3315,14 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED SetOnce newEngineReference = new SetOnce<>(); final long globalCheckpoint = getLastKnownGlobalCheckpoint(); assert globalCheckpoint == getLastSyncedGlobalCheckpoint(); - synchronized (mutex) { - verifyNotClosed(); - // we must create both new read-only engine and new read-write engine under mutex to ensure snapshotStoreMetadata, + synchronized (engineMutex) { + // we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata, // acquireXXXCommit and close works. - final Engine readOnlyEngine = + Engine readOnlyEngine = new ReadOnlyEngine(newEngineConfig(replicationTracker), seqNoStats, translogStats, false, Function.identity()) { @Override public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { - synchronized (mutex) { + synchronized (engineMutex) { // ignore flushFirst since we flushed above and we do not want to interfere with ongoing translog replay return newEngineReference.get().acquireLastIndexCommit(false); } @@ -3323,24 +3330,30 @@ public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { @Override public IndexCommitRef acquireSafeIndexCommit() { - synchronized (mutex) { + synchronized (engineMutex) { return newEngineReference.get().acquireSafeIndexCommit(); } } @Override public void close() throws IOException { - assert Thread.holdsLock(mutex); - - Engine newEngine = newEngineReference.get(); - if (newEngine == currentEngineReference.get()) { - // we successfully installed the new engine so do not close it. - newEngine = null; + Engine newEngine; + synchronized (engineMutex) { + newEngine = newEngineReference.get(); + if (newEngine == currentEngineReference.get()) { + // we successfully installed the new engine so do not close it. + newEngine = null; + } } IOUtils.close(super::close, newEngine); } }; - IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine)); + try { + currentEngineReference.swapReference(readOnlyEngine); + readOnlyEngine = null; + } finally { + IOUtils.close(readOnlyEngine); + } newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker))); onNewEngine(newEngineReference.get()); } @@ -3349,9 +3362,14 @@ public void close() throws IOException { // TODO: add a dedicate recovery stats for the reset translog }); newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint); - synchronized (mutex) { - verifyNotClosed(); - IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get())); + synchronized (engineMutex) { + Engine newEngine = newEngineReference.get(); + try { + currentEngineReference.swapReference(newEngine); + newEngine = null; + } finally { + IOUtils.close(newEngine); + } // We set active because we are now writing operations to the engine; this way, // if we go idle after some time and become inactive, we still give sync'd flush a chance to run. active.set(true); diff --git a/server/src/test/java/org/elasticsearch/index/shard/EngineReferenceTests.java b/server/src/test/java/org/elasticsearch/index/shard/EngineReferenceTests.java new file mode 100644 index 0000000000000..0c2e1e0dfcaf9 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/shard/EngineReferenceTests.java @@ -0,0 +1,116 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Phaser; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.either; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; + +public class EngineReferenceTests extends ESTestCase { + + public void testSimple() throws Exception { + final EngineReference engineReference = new EngineReference(); + assertNull(engineReference.get()); + List engines = new ArrayList<>(); + int iterations = randomIntBetween(0, 10); + for (int i = 0; i < iterations; i++) { + Engine oldEngine = engineReference.get(); + Engine newEngine = mock(Engine.class); + engineReference.swapReference(newEngine); + engines.add(newEngine); + assertSame(newEngine, engineReference.get()); + if (oldEngine != null) { + verify(oldEngine, times(1)).close(); + } + } + if (randomBoolean()) { + engineReference.flushAndClose(); + if (engines.isEmpty() == false) { + Engine flushedEngine = engines.remove(engines.size() - 1); + verify(flushedEngine, times(1)).flushAndClose(); + } + } else { + engineReference.close(); + } + assertNull(engineReference.get()); + for (Engine engine : engines) { + verify(engine, times(1)).close(); + } + Engine newEngine = mock(Engine.class); + AlreadyClosedException ace = expectThrows(AlreadyClosedException.class, () -> engineReference.swapReference(newEngine)); + assertThat(ace.getMessage(), containsString("engine reference was closed")); + verifyZeroInteractions(newEngine); + } + + public void testSwapAndCloseConcurrently() throws Exception { + final EngineReference engineReference = new EngineReference(); + final Phaser phaser = new Phaser(2); + Thread closeThread = new Thread(() -> { + try { + phaser.arriveAndAwaitAdvance(); + engineReference.close(); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + closeThread.start(); + phaser.arriveAndAwaitAdvance(); + List engines = new ArrayList<>(); + int iterations = randomIntBetween(0, 100); + for (int i = 0; i < iterations; i++) { + Engine newEngine = mock(Engine.class); + try { + Engine oldEngine = engineReference.get(); + engineReference.swapReference(newEngine); + assertThat(engineReference.get(), either(sameInstance(newEngine)).or(nullValue())); + if (oldEngine != null) { + verify(oldEngine, times(1)).close(); + } + engines.add(newEngine); + } catch (AlreadyClosedException ace) { + verifyZeroInteractions(newEngine); + assertThat(ace.getMessage(), containsString("engine reference was closed")); + } + } + closeThread.join(); + assertNull(engineReference.get()); + for (Engine engine : engines) { + verify(engine, times(1)).close(); + } + Engine newEngine = mock(Engine.class); + AlreadyClosedException ace = expectThrows(AlreadyClosedException.class, () -> engineReference.swapReference(newEngine)); + assertThat(ace.getMessage(), containsString("engine reference was closed")); + verifyZeroInteractions(newEngine); + } +} diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 3e507a3cfb685..5663cd5022ef9 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -79,10 +79,12 @@ import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.DocIdSeqNoAndSource; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine.DeleteResult; +import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.InternalEngineFactory; @@ -4123,4 +4125,39 @@ protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(SeqNoStats seqNoStats) { assertThat(readonlyShard.docStats().getCount(), equalTo(numDocs)); closeShards(readonlyShard); } + + public void testCloseShardWhileEngineIsWarming() throws Exception { + CountDownLatch warmerStarted = new CountDownLatch(1); + CountDownLatch warmerBlocking = new CountDownLatch(1); + IndexShard shard = newShard(true, Settings.EMPTY, config -> { + Engine.Warmer warmer = reader -> { + try { + warmerStarted.countDown(); + warmerBlocking.await(); + config.getWarmer().warm(reader); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + }; + EngineConfig configWithWarmer = new EngineConfig(config.getShardId(), config.getAllocationId(), config.getThreadPool(), + config.getIndexSettings(), warmer, config.getStore(), config.getMergePolicy(), config.getAnalyzer(), + config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), + config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), + config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), + config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), + config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier()); + return new InternalEngine(configWithWarmer); + }); + Thread recoveryThread = new Thread(() -> expectThrows(AlreadyClosedException.class, () -> recoverShardFromStore(shard))); + recoveryThread.start(); + try { + warmerStarted.await(); + shard.close("testing", false); + assertThat(shard.state, equalTo(IndexShardState.CLOSED)); + } finally { + warmerBlocking.countDown(); + } + recoveryThread.join(); + shard.store().close(); + } } From 16cd233c0f11a7109be45a969ca21a5266393c86 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 6 Aug 2019 22:20:35 -0400 Subject: [PATCH 2/8] remove volatile --- .../java/org/elasticsearch/index/shard/EngineReference.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/EngineReference.java b/server/src/main/java/org/elasticsearch/index/shard/EngineReference.java index dabaa5ad2158e..83b1baad2c1ed 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/EngineReference.java +++ b/server/src/main/java/org/elasticsearch/index/shard/EngineReference.java @@ -30,7 +30,7 @@ * Used in {@link IndexShard} to hold the reference to the current {@link Engine}. */ final class EngineReference implements Closeable { - private volatile boolean closed; + private boolean closed; private volatile Engine current; Engine get() { From bd56da818a9d572fa431b65af4043470152f018e Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 8 Aug 2019 18:16:54 -0400 Subject: [PATCH 3/8] Create and swap engine in two steps --- .../index/shard/EngineReference.java | 84 ------------- .../elasticsearch/index/shard/IndexShard.java | 109 +++++++++------- .../index/shard/EngineReferenceTests.java | 116 ------------------ 3 files changed, 62 insertions(+), 247 deletions(-) delete mode 100644 server/src/main/java/org/elasticsearch/index/shard/EngineReference.java delete mode 100644 server/src/test/java/org/elasticsearch/index/shard/EngineReferenceTests.java diff --git a/server/src/main/java/org/elasticsearch/index/shard/EngineReference.java b/server/src/main/java/org/elasticsearch/index/shard/EngineReference.java deleted file mode 100644 index 83b1baad2c1ed..0000000000000 --- a/server/src/main/java/org/elasticsearch/index/shard/EngineReference.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.shard; - -import org.apache.lucene.store.AlreadyClosedException; -import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.index.engine.Engine; - -import java.io.Closeable; -import java.io.IOException; - -/** - * Used in {@link IndexShard} to hold the reference to the current {@link Engine}. - */ -final class EngineReference implements Closeable { - private boolean closed; - private volatile Engine current; - - Engine get() { - return current; - } - - /** - * Closes the current engine and replaces with the new engine. If this method succeeds, the ownership of the new engine - * is transferred to this reference. Thus, we must not call close directly on the new engine. If this reference was closed, - * this method will throw {@link AlreadyClosedException} and the caller need to release the new engine. - * - * @throws AlreadyClosedException if this holder was closed already - * @throws IOException if fail to close the current engine - */ - void swapReference(Engine newEngine) throws IOException, AlreadyClosedException { - final Engine toClose; - synchronized (this) { - if (closed) { - assert current == null; - throw new AlreadyClosedException("engine reference was closed"); - } - toClose = current; - current = newEngine; - } - IOUtils.close(toClose); - } - - void flushAndClose() throws IOException { - doClose(true); - } - - @Override - public void close() throws IOException { - doClose(false); - } - - private void doClose(boolean flush) throws IOException { - final Engine toClose; - synchronized (this) { - assert closed == false || current == null; - toClose = current; - current = null; - closed = true; - } - if (toClose != null && flush) { - toClose.flushAndClose(); - } else { - IOUtils.close(toClose); - } - } -} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 4632ae784158c..741d1c7b9e140 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -214,7 +214,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl protected volatile IndexShardState state; private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm private final Object engineMutex = new Object(); - private final EngineReference currentEngineReference = new EngineReference(); + private final AtomicReference currentEngineReference = new AtomicReference<>(); final EngineFactory engineFactory; private final IndexingOperationListener indexingOperationListeners; @@ -1191,20 +1191,23 @@ public Engine.IndexCommitRef acquireSafeIndexCommit() throws EngineException { * @throws java.nio.file.NoSuchFileException if one or more files referenced by a commit are not present. */ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { + assert Thread.holdsLock(mutex) == false : "snapshotting store metadata under mutex"; Engine.IndexCommitRef indexCommit = null; store.incRef(); try { - Engine engine; synchronized (engineMutex) { // if the engine is not running, we can access the store directly, but we need to make sure no one starts - // the engine on us. If the engine is running, we can get a snapshot via the deletion policy which is initialized. - // That can be done out of mutex, since the engine can be closed half way. - engine = getEngineOrNull(); - if (engine == null) { + // the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine. + synchronized (mutex) { + final Engine engine = getEngineOrNull(); + if (engine != null) { + indexCommit = engine.acquireLastIndexCommit(false); + } + } + if (indexCommit == null) { return store.getMetadata(null, true); } } - indexCommit = engine.acquireLastIndexCommit(false); return store.getMetadata(indexCommit.getIndexCommit()); } finally { store.decRef(); @@ -1318,14 +1321,15 @@ public void close(String reason, boolean flushEngine) throws IOException { try { changeState(IndexShardState.CLOSED, reason); } finally { + final Engine engine = this.currentEngineReference.getAndSet(null); try { - if (flushEngine) { - currentEngineReference.flushAndClose(); + if (engine != null && flushEngine) { + engine.flushAndClose(); } } finally { // playing safe here and close the engine even if the above succeeds - close can be called multiple times // Also closing refreshListeners to prevent us from accumulating any more listeners - IOUtils.close(currentEngineReference, globalCheckpointListeners, refreshListeners); + IOUtils.close(engine, globalCheckpointListeners, refreshListeners); indexShardOperationPermits.close(); } } @@ -1347,7 +1351,7 @@ public IndexShard postRecovery(String reason) // we need to refresh again to expose all operations that were index until now. Otherwise // we may not expose operations that were indexed with a refresh listener that was immediately // responded to in addRefreshListener. - refresh("post_recovery"); + getEngine().refresh("post_recovery"); return this; } @@ -1420,7 +1424,9 @@ public long recoverLocallyUpToGlobalCheckpoint() { getEngine().recoverFromTranslog(translogRecoveryRunner, globalCheckpoint); logger.trace("shard locally recovered up to {}", getEngine().getSeqNoStats(globalCheckpoint)); } finally { - currentEngineReference.swapReference(null); + synchronized (mutex) { + IOUtils.close(currentEngineReference.getAndSet(null)); + } } } catch (Exception e) { logger.debug(new ParameterizedMessage("failed to recover shard locally up to global checkpoint {}", globalCheckpoint), e); @@ -1579,7 +1585,7 @@ public void openEngineAndSkipTranslogRecovery() throws IOException { } private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) throws IOException { - assert Thread.holdsLock(mutex) == false : "opening engine under mutex [" + Thread.currentThread() + "]"; + assert Thread.holdsLock(mutex) == false : "opening engine under mutex"; if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } @@ -1593,19 +1599,25 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t : "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource() + "] but got " + getRetentionLeases(); synchronized (engineMutex) { - assert currentEngineReference.get() == null : "engine is running"; // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). - Engine newEngine = engineFactory.newReadWriteEngine(config); - onNewEngine(newEngine); + final Engine newEngine = engineFactory.newReadWriteEngine(config); + boolean success = false; try { - currentEngineReference.swapReference(newEngine); - newEngine = null; + synchronized (mutex) { + verifyNotClosed(); + assert currentEngineReference.get() == null : "engine is running"; + onNewEngine(newEngine); + currentEngineReference.set(newEngine); + // We set active because we are now writing operations to the engine; this way, + // if we go idle after some time and become inactive, we still give sync'd flush a chance to run. + active.set(true); + success = true; + } } finally { - IOUtils.close(newEngine); + if (success == false) { + newEngine.close(); + } } - // We set active because we are now writing operations to the engine; this way, - // if we go idle after some time and become inactive, we still give sync'd flush a chance to run. - active.set(true); } // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. @@ -1628,6 +1640,7 @@ private boolean assertSequenceNumbersInCommit() throws IOException { } private void onNewEngine(Engine newEngine) { + assert Thread.holdsLock(engineMutex); refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation); } @@ -1637,7 +1650,7 @@ private void onNewEngine(Engine newEngine) { public void performRecoveryRestart() throws IOException { synchronized (mutex) { assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners"; - currentEngineReference.swapReference(null); + IOUtils.close(currentEngineReference.getAndSet(null)); resetRecoveryStage(); } } @@ -2671,8 +2684,10 @@ private DocumentMapperForType docMapper(String type) { private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { final Sort indexSort = indexSortSupplier.get(); final Engine.Warmer warmer = reader -> { - assert Thread.holdsLock(mutex) == false : "warming engine under mutex [" + Thread.currentThread() + "]"; - this.warmer.warm(reader); + assert Thread.holdsLock(mutex) == false : "warming engine under mutex"; + if (this.warmer != null) { + this.warmer.warm(reader); + } }; return new EngineConfig(shardId, shardRouting.allocationId().getId(), threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(), @@ -3303,7 +3318,7 @@ public ParsedDocument newNoopTombstoneDoc(String reason) { * Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint. */ void resetEngineToGlobalCheckpoint() throws IOException { - assert Thread.holdsLock(mutex) == false : "resetting engine under mutex [" + Thread.currentThread() + "]"; + assert Thread.holdsLock(engineMutex) == false : "resetting engine under mutex"; assert getActiveOperationsCount() == OPERATIONS_BLOCKED : "resetting engine without blocking operations; active operations are [" + getActiveOperations() + ']'; sync(); // persist the global checkpoint to disk @@ -3318,11 +3333,11 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED synchronized (engineMutex) { // we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata, // acquireXXXCommit and close works. - Engine readOnlyEngine = + final Engine readOnlyEngine = new ReadOnlyEngine(newEngineConfig(replicationTracker), seqNoStats, translogStats, false, Function.identity()) { @Override public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { - synchronized (engineMutex) { + synchronized (mutex) { // ignore flushFirst since we flushed above and we do not want to interfere with ongoing translog replay return newEngineReference.get().acquireLastIndexCommit(false); } @@ -3330,29 +3345,34 @@ public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { @Override public IndexCommitRef acquireSafeIndexCommit() { - synchronized (engineMutex) { + synchronized (mutex) { return newEngineReference.get().acquireSafeIndexCommit(); } } @Override public void close() throws IOException { - Engine newEngine; - synchronized (engineMutex) { - newEngine = newEngineReference.get(); - if (newEngine == currentEngineReference.get()) { - // we successfully installed the new engine so do not close it. - newEngine = null; - } + assert Thread.holdsLock(mutex); + + Engine newEngine = newEngineReference.get(); + if (newEngine == currentEngineReference.get()) { + // we successfully installed the new engine so do not close it. + newEngine = null; } IOUtils.close(super::close, newEngine); } }; + boolean success = false; try { - currentEngineReference.swapReference(readOnlyEngine); - readOnlyEngine = null; + synchronized (mutex) { + verifyNotClosed(); + IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine)); + success = true; + } } finally { - IOUtils.close(readOnlyEngine); + if (success == false) { + readOnlyEngine.close(); + } } newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker))); onNewEngine(newEngineReference.get()); @@ -3362,14 +3382,9 @@ public void close() throws IOException { // TODO: add a dedicate recovery stats for the reset translog }); newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint); - synchronized (engineMutex) { - Engine newEngine = newEngineReference.get(); - try { - currentEngineReference.swapReference(newEngine); - newEngine = null; - } finally { - IOUtils.close(newEngine); - } + synchronized (mutex) { + verifyNotClosed(); + IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get())); // We set active because we are now writing operations to the engine; this way, // if we go idle after some time and become inactive, we still give sync'd flush a chance to run. active.set(true); diff --git a/server/src/test/java/org/elasticsearch/index/shard/EngineReferenceTests.java b/server/src/test/java/org/elasticsearch/index/shard/EngineReferenceTests.java deleted file mode 100644 index 0c2e1e0dfcaf9..0000000000000 --- a/server/src/test/java/org/elasticsearch/index/shard/EngineReferenceTests.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.shard; - -import org.apache.lucene.store.AlreadyClosedException; -import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.test.ESTestCase; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Phaser; - -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.either; -import static org.hamcrest.Matchers.nullValue; -import static org.hamcrest.Matchers.sameInstance; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyZeroInteractions; - -public class EngineReferenceTests extends ESTestCase { - - public void testSimple() throws Exception { - final EngineReference engineReference = new EngineReference(); - assertNull(engineReference.get()); - List engines = new ArrayList<>(); - int iterations = randomIntBetween(0, 10); - for (int i = 0; i < iterations; i++) { - Engine oldEngine = engineReference.get(); - Engine newEngine = mock(Engine.class); - engineReference.swapReference(newEngine); - engines.add(newEngine); - assertSame(newEngine, engineReference.get()); - if (oldEngine != null) { - verify(oldEngine, times(1)).close(); - } - } - if (randomBoolean()) { - engineReference.flushAndClose(); - if (engines.isEmpty() == false) { - Engine flushedEngine = engines.remove(engines.size() - 1); - verify(flushedEngine, times(1)).flushAndClose(); - } - } else { - engineReference.close(); - } - assertNull(engineReference.get()); - for (Engine engine : engines) { - verify(engine, times(1)).close(); - } - Engine newEngine = mock(Engine.class); - AlreadyClosedException ace = expectThrows(AlreadyClosedException.class, () -> engineReference.swapReference(newEngine)); - assertThat(ace.getMessage(), containsString("engine reference was closed")); - verifyZeroInteractions(newEngine); - } - - public void testSwapAndCloseConcurrently() throws Exception { - final EngineReference engineReference = new EngineReference(); - final Phaser phaser = new Phaser(2); - Thread closeThread = new Thread(() -> { - try { - phaser.arriveAndAwaitAdvance(); - engineReference.close(); - } catch (IOException e) { - throw new AssertionError(e); - } - }); - closeThread.start(); - phaser.arriveAndAwaitAdvance(); - List engines = new ArrayList<>(); - int iterations = randomIntBetween(0, 100); - for (int i = 0; i < iterations; i++) { - Engine newEngine = mock(Engine.class); - try { - Engine oldEngine = engineReference.get(); - engineReference.swapReference(newEngine); - assertThat(engineReference.get(), either(sameInstance(newEngine)).or(nullValue())); - if (oldEngine != null) { - verify(oldEngine, times(1)).close(); - } - engines.add(newEngine); - } catch (AlreadyClosedException ace) { - verifyZeroInteractions(newEngine); - assertThat(ace.getMessage(), containsString("engine reference was closed")); - } - } - closeThread.join(); - assertNull(engineReference.get()); - for (Engine engine : engines) { - verify(engine, times(1)).close(); - } - Engine newEngine = mock(Engine.class); - AlreadyClosedException ace = expectThrows(AlreadyClosedException.class, () -> engineReference.swapReference(newEngine)); - assertThat(ace.getMessage(), containsString("engine reference was closed")); - verifyZeroInteractions(newEngine); - } -} From f8025155c225093c58c84eabdc4c63d4fdb587cb Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 13 Aug 2019 17:01:06 -0400 Subject: [PATCH 4/8] addRefreshListener --- .../elasticsearch/index/shard/IndexShard.java | 39 ++++++++++--------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 3530b6fd3af7b..202a362b09419 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -213,8 +213,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl protected volatile ShardRouting shardRouting; protected volatile IndexShardState state; + // ensure happens-before relation between addRefreshListener() and postRecovery() + private final Object postRecoveryMutex = new Object(); private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm - private final Object engineMutex = new Object(); + private final Object engineMutex = new Object(); // lock ordering: engineMutex -> mutex private final AtomicReference currentEngineReference = new AtomicReference<>(); final EngineFactory engineFactory; @@ -1338,23 +1340,24 @@ public void close(String reason, boolean flushEngine) throws IOException { } } - public IndexShard postRecovery(String reason) - throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException { - synchronized (mutex) { - if (state == IndexShardState.CLOSED) { - throw new IndexShardClosedException(shardId); - } - if (state == IndexShardState.STARTED) { - throw new IndexShardStartedException(shardId); + public void postRecovery(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException { + synchronized (postRecoveryMutex) { + // we need to refresh again to expose all operations that were index until now. Otherwise + // we may not expose operations that were indexed with a refresh listener that was immediately + // responded to in addRefreshListener. The refresh must happen under the same mutex used in addRefreshListener + // and before moving this shard to POST_RECOVERY state (i.e., allow to read from this shard). + getEngine().refresh("post_recovery"); + synchronized (mutex) { + if (state == IndexShardState.CLOSED) { + throw new IndexShardClosedException(shardId); + } + if (state == IndexShardState.STARTED) { + throw new IndexShardStartedException(shardId); + } + recoveryState.setStage(RecoveryState.Stage.DONE); + changeState(IndexShardState.POST_RECOVERY, reason); } - recoveryState.setStage(RecoveryState.Stage.DONE); - changeState(IndexShardState.POST_RECOVERY, reason); } - // we need to refresh again to expose all operations that were index until now. Otherwise - // we may not expose operations that were indexed with a refresh listener that was immediately - // responded to in addRefreshListener. - getEngine().refresh("post_recovery"); - return this; } /** @@ -3257,10 +3260,10 @@ public void addRefreshListener(Translog.Location location, Consumer lis if (isReadAllowed()) { readAllowed = true; } else { - // check again under mutex. this is important to create a happens before relationship + // check again under postRecoveryMutex. this is important to create a happens before relationship // between the switch to POST_RECOVERY + associated refresh. Otherwise we may respond // to a listener before a refresh actually happened that contained that operation. - synchronized (mutex) { + synchronized (postRecoveryMutex) { readAllowed = isReadAllowed(); } } From 681696020c666cdbf29d3743a38eaeb332c4993a Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 13 Aug 2019 23:12:48 -0400 Subject: [PATCH 5/8] add test warmer --- .../index/engine/EngineTestCase.java | 46 ++++++++++--------- .../index/shard/IndexShardTestCase.java | 15 +++++- 2 files changed, 38 insertions(+), 23 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index c6b286d38d36c..a545b77866ce4 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -1128,37 +1128,39 @@ public static void assertMaxSeqNoInCommitUserData(Engine engine) throws Exceptio } public static void assertAtMostOneLuceneDocumentPerSequenceNumber(Engine engine) throws IOException { - if (engine.config().getIndexSettings().isSoftDeleteEnabled() == false || engine instanceof InternalEngine == false) { - return; - } try { engine.refresh("test"); try (Engine.Searcher searcher = engine.acquireSearcher("test")) { - DirectoryReader reader = Lucene.wrapAllDocsLive(searcher.getDirectoryReader()); - Set seqNos = new HashSet<>(); - for (LeafReaderContext leaf : reader.leaves()) { - NumericDocValues primaryTermDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); - NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); - int docId; - while ((docId = seqNoDocValues.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { - assertTrue(seqNoDocValues.advanceExact(docId)); - long seqNo = seqNoDocValues.longValue(); - assertThat(seqNo, greaterThanOrEqualTo(0L)); - if (primaryTermDocValues.advanceExact(docId)) { - if (seqNos.add(seqNo) == false) { - final IdOnlyFieldVisitor idFieldVisitor = new IdOnlyFieldVisitor(); - leaf.reader().document(docId, idFieldVisitor); - throw new AssertionError("found multiple documents for seq=" + seqNo + " id=" + idFieldVisitor.getId()); - } - } - } - } + assertAtMostOneLuceneDocumentPerSequenceNumber(engine.config().getIndexSettings(), searcher.getDirectoryReader()); } } catch (AlreadyClosedException ignored) { } } + public static void assertAtMostOneLuceneDocumentPerSequenceNumber(IndexSettings indexSettings, + DirectoryReader reader) throws IOException { + Set seqNos = new HashSet<>(); + final DirectoryReader wrappedReader = indexSettings.isSoftDeleteEnabled() ? Lucene.wrapAllDocsLive(reader) : reader; + for (LeafReaderContext leaf : wrappedReader.leaves()) { + NumericDocValues primaryTermDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); + NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); + int docId; + while ((docId = seqNoDocValues.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + assertTrue(seqNoDocValues.advanceExact(docId)); + long seqNo = seqNoDocValues.longValue(); + assertThat(seqNo, greaterThanOrEqualTo(0L)); + if (primaryTermDocValues.advanceExact(docId)) { + if (seqNos.add(seqNo) == false) { + final IdOnlyFieldVisitor idFieldVisitor = new IdOnlyFieldVisitor(); + leaf.reader().document(docId, idFieldVisitor); + throw new AssertionError("found multiple documents for seq=" + seqNo + " id=" + idFieldVisitor.getId()); + } + } + } + } + } + public static MapperService createMapperService(String type) throws IOException { IndexMetaData indexMetaData = IndexMetaData.builder("test") .settings(Settings.builder() diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 4b621e5fe5153..bc5a368c47daa 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -375,7 +375,7 @@ protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMe indexSettings.getSettings(), "index"); mapperService.merge(indexMetaData, MapperService.MergeReason.MAPPING_RECOVERY); SimilarityService similarityService = new SimilarityService(indexSettings, null, Collections.emptyMap()); - final Engine.Warmer warmer = reader -> {}; + final Engine.Warmer warmer = createTestWarmer(indexSettings); ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); CircuitBreakerService breakerService = new HierarchyCircuitBreakerService(nodeSettings, clusterSettings); indexShard = new IndexShard( @@ -860,4 +860,17 @@ public static Translog getTranslog(IndexShard shard) { public static ReplicationTracker getReplicationTracker(IndexShard indexShard) { return indexShard.getReplicationTracker(); } + + public static Engine.Warmer createTestWarmer(IndexSettings indexSettings) { + return reader -> { + // This isn't a warmer but sometimes verify the content in the reader + if (randomBoolean()) { + try { + EngineTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber(indexSettings, reader); + } catch (IOException e) { + throw new AssertionError(e); + } + } + }; + } } From 67badc6621b98249745711f70490cda37cf275ec Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 14 Aug 2019 12:14:47 -0400 Subject: [PATCH 6/8] close if failed to install reference --- .../elasticsearch/index/shard/IndexShard.java | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 202a362b09419..a71616be7d320 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1606,9 +1606,8 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t synchronized (engineMutex) { // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). final Engine newEngine = engineFactory.newReadWriteEngine(config); - boolean success = false; - try { - synchronized (mutex) { + synchronized (mutex) { + try { verifyNotClosed(); assert currentEngineReference.get() == null : "engine is running"; onNewEngine(newEngine); @@ -1616,11 +1615,10 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t // We set active because we are now writing operations to the engine; this way, // if we go idle after some time and become inactive, we still give sync'd flush a chance to run. active.set(true); - success = true; - } - } finally { - if (success == false) { - newEngine.close(); + } finally { + if (currentEngineReference.get() != newEngine) { + newEngine.close(); + } } } } @@ -3372,16 +3370,14 @@ public void close() throws IOException { IOUtils.close(super::close, newEngine); } }; - boolean success = false; - try { - synchronized (mutex) { + synchronized (mutex) { + try { verifyNotClosed(); IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine)); - success = true; - } - } finally { - if (success == false) { - readOnlyEngine.close(); + } finally { + if (currentEngineReference.get() != readOnlyEngine) { + readOnlyEngine.close(); + } } } newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker))); From c8b2529e6933b4ebdd0d16222421ee2a7784b764 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 14 Aug 2019 12:59:43 -0400 Subject: [PATCH 7/8] use engineMutex while resetting engine --- .../elasticsearch/index/shard/IndexShard.java | 24 +++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index a71616be7d320..e64322adf2c12 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -3345,7 +3345,10 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED new ReadOnlyEngine(newEngineConfig(replicationTracker), seqNoStats, translogStats, false, Function.identity()) { @Override public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { - synchronized (mutex) { + synchronized (engineMutex) { + if (newEngineReference.get() == null) { + throw new AlreadyClosedException("engine was closed"); + } // ignore flushFirst since we flushed above and we do not want to interfere with ongoing translog replay return newEngineReference.get().acquireLastIndexCommit(false); } @@ -3353,7 +3356,10 @@ public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { @Override public IndexCommitRef acquireSafeIndexCommit() { - synchronized (mutex) { + synchronized (engineMutex) { + if (newEngineReference.get() == null) { + throw new AlreadyClosedException("engine was closed"); + } return newEngineReference.get().acquireSafeIndexCommit(); } } @@ -3380,8 +3386,18 @@ public void close() throws IOException { } } } - newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker))); - onNewEngine(newEngineReference.get()); + final Engine newReadWriteEngine = engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)); + synchronized (mutex) { + try { + verifyNotClosed(); + newEngineReference.set(newReadWriteEngine); + onNewEngine(newReadWriteEngine); + } finally { + if (newEngineReference.get() != newReadWriteEngine) { + newReadWriteEngine.close(); + } + } + } } final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery( engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> { From e48d23df94340487fe27c2396c8286aabf43e715 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 14 Aug 2019 14:18:32 -0400 Subject: [PATCH 8/8] only verify internal engine --- .../org/elasticsearch/index/shard/IndexShard.java | 2 +- .../elasticsearch/index/engine/EngineTestCase.java | 14 ++++++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index e64322adf2c12..25598ed607cf6 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -3394,7 +3394,7 @@ public void close() throws IOException { onNewEngine(newReadWriteEngine); } finally { if (newEngineReference.get() != newReadWriteEngine) { - newReadWriteEngine.close(); + newReadWriteEngine.close(); // shard was closed } } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index fe4915fbf33d5..b67108a16c19b 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -1130,13 +1130,15 @@ public static void assertMaxSeqNoInCommitUserData(Engine engine) throws Exceptio } public static void assertAtMostOneLuceneDocumentPerSequenceNumber(Engine engine) throws IOException { - try { - engine.refresh("test"); - try (Engine.Searcher searcher = engine.acquireSearcher("test")) { - assertAtMostOneLuceneDocumentPerSequenceNumber(engine.config().getIndexSettings(), searcher.getDirectoryReader()); + if (engine instanceof InternalEngine) { + try { + engine.refresh("test"); + try (Engine.Searcher searcher = engine.acquireSearcher("test")) { + assertAtMostOneLuceneDocumentPerSequenceNumber(engine.config().getIndexSettings(), searcher.getDirectoryReader()); + } + } catch (AlreadyClosedException ignored) { + // engine was closed } - } catch (AlreadyClosedException ignored) { - } }