Skip to content

Commit cdcfcfb

Browse files
Ensure MockRepository is Unblocked on Node Close (elastic#62711)
`RepositoriesService#doClose` was never called which lead to mock repositories not unblocking until the `ThreadPool` interrupts all threads. Thus stopping a node that is blocked on a mock repository operation wastes `10s` in each test that does it (which is quite a few as it turns out).
1 parent 265387f commit cdcfcfb

File tree

5 files changed

+16
-7
lines changed

5 files changed

+16
-7
lines changed

server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,12 @@ public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase
4242
public void testMasterFailoverDuringCleanup() throws Exception {
4343
startBlockedCleanup("test-repo");
4444

45+
final int nodeCount = internalCluster().numDataAndMasterNodes();
4546
logger.info("--> stopping master node");
4647
internalCluster().stopCurrentMasterNode();
4748

49+
ensureStableCluster(nodeCount - 1);
50+
4851
logger.info("--> wait for cleanup to finish and disappear from cluster state");
4952
assertBusy(() -> {
5053
RepositoryCleanupInProgress cleanupInProgress =
@@ -102,6 +105,8 @@ private String startBlockedCleanup(String repoName) throws Exception {
102105

103106
logger.info("--> waiting for block to kick in on " + masterNode);
104107
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(60));
108+
awaitClusterState(state ->
109+
state.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY).hasCleanupInProgress());
105110
return masterNode;
106111
}
107112

server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@
6161
import java.util.List;
6262
import java.util.Locale;
6363
import java.util.concurrent.TimeUnit;
64-
import java.util.function.Predicate;
6564

6665
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
6766
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
@@ -1278,10 +1277,6 @@ private ActionFuture<CreateSnapshotResponse> startFullSnapshot(String repoName,
12781277
.setPartial(partial).execute();
12791278
}
12801279

1281-
private void awaitClusterState(Predicate<ClusterState> statePredicate) throws Exception {
1282-
awaitClusterState(internalCluster().getMasterName(), statePredicate);
1283-
}
1284-
12851280
// Large snapshot pool settings to set up nodes for tests involving multiple repositories that need to have enough
12861281
// threads so that blocking some threads on one repository doesn't block other repositories from doing work
12871282
private static final Settings LARGE_SNAPSHOT_POOL_SETTINGS = Settings.builder()

server/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -947,6 +947,7 @@ public synchronized void close() throws IOException {
947947
toClose.add(() -> stopWatch.stop().start("snapshot_service"));
948948
toClose.add(injector.getInstance(SnapshotsService.class));
949949
toClose.add(injector.getInstance(SnapshotShardsService.class));
950+
toClose.add(injector.getInstance(RepositoriesService.class));
950951
toClose.add(() -> stopWatch.stop().start("client"));
951952
Releasables.close(injector.getInstance(Client.class));
952953
toClose.add(() -> stopWatch.stop().start("indices_cluster"));

test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,10 @@ protected void awaitNoMoreRunningOperations(String viaNode) throws Exception {
440440
state.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY).hasDeletionsInProgress() == false);
441441
}
442442

443+
protected void awaitClusterState(Predicate<ClusterState> statePredicate) throws Exception {
444+
awaitClusterState(internalCluster().getMasterName(), statePredicate);
445+
}
446+
443447
protected void awaitClusterState(String viaNode, Predicate<ClusterState> statePredicate) throws Exception {
444448
final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, viaNode);
445449
final ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, viaNode);

test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -322,9 +322,13 @@ private void maybeIOExceptionOrBlock(String blobName) throws IOException {
322322
}
323323
}
324324

325-
private void blockExecutionAndMaybeWait(final String blobName) {
325+
private void blockExecutionAndMaybeWait(final String blobName) throws IOException {
326326
logger.info("[{}] blocking I/O operation for file [{}] at path [{}]", metadata.name(), blobName, path());
327-
if (blockExecution() && waitAfterUnblock > 0) {
327+
final boolean wasBlocked = blockExecution();
328+
if (wasBlocked && lifecycle.stoppedOrClosed()) {
329+
throw new IOException("already closed");
330+
}
331+
if (wasBlocked && waitAfterUnblock > 0) {
328332
try {
329333
// Delay operation after unblocking
330334
// So, we can start node shutdown while this operation is still running.

0 commit comments

Comments
 (0)