Skip to content

Commit 23cfe40

Browse files
committed
Add base framework for snapshot retention (#43605)
* Add base framework for snapshot retention This adds a basic `SnapshotRetentionService` and `SnapshotRetentionTask` to start as the basis for SLM's retention implementation. Relates to #38461 * Remove extraneous 'public' * Use a local var instead of reading class var repeatedly
1 parent 4824148 commit 23cfe40

File tree

6 files changed

+301
-4
lines changed

6 files changed

+301
-4
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecycleSettings.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
*/
66
package org.elasticsearch.xpack.core.indexlifecycle;
77

8+
import org.elasticsearch.common.Strings;
89
import org.elasticsearch.common.settings.Setting;
910
import org.elasticsearch.common.unit.TimeValue;
11+
import org.elasticsearch.xpack.core.scheduler.CronSchedule;
1012

1113
/**
1214
* Class encapsulating settings related to Index Lifecycle Management X-Pack Plugin
@@ -17,6 +19,8 @@ public class LifecycleSettings {
1719
public static final String LIFECYCLE_INDEXING_COMPLETE = "index.lifecycle.indexing_complete";
1820

1921
public static final String SLM_HISTORY_INDEX_ENABLED = "slm.history_index_enabled";
22+
public static final String SLM_RETENTION_SCHEDULE = "slm.retention_schedule";
23+
2024

2125
public static final Setting<TimeValue> LIFECYCLE_POLL_INTERVAL_SETTING = Setting.timeSetting(LIFECYCLE_POLL_INTERVAL,
2226
TimeValue.timeValueMinutes(10), TimeValue.timeValueSeconds(1), Setting.Property.Dynamic, Setting.Property.NodeScope);
@@ -27,4 +31,15 @@ public class LifecycleSettings {
2731

2832
public static final Setting<Boolean> SLM_HISTORY_INDEX_ENABLED_SETTING = Setting.boolSetting(SLM_HISTORY_INDEX_ENABLED, true,
2933
Setting.Property.NodeScope);
34+
public static final Setting<String> SLM_RETENTION_SCHEDULE_SETTING = Setting.simpleString(SLM_RETENTION_SCHEDULE, str -> {
35+
try {
36+
if (Strings.hasText(str)) {
37+
// Test that the setting is a valid cron syntax
38+
new CronSchedule(str);
39+
}
40+
} catch (Exception e) {
41+
throw new IllegalArgumentException("invalid cron expression [" + str + "] for SLM retention schedule [" +
42+
SLM_RETENTION_SCHEDULE + "]", e);
43+
}
44+
}, Setting.Property.Dynamic, Setting.Property.NodeScope);
3045
}

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@
8989
import org.elasticsearch.xpack.indexlifecycle.action.TransportStopILMAction;
9090
import org.elasticsearch.xpack.snapshotlifecycle.SnapshotLifecycleService;
9191
import org.elasticsearch.xpack.snapshotlifecycle.SnapshotLifecycleTask;
92+
import org.elasticsearch.xpack.snapshotlifecycle.SnapshotRetentionService;
93+
import org.elasticsearch.xpack.snapshotlifecycle.SnapshotRetentionTask;
9294
import org.elasticsearch.xpack.snapshotlifecycle.action.RestDeleteSnapshotLifecycleAction;
9395
import org.elasticsearch.xpack.snapshotlifecycle.action.RestExecuteSnapshotLifecycleAction;
9496
import org.elasticsearch.xpack.snapshotlifecycle.action.RestGetSnapshotLifecycleAction;
@@ -111,6 +113,7 @@
111113
public class IndexLifecycle extends Plugin implements ActionPlugin {
112114
private final SetOnce<IndexLifecycleService> indexLifecycleInitialisationService = new SetOnce<>();
113115
private final SetOnce<SnapshotLifecycleService> snapshotLifecycleService = new SetOnce<>();
116+
private final SetOnce<SnapshotRetentionService> snapshotRetentionService = new SetOnce<>();
114117
private final SetOnce<SnapshotHistoryStore> snapshotHistoryStore = new SetOnce<>();
115118
private Settings settings;
116119
private boolean enabled;
@@ -132,7 +135,8 @@ public List<Setting<?>> getSettings() {
132135
LifecycleSettings.LIFECYCLE_NAME_SETTING,
133136
LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING,
134137
RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING,
135-
LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING);
138+
LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING,
139+
LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING);
136140
}
137141

138142
@Override
@@ -150,7 +154,10 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
150154
snapshotHistoryStore.set(new SnapshotHistoryStore(settings, client, getClock().getZone()));
151155
snapshotLifecycleService.set(new SnapshotLifecycleService(settings,
152156
() -> new SnapshotLifecycleTask(client, clusterService, snapshotHistoryStore.get()), clusterService, getClock()));
153-
return Arrays.asList(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get(), snapshotHistoryStore.get());
157+
snapshotRetentionService.set(new SnapshotRetentionService(settings, () -> new SnapshotRetentionTask(client, clusterService),
158+
clusterService, getClock()));
159+
return Arrays.asList(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get(), snapshotHistoryStore.get(),
160+
snapshotRetentionService.get());
154161
}
155162

156163
@Override
@@ -240,7 +247,7 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
240247
@Override
241248
public void close() {
242249
try {
243-
IOUtils.close(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get());
250+
IOUtils.close(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get(), snapshotRetentionService.get());
244251
} catch (IOException e) {
245252
throw new ElasticsearchException("unable to close index lifecycle services", e);
246253
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.snapshotlifecycle;
8+
9+
import org.apache.logging.log4j.LogManager;
10+
import org.apache.logging.log4j.Logger;
11+
import org.elasticsearch.cluster.LocalNodeMasterListener;
12+
import org.elasticsearch.cluster.service.ClusterService;
13+
import org.elasticsearch.common.Strings;
14+
import org.elasticsearch.common.settings.Settings;
15+
import org.elasticsearch.threadpool.ThreadPool;
16+
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
17+
import org.elasticsearch.xpack.core.scheduler.CronSchedule;
18+
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
19+
import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicy;
20+
21+
import java.io.Closeable;
22+
import java.time.Clock;
23+
import java.util.function.Supplier;
24+
25+
/**
26+
* The {@code SnapshotRetentionService} is responsible for scheduling the period kickoff of SLM's
27+
* snapshot retention. This means that when the retention schedule setting is configured, the
28+
* scheduler schedules a job that, when triggered, will delete snapshots according to the retention
29+
* policy configured in the {@link SnapshotLifecyclePolicy}.
30+
*/
31+
public class SnapshotRetentionService implements LocalNodeMasterListener, Closeable {
32+
33+
static final String SLM_RETENTION_JOB_ID = "slm-retention-job";
34+
35+
private static final Logger logger = LogManager.getLogger(SnapshotRetentionService.class);
36+
37+
private final SchedulerEngine scheduler;
38+
39+
private volatile String slmRetentionSchedule;
40+
41+
public SnapshotRetentionService(Settings settings,
42+
Supplier<SnapshotRetentionTask> taskSupplier,
43+
ClusterService clusterService,
44+
Clock clock) {
45+
this.scheduler = new SchedulerEngine(settings, clock);
46+
this.scheduler.register(taskSupplier.get());
47+
this.slmRetentionSchedule = LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING.get(settings);
48+
clusterService.addLocalNodeMasterListener(this);
49+
clusterService.getClusterSettings().addSettingsUpdateConsumer(LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING,
50+
this::setUpdateSchedule);
51+
}
52+
53+
void setUpdateSchedule(String retentionSchedule) {
54+
this.slmRetentionSchedule = retentionSchedule;
55+
// The schedule has changed, so reschedule the retention job
56+
rescheduleRetentionJob();
57+
}
58+
59+
// Only used for testing
60+
SchedulerEngine getScheduler() {
61+
return this.scheduler;
62+
}
63+
64+
@Override
65+
public void onMaster() {
66+
rescheduleRetentionJob();
67+
}
68+
69+
@Override
70+
public void offMaster() {
71+
cancelRetentionJob();
72+
}
73+
74+
private void rescheduleRetentionJob() {
75+
final String schedule = this.slmRetentionSchedule;
76+
if (Strings.hasText(schedule)) {
77+
final SchedulerEngine.Job retentionJob = new SchedulerEngine.Job(SLM_RETENTION_JOB_ID,
78+
new CronSchedule(schedule));
79+
logger.debug("scheduling SLM retention job for [{}]", schedule);
80+
this.scheduler.add(retentionJob);
81+
} else {
82+
// The schedule has been unset, so cancel the scheduled retention job
83+
cancelRetentionJob();
84+
}
85+
}
86+
87+
private void cancelRetentionJob() {
88+
this.scheduler.scheduledJobIds().forEach(this.scheduler::remove);
89+
}
90+
91+
@Override
92+
public String executorName() {
93+
return ThreadPool.Names.SNAPSHOT;
94+
}
95+
96+
@Override
97+
public void close() {
98+
this.scheduler.stop();
99+
}
100+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.snapshotlifecycle;
8+
9+
import org.apache.logging.log4j.LogManager;
10+
import org.apache.logging.log4j.Logger;
11+
import org.elasticsearch.client.Client;
12+
import org.elasticsearch.cluster.ClusterState;
13+
import org.elasticsearch.cluster.service.ClusterService;
14+
import org.elasticsearch.snapshots.SnapshotInfo;
15+
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
16+
import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicy;
17+
18+
import java.util.Collection;
19+
import java.util.Collections;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.Set;
23+
import java.util.concurrent.atomic.AtomicBoolean;
24+
import java.util.stream.Collectors;
25+
26+
/**
27+
* The {@code SnapshotRetentionTask} is invoked by the scheduled job from the
28+
* {@link SnapshotRetentionService}. It is responsible for retrieving the snapshots for repositories
29+
* that have an SLM policy configured, and then deleting the snapshots that fall outside the
30+
* retention policy.
31+
*/
32+
public class SnapshotRetentionTask implements SchedulerEngine.Listener {
33+
34+
private static final Logger logger = LogManager.getLogger(SnapshotRetentionTask.class);
35+
private static final AtomicBoolean running = new AtomicBoolean(false);
36+
37+
private final Client client;
38+
private final ClusterService clusterService;
39+
40+
public SnapshotRetentionTask(Client client, ClusterService clusterService) {
41+
this.client = client;
42+
this.clusterService = clusterService;
43+
}
44+
45+
@Override
46+
public void triggered(SchedulerEngine.Event event) {
47+
assert event.getJobName().equals(SnapshotRetentionService.SLM_RETENTION_JOB_ID) :
48+
"expected id to be " + SnapshotRetentionService.SLM_RETENTION_JOB_ID + " but it was " + event.getJobName();
49+
if (running.compareAndSet(false, true)) {
50+
try {
51+
logger.info("starting SLM retention snapshot cleanup task");
52+
final ClusterState state = clusterService.state();
53+
54+
// Find all SLM policies that have retention enabled
55+
final Map<String, SnapshotLifecyclePolicy> policiesWithRetention = getAllPoliciesWithRetentionEnabled(state);
56+
57+
// For those policies (there may be more than one for the same repo),
58+
// return the repos that we need to get the snapshots for
59+
final Set<String> repositioriesToFetch = policiesWithRetention.values().stream()
60+
.map(SnapshotLifecyclePolicy::getRepository)
61+
.collect(Collectors.toSet());
62+
63+
// Find all the snapshots that are past their retention date
64+
// TODO: include min/max snapshot count as a criteria for deletion also
65+
final List<SnapshotInfo> snapshotsToBeDeleted = getAllSnapshots(repositioriesToFetch).stream()
66+
.filter(snapshot -> snapshotEligibleForDeletion(snapshot, policiesWithRetention))
67+
.collect(Collectors.toList());
68+
69+
// Finally, delete the snapshots that need to be deleted
70+
deleteSnapshots(snapshotsToBeDeleted);
71+
72+
} finally {
73+
running.set(false);
74+
}
75+
} else {
76+
logger.debug("snapshot lifecycle retention task started, but a task is already running, skipping");
77+
}
78+
}
79+
80+
static Map<String, SnapshotLifecyclePolicy> getAllPoliciesWithRetentionEnabled(final ClusterState state) {
81+
// TODO: fill me in
82+
return Collections.emptyMap();
83+
}
84+
85+
static boolean snapshotEligibleForDeletion(SnapshotInfo snapshot, Map<String, SnapshotLifecyclePolicy> policies) {
86+
// TODO: fill me in
87+
return false;
88+
}
89+
90+
List<SnapshotInfo> getAllSnapshots(Collection<String> repositories) {
91+
// TODO: fill me in
92+
return Collections.emptyList();
93+
}
94+
95+
void deleteSnapshots(List<SnapshotInfo> snapshotsToDelete) {
96+
// TODO: fill me in
97+
logger.info("deleting {}", snapshotsToDelete);
98+
}
99+
}

x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleServiceTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ public static SnapshotLifecyclePolicy createPolicy(String id, String schedule) {
331331
return new SnapshotLifecyclePolicy(id, randomAlphaOfLength(4), schedule, randomAlphaOfLength(4), config);
332332
}
333333

334-
private static String randomSchedule() {
334+
public static String randomSchedule() {
335335
return randomIntBetween(0, 59) + " " +
336336
randomIntBetween(0, 59) + " " +
337337
randomIntBetween(0, 12) + " * * ?";
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.snapshotlifecycle;
8+
9+
import org.elasticsearch.Version;
10+
import org.elasticsearch.cluster.node.DiscoveryNode;
11+
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
12+
import org.elasticsearch.cluster.service.ClusterService;
13+
import org.elasticsearch.common.settings.ClusterSettings;
14+
import org.elasticsearch.common.settings.Setting;
15+
import org.elasticsearch.common.settings.Settings;
16+
import org.elasticsearch.test.ClusterServiceUtils;
17+
import org.elasticsearch.test.ESTestCase;
18+
import org.elasticsearch.threadpool.TestThreadPool;
19+
import org.elasticsearch.threadpool.ThreadPool;
20+
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
21+
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
22+
import org.elasticsearch.xpack.core.watcher.watch.ClockMock;
23+
24+
import java.util.Collections;
25+
import java.util.HashSet;
26+
import java.util.Set;
27+
28+
import static org.hamcrest.Matchers.containsInAnyOrder;
29+
import static org.hamcrest.Matchers.equalTo;
30+
31+
public class SnapshotRetentionServiceTests extends ESTestCase {
32+
33+
private static final ClusterSettings clusterSettings;
34+
static {
35+
Set<Setting<?>> internalSettings = new HashSet<>(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
36+
internalSettings.add(LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING);
37+
clusterSettings = new ClusterSettings(Settings.EMPTY, internalSettings);
38+
}
39+
40+
public void testJobsAreScheduled() {
41+
final DiscoveryNode discoveryNode = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(),
42+
Collections.emptyMap(), DiscoveryNodeRole.BUILT_IN_ROLES, Version.CURRENT);
43+
ClockMock clock = new ClockMock();
44+
45+
try (ThreadPool threadPool = new TestThreadPool("test");
46+
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool, discoveryNode, clusterSettings);
47+
SnapshotRetentionService service = new SnapshotRetentionService(Settings.EMPTY,
48+
FakeRetentionTask::new, clusterService, clock)) {
49+
assertThat(service.getScheduler().jobCount(), equalTo(0));
50+
51+
service.setUpdateSchedule(SnapshotLifecycleServiceTests.randomSchedule());
52+
assertThat(service.getScheduler().scheduledJobIds(), containsInAnyOrder(SnapshotRetentionService.SLM_RETENTION_JOB_ID));
53+
54+
service.offMaster();
55+
assertThat(service.getScheduler().jobCount(), equalTo(0));
56+
57+
service.onMaster();
58+
assertThat(service.getScheduler().scheduledJobIds(), containsInAnyOrder(SnapshotRetentionService.SLM_RETENTION_JOB_ID));
59+
60+
service.setUpdateSchedule("");
61+
assertThat(service.getScheduler().jobCount(), equalTo(0));
62+
threadPool.shutdownNow();
63+
}
64+
}
65+
66+
private static class FakeRetentionTask extends SnapshotRetentionTask {
67+
FakeRetentionTask() {
68+
super(null, null);
69+
}
70+
71+
@Override
72+
public void triggered(SchedulerEngine.Event event) {
73+
super.triggered(event);
74+
}
75+
}
76+
}

0 commit comments

Comments
 (0)