Skip to content

Commit 2d56ae2

Browse files
authored
Retry SLM retention after currently running snapshot completes (#45802)
* Retry SLM retention after currently running snapshot completes This commit adds a ClusterStateObserver to wait until the currently running snapshot is complete before proceeding with snapshot deletion. SLM retention waits for the maximum allowed deletion time for the snapshot to complete, however, the waiting time is not factored into the limit on actual deletions. Relates to #43663 * Increase timeout waiting for snapshot completion * Apply patch From https://github.com/original-brownbear/elasticsearch/commit/2374316f0d1912c9e1498bece195546a1dc60bce.patch * Rename test variables
1 parent fcd0a30 commit 2d56ae2

File tree

6 files changed

+289
-65
lines changed

6 files changed

+289
-65
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.carrotsearch.hppc.cursors.ObjectCursor;
2323
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
2424
import org.elasticsearch.action.ActionListener;
25+
import org.elasticsearch.action.ActionRunnable;
2526
import org.elasticsearch.action.support.ActionFilters;
2627
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
2728
import org.elasticsearch.client.node.NodeClient;
@@ -121,10 +122,13 @@ protected void masterOperation(Task task, final SnapshotsStatusRequest request,
121122
new TransportNodesSnapshotsStatus.Request(nodesIds.toArray(new String[nodesIds.size()]))
122123
.snapshots(snapshots).timeout(request.masterNodeTimeout());
123124
client.executeLocally(TransportNodesSnapshotsStatus.TYPE, nodesRequest,
124-
ActionListener.map(
125-
listener, nodeSnapshotStatuses ->
126-
buildResponse(request, snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots())),
127-
nodeSnapshotStatuses)));
125+
ActionListener.wrap(
126+
nodeSnapshotStatuses ->
127+
threadPool.executor(ThreadPool.Names.GENERIC).execute(
128+
ActionRunnable.wrap(listener, l -> l.onResponse(
129+
buildResponse(
130+
request, snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots())),
131+
nodeSnapshotStatuses)))), listener::onFailure));
128132
} else {
129133
// We don't have any in-progress shards, just return current stats
130134
listener.onResponse(buildResponse(request, currentSnapshots, null));

x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java

Lines changed: 162 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -208,38 +208,33 @@ public void testPolicyManualExecution() throws Exception {
208208
assertThat(EntityUtils.toString(badResp.getResponse().getEntity()),
209209
containsString("no such snapshot lifecycle policy [" + policyName + "-bad]"));
210210

211-
Response goodResp = client().performRequest(new Request("PUT", "/_slm/policy/" + policyName + "/_execute"));
211+
final String snapshotName = executePolicy(policyName);
212212

213-
try (XContentParser parser = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY,
214-
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, EntityUtils.toByteArray(goodResp.getEntity()))) {
215-
final String snapshotName = parser.mapStrings().get("snapshot_name");
216-
217-
// Check that the executed snapshot is created
218-
assertBusy(() -> {
219-
try {
220-
Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName));
221-
Map<String, Object> snapshotResponseMap;
222-
try (InputStream is = response.getEntity().getContent()) {
223-
snapshotResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
224-
}
225-
assertThat(snapshotResponseMap.size(), greaterThan(0));
226-
final Map<String, Object> metadata = extractMetadata(snapshotResponseMap, snapshotName);
227-
assertNotNull(metadata);
228-
assertThat(metadata.get("policy"), equalTo(policyName));
229-
assertHistoryIsPresent(policyName, true, repoId, CREATE_OPERATION);
230-
} catch (ResponseException e) {
231-
fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity()));
213+
// Check that the executed snapshot is created
214+
assertBusy(() -> {
215+
try {
216+
Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName));
217+
Map<String, Object> snapshotResponseMap;
218+
try (InputStream is = response.getEntity().getContent()) {
219+
snapshotResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
232220
}
221+
assertThat(snapshotResponseMap.size(), greaterThan(0));
222+
final Map<String, Object> metadata = extractMetadata(snapshotResponseMap, snapshotName);
223+
assertNotNull(metadata);
224+
assertThat(metadata.get("policy"), equalTo(policyName));
225+
assertHistoryIsPresent(policyName, true, repoId, CREATE_OPERATION);
226+
} catch (ResponseException e) {
227+
fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity()));
228+
}
233229

