Skip to content

Time-bound deletion of snapshots in retention delete function #45065

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class LifecycleSettings {

public static final String SLM_HISTORY_INDEX_ENABLED = "slm.history_index_enabled";
public static final String SLM_RETENTION_SCHEDULE = "slm.retention_schedule";
public static final String SLM_RETENTION_DURATION = "slm.retention_duration";


public static final Setting<TimeValue> LIFECYCLE_POLL_INTERVAL_SETTING = Setting.timeSetting(LIFECYCLE_POLL_INTERVAL,
Expand All @@ -42,4 +43,6 @@ public class LifecycleSettings {
SLM_RETENTION_SCHEDULE + "]", e);
}
}, Setting.Property.Dynamic, Setting.Property.NodeScope);
public static final Setting<TimeValue> SLM_RETENTION_DURATION_SETTING = Setting.timeSetting(SLM_RETENTION_DURATION,
TimeValue.timeValueHours(1), TimeValue.timeValueMillis(500), Setting.Property.Dynamic, Setting.Property.NodeScope);
}
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,8 @@ public void testDeleteDuringSnapshot() throws Exception {
// index document so snapshot actually does something
indexDocument();
// start snapshot
request = new Request("PUT", "/_snapshot/repo/snapshot");
String snapName = "snapshot-" + randomAlphaOfLength(6).toLowerCase(Locale.ROOT);
request = new Request("PUT", "/_snapshot/repo/" + snapName);
request.addParameter("wait_for_completion", "false");
request.setJsonEntity("{\"indices\": \"" + index + "\"}");
assertOK(client().performRequest(request));
Expand All @@ -392,8 +393,8 @@ public void testDeleteDuringSnapshot() throws Exception {
// assert that index was deleted
assertBusy(() -> assertFalse(indexExists(index)), 2, TimeUnit.MINUTES);
// assert that snapshot is still in progress and clean up
assertThat(getSnapshotState("snapshot"), equalTo("SUCCESS"));
assertOK(client().performRequest(new Request("DELETE", "/_snapshot/repo/snapshot")));
assertThat(getSnapshotState(snapName), equalTo("SUCCESS"));
assertOK(client().performRequest(new Request("DELETE", "/_snapshot/repo/" + snapName)));
}

public void testReadOnly() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -293,12 +294,13 @@ public void testBasicTimeBasedRetenion() throws Exception {
assertBusy(() -> {
// We expect a failed response because the snapshot should not exist
try {
logger.info("--> checking to see if snapshot has been deleted...");
Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName));
assertThat(EntityUtils.toString(response.getEntity()), containsString("snapshot_missing_exception"));
} catch (ResponseException e) {
assertThat(EntityUtils.toString(e.getResponse().getEntity()), containsString("snapshot_missing_exception"));
}
});
}, 60, TimeUnit.SECONDS);

