Skip to content

Commit 1ef87c9

Browse files
committed
Refresh should not acquire readLock (elastic#48414)
Today, we hold the engine readLock while refreshing. Although this choice simplifies the correctness reasoning, it can block IndexShard from closing if warming an external reader takes time. The current implementation of refresh does not need to hold readLock as ReferenceManager can handle errors correctly if the engine is closed in midway. This PR is a prerequisite that we need to solve elastic#47186.
1 parent 2e3db51 commit 1ef87c9

File tree

2 files changed

+46
-6
lines changed

2 files changed

+46
-6
lines changed

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1569,14 +1569,12 @@ public boolean maybeRefresh(String source) throws EngineException {
15691569
}
15701570

15711571
final boolean refresh(String source, SearcherScope scope, boolean block) throws EngineException {
1572-
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
1573-
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
15741572
// both refresh types will result in an internal refresh but only the external will also
15751573
// pass the new reader reference to the external reader manager.
15761574
final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint();
15771575
boolean refreshed;
1578-
try (ReleasableLock lock = readLock.acquire()) {
1579-
ensureOpen();
1576+
try {
1577+
// refresh does not need to hold readLock as ReferenceManager can handle correctly if the engine is closed in mid-way.
15801578
if (store.tryIncRef()) {
15811579
// increment the ref just to ensure nobody closes the store during a refresh
15821580
try {

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@
141141
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
142142
import org.elasticsearch.test.IndexSettingsModule;
143143
import org.elasticsearch.test.VersionUtils;
144+
import org.elasticsearch.threadpool.ThreadPool;
144145
import org.hamcrest.MatcherAssert;
145146

146147
import java.io.Closeable;
@@ -5777,7 +5778,7 @@ public void testMaxSeqNoInCommitUserData() throws Exception {
57775778
assertMaxSeqNoInCommitUserData(engine);
57785779
}
57795780

5780-
public void testRefreshAndFailEngineConcurrently() throws Exception {
5781+
public void testRefreshAndCloseEngineConcurrently() throws Exception {
57815782
AtomicBoolean stopped = new AtomicBoolean();
57825783
Semaphore indexedDocs = new Semaphore(0);
57835784
Thread indexer = new Thread(() -> {
@@ -5807,7 +5808,11 @@ public void testRefreshAndFailEngineConcurrently() throws Exception {
58075808
refresher.start();
58085809
indexedDocs.acquire(randomIntBetween(1, 100));
58095810
try {
5810-
engine.failEngine("test", new IOException("simulated error"));
5811+
if (randomBoolean()) {
5812+
engine.failEngine("test", new IOException("simulated error"));
5813+
} else {
5814+
engine.close();
5815+
}
58115816
} finally {
58125817
stopped.set(true);
58135818
indexer.join();
@@ -6149,4 +6154,41 @@ public void afterRefresh(boolean didRefresh) {
61496154
}
61506155
}
61516156
}
6157+
6158+
public void testRefreshDoesNotBlockClosing() throws Exception {
6159+
final CountDownLatch refreshStarted = new CountDownLatch(1);
6160+
final CountDownLatch engineClosed = new CountDownLatch(1);
6161+
final ReferenceManager.RefreshListener refreshListener = new ReferenceManager.RefreshListener() {
6162+
6163+
@Override
6164+
public void beforeRefresh() {
6165+
refreshStarted.countDown();
6166+
try {
6167+
engineClosed.await();
6168+
} catch (InterruptedException e) {
6169+
throw new AssertionError(e);
6170+
}
6171+
}
6172+
6173+
@Override
6174+
public void afterRefresh(boolean didRefresh) {
6175+
assertFalse(didRefresh);
6176+
}
6177+
};
6178+
try (Store store = createStore()) {
6179+
final EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null,
6180+
refreshListener, null, null, engine.config().getCircuitBreakerService());
6181+
try (InternalEngine engine = createEngine(config)) {
6182+
if (randomBoolean()) {
6183+
engine.index(indexForDoc(createParsedDoc("id", null)));
6184+
}
6185+
threadPool.executor(ThreadPool.Names.REFRESH).execute(() ->
6186+
expectThrows(AlreadyClosedException.class,
6187+
() -> engine.refresh("test", randomFrom(Engine.SearcherScope.values()), true)));
6188+
refreshStarted.await();
6189+
engine.close();
6190+
engineClosed.countDown();
6191+
}
6192+
}
6193+
}
61526194
}

0 commit comments

Comments
 (0)