234-
Map<String, Object> stats = getSLMStats();
235-
Map<String, Object> policyStats = (Map<String, Object>) stats.get(SnapshotLifecycleStats.POLICY_STATS.getPreferredName());
236-
Map<String, Object> policyIdStats = (Map<String, Object>) policyStats.get(policyName);
237-
int snapsTaken = (int) policyIdStats.get(SnapshotLifecycleStats.SnapshotPolicyStats.SNAPSHOTS_TAKEN.getPreferredName());
238-
int totalTaken = (int) stats.get(SnapshotLifecycleStats.TOTAL_TAKEN.getPreferredName());
239-
assertThat(snapsTaken, equalTo(1));
240-
assertThat(totalTaken, equalTo(1));
241-
});
242-
}
230+
Map<String, Object> stats = getSLMStats();
231+
Map<String, Object> policyStats = (Map<String, Object>) stats.get(SnapshotLifecycleStats.POLICY_STATS.getPreferredName());
232+
Map<String, Object> policyIdStats = (Map<String, Object>) policyStats.get(policyName);
233+
int snapsTaken = (int) policyIdStats.get(SnapshotLifecycleStats.SnapshotPolicyStats.SNAPSHOTS_TAKEN.getPreferredName());
234+
int totalTaken = (int) stats.get(SnapshotLifecycleStats.TOTAL_TAKEN.getPreferredName());
235+
assertThat(snapsTaken, equalTo(1));
236+
assertThat(totalTaken, equalTo(1));
237+
});
243238
}
244239

245240
@SuppressWarnings("unchecked")
@@ -261,31 +256,25 @@ public void testBasicTimeBasedRetenion() throws Exception {
261256
new SnapshotRetentionConfiguration(TimeValue.timeValueMillis(1), null, null));
262257

263258
// Manually create a snapshot
264-
Response executeResp = client().performRequest(new Request("PUT", "/_slm/policy/" + policyName + "/_execute"));
259+
final String snapshotName = executePolicy(policyName);
265260

266-
final String snapshotName;
267-
try (XContentParser parser = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY,
268-
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, EntityUtils.toByteArray(executeResp.getEntity()))) {
269-
snapshotName = parser.mapStrings().get("snapshot_name");
270-
271-
// Check that the executed snapshot is created
272-
assertBusy(() -> {
273-
try {
274-
Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName));
275-
Map<String, Object> snapshotResponseMap;
276-
try (InputStream is = response.getEntity().getContent()) {
277-
snapshotResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
278-
}
279-
assertThat(snapshotResponseMap.size(), greaterThan(0));
280-
final Map<String, Object> metadata = extractMetadata(snapshotResponseMap, snapshotName);
281-
assertNotNull(metadata);
282-
assertThat(metadata.get("policy"), equalTo(policyName));
283-
assertHistoryIsPresent(policyName, true, repoId, CREATE_OPERATION);
284-
} catch (ResponseException e) {
285-
fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity()));
261+
// Check that the executed snapshot is created
262+
assertBusy(() -> {
263+
try {
264+
Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName));
265+
Map<String, Object> snapshotResponseMap;
266+
try (InputStream is = response.getEntity().getContent()) {
267+
snapshotResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
286268
}
287-
});
288-
}
269+
assertThat(snapshotResponseMap.size(), greaterThan(0));
270+
final Map<String, Object> metadata = extractMetadata(snapshotResponseMap, snapshotName);
271+
assertNotNull(metadata);
272+
assertThat(metadata.get("policy"), equalTo(policyName));
273+
assertHistoryIsPresent(policyName, true, repoId, CREATE_OPERATION);
274+
} catch (ResponseException e) {
275+
fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity()));
276+
}
277+
});
289278

