diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java index 42b5160417114..9b88790ba28e3 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java @@ -91,7 +91,7 @@ public TransportCleanupRepositoryAction(TransportService transportService, Clust clusterService.addStateApplier(event -> { if (event.localNodeMaster() && event.previousState().nodes().isLocalNodeElectedMaster() == false) { final RepositoryCleanupInProgress repositoryCleanupInProgress = event.state().custom(RepositoryCleanupInProgress.TYPE); - if (repositoryCleanupInProgress == null || repositoryCleanupInProgress.cleanupInProgress() == false) { + if (repositoryCleanupInProgress == null || repositoryCleanupInProgress.hasCleanupInProgress() == false) { return; } clusterService.submitStateUpdateTask("clean up repository cleanup task after master failover", @@ -120,7 +120,7 @@ private static ClusterState removeInProgressCleanup(final ClusterState currentSt RepositoryCleanupInProgress cleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE); if (cleanupInProgress != null) { boolean changed = false; - if (cleanupInProgress.cleanupInProgress() == false) { + if (cleanupInProgress.hasCleanupInProgress()) { cleanupInProgress = new RepositoryCleanupInProgress(); changed = true; } @@ -170,10 +170,13 @@ private void cleanupRepo(String repositoryName, ActionListener blobStoreRepository.cleanup( @@ -211,6 +215,11 @@ private void after(@Nullable Exception failure, @Nullable RepositoryCleanupResul "Failed to finish repository cleanup operations on [{}][{}]", repositoryName, repositoryStateId), failure); } assert failure != null || result != null; + if (startedCleanup == false) { + logger.debug("No cleanup task to remove from cluster state because we failed to start one", failure); + listener.onFailure(failure); + return; + } clusterService.submitStateUpdateTask( "remove repository cleanup task [" + repositoryName + "][" + repositoryStateId + ']', new ClusterStateUpdateTask() { diff --git a/server/src/main/java/org/elasticsearch/cluster/RepositoryCleanupInProgress.java b/server/src/main/java/org/elasticsearch/cluster/RepositoryCleanupInProgress.java index e7c8e995dd61c..2acdfff2fa4ae 100644 --- a/server/src/main/java/org/elasticsearch/cluster/RepositoryCleanupInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/RepositoryCleanupInProgress.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -51,9 +52,13 @@ public static Entry startedEntry(String repository, long repositoryStateId) { return new Entry(repository, repositoryStateId); } - public boolean cleanupInProgress() { + public boolean hasCleanupInProgress() { // TODO: Should we allow parallelism across repositories here maybe? - return entries.isEmpty(); + return entries.isEmpty() == false; + } + + public List entries() { + return new ArrayList<>(entries); } @Override @@ -106,6 +111,10 @@ public Entry(String repository, long repositoryStateId) { this.repositoryStateId = repositoryStateId; } + public String repository() { + return repository; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(repository); diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index 15af20755b9f8..ba1c317ac4402 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -429,8 +429,7 @@ private static void validate(final String repositoryName) { } } - - private void ensureRepositoryNotInUse(ClusterState clusterState, String repository) { + private static void ensureRepositoryNotInUse(ClusterState clusterState, String repository) { if (SnapshotsService.isRepositoryInUse(clusterState, repository) || RestoreService.isRepositoryInUse(clusterState, repository)) { throw new IllegalStateException("trying to modify or unregister repository that is currently used "); } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 3318c6ab46c85..24a13dc270b7e 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -278,7 +278,7 @@ public ClusterState execute(ClusterState currentState) { "cannot snapshot while a snapshot deletion is in-progress"); } final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE); - if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.cleanupInProgress() == false) { + if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) { throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, "cannot snapshot while a repository cleanup is in-progress"); } @@ -1178,7 +1178,7 @@ public ClusterState execute(ClusterState currentState) throws Exception { "cannot delete - another snapshot is currently being deleted"); } final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE); - if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.cleanupInProgress() == false) { + if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) { throw new ConcurrentSnapshotExecutionException(snapshot.getRepository(), snapshot.getSnapshotId().getName(), "cannot delete snapshot while a repository cleanup is in-progress"); } @@ -1335,6 +1335,14 @@ public static boolean isRepositoryInUse(ClusterState clusterState, String reposi } } } + final RepositoryCleanupInProgress repositoryCleanupInProgress = clusterState.custom(RepositoryCleanupInProgress.TYPE); + if (repositoryCleanupInProgress != null) { + for (RepositoryCleanupInProgress.Entry entry : repositoryCleanupInProgress.entries()) { + if (entry.repository().equals(repository)) { + return true; + } + } + } return false; } diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java new file mode 100644 index 0000000000000..7ee49e53e2658 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java @@ -0,0 +1,110 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.repositories.blobstore; + +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.RepositoryCleanupInProgress; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase; +import org.elasticsearch.test.ESIntegTestCase; + +import java.io.ByteArrayInputStream; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) +public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase { + + public void testMasterFailoverDuringCleanup() throws Exception { + startBlockedCleanup("test-repo"); + + logger.info("--> stopping master node"); + internalCluster().stopCurrentMasterNode(); + + logger.info("--> wait for cleanup to finish and disappear from cluster state"); + assertBusy(() -> { + RepositoryCleanupInProgress cleanupInProgress = + client().admin().cluster().prepareState().get().getState().custom(RepositoryCleanupInProgress.TYPE); + assertFalse(cleanupInProgress.hasCleanupInProgress()); + }, 30, TimeUnit.SECONDS); + } + + public void testRepeatCleanupsDontRemove() throws Exception { + final String masterNode = startBlockedCleanup("test-repo"); + + logger.info("--> sending another cleanup"); + assertThrows(client().admin().cluster().prepareCleanupRepository("test-repo").execute(), IllegalStateException.class); + + logger.info("--> ensure cleanup is still in progress"); + final RepositoryCleanupInProgress cleanup = + client().admin().cluster().prepareState().get().getState().custom(RepositoryCleanupInProgress.TYPE); + assertTrue(cleanup.hasCleanupInProgress()); + + logger.info("--> unblocking master node"); + unblockNode("test-repo", masterNode); + + logger.info("--> wait for cleanup to finish and disappear from cluster state"); + assertBusy(() -> { + RepositoryCleanupInProgress cleanupInProgress = + client().admin().cluster().prepareState().get().getState().custom(RepositoryCleanupInProgress.TYPE); + assertFalse(cleanupInProgress.hasCleanupInProgress()); + }, 30, TimeUnit.SECONDS); + } + + private String startBlockedCleanup(String repoName) throws Exception { + logger.info("--> starting two master nodes and one data node"); + internalCluster().startMasterOnlyNodes(2); + internalCluster().startDataOnlyNodes(1); + + logger.info("--> creating repository"); + assertAcked(client().admin().cluster().preparePutRepository(repoName) + .setType("mock").setSettings(Settings.builder() + .put("location", randomRepoPath()) + .put("compress", randomBoolean()) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); + + logger.info("--> snapshot"); + client().admin().cluster().prepareCreateSnapshot(repoName, "test-snap") + .setWaitForCompletion(true).get(); + + final RepositoriesService service = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName()); + final BlobStoreRepository repository = (BlobStoreRepository) service.repository(repoName); + + logger.info("--> creating a garbage data blob"); + final PlainActionFuture garbageFuture = PlainActionFuture.newFuture(); + repository.threadPool().generic().execute(ActionRunnable.run(garbageFuture, () -> repository.blobStore() + .blobContainer(repository.basePath()).writeBlob("snap-foo.dat", new ByteArrayInputStream(new byte[1]), 1, true))); + garbageFuture.get(); + + final String masterNode = blockMasterFromFinalizingSnapshotOnIndexFile(repoName); + + logger.info("--> starting repository cleanup"); + client().admin().cluster().prepareCleanupRepository(repoName).execute(); + + logger.info("--> waiting for block to kick in on " + masterNode); + waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(60)); + return masterNode; + } +} diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java index f600f13dc1655..d573a7a17e322 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java @@ -458,7 +458,7 @@ public static boolean okayToDeleteSnapshots(ClusterState state) { // Cannot delete while a repository is being cleaned final RepositoryCleanupInProgress repositoryCleanupInProgress = state.custom(RepositoryCleanupInProgress.TYPE); - if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.cleanupInProgress() == false) { + if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) { return false; }