Skip to content

Commit bc9fa62

Browse files
Fix RepoCleanup not Removed on Master-Failover (#49217) (#49240)
The logic for `cleanupInProgress()` was backwards everywhere (method itself and all but one user). Also, we weren't checking it when removing a repository. This lead to a bug (in the one spot that didn't use the method backwards) that prevented the cleanup cluster state entry from ever being removed from the cluster state if master failed over during the cleanup process. This change corrects the backwards logic, adds a test that makes sure the cleanup is always removed and adds a check that prevents repository removal during cleanup to the repositories service. Also, the failure handling logic in the cleanup action was broken. Repeated invocation would lead to the cleanup being removed from the cluster state even if it was in progress. Fixed by adding a flag that indicates whether or not any removal of the cleanup task from the cluster state must be executed. Sorry for mixing this in here, but I had to fix it in the same PR, as the first test (for master-failover) otherwise would often just delete the blocked cleanup action as a result of a transport master action retry.
1 parent 4b31e70 commit bc9fa62

File tree

6 files changed

+145
-10
lines changed

6 files changed

+145
-10
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public TransportCleanupRepositoryAction(TransportService transportService, Clust
9191
clusterService.addStateApplier(event -> {
9292
if (event.localNodeMaster() && event.previousState().nodes().isLocalNodeElectedMaster() == false) {
9393
final RepositoryCleanupInProgress repositoryCleanupInProgress = event.state().custom(RepositoryCleanupInProgress.TYPE);
94-
if (repositoryCleanupInProgress == null || repositoryCleanupInProgress.cleanupInProgress() == false) {
94+
if (repositoryCleanupInProgress == null || repositoryCleanupInProgress.hasCleanupInProgress() == false) {
9595
return;
9696
}
9797
clusterService.submitStateUpdateTask("clean up repository cleanup task after master failover",
@@ -120,7 +120,7 @@ private static ClusterState removeInProgressCleanup(final ClusterState currentSt
120120
RepositoryCleanupInProgress cleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
121121
if (cleanupInProgress != null) {
122122
boolean changed = false;
123-
if (cleanupInProgress.cleanupInProgress() == false) {
123+
if (cleanupInProgress.hasCleanupInProgress()) {
124124
cleanupInProgress = new RepositoryCleanupInProgress();
125125
changed = true;
126126
}
@@ -170,10 +170,13 @@ private void cleanupRepo(String repositoryName, ActionListener<RepositoryCleanup
170170
logger.info("Running cleanup operations on repository [{}][{}]", repositoryName, repositoryStateId);
171171
clusterService.submitStateUpdateTask("cleanup repository [" + repositoryName + "][" + repositoryStateId + ']',
172172
new ClusterStateUpdateTask() {
173+
174+
private boolean startedCleanup = false;
175+
173176
@Override
174177
public ClusterState execute(ClusterState currentState) {
175178
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
176-
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.cleanupInProgress() == false) {
179+
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
177180
throw new IllegalStateException(
178181
"Cannot cleanup [" + repositoryName + "] - a repository cleanup is already in-progress");
179182
}
@@ -197,6 +200,7 @@ public void onFailure(String source, Exception e) {
197200

198201
@Override
199202
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
203+
startedCleanup = true;
200204
logger.debug("Initialized repository cleanup in cluster state for [{}][{}]", repositoryName, repositoryStateId);
201205
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener,
202206
l -> blobStoreRepository.cleanup(
@@ -211,6 +215,11 @@ private void after(@Nullable Exception failure, @Nullable RepositoryCleanupResul
211215
"Failed to finish repository cleanup operations on [{}][{}]", repositoryName, repositoryStateId), failure);
212216
}
213217
assert failure != null || result != null;
218+
if (startedCleanup == false) {
219+
logger.debug("No cleanup task to remove from cluster state because we failed to start one", failure);
220+
listener.onFailure(failure);
221+
return;
222+
}
214223
clusterService.submitStateUpdateTask(
215224
"remove repository cleanup task [" + repositoryName + "][" + repositoryStateId + ']',
216225
new ClusterStateUpdateTask() {

server/src/main/java/org/elasticsearch/cluster/RepositoryCleanupInProgress.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.common.xcontent.XContentBuilder;
2727

2828
import java.io.IOException;
29+
import java.util.ArrayList;
2930
import java.util.Arrays;
3031
import java.util.List;
3132

@@ -51,9 +52,13 @@ public static Entry startedEntry(String repository, long repositoryStateId) {
5152
return new Entry(repository, repositoryStateId);
5253
}
5354

54-
public boolean cleanupInProgress() {
55+
public boolean hasCleanupInProgress() {
5556
// TODO: Should we allow parallelism across repositories here maybe?
56-
return entries.isEmpty();
57+
return entries.isEmpty() == false;
58+
}
59+
60+
public List<Entry> entries() {
61+
return new ArrayList<>(entries);
5762
}
5863

5964
@Override
@@ -106,6 +111,10 @@ public Entry(String repository, long repositoryStateId) {
106111
this.repositoryStateId = repositoryStateId;
107112
}
108113

114+
public String repository() {
115+
return repository;
116+
}
117+
109118
@Override
110119
public void writeTo(StreamOutput out) throws IOException {
111120
out.writeString(repository);

server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -429,8 +429,7 @@ private static void validate(final String repositoryName) {
429429
}
430430
}
431431

432-
433-
private void ensureRepositoryNotInUse(ClusterState clusterState, String repository) {
432+
private static void ensureRepositoryNotInUse(ClusterState clusterState, String repository) {
434433
if (SnapshotsService.isRepositoryInUse(clusterState, repository) || RestoreService.isRepositoryInUse(clusterState, repository)) {
435434
throw new IllegalStateException("trying to modify or unregister repository that is currently used ");
436435
}

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ public ClusterState execute(ClusterState currentState) {
278278
"cannot snapshot while a snapshot deletion is in-progress");
279279
}
280280
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
281-
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.cleanupInProgress() == false) {
281+
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
282282
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
283283
"cannot snapshot while a repository cleanup is in-progress");
284284
}
@@ -1178,7 +1178,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
11781178
"cannot delete - another snapshot is currently being deleted");
11791179
}
11801180
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
1181-
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.cleanupInProgress() == false) {
1181+
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
11821182
throw new ConcurrentSnapshotExecutionException(snapshot.getRepository(), snapshot.getSnapshotId().getName(),
11831183
"cannot delete snapshot while a repository cleanup is in-progress");
11841184
}
@@ -1335,6 +1335,14 @@ public static boolean isRepositoryInUse(ClusterState clusterState, String reposi
13351335
}
13361336
}
13371337
}
1338+
final RepositoryCleanupInProgress repositoryCleanupInProgress = clusterState.custom(RepositoryCleanupInProgress.TYPE);
1339+
if (repositoryCleanupInProgress != null) {
1340+
for (RepositoryCleanupInProgress.Entry entry : repositoryCleanupInProgress.entries()) {
1341+
if (entry.repository().equals(repository)) {
1342+
return true;
1343+
}
1344+
}
1345+
}
13381346
return false;
13391347
}
13401348

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.repositories.blobstore;
20+
21+
import org.elasticsearch.action.ActionRunnable;
22+
import org.elasticsearch.action.support.PlainActionFuture;
23+
import org.elasticsearch.cluster.RepositoryCleanupInProgress;
24+
import org.elasticsearch.common.settings.Settings;
25+
import org.elasticsearch.common.unit.ByteSizeUnit;
26+
import org.elasticsearch.common.unit.TimeValue;
27+
import org.elasticsearch.repositories.RepositoriesService;
28+
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
29+
import org.elasticsearch.test.ESIntegTestCase;
30+
31+
import java.io.ByteArrayInputStream;
32+
import java.util.concurrent.TimeUnit;
33+
34+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
35+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
36+
37+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
38+
public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase {
39+
40+
public void testMasterFailoverDuringCleanup() throws Exception {
41+
startBlockedCleanup("test-repo");
42+
43+
logger.info("--> stopping master node");
44+
internalCluster().stopCurrentMasterNode();
45+
46+
logger.info("--> wait for cleanup to finish and disappear from cluster state");
47+
assertBusy(() -> {
48+
RepositoryCleanupInProgress cleanupInProgress =
49+
client().admin().cluster().prepareState().get().getState().custom(RepositoryCleanupInProgress.TYPE);
50+
assertFalse(cleanupInProgress.hasCleanupInProgress());
51+
}, 30, TimeUnit.SECONDS);
52+
}
53+
54+
public void testRepeatCleanupsDontRemove() throws Exception {
55+
final String masterNode = startBlockedCleanup("test-repo");
56+
57+
logger.info("--> sending another cleanup");
58+
assertThrows(client().admin().cluster().prepareCleanupRepository("test-repo").execute(), IllegalStateException.class);
59+
60+
logger.info("--> ensure cleanup is still in progress");
61+
final RepositoryCleanupInProgress cleanup =
62+
client().admin().cluster().prepareState().get().getState().custom(RepositoryCleanupInProgress.TYPE);
63+
assertTrue(cleanup.hasCleanupInProgress());
64+
65+
logger.info("--> unblocking master node");
66+
unblockNode("test-repo", masterNode);
67+
68+
logger.info("--> wait for cleanup to finish and disappear from cluster state");
69+
assertBusy(() -> {
70+
RepositoryCleanupInProgress cleanupInProgress =
71+
client().admin().cluster().prepareState().get().getState().custom(RepositoryCleanupInProgress.TYPE);
72+
assertFalse(cleanupInProgress.hasCleanupInProgress());
73+
}, 30, TimeUnit.SECONDS);
74+
}
75+
76+
private String startBlockedCleanup(String repoName) throws Exception {
77+
logger.info("--> starting two master nodes and one data node");
78+
internalCluster().startMasterOnlyNodes(2);
79+
internalCluster().startDataOnlyNodes(1);
80+
81+
logger.info("--> creating repository");
82+
assertAcked(client().admin().cluster().preparePutRepository(repoName)
83+
.setType("mock").setSettings(Settings.builder()
84+
.put("location", randomRepoPath())
85+
.put("compress", randomBoolean())
86+
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
87+
88+
logger.info("--> snapshot");
89+
client().admin().cluster().prepareCreateSnapshot(repoName, "test-snap")
90+
.setWaitForCompletion(true).get();
91+
92+
final RepositoriesService service = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName());
93+
final BlobStoreRepository repository = (BlobStoreRepository) service.repository(repoName);
94+
95+
logger.info("--> creating a garbage data blob");
96+
final PlainActionFuture<Void> garbageFuture = PlainActionFuture.newFuture();
97+
repository.threadPool().generic().execute(ActionRunnable.run(garbageFuture, () -> repository.blobStore()
98+
.blobContainer(repository.basePath()).writeBlob("snap-foo.dat", new ByteArrayInputStream(new byte[1]), 1, true)));
99+
garbageFuture.get();
100+
101+
final String masterNode = blockMasterFromFinalizingSnapshotOnIndexFile(repoName);
102+
103+
logger.info("--> starting repository cleanup");
104+
client().admin().cluster().prepareCleanupRepository(repoName).execute();
105+
106+
logger.info("--> waiting for block to kick in on " + masterNode);
107+
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(60));
108+
return masterNode;
109+
}
110+
}

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ public static boolean okayToDeleteSnapshots(ClusterState state) {
458458

459459
// Cannot delete while a repository is being cleaned
460460
final RepositoryCleanupInProgress repositoryCleanupInProgress = state.custom(RepositoryCleanupInProgress.TYPE);
461-
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.cleanupInProgress() == false) {
461+
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
462462
return false;
463463
}
464464

0 commit comments

Comments
 (0)