Skip to content

Commit 379e847

Browse files
authored
Refresh should not acquire readLock (#48414)
Today, we hold the engine readLock while refreshing. Although this choice simplifies the correctness reasoning, it can block IndexShard from closing if warming an external reader takes time. The current implementation of refresh does not need to hold readLock as ReferenceManager can handle errors correctly if the engine is closed in midway. This PR is a prerequisite that we need to solve #47186.
1 parent 27c1902 commit 379e847

File tree

2 files changed

+46
-6
lines changed

2 files changed

+46
-6
lines changed

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -1563,14 +1563,12 @@ public boolean maybeRefresh(String source) throws EngineException {
15631563
}
15641564

15651565
final boolean refresh(String source, SearcherScope scope, boolean block) throws EngineException {
1566-
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
1567-
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
15681566
// both refresh types will result in an internal refresh but only the external will also
15691567
// pass the new reader reference to the external reader manager.
15701568
final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint();
15711569
boolean refreshed;
1572-
try (ReleasableLock lock = readLock.acquire()) {
1573-
ensureOpen();
1570+
try {
1571+
// refresh does not need to hold readLock as ReferenceManager can handle correctly if the engine is closed in mid-way.
15741572
if (store.tryIncRef()) {
15751573
// increment the ref just to ensure nobody closes the store during a refresh
15761574
try {

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

+44-2
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@
136136
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
137137
import org.elasticsearch.test.IndexSettingsModule;
138138
import org.elasticsearch.test.VersionUtils;
139+
import org.elasticsearch.threadpool.ThreadPool;
139140
import org.hamcrest.MatcherAssert;
140141

141142
import java.io.Closeable;
@@ -5761,7 +5762,7 @@ public void testMaxSeqNoInCommitUserData() throws Exception {
57615762
assertMaxSeqNoInCommitUserData(engine);
57625763
}
57635764

5764-
public void testRefreshAndFailEngineConcurrently() throws Exception {
5765+
public void testRefreshAndCloseEngineConcurrently() throws Exception {
57655766
AtomicBoolean stopped = new AtomicBoolean();
57665767
Semaphore indexedDocs = new Semaphore(0);
57675768
Thread indexer = new Thread(() -> {
@@ -5791,7 +5792,11 @@ public void testRefreshAndFailEngineConcurrently() throws Exception {
57915792
refresher.start();
57925793
indexedDocs.acquire(randomIntBetween(1, 100));
57935794
try {
5794-
engine.failEngine("test", new IOException("simulated error"));
5795+
if (randomBoolean()) {
5796+
engine.failEngine("test", new IOException("simulated error"));
5797+
} else {
5798+
engine.close();
5799+
}
57955800
} finally {
57965801
stopped.set(true);
57975802
indexer.join();
@@ -6133,4 +6138,41 @@ public void afterRefresh(boolean didRefresh) {
61336138
}
61346139
}
61356140
}
6141+
6142+
public void testRefreshDoesNotBlockClosing() throws Exception {
6143+
final CountDownLatch refreshStarted = new CountDownLatch(1);
6144+
final CountDownLatch engineClosed = new CountDownLatch(1);
6145+
final ReferenceManager.RefreshListener refreshListener = new ReferenceManager.RefreshListener() {
6146+
6147+
@Override
6148+
public void beforeRefresh() {
6149+
refreshStarted.countDown();
6150+
try {
6151+
engineClosed.await();
6152+
} catch (InterruptedException e) {
6153+
throw new AssertionError(e);
6154+
}
6155+
}
6156+
6157+
@Override
6158+
public void afterRefresh(boolean didRefresh) {
6159+
assertFalse(didRefresh);
6160+
}
6161+
};
6162+
try (Store store = createStore()) {
6163+
final EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null,
6164+
refreshListener, null, null, engine.config().getCircuitBreakerService());
6165+
try (InternalEngine engine = createEngine(config)) {
6166+
if (randomBoolean()) {
6167+
engine.index(indexForDoc(createParsedDoc("id", null)));
6168+
}
6169+
threadPool.executor(ThreadPool.Names.REFRESH).execute(() ->
6170+
expectThrows(AlreadyClosedException.class,
6171+
() -> engine.refresh("test", randomFrom(Engine.SearcherScope.values()), true)));
6172+
refreshStarted.await();
6173+
engine.close();
6174+
engineClosed.countDown();
6175+
}
6176+
}
6177+
}
61366178
}

0 commit comments

Comments
 (0)