Skip to content

Commit 4b893c1

Browse files
authored
TEST: Retry synced-flush if ongoing ops on primary (#30978)
When the last indexing operation is completed, we will fire a global checkpoint sync. Since a global checkpoint sync request is a replication request, it will acquire an index shard permit on the primary when executing. If this happens at the same time while we are issuing the synced-flush, the synced-flush request will fail as it thinks there are in-flight operations. We can avoid such situation by retrying another synced-flush if the current request fails due to ongoing operations on the primary. Closes #29392
1 parent 1af6d20 commit 4b893c1

File tree

3 files changed

+26
-45
lines changed

3 files changed

+26
-45
lines changed

server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.elasticsearch.indices.flush;
2020

2121
import org.apache.logging.log4j.message.ParameterizedMessage;
22-
import org.elasticsearch.Assertions;
2322
import org.elasticsearch.ElasticsearchException;
2423
import org.elasticsearch.Version;
2524
import org.elasticsearch.action.ActionListener;
@@ -502,18 +501,7 @@ private InFlightOpsResponse performInFlightOps(InFlightOpsRequest request) {
502501
if (indexShard.routingEntry().primary() == false) {
503502
throw new IllegalStateException("[" + request.shardId() +"] expected a primary shard");
504503
}
505-
if (Assertions.ENABLED) {
506-
if (logger.isTraceEnabled()) {
507-
logger.trace("in flight operations {}, acquirers {}", indexShard.getActiveOperationsCount(), indexShard.getActiveOperations());
508-
}
509-
}
510504
int opCount = indexShard.getActiveOperationsCount();
511-
// Need to snapshot the debug info twice as it's updated concurrently with the permit count.
512-
if (Assertions.ENABLED) {
513-
if (logger.isTraceEnabled()) {
514-
logger.trace("in flight operations {}, acquirers {}", indexShard.getActiveOperationsCount(), indexShard.getActiveOperations());
515-
}
516-
}
517505
return new InFlightOpsResponse(opCount);
518506
}
519507

server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,13 @@
4646
import org.elasticsearch.index.shard.ShardId;
4747
import org.elasticsearch.indices.IndicesService;
4848
import org.elasticsearch.test.ESIntegTestCase;
49-
import org.elasticsearch.test.junit.annotations.TestLogging;
5049

5150
import java.io.IOException;
5251
import java.util.Arrays;
5352
import java.util.List;
54-
import java.util.Locale;
5553
import java.util.Map;
5654
import java.util.concurrent.CopyOnWriteArrayList;
5755
import java.util.concurrent.CountDownLatch;
58-
import java.util.concurrent.ExecutionException;
5956
import java.util.concurrent.atomic.AtomicBoolean;
6057
import java.util.concurrent.atomic.AtomicInteger;
6158
import java.util.stream.Collectors;
@@ -103,7 +100,7 @@ public void onFailure(Exception e) {
103100
}
104101
}
105102

106-
public void testSyncedFlush() throws ExecutionException, InterruptedException, IOException {
103+
public void testSyncedFlush() throws Exception {
107104
internalCluster().ensureAtLeastNumDataNodes(2);
108105
prepareCreate("test").setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)).get();
109106
ensureGreen();
@@ -246,16 +243,6 @@ private void indexDoc(Engine engine, String id) throws IOException {
246243
assertThat(indexResult.getFailure(), nullValue());
247244
}
248245

