Skip to content

Commit 6681cc0

Browse files
Fix RepoCleanup not Removed on Master-Failover (#49217)
The logic for `cleanupInProgress()` was backwards everywhere (method itself and all but one user). Also, we weren't checking it when removing a repository. This lead to a bug (in the one spot that didn't use the method backwards) that prevented the cleanup cluster state entry from ever being removed from the cluster state if master failed over during the cleanup process. This change corrects the backwards logic, adds a test that makes sure the cleanup is always removed and adds a check that prevents repository removal during cleanup to the repositories service. Also, the failure handling logic in the cleanup action was broken. Repeated invocation would lead to the cleanup being removed from the cluster state even if it was in progress. Fixed by adding a flag that indicates whether or not any removal of the cleanup task from the cluster state must be executed. Sorry for mixing this in here, but I had to fix it in the same PR, as the first test (for master-failover) otherwise would often just delete the blocked cleanup action as a result of a transport master action retry.
1 parent d8f2900 commit 6681cc0

File tree

6 files changed

+144
-10
lines changed

6 files changed

+144
-10
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public TransportCleanupRepositoryAction(TransportService transportService, Clust
9393
clusterService.addStateApplier(event -> {
9494
if (event.localNodeMaster() && event.previousState().nodes().isLocalNodeElectedMaster() == false) {
9595
final RepositoryCleanupInProgress repositoryCleanupInProgress = event.state().custom(RepositoryCleanupInProgress.TYPE);
96-
if (repositoryCleanupInProgress == null || repositoryCleanupInProgress.cleanupInProgress() == false) {
96+
if (repositoryCleanupInProgress == null || repositoryCleanupInProgress.hasCleanupInProgress() == false) {
9797
return;
9898
}
9999
clusterService.submitStateUpdateTask("clean up repository cleanup task after master failover",
@@ -122,7 +122,7 @@ private static ClusterState removeInProgressCleanup(final ClusterState currentSt
122122
RepositoryCleanupInProgress cleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
123123
if (cleanupInProgress != null) {
124124
boolean changed = false;
125-
if (cleanupInProgress.cleanupInProgress() == false) {
125+
if (cleanupInProgress.hasCleanupInProgress()) {
126126
cleanupInProgress = new RepositoryCleanupInProgress();
127127
changed = true;
128128
}
@@ -172,10 +172,13 @@ private void cleanupRepo(String repositoryName, ActionListener<RepositoryCleanup
172172
logger.info("Running cleanup operations on repository [{}][{}]", repositoryName, repositoryStateId);
173173
clusterService.submitStateUpdateTask("cleanup repository [" + repositoryName + "][" + repositoryStateId + ']',
174174
new ClusterStateUpdateTask() {
175+
176+
private boolean startedCleanup = false;
177+
175178
@Override
176179
public ClusterState execute(ClusterState currentState) {
177180
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
178-
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.cleanupInProgress() == false) {
181+
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
179182
throw new IllegalStateException(
180183
"Cannot cleanup [" + repositoryName + "] - a repository cleanup is already in-progress in ["
181184
+ repositoryCleanupInProgress + "]");
@@ -202,6 +205,7 @@ public void onFailure(String source, Exception e) {
202205

203206
@Override
204207
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
208+
startedCleanup = true;
205209
logger.debug("Initialized repository cleanup in cluster state for [{}][{}]", repositoryName, repositoryStateId);
206210
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener,
207211
l -> blobStoreRepository.cleanup(
@@ -218,6 +222,11 @@ private void after(@Nullable Exception failure, @Nullable RepositoryCleanupResul
218222
"Failed to finish repository cleanup operations on [{}][{}]", repositoryName, repositoryStateId), failure);
219223
}
220224
assert failure != null || result != null;
225+
if (startedCleanup == false) {
226+
logger.debug("No cleanup task to remove from cluster state because we failed to start one", failure);
227+
listener.onFailure(failure);
228+
return;
229+
}
221230
clusterService.submitStateUpdateTask(
222231
"remove repository cleanup task [" + repositoryName + "][" + repositoryStateId + ']',
223232
new ClusterStateUpdateTask() {

server/src/main/java/org/elasticsearch/cluster/RepositoryCleanupInProgress.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,13 @@ public static Entry startedEntry(String repository, long repositoryStateId) {
5151
return new Entry(repository, repositoryStateId);
5252
}
5353

54-
public boolean cleanupInProgress() {
54+
public boolean hasCleanupInProgress() {
5555
// TODO: Should we allow parallelism across repositories here maybe?
56-
return entries.isEmpty();
56+
return entries.isEmpty() == false;
57+
}
58+
59+
public List<Entry> entries() {
60+
return List.copyOf(entries);
5761
}
5862

5963
@Override
@@ -106,6 +110,10 @@ public Entry(String repository, long repositoryStateId) {
106110
this.repositoryStateId = repositoryStateId;
107111
}
108112

113+
public String repository() {
114+
return repository;
115+
}
116+
109117
@Override
110118
public void writeTo(StreamOutput out) throws IOException {
111119
out.writeString(repository);

server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -435,8 +435,7 @@ private static void validate(final String repositoryName) {
435435
}
436436
}
437437

438-
439-
private void ensureRepositoryNotInUse(ClusterState clusterState, String repository) {
438+
private static void ensureRepositoryNotInUse(ClusterState clusterState, String repository) {
440439
if (SnapshotsService.isRepositoryInUse(clusterState, repository) || RestoreService.isRepositoryInUse(clusterState, repository)) {
441440
throw new IllegalStateException("trying to modify or unregister repository that is currently used ");
442441
}

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ public ClusterState execute(ClusterState currentState) {
275275
"cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]");
276276
}
277277
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
278-
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.cleanupInProgress() == false) {
278+
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
279279
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
280280
"cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]");
281281
}
@@ -1185,7 +1185,7 @@ public ClusterState execute(ClusterState currentState) {
11851185
"cannot delete - another snapshot is currently being deleted in [" + deletionsInProgress + "]");
11861186
}
11871187
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
1188-
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.cleanupInProgress() == false) {
1188+
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
11891189
throw new ConcurrentSnapshotExecutionException(snapshot.getRepository(), snapshot.getSnapshotId().getName(),
11901190
"cannot delete snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]");
11911191
}
@@ -1344,6 +1344,14 @@ public static boolean isRepositoryInUse(ClusterState clusterState, String reposi
13441344
}
13451345
}
13461346
}
1347+
final RepositoryCleanupInProgress repositoryCleanupInProgress = clusterState.custom(RepositoryCleanupInProgress.TYPE);
1348+
if (repositoryCleanupInProgress != null) {
1349+
for (RepositoryCleanupInProgress.Entry entry : repositoryCleanupInProgress.entries()) {
1350+
if (entry.repository().equals(repository)) {
1351+
return true;
1352+
}
1353+
}
1354+
}
13471355
return false;
13481356
}
13491357

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.repositories.blobstore;
20+
21+
import org.elasticsearch.action.ActionRunnable;
22+
import org.elasticsearch.action.support.PlainActionFuture;
23+
import org.elasticsearch.cluster.RepositoryCleanupInProgress;
24+
import org.elasticsearch.common.settings.Settings;
25+
import org.elasticsearch.common.unit.ByteSizeUnit;
26+
import org.elasticsearch.common.unit.TimeValue;
27+
import org.elasticsearch.repositories.RepositoriesService;
28+
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
29+
import org.elasticsearch.test.ESIntegTestCase;
30+
31+
import java.io.ByteArrayInputStream;
32+
import java.util.concurrent.TimeUnit;
33+
34+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
35+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
36+
37+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
38+
public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase {
39+
40+
public void testMasterFailoverDuringCleanup() throws Exception {
41+
startBlockedCleanup("test-repo");
42+
43+
logger.info("--> stopping master node");
44+
internalCluster().stopCurrentMasterNode();
45+
46+
logger.info("--> wait for cleanup to finish and disappear from cluster state");
47+
assertBusy(() -> {
48+
RepositoryCleanupInProgress cleanupInProgress =
49+
client().admin().cluster().prepareState().get().getState().custom(RepositoryCleanupInProgress.TYPE);
50+
assertFalse(cleanupInProgress.hasCleanupInProgress());
51+
}, 30, TimeUnit.SECONDS);
52+
}
53+
54+
public void testRepeatCleanupsDontRemove() throws Exception {
55+
final String masterNode = startBlockedCleanup("test-repo");
56+
57+
logger.info("--> sending another cleanup");
58+
assertThrows(client().admin().cluster().prepareCleanupRepository("test-repo").execute(), IllegalStateException.class);
59+
60+
logger.info("--> ensure cleanup is still in progress");
61+
final RepositoryCleanupInProgress cleanup =
62+
client().admin().cluster().prepareState().get().getState().custom(RepositoryCleanupInProgress.TYPE);
63+
assertTrue(cleanup.hasCleanupInProgress());
64+
65+
logger.info("--> unblocking master node");
66+
unblockNode("test-repo", masterNode);
67+
68+
logger.info("--> wait for cleanup to finish and disappear from cluster state");
69+
assertBusy(() -> {
70+
RepositoryCleanupInProgress cleanupInProgress =
71+
client().admin().cluster().prepareState().get().getState().custom(RepositoryCleanupInProgress.TYPE);
72+
assertFalse(cleanupInProgress.hasCleanupInProgress());
73+
}, 30, TimeUnit.SECONDS);
74+
}
75+
76+
private String startBlockedCleanup(String repoName) throws Exception {
77+
logger.info("--> starting two master nodes and one data node");
78+
internalCluster().startMasterOnlyNodes(2);
79+
internalCluster().startDataOnlyNodes(1);
80+
81+
logger.info("--> creating repository");
82+
assertAcked(client().admin().cluster().preparePutRepository(repoName)
83+
.setType("mock").setSettings(Settings.builder()
84+
.put("location", randomRepoPath())
85+
.put("compress", randomBoolean())
86+
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
87+
88+
logger.info("--> snapshot");
89+
client().admin().cluster().prepareCreateSnapshot(repoName, "test-snap")
90+
.setWaitForCompletion(true).get();
91+
92+
final RepositoriesService service = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName());
93+
final BlobStoreRepository repository = (BlobStoreRepository) service.repository(repoName);
94+
95+
logger.info("--> creating a garbage data blob");
96+
final PlainActionFuture<Void> garbageFuture = PlainActionFuture.newFuture();
97+
repository.threadPool().generic().execute(ActionRunnable.run(garbageFuture, () -> repository.blobStore()
98+
.blobContainer(repository.basePath()).writeBlob("snap-foo.dat", new ByteArrayInputStream(new byte[1]), 1, true)));
99+
garbageFuture.get();
100+
101+
final String masterNode = blockMasterFromFinalizingSnapshotOnIndexFile(repoName);
102+
103+
logger.info("--> starting repository cleanup");
104+
client().admin().cluster().prepareCleanupRepository(repoName).execute();
105+
106+
logger.info("--> waiting for block to kick in on " + masterNode);
107+
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(60));
108+
return masterNode;
109+
}
110+
}

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ public static boolean okayToDeleteSnapshots(ClusterState state) {
440440

441441
// Cannot delete while a repository is being cleaned
442442
final RepositoryCleanupInProgress repositoryCleanupInProgress = state.custom(RepositoryCleanupInProgress.TYPE);
443-
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.cleanupInProgress() == false) {
443+
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
444444
return false;
445445
}
446446

0 commit comments

Comments
 (0)