Skip to content

Fix BlobStoreRepositoryCleanupIT Leaking Futures #69446

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 24, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
*/
package org.elasticsearch.repositories.blobstore;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.RepositoryCleanupInProgress;
Expand All @@ -17,16 +20,18 @@
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESIntegTestCase;

import java.io.IOException;
import java.util.concurrent.ExecutionException;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFutureThrows;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase {

public void testMasterFailoverDuringCleanup() throws Exception {
startBlockedCleanup("test-repo");
final ActionFuture<CleanupRepositoryResponse> cleanupFuture = startBlockedCleanup("test-repo");

final int nodeCount = internalCluster().numDataAndMasterNodes();
logger.info("--> stopping master node");
Expand All @@ -37,10 +42,12 @@ public void testMasterFailoverDuringCleanup() throws Exception {
logger.info("--> wait for cleanup to finish and disappear from cluster state");
awaitClusterState(state ->
state.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY).hasCleanupInProgress() == false);

cleanupFuture.get();
}

public void testRepeatCleanupsDontRemove() throws Exception {
final String masterNode = startBlockedCleanup("test-repo");
final ActionFuture<CleanupRepositoryResponse> cleanupFuture = startBlockedCleanup("test-repo");

logger.info("--> sending another cleanup");
assertFutureThrows(client().admin().cluster().prepareCleanupRepository("test-repo").execute(), IllegalStateException.class);
Expand All @@ -51,14 +58,19 @@ public void testRepeatCleanupsDontRemove() throws Exception {
assertTrue(cleanup.hasCleanupInProgress());

logger.info("--> unblocking master node");
unblockNode("test-repo", masterNode);
unblockNode("test-repo", internalCluster().getMasterName());

logger.info("--> wait for cleanup to finish and disappear from cluster state");
awaitClusterState(state ->
state.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY).hasCleanupInProgress() == false);

final ExecutionException e = expectThrows(ExecutionException.class, cleanupFuture::get);
final Throwable ioe = ExceptionsHelper.unwrap(e, IOException.class);
assertThat(ioe, instanceOf(IOException.class));
assertThat(ioe.getMessage(), is("exception after block"));
}

private String startBlockedCleanup(String repoName) throws Exception {
private ActionFuture<CleanupRepositoryResponse> startBlockedCleanup(String repoName) throws Exception {
logger.info("--> starting two master nodes and one data node");
internalCluster().startMasterOnlyNodes(2);
internalCluster().startDataOnlyNodes(1);
Expand All @@ -80,13 +92,16 @@ private String startBlockedCleanup(String repoName) throws Exception {
blockMasterFromFinalizingSnapshotOnIndexFile(repoName);

logger.info("--> starting repository cleanup");
client().admin().cluster().prepareCleanupRepository(repoName).execute();
// running from a non-master client because shutting down a master while a request to it is pending might result in the future
// never completing
final ActionFuture<CleanupRepositoryResponse> future =
internalCluster().nonMasterClient().admin().cluster().prepareCleanupRepository(repoName).execute();

final String masterNode = internalCluster().getMasterName();
waitForBlock(masterNode, repoName);
awaitClusterState(state ->
state.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY).hasCleanupInProgress());
return masterNode;
return future;
}

public void testCleanupOldIndexN() throws ExecutionException, InterruptedException {
Expand Down