Request delReq = new Request("DELETE", "/_slm/policy/" + policyName);
assertOK(client().performRequest(delReq));
Expand Down Expand Up @@ -424,7 +426,6 @@ private void inializeRepo(String repoName) throws IOException {
.startObject("settings")
.field("compress", randomBoolean())
.field("location", System.getProperty("tests.path.repo"))
.field("max_snapshot_bytes_per_sec", "256b")
.endObject()
.endObject()));
assertOK(client().performRequest(request));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
snapshotHistoryStore.set(new SnapshotHistoryStore(settings, client, getClock().getZone()));
snapshotLifecycleService.set(new SnapshotLifecycleService(settings,
() -> new SnapshotLifecycleTask(client, clusterService, snapshotHistoryStore.get()), clusterService, getClock()));
snapshotRetentionService.set(new SnapshotRetentionService(settings, () -> new SnapshotRetentionTask(client, clusterService),
snapshotRetentionService.set(new SnapshotRetentionService(settings,
() -> new SnapshotRetentionTask(client, clusterService, System::nanoTime),
clusterService, getClock()));
return Arrays.asList(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get(), snapshotHistoryStore.get(),
snapshotRetentionService.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata;
import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy;
Expand All @@ -35,6 +39,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

/**
Expand All @@ -50,10 +55,12 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {

private final Client client;
private final ClusterService clusterService;
private final LongSupplier nowNanoSupplier;

public SnapshotRetentionTask(Client client, ClusterService clusterService) {
public SnapshotRetentionTask(Client client, ClusterService clusterService, LongSupplier nowNanoSupplier) {
this.client = new OriginSettingClient(client, ClientHelper.INDEX_LIFECYCLE_ORIGIN);
this.clusterService = clusterService;
this.nowNanoSupplier = nowNanoSupplier;
}

@Override
Expand All @@ -64,6 +71,7 @@ public void triggered(SchedulerEngine.Event event) {
try {
logger.info("starting SLM retention snapshot cleanup task");
final ClusterState state = clusterService.state();
final TimeValue maxDeletionTime = LifecycleSettings.SLM_RETENTION_DURATION_SETTING.get(state.metaData().settings());

// Find all SLM policies that have retention enabled
final Map<String, SnapshotLifecyclePolicy> policiesWithRetention = getAllPoliciesWithRetentionEnabled(state);
Expand All @@ -74,7 +82,7 @@ public void triggered(SchedulerEngine.Event event) {
.map(SnapshotLifecyclePolicy::getRepository)
.collect(Collectors.toSet());

getAllSnapshots(repositioriesToFetch, new ActionListener<>() {
getAllSuccessfulSnapshots(repositioriesToFetch, new ActionListener<>() {
@Override
public void onResponse(Map<String, List<SnapshotInfo>> allSnapshots) {
// Find all the snapshots that are past their retention date
Expand All @@ -85,7 +93,7 @@ public void onResponse(Map<String, List<SnapshotInfo>> allSnapshots) {
.collect(Collectors.toList())));

// Finally, delete the snapshots that need to be deleted
deleteSnapshots(snapshotsToBeDeleted);
deleteSnapshots(snapshotsToBeDeleted, maxDeletionTime);
}

@Override
Expand Down Expand Up @@ -160,8 +168,8 @@ static boolean snapshotEligibleForDeletion(SnapshotInfo snapshot, Map<String, Li
return eligible;
}

void getAllSnapshots(Collection<String> repositories, ActionListener<Map<String, List<SnapshotInfo>>> listener,
Consumer<Exception> errorHandler) {
void getAllSuccessfulSnapshots(Collection<String> repositories, ActionListener<Map<String, List<SnapshotInfo>>> listener,
Consumer<Exception> errorHandler) {
if (repositories.isEmpty()) {
// Skip retrieving anything if there are no repositories to fetch
listener.onResponse(Collections.emptyMap());
Expand All @@ -175,7 +183,11 @@ void getAllSnapshots(Collection<String> repositories, ActionListener<Map<String,
public void onResponse(final GetSnapshotsResponse resp) {
Map<String, List<SnapshotInfo>> snapshots = new HashMap<>();
repositories.forEach(repo -> {
snapshots.put(repo, resp.getSnapshots(repo));
snapshots.put(repo,
// Only return snapshots in the SUCCESS state
resp.getSnapshots(repo).stream()
.filter(info -> info.state() == SnapshotState.SUCCESS)
.collect(Collectors.toList()));
});
listener.onResponse(snapshots);
}
Expand All @@ -188,42 +200,64 @@ public void onFailure(Exception e) {
});
}

void deleteSnapshots(Map<String, List<SnapshotInfo>> snapshotsToDelete) {
// TODO: make this more resilient and possibly only delete for a certain amount of time
void deleteSnapshots(Map<String, List<SnapshotInfo>> snapshotsToDelete, TimeValue maximumTime) {
int count = snapshotsToDelete.values().stream().mapToInt(List::size).sum();
if (count == 0) {
logger.debug("no snapshots are eligible for deletion");
return;
}

logger.info("starting snapshot retention deletion for [{}] snapshots", count);
snapshotsToDelete.forEach((repo, snapshots) -> {
snapshots.forEach(info -> {
logger.info("[{}] snapshot retention deleting snapshot [{}]", repo, info.snapshotId());
CountDownLatch latch = new CountDownLatch(1);
client.admin().cluster().prepareDeleteSnapshot(repo, info.snapshotId().getName())
.execute(new LatchedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
if (acknowledgedResponse.isAcknowledged()) {
logger.debug("[{}] snapshot [{}] deleted successfully", repo, info.snapshotId());
}
}

@Override
public void onFailure(Exception e) {
logger.warn(new ParameterizedMessage("[{}] failed to delete snapshot [{}] for retention",
repo, info.snapshotId()), e);
}
}, latch));
try {
// Deletes cannot occur simultaneously, so wait for this
// deletion to complete before attempting the next one
latch.await();
} catch (InterruptedException e) {
logger.error(new ParameterizedMessage("[{}] deletion of snapshot [{}] interrupted",
repo, info.snapshotId()), e);
long startTime = nowNanoSupplier.getAsLong();
int deleted = 0;
for (Map.Entry<String, List<SnapshotInfo>> entry : snapshotsToDelete.entrySet()) {
String repo = entry.getKey();
List<SnapshotInfo> snapshots = entry.getValue();
for (SnapshotInfo info : snapshots) {
deleteSnapshot(repo, info.snapshotId());
deleted++;
// Check whether we have exceeded the maximum time allowed to spend deleting
// snapshots, if we have, short-circuit the rest of the deletions
TimeValue elapsedDeletionTime = TimeValue.timeValueNanos(nowNanoSupplier.getAsLong() - startTime);
logger.trace("elapsed time for deletion of [{}] snapshot: {}", info.snapshotId(), elapsedDeletionTime);
if (elapsedDeletionTime.compareTo(maximumTime) > 0) {
logger.info("maximum snapshot retention deletion time reached, time spent: [{}]," +
" maximum allowed time: [{}], deleted {} out of {} snapshots scheduled for deletion",
elapsedDeletionTime, maximumTime, deleted, count);
return;
}
});
});
}
}
}

/**
* Delete the given snapshot from the repository in blocking manner
*/
void deleteSnapshot(String repo, SnapshotId snapshot) {
logger.info("[{}] snapshot retention deleting snapshot [{}]", repo, snapshot);
CountDownLatch latch = new CountDownLatch(1);
client.admin().cluster().prepareDeleteSnapshot(repo, snapshot.getName())
.execute(new LatchedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
if (acknowledgedResponse.isAcknowledged()) {
logger.debug("[{}] snapshot [{}] deleted successfully", repo, snapshot);
}
}

@Override
public void onFailure(Exception e) {
logger.warn(new ParameterizedMessage("[{}] failed to delete snapshot [{}] for retention",
repo, snapshot), e);
}
}, latch));
try {
// Deletes cannot occur simultaneously, so wait for this
// deletion to complete before attempting the next one
latch.await();
} catch (InterruptedException e) {
logger.error(new ParameterizedMessage("[{}] deletion of snapshot [{}] interrupted",
repo, snapshot), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void testJobsAreScheduled() {

private static class FakeRetentionTask extends SnapshotRetentionTask {
FakeRetentionTask() {
super(mock(Client.class), null);
super(mock(Client.class), null, System::nanoTime);
}

@Override
Expand Down
Loading