Skip to content

Commit 25cc8e3

Browse files
Fix RepoCleanup not Removed on Master-Failover (#49217) (#49239)
The logic for `cleanupInProgress()` was backwards everywhere (method itself and all but one user). Also, we weren't checking it when removing a repository. This lead to a bug (in the one spot that didn't use the method backwards) that prevented the cleanup cluster state entry from ever being removed from the cluster state if master failed over during the cleanup process. This change corrects the backwards logic, adds a test that makes sure the cleanup is always removed and adds a check that prevents repository removal during cleanup to the repositories service. Also, the failure handling logic in the cleanup action was broken. Repeated invocation would lead to the cleanup being removed from the cluster state even if it was in progress. Fixed by adding a flag that indicates whether or not any removal of the cleanup task from the cluster state must be executed. Sorry for mixing this in here, but I had to fix it in the same PR, as the first test (for master-failover) otherwise would often just delete the blocked cleanup action as a result of a transport master action retry.
1 parent e4f6eae commit 25cc8e3

File tree

6 files changed

+145
-10
lines changed

6 files changed

+145
-10
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public TransportCleanupRepositoryAction(TransportService transportService, Clust
9292
clusterService.addStateApplier(event -> {
9393
if (event.localNodeMaster() && event.previousState().nodes().isLocalNodeElectedMaster() == false) {
9494
final RepositoryCleanupInProgress repositoryCleanupInProgress = event.state().custom(RepositoryCleanupInProgress.TYPE);
95-
if (repositoryCleanupInProgress == null || repositoryCleanupInProgress.cleanupInProgress() == false) {
95+
if (repositoryCleanupInProgress == null || repositoryCleanupInProgress.hasCleanupInProgress() == false) {
9696
return;
9797
}
9898
clusterService.submitStateUpdateTask("clean up repository cleanup task after master failover",
@@ -121,7 +121,7 @@ private static ClusterState removeInProgressCleanup(final ClusterState currentSt
121121
RepositoryCleanupInProgress cleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
122122
if (cleanupInProgress != null) {
123123
boolean changed = false;
124-
if (cleanupInProgress.cleanupInProgress() == false) {
124+
if (cleanupInProgress.hasCleanupInProgress()) {
125125
cleanupInProgress = new RepositoryCleanupInProgress();
126126
changed = true;
127127
}
@@ -171,10 +171,13 @@ private void cleanupRepo(String repositoryName, ActionListener<RepositoryCleanup
171171
logger.info("Running cleanup operations on repository [{}][{}]", repositoryName, repositoryStateId);
172172
clusterService.submitStateUpdateTask("cleanup repository [" + repositoryName + "][" + repositoryStateId + ']',
173173
new ClusterStateUpdateTask() {
174+
175+
private boolean startedCleanup = false;
176+
174177
@Override
175178
public ClusterState execute(ClusterState currentState) {
176179
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
177-
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.cleanupInProgress() == false) {
180+
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
178181
throw new IllegalStateException(
179182
"Cannot cleanup [" + repositoryName + "] - a repository cleanup is already in-progress in ["
180183
+ repositoryCleanupInProgress + "]");
@@ -201,6 +204,7 @@ public void onFailure(String source, Exception e) {
201204

202205
@Override
203206
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
207+
startedCleanup = true;
204208
logger.debug("Initialized repository cleanup in cluster state for [{}][{}]", repositoryName, repositoryStateId);
205209
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener,
206210
l -> blobStoreRepository.cleanup(
@@ -217,6 +221,11 @@ private void after(@Nullable Exception failure, @Nullable RepositoryCleanupResul
217221
"Failed to finish repository cleanup operations on [{}][{}]", repositoryName, repositoryStateId), failure);
218222
}
219223
assert failure != null || result != null;
224+
if (startedCleanup == false) {
225+
logger.debug("No cleanup task to remove from cluster state because we failed to start one", failure);
226+
listener.onFailure(failure);
227+
return;
228+
}
220229
clusterService.submitStateUpdateTask(
221230
"remove repository cleanup task [" + repositoryName + "][" + repositoryStateId + ']',
222231
new ClusterStateUpdateTask() {

server/src/main/java/org/elasticsearch/cluster/RepositoryCleanupInProgress.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.common.xcontent.XContentBuilder;
2727

2828
import java.io.IOException;
29+
import java.util.ArrayList;
2930
import java.util.Arrays;
3031
import java.util.List;
3132

@@ -51,9 +52,13 @@ public static Entry startedEntry(String repository, long repositoryStateId) {
5152
return new Entry(repository, repositoryStateId);
5253
}
5354

54-
public boolean cleanupInProgress() {
55+
public boolean hasCleanupInProgress() {
5556
// TODO: Should we allow parallelism across repositories here maybe?
56-
return entries.isEmpty();
57+
return entries.isEmpty() == false;
58+
}
59+
60+
public List<Entry> entries() {
61+
return new ArrayList<>(entries);
5762
}
5863

5964
@Override
@@ -106,6 +111,10 @@ public Entry(String repository, long repositoryStateId) {
106111
this.repositoryStateId = repositoryStateId;
107112
}
108113

114+
public String repository() {
115+
return repository;
116+
}
117+
109118
@Override
110119
public void writeTo(StreamOutput out) throws IOException {
111120
out.writeString(repository);

server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -435,8 +435,7 @@ private static void validate(final String repositoryName) {
435435
}
436436
}
437437

438-
439-
private void ensureRepositoryNotInUse(ClusterState clusterState, String repository) {
438+
private static void ensureRepositoryNotInUse(ClusterState clusterState, String repository) {
440439
if (SnapshotsService.isRepositoryInUse(clusterState, repository) || RestoreService.isRepositoryInUse(clusterState, repository)) {
441440
throw new IllegalStateException("trying to modify or unregister repository that is currently used ");
442441
}

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ public ClusterState execute(ClusterState currentState) {
281281
"cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]");
282282
}
283283
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
284-
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.cleanupInProgress() == false) {
284+
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
285285
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
286286
"cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]");
287287
}
@@ -1198,7 +1198,7 @@ public ClusterState execute(ClusterState currentState) {
11981198
"cannot delete - another snapshot is currently being deleted in [" + deletionsInProgress + "]");
11991199
}
12001200
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
1201-
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.cleanupInProgress() == false) {
1201+
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
12021202
throw new ConcurrentSnapshotExecutionException(snapshot.getRepository(), snapshot.getSnapshotId().getName(),
12031203
"cannot delete snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]");
12041204
}
@@ -1357,6 +1357,14 @@ public static boolean isRepositoryInUse(ClusterState clusterState, String reposi
13571357
}
13581358
}
13591359
}
1360+
final RepositoryCleanupInProgress repositoryCleanupInProgress = clusterState.custom(RepositoryCleanupInProgress.TYPE);
1361+
if (repositoryCleanupInProgress != null) {
1362+
for (RepositoryCleanupInProgress.Entry entry : repositoryCleanupInProgress.entries()) {
1363+
if (entry.repository().equals(repository)) {
1364+
return true;
1365+
}
1366+
}
1367+
}
13601368
return false;
13611369
}
13621370

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.repositories.blobstore;
20+
21+
import org.elasticsearch.action.ActionRunnable;
22+
import org.elasticsearch.action.support.PlainActionFuture;
23+
import org.elasticsearch.cluster.RepositoryCleanupInProgress;
24+
import org.elasticsearch.common.settings.Settings;
25+
import org.elasticsearch.common.unit.ByteSizeUnit;
26+
import org.elasticsearch.common.unit.TimeValue;
27+
import org.elasticsearch.repositories.RepositoriesService;
28+
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
29+
import org.elasticsearch.test.ESIntegTestCase;
30+
31+
import java.io.ByteArrayInputStream;
32+
import java.util.concurrent.TimeUnit;
33+
34+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
35+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
36+
37+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
38+
public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase {
39+
40+
public void testMasterFailoverDuringCleanup() throws Exception {
41+
startBlockedCleanup("test-repo");
42+
43+
logger.info("--> stopping master node");
44+
internalCluster().stopCurrentMasterNode();
45+
46+
logger.info("--> wait for cleanup to finish and disappear from cluster state");
47+
assertBusy(() -> {
48+
RepositoryCleanupInProgress cleanupInProgress =
49+
client().admin().cluster().prepareState().get().getState().custom(RepositoryCleanupInProgress.TYPE);
50+
assertFalse(cleanupInProgress.hasCleanupInProgress());
51+
}, 30, TimeUnit.SECONDS);
52+
}
53+
54+
public void testRepeatCleanupsDontRemove() throws Exception {
55+
final String masterNode = startBlockedCleanup("test-repo");
56+
57+
logger.info("--> sending another cleanup");
58+
assertThrows(client().admin().cluster().prepareCleanupRepository("test-repo").execute(), IllegalStateException.class);
59+
60+
logger.info("--> ensure cleanup is still in progress");
61+
final RepositoryCleanupInProgress cleanup =
62+
client().admin().cluster().prepareState().get().getState().custom(RepositoryCleanupInProgress.TYPE);
63+
assertTrue(cleanup.hasCleanupInProgress());
64+
65+
logger.info("--> unblocking master node");
66+
unblockNode("test-repo", masterNode);
67+
68+
logger.info("--> wait for cleanup to finish and disappear from cluster state");
69+
assertBusy(() -> {
70+
RepositoryCleanupInProgress cleanupInProgress =
71+
client().admin().cluster().prepareState().get().getState().custom(RepositoryCleanupInProgress.TYPE);
72+
assertFalse(cleanupInProgress.hasCleanupInProgress());
73+
}, 30, TimeUnit.SECONDS);
74+
}
75+
76+
private String startBlockedCleanup(String repoName) throws Exception {
77+
logger.info("--> starting two master nodes and one data node");
78+
internalCluster().startMasterOnlyNodes(2);
79+
internalCluster().startDataOnlyNodes(1);
80+
81+
logger.info("--> creating repository");
82+
assertAcked(client().admin().cluster().preparePutRepository(repoName)
83+
.setType("mock").setSettings(Settings.builder()
84+
.put("location", randomRepoPath())
85+
.put("compress", randomBoolean())
86+
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
87+
88+
logger.info("--> snapshot");
89+
client().admin().cluster().prepareCreateSnapshot(repoName, "test-snap")
90+
.setWaitForCompletion(true).get();
91+
92+
final RepositoriesService service = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName());
93+
final BlobStoreRepository repository = (BlobStoreRepository) service.repository(repoName);
94+
95+
logger.info("--> creating a garbage data blob");
96+
final PlainActionFuture<Void> garbageFuture = PlainActionFuture.newFuture();
97+
repository.threadPool().generic().execute(ActionRunnable.run(garbageFuture, () -> repository.blobStore()
98+
.blobContainer(repository.basePath()).writeBlob("snap-foo.dat", new ByteArrayInputStream(new byte[1]), 1, true)));
99+
garbageFuture.get();
100+
101+
final String masterNode = blockMasterFromFinalizingSnapshotOnIndexFile(repoName);
102+
103+
logger.info("--> starting repository cleanup");
104+
client().admin().cluster().prepareCleanupRepository(repoName).execute();
105+
106+
logger.info("--> waiting for block to kick in on " + masterNode);
107+
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(60));
108+
return masterNode;
109+
}
110+
}

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ public static boolean okayToDeleteSnapshots(ClusterState state) {
458458

459459
// Cannot delete while a repository is being cleaned
460460
final RepositoryCleanupInProgress repositoryCleanupInProgress = state.custom(RepositoryCleanupInProgress.TYPE);
461-
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.cleanupInProgress() == false) {
461+
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
462462
return false;
463463
}
464464

0 commit comments

Comments
 (0)