diff --git a/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java b/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java index c5ce2c8716f6b..3a22d933c057a 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java @@ -7,7 +7,10 @@ */ package org.elasticsearch.repositories.blobstore; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryResponse; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.RepositoryCleanupInProgress; @@ -17,16 +20,18 @@ import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ESIntegTestCase; +import java.io.IOException; import java.util.concurrent.ExecutionException; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFutureThrows; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase { public void testMasterFailoverDuringCleanup() throws Exception { - startBlockedCleanup("test-repo"); + final ActionFuture cleanupFuture = startBlockedCleanup("test-repo"); final int nodeCount = internalCluster().numDataAndMasterNodes(); logger.info("--> stopping master node"); @@ -37,10 +42,12 @@ public void testMasterFailoverDuringCleanup() throws Exception { logger.info("--> wait for cleanup to finish and disappear from cluster state"); awaitClusterState(state -> state.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY).hasCleanupInProgress() == false); + + cleanupFuture.get(); } public void testRepeatCleanupsDontRemove() throws Exception { - final String masterNode = startBlockedCleanup("test-repo"); + final ActionFuture cleanupFuture = startBlockedCleanup("test-repo"); logger.info("--> sending another cleanup"); assertFutureThrows(client().admin().cluster().prepareCleanupRepository("test-repo").execute(), IllegalStateException.class); @@ -51,14 +58,19 @@ public void testRepeatCleanupsDontRemove() throws Exception { assertTrue(cleanup.hasCleanupInProgress()); logger.info("--> unblocking master node"); - unblockNode("test-repo", masterNode); + unblockNode("test-repo", internalCluster().getMasterName()); logger.info("--> wait for cleanup to finish and disappear from cluster state"); awaitClusterState(state -> state.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY).hasCleanupInProgress() == false); + + final ExecutionException e = expectThrows(ExecutionException.class, cleanupFuture::get); + final Throwable ioe = ExceptionsHelper.unwrap(e, IOException.class); + assertThat(ioe, instanceOf(IOException.class)); + assertThat(ioe.getMessage(), is("exception after block")); } - private String startBlockedCleanup(String repoName) throws Exception { + private ActionFuture startBlockedCleanup(String repoName) throws Exception { logger.info("--> starting two master nodes and one data node"); internalCluster().startMasterOnlyNodes(2); internalCluster().startDataOnlyNodes(1); @@ -80,13 +92,16 @@ private String startBlockedCleanup(String repoName) throws Exception { blockMasterFromFinalizingSnapshotOnIndexFile(repoName); logger.info("--> starting repository cleanup"); - client().admin().cluster().prepareCleanupRepository(repoName).execute(); + // running from a non-master client because shutting down a master while a request to it is pending might result in the future + // never completing + final ActionFuture future = + internalCluster().nonMasterClient().admin().cluster().prepareCleanupRepository(repoName).execute(); final String masterNode = internalCluster().getMasterName(); waitForBlock(masterNode, repoName); awaitClusterState(state -> state.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY).hasCleanupInProgress()); - return masterNode; + return future; } public void testCleanupOldIndexN() throws ExecutionException, InterruptedException {