From c5509c46c5633ff6aca661074f8c81011d2a26a2 Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Thu, 18 Aug 2022 15:31:27 +0300 Subject: [PATCH 1/7] Inactive shard flush should wait for ongoing one org.elasticsearch.indices.flush.FlushIT#testFlushOnInactive would sometimes fail in the following case: * SHARD_MEMORY_INTERVAL_TIME_SETTING is set very low, e.g., 10ms * The regularly scheduled multiple flushes proceed to org.elasticsearch.index.shard.IndexShard#flushOnIdle * There, the first flush will handle e.g., the first document that was indexed. The second flush will arrive shortly after, before the first flush finishes. * The second flush will find that wasActive = true (due to the indexing of the remaining documents), and will set it to false. * However, the second flush will not be executed because waitIfOngoing = false, and there is the ongoing first flush. * No other flush is scheduled (since any next regularly scheduled flush will find wasActive = false), which creates the problem. Solution: if a flush request does not happen, revert active flag, so that a next flush request can happen. Fixes #87888 --- .../diskusage/IndexDiskUsageAnalyzerIT.java | 4 ++-- .../org/elasticsearch/index/engine/Engine.java | 3 ++- .../index/engine/InternalEngine.java | 5 +++-- .../index/engine/ReadOnlyEngine.java | 4 ++-- .../elasticsearch/index/shard/IndexShard.java | 18 +++++++++++++----- 5 files changed, 22 insertions(+), 12 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/diskusage/IndexDiskUsageAnalyzerIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/diskusage/IndexDiskUsageAnalyzerIT.java index 424a06dddf84c..135b02305c67a 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/diskusage/IndexDiskUsageAnalyzerIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/diskusage/IndexDiskUsageAnalyzerIT.java @@ -66,12 +66,12 @@ public static class EngineTestPlugin extends Plugin implements EnginePlugin { public Optional getEngineFactory(IndexSettings indexSettings) { return Optional.of(config -> new InternalEngine(config) { @Override - public void flush(boolean force, boolean waitIfOngoing) throws EngineException { + public boolean flush(boolean force, boolean waitIfOngoing) throws EngineException { final ShardId shardId = config.getShardId(); if (failOnFlushShards.contains(shardId)) { throw new EngineException(shardId, "simulated IO"); } - super.flush(force, waitIfOngoing); + return super.flush(force, waitIfOngoing); } }); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 0b1e5902370d9..9658c141ea16b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1015,8 +1015,9 @@ public boolean refreshNeeded() { * @param force if true a lucene commit is executed even if no changes need to be committed. * @param waitIfOngoing if true this call will block until all currently running flushes have finished. * Otherwise this call will return without blocking. + * @return true if the flush happened, else false (e.g., if it did not wait for an ongoing request) */ - public abstract void flush(boolean force, boolean waitIfOngoing) throws EngineException; + public abstract boolean flush(boolean force, boolean waitIfOngoing) throws EngineException; /** * Flushes the state of the engine including the transaction log, clearing memory and persisting diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 7a70b5ae6b8cc..feb47c1d31a79 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1895,7 +1895,7 @@ public boolean shouldPeriodicallyFlush() { } @Override - public void flush(boolean force, boolean waitIfOngoing) throws EngineException { + public boolean flush(boolean force, boolean waitIfOngoing) throws EngineException { ensureOpen(); if (force && waitIfOngoing == false) { assert false : "wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing; @@ -1908,7 +1908,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { if (flushLock.tryLock() == false) { // if we can't get the lock right away we block if needed otherwise barf if (waitIfOngoing == false) { - return; + return false; } logger.trace("waiting for in-flight flush to finish"); flushLock.lock(); @@ -1967,6 +1967,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { if (engineConfig.isEnableGcDeletes()) { pruneDeletedTombstones(); } + return true; } private void refreshLastCommittedSegmentInfos() { diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index fbb7ef439d027..e47c3cf984c2b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -437,8 +437,8 @@ public boolean shouldPeriodicallyFlush() { } @Override - public void flush(boolean force, boolean waitIfOngoing) throws EngineException { - // noop + public boolean flush(boolean force, boolean waitIfOngoing) throws EngineException { + return true; // noop } @Override diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index cb24bd2b9c6e1..6ea720f9efe27 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1343,8 +1343,9 @@ public BulkStats bulkStats() { * Executes the given flush request against the engine. * * @param request the flush request + * @return true if the flush happened, else false (e.g., if it did not wait for an ongoing request) */ - public void flush(FlushRequest request) { + public boolean flush(FlushRequest request) { final boolean waitIfOngoing = request.waitIfOngoing(); final boolean force = request.force(); logger.trace("flush with {}", request); @@ -1355,8 +1356,11 @@ public void flush(FlushRequest request) { */ verifyNotClosed(); final long time = System.nanoTime(); - getEngine().flush(force, waitIfOngoing); - flushMetric.inc(System.nanoTime() - time); + boolean flushHappened = getEngine().flush(force, waitIfOngoing); + if (flushHappened) { + flushMetric.inc(System.nanoTime() - time); + } + return flushHappened; } /** @@ -2188,8 +2192,12 @@ public void onFailure(Exception e) { @Override protected void doRun() { - flush(new FlushRequest().waitIfOngoing(false).force(false)); - periodicFlushMetric.inc(); + if (flush(new FlushRequest().waitIfOngoing(false).force(false))) { + periodicFlushMetric.inc(); + } else { + // In case the flush did not happen, revert active flag so that a next flushOnIdle request can happen (#87888) + active.set(true); + } } }); } From a1285b30bc8b8a4143f0c91c39b4077d50a6e037 Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Thu, 18 Aug 2022 16:44:41 +0300 Subject: [PATCH 2/7] Fix test --- .../close/TransportVerifyShardBeforeCloseActionTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index 51db5b4f7c648..8dd0e89e1cbbe 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -62,7 +62,7 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -170,7 +170,7 @@ private void executeOnPrimaryOrReplica(boolean phase1) throws Throwable { public void testShardIsFlushed() throws Throwable { final ArgumentCaptor flushRequest = ArgumentCaptor.forClass(FlushRequest.class); - doNothing().when(indexShard).flush(flushRequest.capture()); + doReturn(true).when(indexShard).flush(flushRequest.capture()); executeOnPrimaryOrReplica(); verify(indexShard, times(1)).flush(any(FlushRequest.class)); assertThat(flushRequest.getValue().force(), is(true)); From bdb7a4b19f452acc081fd44eb147cfe9114abf08 Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Thu, 18 Aug 2022 19:14:55 +0300 Subject: [PATCH 3/7] New test for concurrent flushes And fix some PR review feedback --- .../elasticsearch/index/engine/Engine.java | 2 +- .../index/engine/InternalEngine.java | 2 + .../elasticsearch/index/shard/IndexShard.java | 11 ++-- .../index/shard/IndexShardTests.java | 58 +++++++++++++++++++ 4 files changed, 65 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 9658c141ea16b..31adaf7358c3a 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1015,7 +1015,7 @@ public boolean refreshNeeded() { * @param force if true a lucene commit is executed even if no changes need to be committed. * @param waitIfOngoing if true this call will block until all currently running flushes have finished. * Otherwise this call will return without blocking. - * @return true if the flush happened, else false (e.g., if it did not wait for an ongoing request) + * @return false if the flush did not wait for an ongoing request and returned, else true */ public abstract boolean flush(boolean force, boolean waitIfOngoing) throws EngineException; diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index feb47c1d31a79..6a2d887d7f7e4 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1908,6 +1908,7 @@ public boolean flush(boolean force, boolean waitIfOngoing) throws EngineExceptio if (flushLock.tryLock() == false) { // if we can't get the lock right away we block if needed otherwise barf if (waitIfOngoing == false) { + logger.trace("returning as there is an in-flight flush that we do not need to wait for"); return false; } logger.trace("waiting for in-flight flush to finish"); @@ -1960,6 +1961,7 @@ public boolean flush(boolean force, boolean waitIfOngoing) throws EngineExceptio throw ex; } finally { flushLock.unlock(); + logger.trace("released flush lock"); } } // We don't have to do this here; we do it defensively to make sure that even if wall clock time is misbehaving diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 6ea720f9efe27..4a6be7a03208d 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1343,7 +1343,7 @@ public BulkStats bulkStats() { * Executes the given flush request against the engine. * * @param request the flush request - * @return true if the flush happened, else false (e.g., if it did not wait for an ongoing request) + * @return false if the flush did not wait for an ongoing request and returned, else true */ public boolean flush(FlushRequest request) { final boolean waitIfOngoing = request.waitIfOngoing(); @@ -1357,9 +1357,7 @@ public boolean flush(FlushRequest request) { verifyNotClosed(); final long time = System.nanoTime(); boolean flushHappened = getEngine().flush(force, waitIfOngoing); - if (flushHappened) { - flushMetric.inc(System.nanoTime() - time); - } + flushMetric.inc(System.nanoTime() - time); return flushHappened; } @@ -2192,12 +2190,11 @@ public void onFailure(Exception e) { @Override protected void doRun() { - if (flush(new FlushRequest().waitIfOngoing(false).force(false))) { - periodicFlushMetric.inc(); - } else { + if (flush(new FlushRequest().waitIfOngoing(false).force(false)) == false) { // In case the flush did not happen, revert active flag so that a next flushOnIdle request can happen (#87888) active.set(true); } + periodicFlushMetric.inc(); } }); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 91f7d262cec7a..575f05c630838 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -116,6 +116,7 @@ import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.FieldMaskingReader; import org.elasticsearch.test.MockLogAppender; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.store.MockFSDirectoryFactory; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.NamedXContentRegistry; @@ -3897,6 +3898,63 @@ public void testFlushOnIdle() throws Exception { closeShards(shard); } + @TestLogging(reason = "testing traces of concurrent flushes", value = "org.elasticsearch.index.engine.Engine:TRACE") + public void testFlushOnIdleConcurrentFlushDoesNotWait() throws Exception { + final MockLogAppender mockLogAppender = new MockLogAppender(); + try { + IndexShard shard = newStartedShard(); + final int manyDocs = randomIntBetween(50, 100); + for (int i = 0; i < manyDocs; i++) { + indexDoc(shard, "_doc", Integer.toString(i)); + } + + mockLogAppender.start(); + Loggers.addAppender(LogManager.getLogger(Engine.class), mockLogAppender); + + shard.flushOnIdle(0); // flush happens in the background using the flush threadpool + assertFalse(shard.isActive()); + + // Wait for log message that flush acquired lock immediately + mockLogAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "should see first flush getting lock immediately", + Engine.class.getCanonicalName(), + Level.TRACE, + "acquired flush lock immediately" + ) + ); + assertBusy(mockLogAppender::assertAllExpectationsMatched); + + // While the previous flushOnIdle request is happening, immediately issue a second flush request with waitIfOngoing=false + mockLogAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "should see second flush returning since it will not wait for the ongoing flush", + Engine.class.getCanonicalName(), + Level.TRACE, + "returning as there is an in-flight flush that we do not need to wait for" + ) + ); + assertFalse(shard.flush(new FlushRequest().waitIfOngoing(false).force(false))); + mockLogAppender.assertAllExpectationsMatched(); + + // Wait for first flush to log a message that it released the flush lock + mockLogAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "should see first flush releasing lock", + Engine.class.getCanonicalName(), + Level.TRACE, + "released flush lock" + ) + ); + assertBusy(mockLogAppender::assertAllExpectationsMatched); + + closeShards(shard); + } finally { + Loggers.removeAppender(LogManager.getLogger(Engine.class), mockLogAppender); + mockLogAppender.stop(); + } + } + public void testOnCloseStats() throws IOException { final IndexShard indexShard = newStartedShard(true); From d1cae136bf65384ff16cfe71eb9239acdcf8d709 Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Mon, 22 Aug 2022 11:07:38 +0300 Subject: [PATCH 4/7] Apply suggestions from code review Co-authored-by: Henning Andersen <33268011+henningandersen@users.noreply.github.com> --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 2 +- .../main/java/org/elasticsearch/index/shard/IndexShard.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 6a2d887d7f7e4..bfe94bbb11bc5 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1908,7 +1908,7 @@ public boolean flush(boolean force, boolean waitIfOngoing) throws EngineExceptio if (flushLock.tryLock() == false) { // if we can't get the lock right away we block if needed otherwise barf if (waitIfOngoing == false) { - logger.trace("returning as there is an in-flight flush that we do not need to wait for"); + logger.trace("detected an in-flight flush, not blocking to wait for it's completion"); return false; } logger.trace("waiting for in-flight flush to finish"); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 4a6be7a03208d..879eb6cae49f1 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1343,7 +1343,7 @@ public BulkStats bulkStats() { * Executes the given flush request against the engine. * * @param request the flush request - * @return false if the flush did not wait for an ongoing request and returned, else true + * @return false if waitIfOngoing==true and an ongoing request is detected, else true. If false is returned, no flush happened. */ public boolean flush(FlushRequest request) { final boolean waitIfOngoing = request.waitIfOngoing(); @@ -2191,7 +2191,7 @@ public void onFailure(Exception e) { @Override protected void doRun() { if (flush(new FlushRequest().waitIfOngoing(false).force(false)) == false) { - // In case the flush did not happen, revert active flag so that a next flushOnIdle request can happen (#87888) + // In case an ongoing flush was detected, revert active flag so that a next flushOnIdle request will retry (#87888) active.set(true); } periodicFlushMetric.inc(); From 92e9b3f9f497941f55bb0259eb130653c3558552 Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Mon, 22 Aug 2022 12:22:03 +0300 Subject: [PATCH 5/7] Fix test and comments --- .../elasticsearch/index/shard/IndexShard.java | 6 +++-- .../index/shard/IndexShardTests.java | 24 +++++++++++++++---- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 879eb6cae49f1..adb54cfa01c3a 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1343,7 +1343,8 @@ public BulkStats bulkStats() { * Executes the given flush request against the engine. * * @param request the flush request - * @return false if waitIfOngoing==true and an ongoing request is detected, else true. If false is returned, no flush happened. + * @return false if waitIfOngoing==true and an ongoing request is detected, else true. + * If false is returned, no flush happened. */ public boolean flush(FlushRequest request) { final boolean waitIfOngoing = request.waitIfOngoing(); @@ -2191,7 +2192,8 @@ public void onFailure(Exception e) { @Override protected void doRun() { if (flush(new FlushRequest().waitIfOngoing(false).force(false)) == false) { - // In case an ongoing flush was detected, revert active flag so that a next flushOnIdle request will retry (#87888) + // In case an ongoing flush was detected, revert active flag so that a next flushOnIdle request + // will retry (#87888) active.set(true); } periodicFlushMetric.inc(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 575f05c630838..a57b7a40e0dd3 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -14,6 +14,7 @@ import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; +import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.Term; import org.apache.lucene.search.TermQuery; @@ -3902,8 +3903,20 @@ public void testFlushOnIdle() throws Exception { public void testFlushOnIdleConcurrentFlushDoesNotWait() throws Exception { final MockLogAppender mockLogAppender = new MockLogAppender(); try { - IndexShard shard = newStartedShard(); - final int manyDocs = randomIntBetween(50, 100); + CountDownLatch readyToCompleteFlushLatch = new CountDownLatch(1); + IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> new InternalEngine(config) { + @Override + protected void commitIndexWriter(final IndexWriter writer, final Translog translog) throws IOException { + try { + readyToCompleteFlushLatch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + super.commitIndexWriter(writer, translog); + } + }); + + final int manyDocs = randomIntBetween(1, 5); for (int i = 0; i < manyDocs; i++) { indexDoc(shard, "_doc", Integer.toString(i)); } @@ -3925,18 +3938,21 @@ public void testFlushOnIdleConcurrentFlushDoesNotWait() throws Exception { ); assertBusy(mockLogAppender::assertAllExpectationsMatched); - // While the previous flushOnIdle request is happening, immediately issue a second flush request with waitIfOngoing=false + // While the previous flushOnIdle request is happening, issue a second flush request with waitIfOngoing=false mockLogAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "should see second flush returning since it will not wait for the ongoing flush", Engine.class.getCanonicalName(), Level.TRACE, - "returning as there is an in-flight flush that we do not need to wait for" + "detected an in-flight flush, not blocking to wait for it's completion" ) ); assertFalse(shard.flush(new FlushRequest().waitIfOngoing(false).force(false))); mockLogAppender.assertAllExpectationsMatched(); + // Allow first flush to complete + readyToCompleteFlushLatch.countDown(); + // Wait for first flush to log a message that it released the flush lock mockLogAppender.addExpectation( new MockLogAppender.SeenEventExpectation( From d23a14515a60feb24db81c9040bad3b1c29d3e78 Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Mon, 22 Aug 2022 16:44:49 +0300 Subject: [PATCH 6/7] Test active flag as well Fix some javadoc --- .../elasticsearch/index/engine/Engine.java | 3 ++- .../elasticsearch/index/shard/IndexShard.java | 4 ++-- .../index/shard/IndexShardTests.java | 19 +++++++++++++++---- 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 31adaf7358c3a..952ef783a2ea5 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1015,7 +1015,8 @@ public boolean refreshNeeded() { * @param force if true a lucene commit is executed even if no changes need to be committed. * @param waitIfOngoing if true this call will block until all currently running flushes have finished. * Otherwise this call will return without blocking. - * @return false if the flush did not wait for an ongoing request and returned, else true + * @return false if waitIfOngoing==false and an ongoing request is detected, else true. + * If false is returned, no flush happened. */ public abstract boolean flush(boolean force, boolean waitIfOngoing) throws EngineException; diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index adb54cfa01c3a..e3f2ea350a8c6 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1343,8 +1343,8 @@ public BulkStats bulkStats() { * Executes the given flush request against the engine. * * @param request the flush request - * @return false if waitIfOngoing==true and an ongoing request is detected, else true. - * If false is returned, no flush happened. + * @return false if waitIfOngoing==false and an ongoing request is detected, else true. + * If false is returned, no flush happened. */ public boolean flush(FlushRequest request) { final boolean waitIfOngoing = request.waitIfOngoing(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index a57b7a40e0dd3..fcb17db9319d0 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -3916,8 +3916,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl } }); - final int manyDocs = randomIntBetween(1, 5); - for (int i = 0; i < manyDocs; i++) { + for (int i = 0; i < 3; i++) { indexDoc(shard, "_doc", Integer.toString(i)); } @@ -3938,7 +3937,10 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl ); assertBusy(mockLogAppender::assertAllExpectationsMatched); - // While the previous flushOnIdle request is happening, issue a second flush request with waitIfOngoing=false + // While the first flush is happening, index one more doc (to turn the shard's active flag to true), + // and issue a second flushOnIdle request which should not wait for the ongoing flush + indexDoc(shard, "_doc", Integer.toString(3)); + assertTrue(shard.isActive()); mockLogAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "should see second flush returning since it will not wait for the ongoing flush", @@ -3947,8 +3949,11 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl "detected an in-flight flush, not blocking to wait for it's completion" ) ); + shard.flushOnIdle(0); + assertBusy(mockLogAppender::assertAllExpectationsMatched); + + // A direct call to flush (with waitIfOngoing=false) should not wait and return false immediately assertFalse(shard.flush(new FlushRequest().waitIfOngoing(false).force(false))); - mockLogAppender.assertAllExpectationsMatched(); // Allow first flush to complete readyToCompleteFlushLatch.countDown(); @@ -3964,6 +3969,12 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl ); assertBusy(mockLogAppender::assertAllExpectationsMatched); + // The second flush (that did not happen) should have returned false and turned the active flag to true + assertTrue(shard.isActive()); + + // After all the previous flushes, issue a final flush (for any remaining documents) that should return true + assertTrue(shard.flush(new FlushRequest())); + closeShards(shard); } finally { Loggers.removeAppender(LogManager.getLogger(Engine.class), mockLogAppender); From 7531b4fec0374383998bf1e50050ac96d8f069a7 Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Mon, 22 Aug 2022 17:32:01 +0300 Subject: [PATCH 7/7] Improve test comments --- .../elasticsearch/index/shard/IndexShardTests.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index fcb17db9319d0..1ce2685952d25 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -3923,7 +3923,8 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl mockLogAppender.start(); Loggers.addAppender(LogManager.getLogger(Engine.class), mockLogAppender); - shard.flushOnIdle(0); // flush happens in the background using the flush threadpool + // Issue the first flushOnIdle request. The flush happens in the background using the flush threadpool. + shard.flushOnIdle(0); assertFalse(shard.isActive()); // Wait for log message that flush acquired lock immediately @@ -3955,10 +3956,10 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl // A direct call to flush (with waitIfOngoing=false) should not wait and return false immediately assertFalse(shard.flush(new FlushRequest().waitIfOngoing(false).force(false))); - // Allow first flush to complete + // Allow first flushOnIdle to complete readyToCompleteFlushLatch.countDown(); - // Wait for first flush to log a message that it released the flush lock + // Wait for first flushOnIdle to log a message that it released the flush lock mockLogAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "should see first flush releasing lock", @@ -3969,10 +3970,10 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl ); assertBusy(mockLogAppender::assertAllExpectationsMatched); - // The second flush (that did not happen) should have returned false and turned the active flag to true + // The second flushOnIdle (that did not happen) should have turned the active flag to true assertTrue(shard.isActive()); - // After all the previous flushes, issue a final flush (for any remaining documents) that should return true + // After all the previous flushes are done, issue a final flush (for any remaining documents) that should return true assertTrue(shard.flush(new FlushRequest())); closeShards(shard);