diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/SnapshotLifecyclePolicyItem.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/SnapshotLifecyclePolicyItem.java index 9db4a0652e730..70f108957d731 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/SnapshotLifecyclePolicyItem.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/SnapshotLifecyclePolicyItem.java @@ -211,6 +211,22 @@ public static SnapshotInProgress fromEntry(SnapshotsInProgress.Entry entry) { entry.state(), entry.startTime(), entry.failure()); } + public SnapshotId getSnapshotId() { + return snapshotId; + } + + public SnapshotsInProgress.State getState() { + return state; + } + + public long getStartTime() { + return startTime; + } + + public String getFailure() { + return failure; + } + @Override public void writeTo(StreamOutput out) throws IOException { this.snapshotId.writeTo(out); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/action/GetSnapshotLifecycleAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/action/GetSnapshotLifecycleAction.java index 96d7c19f56f2c..5821f19fc9b21 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/action/GetSnapshotLifecycleAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/action/GetSnapshotLifecycleAction.java @@ -94,6 +94,10 @@ public Response(StreamInput in) throws IOException { this.lifecycles = in.readList(SnapshotLifecyclePolicyItem::new); } + public List getPolicies() { + return this.lifecycles; + } + @Override public String toString() { return Strings.toString(this); diff --git a/x-pack/plugin/ilm/build.gradle b/x-pack/plugin/ilm/build.gradle index 17b66741a7a9f..74c239414a390 100644 --- a/x-pack/plugin/ilm/build.gradle +++ b/x-pack/plugin/ilm/build.gradle @@ -30,4 +30,3 @@ gradle.projectsEvaluated { } integTest.enabled = false - diff --git a/x-pack/plugin/ilm/qa/multi-node/build.gradle b/x-pack/plugin/ilm/qa/multi-node/build.gradle index 1746cc8f840ce..8e8df478d1235 100644 --- a/x-pack/plugin/ilm/qa/multi-node/build.gradle +++ b/x-pack/plugin/ilm/qa/multi-node/build.gradle @@ -25,8 +25,4 @@ testClusters.integTest { setting 'xpack.ml.enabled', 'false' setting 'xpack.license.self_generated.type', 'trial' setting 'indices.lifecycle.poll_interval', '1000ms' - // TODO: Find a way to run these tests with more than one snapshot pool thread. Currently we need to limit to one thread so that the - // rate limiting settings in SnapshotLifecycleIT doesn't result in blocked snapshot threads because multiple threads overshoot - // the limit simultaneously and the rate limiter then moves to wait minutes to make up for this. - setting 'thread_pool.snapshot.max', '1' } diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleRestIT.java similarity index 72% rename from x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java rename to x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleRestIT.java index a25a90bf4199f..43194823281d7 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleRestIT.java @@ -9,7 +9,6 @@ import org.apache.http.util.EntityUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; -import org.apache.lucene.util.LuceneTestCase.AwaitsFix; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; @@ -41,7 +40,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.TimeUnit; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -49,15 +47,13 @@ import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryItem.DELETE_OPERATION; import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryStore.SLM_HISTORY_INDEX_PREFIX; import static org.elasticsearch.xpack.ilm.TimeSeriesLifecycleActionsIT.getStepKeyForIndex; -import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.startsWith; -@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/46205") -public class SnapshotLifecycleIT extends ESRestTestCase { +public class SnapshotLifecycleRestIT extends ESRestTestCase { @Override protected boolean waitForAllSnapshotsWiped() { @@ -335,183 +331,6 @@ public void testBasicTimeBasedRetenion() throws Exception { } } - @SuppressWarnings("unchecked") - public void testSnapshotInProgress() throws Exception { - final String indexName = "test"; - final String policyName = "test-policy"; - final String repoId = "my-repo"; - int docCount = 20; - for (int i = 0; i < docCount; i++) { - index(client(), indexName, "" + i, "foo", "bar"); - } - - // Create a snapshot repo - initializeRepo(repoId, "1b"); - - createSnapshotPolicy(policyName, "snap", "1 2 3 4 5 ?", repoId, indexName, true); - - Response executeRepsonse = client().performRequest(new Request("PUT", "/_slm/policy/" + policyName + "/_execute")); - - try (XContentParser parser = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, EntityUtils.toByteArray(executeRepsonse.getEntity()))) { - final String snapshotName = parser.mapStrings().get("snapshot_name"); - - // Check that the executed snapshot shows up in the SLM output - assertBusy(() -> { - try { - Response response = client().performRequest(new Request("GET", "/_slm/policy" + (randomBoolean() ? "" : "?human"))); - Map policyResponseMap; - try (InputStream content = response.getEntity().getContent()) { - policyResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), content, true); - } - assertThat(policyResponseMap.size(), greaterThan(0)); - Optional> inProgress = Optional.ofNullable((Map) policyResponseMap.get(policyName)) - .map(policy -> (Map) policy.get("in_progress")); - - if (inProgress.isPresent()) { - Map inProgressMap = inProgress.get(); - assertThat(inProgressMap.get("name"), equalTo(snapshotName)); - assertNotNull(inProgressMap.get("uuid")); - assertThat(inProgressMap.get("state"), equalTo("STARTED")); - assertThat((long) inProgressMap.get("start_time_millis"), greaterThan(0L)); - assertNull(inProgressMap.get("failure")); - } else { - fail("expected in_progress to contain a running snapshot, but the response was " + policyResponseMap); - } - } catch (ResponseException e) { - fail("expected policy to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity())); - } - }); - - // Cancel the snapshot since it is not going to complete quickly - try { - client().performRequest(new Request("DELETE", "/_snapshot/" + repoId + "/" + snapshotName)); - } catch (Exception e) { - // ignore - } - } - } - - @SuppressWarnings("unchecked") - public void testRetentionWhileSnapshotInProgress() throws Exception { - final String indexName = "test"; - final String slowPolicy = "slow"; - final String fastPolicy = "fast"; - final String slowRepo = "slow-repo"; - final String fastRepo = "fast-repo"; - int docCount = 20; - for (int i = 0; i < docCount; i++) { - index(client(), indexName, "" + i, "foo", "bar"); - } - - // Create snapshot repos, one fast and one slow - initializeRepo(slowRepo, "1b"); - initializeRepo(fastRepo, "10mb"); - - createSnapshotPolicy(slowPolicy, "slow-snap", "1 2 3 4 5 ?", slowRepo, indexName, true, - new SnapshotRetentionConfiguration(TimeValue.timeValueSeconds(0), null, null)); - createSnapshotPolicy(fastPolicy, "fast-snap", "1 2 3 4 5 ?", fastRepo, indexName, true, - new SnapshotRetentionConfiguration(TimeValue.timeValueSeconds(0), null, null)); - - // Create a snapshot and wait for it to be complete (need something that can be deleted) - final String completedSnapshotName = executePolicy(fastPolicy); - assertBusy(() -> { - try { - Response getResp = client().performRequest(new Request("GET", - "/_snapshot/" + fastRepo + "/" + completedSnapshotName + "/_status")); - try (InputStream content = getResp.getEntity().getContent()) { - Map snaps = XContentHelper.convertToMap(XContentType.JSON.xContent(), content, true); - logger.info("--> waiting for snapshot {} to be successful, got: {}", completedSnapshotName, snaps); - List> snaps2 = (List>) snaps.get("snapshots"); - assertThat(snaps2.get(0).get("state"), equalTo("SUCCESS")); - - // Check that no in_progress snapshots show up - Response response = client().performRequest(new Request("GET", "/_slm/policy")); - Map policyResponseMap; - try (InputStream content2 = response.getEntity().getContent()) { - policyResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), content2, true); - } - assertThat(policyResponseMap.size(), greaterThan(0)); - Optional> inProgress = Optional.ofNullable((Map) policyResponseMap.get(slowPolicy)) - .map(policy -> (Map) policy.get("in_progress")); - - // Ensure no snapshots are running - assertFalse("expected no in progress snapshots but got " + inProgress.orElse(null), inProgress.isPresent()); - } - } catch (NullPointerException | ResponseException e) { - fail("unable to retrieve completed snapshot: " + e); - } - }, 60, TimeUnit.SECONDS); - - // Take another snapshot - final String slowSnapshotName = executePolicy(slowPolicy); - - // Check that the executed snapshot shows up in the SLM output as in_progress - assertBusy(() -> { - try { - Response response = client().performRequest(new Request("GET", "/_slm/policy")); - Map policyResponseMap; - try (InputStream content = response.getEntity().getContent()) { - policyResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), content, true); - } - logger.info("--> checking for 'slow-*' snapshot to show up in policy response, got: " + policyResponseMap); - assertThat(policyResponseMap.size(), greaterThan(0)); - Optional> inProgress = Optional.ofNullable((Map) policyResponseMap.get(slowPolicy)) - .map(policy -> (Map) policy.get("in_progress")); - - if (inProgress.isPresent()) { - Map inProgressMap = inProgress.get(); - assertThat(inProgressMap.get("name"), equalTo(slowSnapshotName)); - assertNotNull(inProgressMap.get("uuid")); - assertThat(inProgressMap.get("state"), anyOf(equalTo("STARTED"), equalTo("INIT"))); - assertThat((long) inProgressMap.get("start_time_millis"), greaterThan(0L)); - assertNull(inProgressMap.get("failure")); - } else { - fail("expected in_progress to contain a running snapshot, but the response was " + policyResponseMap); - } - } catch (ResponseException e) { - fail("expected policy to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity())); - } - }, 60, TimeUnit.SECONDS); - - // Run retention every second - ClusterUpdateSettingsRequest req = new ClusterUpdateSettingsRequest(); - req.transientSettings(Settings.builder().put(LifecycleSettings.SLM_RETENTION_SCHEDULE, "*/1 * * * * ?")); - try (XContentBuilder builder = jsonBuilder()) { - req.toXContent(builder, ToXContent.EMPTY_PARAMS); - Request r = new Request("PUT", "/_cluster/settings"); - r.setJsonEntity(Strings.toString(builder)); - client().performRequest(r); - } - - // Cancel the snapshot since it is not going to complete quickly, do it in a thread because - // cancelling the snapshot can take a long time and we might as well check retention while - // its deleting - Thread t = new Thread(() -> { - try { - assertOK(client().performRequest(new Request("DELETE", "/_snapshot/" + slowRepo + "/" + slowSnapshotName))); - } catch (IOException e) { - fail("should not have thrown " + e); - } - }); - t.start(); - - // Check that the snapshot created by the policy has been removed by retention - assertBusy(() -> { - // We expect a failed response because the snapshot should not exist - try { - Response response = client().performRequest(new Request("GET", "/_snapshot/" + slowRepo + "/" + completedSnapshotName)); - String resp = EntityUtils.toString(response.getEntity()); - logger.info("--> checking to see if snapshot has been deleted, got: " + resp); - assertThat(resp, containsString("snapshot_missing_exception")); - } catch (ResponseException e) { - assertThat(EntityUtils.toString(e.getResponse().getEntity()), containsString("snapshot_missing_exception")); - } - }, 60, TimeUnit.SECONDS); - - t.join(5000); - } - /** * Execute the given policy and return the generated snapshot name */ diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java new file mode 100644 index 0000000000000..52d5244aeb08c --- /dev/null +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java @@ -0,0 +1,281 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.slm; + +import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus; +import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; +import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.snapshots.SnapshotMissingException; +import org.elasticsearch.snapshots.mockstore.MockRepository; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.elasticsearch.xpack.core.ilm.LifecycleSettings; +import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy; +import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicyItem; +import org.elasticsearch.xpack.core.slm.SnapshotRetentionConfiguration; +import org.elasticsearch.xpack.core.slm.action.ExecuteSnapshotLifecycleAction; +import org.elasticsearch.xpack.core.slm.action.GetSnapshotLifecycleAction; +import org.elasticsearch.xpack.core.slm.action.PutSnapshotLifecycleAction; +import org.elasticsearch.xpack.ilm.IndexLifecycle; +import org.junit.After; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + +/** + * Tests for Snapshot Lifecycle Management that require a slow or blocked snapshot repo (using {@link MockRepository} + */ +public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase { + + @After + public void resetSLMSettings() { + // unset retention settings + client().admin().cluster().prepareUpdateSettings() + .setTransientSettings(Settings.builder() + .put(LifecycleSettings.SLM_RETENTION_SCHEDULE, (String) null) + .build()) + .get(); + } + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(MockRepository.Plugin.class, LocalStateCompositeXPackPlugin.class, IndexLifecycle.class); + } + + public void testSnapshotInProgress() throws Exception { + final String indexName = "test"; + final String policyName = "test-policy"; + final String repoId = "my-repo"; + int docCount = 20; + for (int i = 0; i < docCount; i++) { + index(indexName, "_doc", i + "", Collections.singletonMap("foo", "bar")); + } + + // Create a snapshot repo + initializeRepo(repoId); + + logger.info("--> creating policy {}", policyName); + createSnapshotPolicy(policyName, "snap", "1 2 3 4 5 ?", repoId, indexName, true); + + logger.info("--> blocking master from completing snapshot"); + blockMasterFromFinalizingSnapshotOnIndexFile(repoId); + + logger.info("--> executing snapshot lifecycle"); + final String snapshotName = executePolicy(policyName); + + // Check that the executed snapshot shows up in the SLM output + assertBusy(() -> { + GetSnapshotLifecycleAction.Response getResp = + client().execute(GetSnapshotLifecycleAction.INSTANCE, new GetSnapshotLifecycleAction.Request(policyName)).get(); + logger.info("--> checking for in progress snapshot..."); + + assertThat(getResp.getPolicies().size(), greaterThan(0)); + SnapshotLifecyclePolicyItem item = getResp.getPolicies().get(0); + assertNotNull(item.getSnapshotInProgress()); + SnapshotLifecyclePolicyItem.SnapshotInProgress inProgress = item.getSnapshotInProgress(); + assertThat(inProgress.getSnapshotId().getName(), equalTo(snapshotName)); + assertThat(inProgress.getStartTime(), greaterThan(0L)); + assertThat(inProgress.getState(), anyOf(equalTo(SnapshotsInProgress.State.INIT), equalTo(SnapshotsInProgress.State.STARTED))); + assertNull(inProgress.getFailure()); + }); + + logger.info("--> unblocking snapshots"); + unblockRepo(repoId); + + // Cancel/delete the snapshot + try { + client().admin().cluster().prepareDeleteSnapshot(repoId, snapshotName).get(); + } catch (SnapshotMissingException e) { + // ignore + } + } + + public void testRetentionWhileSnapshotInProgress() throws Exception { + final String indexName = "test"; + final String policyId = "slm-policy"; + final String repoId = "slm-repo"; + int docCount = 20; + for (int i = 0; i < docCount; i++) { + index(indexName, "_doc", i + "", Collections.singletonMap("foo", "bar")); + } + + initializeRepo(repoId); + + logger.info("--> creating policy {}", policyId); + createSnapshotPolicy(policyId, "snap", "1 2 3 4 5 ?", repoId, indexName, true, + new SnapshotRetentionConfiguration(TimeValue.timeValueSeconds(0), null, null)); + + // Create a snapshot and wait for it to be complete (need something that can be deleted) + final String completedSnapshotName = executePolicy(policyId); + logger.info("--> kicked off snapshot {}", completedSnapshotName); + assertBusy(() -> { + try { + SnapshotsStatusResponse s = + client().admin().cluster().prepareSnapshotStatus(repoId).setSnapshots(completedSnapshotName).get(); + assertThat("expected a snapshot but none were returned", s.getSnapshots().size(), equalTo(1)); + SnapshotStatus status = s.getSnapshots().get(0); + logger.info("--> waiting for snapshot {} to be completed, got: {}", completedSnapshotName, status.getState()); + assertThat(status.getState(), equalTo(SnapshotsInProgress.State.SUCCESS)); + } catch (SnapshotMissingException e) { + logger.error("expected a snapshot but it was missing", e); + fail("expected a snapshot with name " + completedSnapshotName + " but it does not exist"); + } + }); + + // Take another snapshot, but before doing that, block it from completing + logger.info("--> blocking nodes from completing snapshot"); + blockAllDataNodes(repoId); + final String secondSnapName = executePolicy(policyId); + + // Check that the executed snapshot shows up in the SLM output as in_progress + assertBusy(() -> { + GetSnapshotLifecycleAction.Response getResp = + client().execute(GetSnapshotLifecycleAction.INSTANCE, new GetSnapshotLifecycleAction.Request(policyId)).get(); + logger.info("--> checking for in progress snapshot..."); + + assertThat(getResp.getPolicies().size(), greaterThan(0)); + SnapshotLifecyclePolicyItem item = getResp.getPolicies().get(0); + assertNotNull(item.getSnapshotInProgress()); + SnapshotLifecyclePolicyItem.SnapshotInProgress inProgress = item.getSnapshotInProgress(); + assertThat(inProgress.getSnapshotId().getName(), equalTo(secondSnapName)); + assertThat(inProgress.getStartTime(), greaterThan(0L)); + assertThat(inProgress.getState(), anyOf(equalTo(SnapshotsInProgress.State.INIT), equalTo(SnapshotsInProgress.State.STARTED))); + assertNull(inProgress.getFailure()); + }); + + // Run retention every second + client().admin().cluster().prepareUpdateSettings() + .setTransientSettings(Settings.builder() + .put(LifecycleSettings.SLM_RETENTION_SCHEDULE, "*/1 * * * * ?") + .build()) + .get(); + // Guarantee that retention gets a chance to run before unblocking, I know sleeps are not + // ideal, but we don't currently have a way to force retention to run, so waiting at least + // a second is the best we can do for now. + Thread.sleep(1500); + + logger.info("--> unblocking snapshots"); + unblockRepo(repoId); + unblockAllDataNodes(repoId); + + // Check that the snapshot created by the policy has been removed by retention + assertBusy(() -> { + // Trigger a cluster state update so that it re-checks for a snapshot in progress + client().admin().cluster().prepareReroute().get(); + logger.info("--> waiting for snapshot to be deleted"); + try { + SnapshotsStatusResponse s = + client().admin().cluster().prepareSnapshotStatus(repoId).setSnapshots(completedSnapshotName).get(); + assertNull("expected no snapshot but one was returned", s.getSnapshots().get(0)); + } catch (SnapshotMissingException e) { + // Great, we wanted it to be deleted! + } + }); + + // Cancel/delete the snapshot + try { + client().admin().cluster().prepareDeleteSnapshot(repoId, secondSnapName).get(); + } catch (SnapshotMissingException e) { + // ignore + } + } + + private void initializeRepo(String repoName) { + client().admin().cluster().preparePutRepository(repoName) + .setType("mock") + .setSettings(Settings.builder() + .put("compress", randomBoolean()) + .put("location", randomAlphaOfLength(6)) + .build()) + .get(); + } + + private void createSnapshotPolicy(String policyName, String snapshotNamePattern, String schedule, String repoId, + String indexPattern, boolean ignoreUnavailable) { + createSnapshotPolicy(policyName, snapshotNamePattern, schedule, repoId, indexPattern, + ignoreUnavailable, SnapshotRetentionConfiguration.EMPTY); + } + + private void createSnapshotPolicy(String policyName, String snapshotNamePattern, String schedule, String repoId, + String indexPattern, boolean ignoreUnavailable, + SnapshotRetentionConfiguration retention) { + Map snapConfig = new HashMap<>(); + snapConfig.put("indices", Collections.singletonList(indexPattern)); + snapConfig.put("ignore_unavailable", ignoreUnavailable); + if (randomBoolean()) { + Map metadata = new HashMap<>(); + int fieldCount = randomIntBetween(2,5); + for (int i = 0; i < fieldCount; i++) { + metadata.put(randomValueOtherThanMany(key -> "policy".equals(key) || metadata.containsKey(key), + () -> randomAlphaOfLength(5)), randomAlphaOfLength(4)); + } + } + SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy(policyName, snapshotNamePattern, schedule, + repoId, snapConfig, retention); + + PutSnapshotLifecycleAction.Request putLifecycle = new PutSnapshotLifecycleAction.Request(policyName, policy); + try { + client().execute(PutSnapshotLifecycleAction.INSTANCE, putLifecycle).get(); + } catch (Exception e) { + logger.error("failed to create slm policy", e); + fail("failed to create policy " + policy + " got: " + e); + } + } + + /** + * Execute the given policy and return the generated snapshot name + */ + private String executePolicy(String policyId) { + ExecuteSnapshotLifecycleAction.Request executeReq = new ExecuteSnapshotLifecycleAction.Request(policyId); + ExecuteSnapshotLifecycleAction.Response resp = null; + try { + resp = client().execute(ExecuteSnapshotLifecycleAction.INSTANCE, executeReq).get(); + return resp.getSnapshotName(); + } catch (Exception e) { + logger.error("failed to execute policy", e); + fail("failed to execute policy " + policyId + " got: " + e); + return "bad"; + } + } + + public static String blockMasterFromFinalizingSnapshotOnIndexFile(final String repositoryName) { + final String masterName = internalCluster().getMasterName(); + ((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterName) + .repository(repositoryName)).setBlockOnWriteIndexFile(true); + return masterName; + } + + public static String unblockRepo(final String repositoryName) { + final String masterName = internalCluster().getMasterName(); + ((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterName) + .repository(repositoryName)).unblock(); + return masterName; + } + + public static void blockAllDataNodes(String repository) { + for(RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { + ((MockRepository)repositoriesService.repository(repository)).blockOnDataFiles(true); + } + } + + public static void unblockAllDataNodes(String repository) { + for(RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { + ((MockRepository)repositoriesService.repository(repository)).unblock(); + } + } +}