290279
// Run retention every second
291280
ClusterUpdateSettingsRequest req = new ClusterUpdateSettingsRequest();
@@ -391,6 +380,127 @@ public void testSnapshotInProgress() throws Exception {
391380
}
392381
}
393382

383+
@SuppressWarnings("unchecked")
384+
public void testRetentionWhileSnapshotInProgress() throws Exception {
385+
final String indexName = "test";
386+
final String slowPolicy = "slow";
387+
final String fastPolicy = "fast";
388+
final String slowRepo = "slow-repo";
389+
final String fastRepo = "fast-repo";
390+
int docCount = 20;
391+
for (int i = 0; i < docCount; i++) {
392+
index(client(), indexName, "" + i, "foo", "bar");
393+
}
394+
395+
// Create snapshot repos, one fast and one slow
396+
initializeRepo(slowRepo, "1b");
397+
initializeRepo(fastRepo, "10mb");
398+
399+
createSnapshotPolicy(slowPolicy, "snap", "1 2 3 4 5 ?", slowRepo, indexName, true,
400+
new SnapshotRetentionConfiguration(TimeValue.timeValueSeconds(0), null, null));
401+
createSnapshotPolicy(fastPolicy, "snap", "1 2 3 4 5 ?", fastRepo, indexName, true,
402+
new SnapshotRetentionConfiguration(TimeValue.timeValueSeconds(0), null, null));
403+
404+
// Create a snapshot and wait for it to be complete (need something that can be deleted)
405+
final String completedSnapshotName = executePolicy(fastPolicy);
406+
assertBusy(() -> {
407+
try {
408+
Response getResp = client().performRequest(new Request("GET",
409+
"/_snapshot/" + fastRepo + "/" + completedSnapshotName + "/_status"));
410+
try (InputStream content = getResp.getEntity().getContent()) {
411+
Map<String, Object> snaps = XContentHelper.convertToMap(XContentType.JSON.xContent(), content, true);
412+
logger.info("--> waiting for snapshot {} to be successful, got: {}", completedSnapshotName, snaps);
413+
List<Map<String, Object>> snaps2 = (List<Map<String, Object>>) snaps.get("snapshots");
414+
assertThat(snaps2.get(0).get("state"), equalTo("SUCCESS"));
415+
}
416+
} catch (NullPointerException | ResponseException e) {
417+
fail("unable to retrieve completed snapshot: " + e);
418+
}
419+
}, 60, TimeUnit.SECONDS);
420+
421+
// Take another snapshot
422+
final String slowSnapshotName = executePolicy(slowPolicy);
423+
424+
// Check that the executed snapshot shows up in the SLM output as in_progress
425+
assertBusy(() -> {
426+
try {
427+
Response response = client().performRequest(new Request("GET", "/_slm/policy" + (randomBoolean() ? "" : "?human")));
428+
Map<String, Object> policyResponseMap;
429+
try (InputStream content = response.getEntity().getContent()) {
430+
policyResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), content, true);
431+
}
432+
assertThat(policyResponseMap.size(), greaterThan(0));
433+
Optional<Map<String, Object>> inProgress = Optional.ofNullable((Map<String, Object>) policyResponseMap.get(slowPolicy))
434+
.map(policy -> (Map<String, Object>) policy.get("in_progress"));
435+
436+
if (inProgress.isPresent()) {
437+
Map<String, Object> inProgressMap = inProgress.get();
438+
assertThat(inProgressMap.get("name"), equalTo(slowSnapshotName));
439+
assertNotNull(inProgressMap.get("uuid"));
440+
assertThat(inProgressMap.get("state"), equalTo("STARTED"));
441+
assertThat((long) inProgressMap.get("start_time_millis"), greaterThan(0L));
442+
assertNull(inProgressMap.get("failure"));
443+
} else {
444+
fail("expected in_progress to contain a running snapshot, but the response was " + policyResponseMap);
445+
}
446+
} catch (ResponseException e) {
447+
fail("expected policy to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity()));
448+
}
449+
});
450+
451+
// Run retention every second
452+
ClusterUpdateSettingsRequest req = new ClusterUpdateSettingsRequest();
453+
req.transientSettings(Settings.builder().put(LifecycleSettings.SLM_RETENTION_SCHEDULE, "*/1 * * * * ?"));
454+
try (XContentBuilder builder = jsonBuilder()) {
455+
req.toXContent(builder, ToXContent.EMPTY_PARAMS);
456+
Request r = new Request("PUT", "/_cluster/settings");
457+
r.setJsonEntity(Strings.toString(builder));
458+
client().performRequest(r);
459+
}
460+
461+
// Cancel the snapshot since it is not going to complete quickly, do it in a thread because
462+
// cancelling the snapshot can take a long time and we might as well check retention while
463+
// its deleting
464+
Thread t = new Thread(() -> {
465+
try {
466+
assertOK(client().performRequest(new Request("DELETE", "/_snapshot/" + slowRepo + "/" + slowSnapshotName)));
467+
} catch (IOException e) {
468+
fail("should not have thrown " + e);
469+
}
470+
});
471+
t.start();
472+
473+
// Check that the snapshot created by the policy has been removed by retention
474+
assertBusy(() -> {
475+
// We expect a failed response because the snapshot should not exist
476+
try {
477+
logger.info("--> checking to see if snapshot has been deleted...");
478+
Response response = client().performRequest(new Request("GET", "/_snapshot/" + slowRepo + "/" + completedSnapshotName));
479+
assertThat(EntityUtils.toString(response.getEntity()), containsString("snapshot_missing_exception"));
480+
} catch (ResponseException e) {
481+
assertThat(EntityUtils.toString(e.getResponse().getEntity()), containsString("snapshot_missing_exception"));
482+
}
483+
}, 60, TimeUnit.SECONDS);
484+
485+
t.join(5000);
486+
}
487+
488+
/**
489+
* Execute the given policy and return the generated snapshot name
490+
*/
491+
private String executePolicy(String policyId) {
492+
try {
493+
Response executeRepsonse = client().performRequest(new Request("PUT", "/_slm/policy/" + policyId + "/_execute"));
494+
try (XContentParser parser = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY,
495+
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, EntityUtils.toByteArray(executeRepsonse.getEntity()))) {
496+
return parser.mapStrings().get("snapshot_name");
497+
}
498+
} catch (Exception e) {
499+
fail("failed to execute policy " + policyId + " - got: " + e);
500+
throw new RuntimeException(e);
501+
}
502+
}
503+
394504
@SuppressWarnings("unchecked")
395505
private static Map<String, Object> extractMetadata(Map<String, Object> snapshotResponseMap, String snapshotPrefix) {
396506
List<Map<String, Object>> snapResponse = ((List<Map<String, Object>>) snapshotResponseMap.get("responses")).stream()

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
161161
snapshotLifecycleService.set(new SnapshotLifecycleService(settings,
162162
() -> new SnapshotLifecycleTask(client, clusterService, snapshotHistoryStore.get()), clusterService, getClock()));
163163
snapshotRetentionService.set(new SnapshotRetentionService(settings,
164-
() -> new SnapshotRetentionTask(client, clusterService, System::nanoTime, snapshotHistoryStore.get()),
164+
() -> new SnapshotRetentionTask(client, clusterService, System::nanoTime, snapshotHistoryStore.get(), threadPool),
165165
clusterService, getClock()));
166166
return Arrays.asList(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get(), snapshotHistoryStore.get(),
167167
snapshotRetentionService.get());

0 commit comments

Comments
 (0)