249-
private String syncedFlushDescription(ShardsSyncedFlushResult result) {
250-
String detail = result.shardResponses().entrySet().stream()
251-
.map(e -> "Shard [" + e.getKey() + "], result [" + e.getValue() + "]")
252-
.collect(Collectors.joining(","));
253-
return String.format(Locale.ROOT, "Total shards: [%d], failed: [%s], reason: [%s], detail: [%s]",
254-
result.totalShards(), result.failed(), result.failureReason(), detail);
255-
}
256-
257-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/29392")
258-
@TestLogging("_root:DEBUG,org.elasticsearch.indices.flush:TRACE")
259246
public void testSyncedFlushSkipOutOfSyncReplicas() throws Exception {
260247
internalCluster().ensureAtLeastNumDataNodes(between(2, 3));
261248
final int numberOfReplicas = internalCluster().numDataNodes() - 1;
@@ -281,7 +268,6 @@ public void testSyncedFlushSkipOutOfSyncReplicas() throws Exception {
281268
indexDoc(IndexShardTestCase.getEngine(outOfSyncReplica), "extra_" + i);
282269
}
283270
final ShardsSyncedFlushResult partialResult = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
284-
logger.info("Partial seal: {}", syncedFlushDescription(partialResult));
285271
assertThat(partialResult.totalShards(), equalTo(numberOfReplicas + 1));
286272
assertThat(partialResult.successfulShards(), equalTo(numberOfReplicas));
287273
assertThat(partialResult.shardResponses().get(outOfSyncReplica.routingEntry()).failureReason, equalTo(
@@ -297,8 +283,6 @@ public void testSyncedFlushSkipOutOfSyncReplicas() throws Exception {
297283
assertThat(fullResult.successfulShards(), equalTo(numberOfReplicas + 1));
298284
}
299285

300-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/29392")
301-
@TestLogging("_root:DEBUG,org.elasticsearch.indices.flush:TRACE")
302286
public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception {
303287
internalCluster().ensureAtLeastNumDataNodes(between(2, 3));
304288
final int numberOfReplicas = internalCluster().numDataNodes() - 1;
@@ -315,11 +299,9 @@ public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception {
315299
index("test", "doc", Integer.toString(i));
316300
}
317301
final ShardsSyncedFlushResult firstSeal = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
318-
logger.info("First seal: {}", syncedFlushDescription(firstSeal));
319302
assertThat(firstSeal.successfulShards(), equalTo(numberOfReplicas + 1));
320303
// Do not renew synced-flush
321304
final ShardsSyncedFlushResult secondSeal = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
322-
logger.info("Second seal: {}", syncedFlushDescription(secondSeal));
323305
assertThat(secondSeal.successfulShards(), equalTo(numberOfReplicas + 1));
324306
assertThat(secondSeal.syncId(), equalTo(firstSeal.syncId()));
325307
// Shards were updated, renew synced flush.
@@ -328,7 +310,6 @@ public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception {
328310
index("test", "doc", Integer.toString(i));
329311
}
330312
final ShardsSyncedFlushResult thirdSeal = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
331-
logger.info("Third seal: {}", syncedFlushDescription(thirdSeal));
332313
assertThat(thirdSeal.successfulShards(), equalTo(numberOfReplicas + 1));
333314
assertThat(thirdSeal.syncId(), not(equalTo(firstSeal.syncId())));
334315
// Manually remove or change sync-id, renew synced flush.
@@ -344,7 +325,6 @@ public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception {
344325
assertThat(shard.commitStats().syncId(), nullValue());
345326
}
346327
final ShardsSyncedFlushResult forthSeal = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
347-
logger.info("Forth seal: {}", syncedFlushDescription(forthSeal));
348328
assertThat(forthSeal.successfulShards(), equalTo(numberOfReplicas + 1));
349329
assertThat(forthSeal.syncId(), not(equalTo(thirdSeal.syncId())));
350330
}

server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929
import java.util.List;
3030
import java.util.Map;
3131
import java.util.concurrent.CountDownLatch;
32+
import java.util.concurrent.atomic.AtomicReference;
33+
34+
import static org.elasticsearch.test.ESTestCase.assertBusy;
3235

3336
/** Utils for SyncedFlush */
3437
public class SyncedFlushUtil {
@@ -40,21 +43,31 @@ private SyncedFlushUtil() {
4043
/**
4144
* Blocking version of {@link SyncedFlushService#attemptSyncedFlush(ShardId, ActionListener)}
4245
*/
43-
public static ShardsSyncedFlushResult attemptSyncedFlush(Logger logger, InternalTestCluster cluster, ShardId shardId) {
46+
public static ShardsSyncedFlushResult attemptSyncedFlush(Logger logger, InternalTestCluster cluster, ShardId shardId) throws Exception {
47+
/*
48+
* When the last indexing operation is completed, we will fire a global checkpoint sync.
49+
* Since a global checkpoint sync request is a replication request, it will acquire an index
50+
* shard permit on the primary when executing. If this happens at the same time while we are
51+
* issuing the synced-flush, the synced-flush request will fail as it thinks there are
52+
* in-flight operations. We can avoid such situation by continuing issuing another synced-flush
53+
* if the synced-flush failed due to the ongoing operations on the primary.
54+
*/
4455
SyncedFlushService service = cluster.getInstance(SyncedFlushService.class);
45-
logger.debug("Issue synced-flush on node [{}], shard [{}], cluster state [{}]",
46-
service.nodeName(), shardId, cluster.clusterService(service.nodeName()).state());
47-
LatchedListener<ShardsSyncedFlushResult> listener = new LatchedListener<>();
48-
service.attemptSyncedFlush(shardId, listener);
49-
try {
56+
AtomicReference<LatchedListener<ShardsSyncedFlushResult>> listenerHolder = new AtomicReference<>();
57+
assertBusy(() -> {
58+
LatchedListener<ShardsSyncedFlushResult> listener = new LatchedListener<>();
59+
listenerHolder.set(listener);
60+
service.attemptSyncedFlush(shardId, listener);
5061
listener.latch.await();
51-
} catch (InterruptedException e) {
52-
Thread.currentThread().interrupt();
62+
if (listener.result != null && listener.result.failureReason() != null
63+
&& listener.result.failureReason().contains("ongoing operations on primary")) {
64+
throw new AssertionError(listener.result.failureReason()); // cause the assert busy to retry
65+
}
66+
});
67+
if (listenerHolder.get().error != null) {
68+
throw ExceptionsHelper.convertToElastic(listenerHolder.get().error);
5369
}
54-
if (listener.error != null) {
55-
throw ExceptionsHelper.convertToElastic(listener.error);
56-
}
57-
return listener.result;
70+
return listenerHolder.get().result;
5871
}
5972

6073
public static final class LatchedListener<T> implements ActionListener<T> {

0 commit comments

Comments
 (0)