diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java index ae143add71ace..4748a6caec095 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java @@ -22,6 +22,7 @@ import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.node.NodeClient; @@ -121,10 +122,13 @@ protected void masterOperation(Task task, final SnapshotsStatusRequest request, new TransportNodesSnapshotsStatus.Request(nodesIds.toArray(new String[nodesIds.size()])) .snapshots(snapshots).timeout(request.masterNodeTimeout()); client.executeLocally(TransportNodesSnapshotsStatus.TYPE, nodesRequest, - ActionListener.map( - listener, nodeSnapshotStatuses -> - buildResponse(request, snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots())), - nodeSnapshotStatuses))); + ActionListener.wrap( + nodeSnapshotStatuses -> + threadPool.executor(ThreadPool.Names.GENERIC).execute( + ActionRunnable.wrap(listener, l -> l.onResponse( + buildResponse( + request, snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots())), + nodeSnapshotStatuses)))), listener::onFailure)); } else { // We don't have any in-progress shards, just return current stats listener.onResponse(buildResponse(request, currentSnapshots, null)); diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java index 37fc062c6f569..1f11a824a0f13 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java @@ -208,38 +208,33 @@ public void testPolicyManualExecution() throws Exception { assertThat(EntityUtils.toString(badResp.getResponse().getEntity()), containsString("no such snapshot lifecycle policy [" + policyName + "-bad]")); - Response goodResp = client().performRequest(new Request("PUT", "/_slm/policy/" + policyName + "/_execute")); + final String snapshotName = executePolicy(policyName); - try (XContentParser parser = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, EntityUtils.toByteArray(goodResp.getEntity()))) { - final String snapshotName = parser.mapStrings().get("snapshot_name"); - - // Check that the executed snapshot is created - assertBusy(() -> { - try { - Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName)); - Map snapshotResponseMap; - try (InputStream is = response.getEntity().getContent()) { - snapshotResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); - } - assertThat(snapshotResponseMap.size(), greaterThan(0)); - final Map metadata = extractMetadata(snapshotResponseMap, snapshotName); - assertNotNull(metadata); - assertThat(metadata.get("policy"), equalTo(policyName)); - assertHistoryIsPresent(policyName, true, repoId, CREATE_OPERATION); - } catch (ResponseException e) { - fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity())); + // Check that the executed snapshot is created + assertBusy(() -> { + try { + Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName)); + Map snapshotResponseMap; + try (InputStream is = response.getEntity().getContent()) { + snapshotResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); } + assertThat(snapshotResponseMap.size(), greaterThan(0)); + final Map metadata = extractMetadata(snapshotResponseMap, snapshotName); + assertNotNull(metadata); + assertThat(metadata.get("policy"), equalTo(policyName)); + assertHistoryIsPresent(policyName, true, repoId, CREATE_OPERATION); + } catch (ResponseException e) { + fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity())); + } - Map stats = getSLMStats(); - Map policyStats = (Map) stats.get(SnapshotLifecycleStats.POLICY_STATS.getPreferredName()); - Map policyIdStats = (Map) policyStats.get(policyName); - int snapsTaken = (int) policyIdStats.get(SnapshotLifecycleStats.SnapshotPolicyStats.SNAPSHOTS_TAKEN.getPreferredName()); - int totalTaken = (int) stats.get(SnapshotLifecycleStats.TOTAL_TAKEN.getPreferredName()); - assertThat(snapsTaken, equalTo(1)); - assertThat(totalTaken, equalTo(1)); - }); - } + Map stats = getSLMStats(); + Map policyStats = (Map) stats.get(SnapshotLifecycleStats.POLICY_STATS.getPreferredName()); + Map policyIdStats = (Map) policyStats.get(policyName); + int snapsTaken = (int) policyIdStats.get(SnapshotLifecycleStats.SnapshotPolicyStats.SNAPSHOTS_TAKEN.getPreferredName()); + int totalTaken = (int) stats.get(SnapshotLifecycleStats.TOTAL_TAKEN.getPreferredName()); + assertThat(snapsTaken, equalTo(1)); + assertThat(totalTaken, equalTo(1)); + }); } @SuppressWarnings("unchecked") @@ -261,31 +256,25 @@ public void testBasicTimeBasedRetenion() throws Exception { new SnapshotRetentionConfiguration(TimeValue.timeValueMillis(1), null, null)); // Manually create a snapshot - Response executeResp = client().performRequest(new Request("PUT", "/_slm/policy/" + policyName + "/_execute")); + final String snapshotName = executePolicy(policyName); - final String snapshotName; - try (XContentParser parser = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, EntityUtils.toByteArray(executeResp.getEntity()))) { - snapshotName = parser.mapStrings().get("snapshot_name"); - - // Check that the executed snapshot is created - assertBusy(() -> { - try { - Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName)); - Map snapshotResponseMap; - try (InputStream is = response.getEntity().getContent()) { - snapshotResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); - } - assertThat(snapshotResponseMap.size(), greaterThan(0)); - final Map metadata = extractMetadata(snapshotResponseMap, snapshotName); - assertNotNull(metadata); - assertThat(metadata.get("policy"), equalTo(policyName)); - assertHistoryIsPresent(policyName, true, repoId, CREATE_OPERATION); - } catch (ResponseException e) { - fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity())); + // Check that the executed snapshot is created + assertBusy(() -> { + try { + Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName)); + Map snapshotResponseMap; + try (InputStream is = response.getEntity().getContent()) { + snapshotResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); } - }); - } + assertThat(snapshotResponseMap.size(), greaterThan(0)); + final Map metadata = extractMetadata(snapshotResponseMap, snapshotName); + assertNotNull(metadata); + assertThat(metadata.get("policy"), equalTo(policyName)); + assertHistoryIsPresent(policyName, true, repoId, CREATE_OPERATION); + } catch (ResponseException e) { + fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity())); + } + }); // Run retention every second ClusterUpdateSettingsRequest req = new ClusterUpdateSettingsRequest(); @@ -391,6 +380,127 @@ public void testSnapshotInProgress() throws Exception { } } + @SuppressWarnings("unchecked") + public void testRetentionWhileSnapshotInProgress() throws Exception { + final String indexName = "test"; + final String slowPolicy = "slow"; + final String fastPolicy = "fast"; + final String slowRepo = "slow-repo"; + final String fastRepo = "fast-repo"; + int docCount = 20; + for (int i = 0; i < docCount; i++) { + index(client(), indexName, "" + i, "foo", "bar"); + } + + // Create snapshot repos, one fast and one slow + initializeRepo(slowRepo, "1b"); + initializeRepo(fastRepo, "10mb"); + + createSnapshotPolicy(slowPolicy, "snap", "1 2 3 4 5 ?", slowRepo, indexName, true, + new SnapshotRetentionConfiguration(TimeValue.timeValueSeconds(0), null, null)); + createSnapshotPolicy(fastPolicy, "snap", "1 2 3 4 5 ?", fastRepo, indexName, true, + new SnapshotRetentionConfiguration(TimeValue.timeValueSeconds(0), null, null)); + + // Create a snapshot and wait for it to be complete (need something that can be deleted) + final String completedSnapshotName = executePolicy(fastPolicy); + assertBusy(() -> { + try { + Response getResp = client().performRequest(new Request("GET", + "/_snapshot/" + fastRepo + "/" + completedSnapshotName + "/_status")); + try (InputStream content = getResp.getEntity().getContent()) { + Map snaps = XContentHelper.convertToMap(XContentType.JSON.xContent(), content, true); + logger.info("--> waiting for snapshot {} to be successful, got: {}", completedSnapshotName, snaps); + List> snaps2 = (List>) snaps.get("snapshots"); + assertThat(snaps2.get(0).get("state"), equalTo("SUCCESS")); + } + } catch (NullPointerException | ResponseException e) { + fail("unable to retrieve completed snapshot: " + e); + } + }, 60, TimeUnit.SECONDS); + + // Take another snapshot + final String slowSnapshotName = executePolicy(slowPolicy); + + // Check that the executed snapshot shows up in the SLM output as in_progress + assertBusy(() -> { + try { + Response response = client().performRequest(new Request("GET", "/_slm/policy" + (randomBoolean() ? "" : "?human"))); + Map policyResponseMap; + try (InputStream content = response.getEntity().getContent()) { + policyResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), content, true); + } + assertThat(policyResponseMap.size(), greaterThan(0)); + Optional> inProgress = Optional.ofNullable((Map) policyResponseMap.get(slowPolicy)) + .map(policy -> (Map) policy.get("in_progress")); + + if (inProgress.isPresent()) { + Map inProgressMap = inProgress.get(); + assertThat(inProgressMap.get("name"), equalTo(slowSnapshotName)); + assertNotNull(inProgressMap.get("uuid")); + assertThat(inProgressMap.get("state"), equalTo("STARTED")); + assertThat((long) inProgressMap.get("start_time_millis"), greaterThan(0L)); + assertNull(inProgressMap.get("failure")); + } else { + fail("expected in_progress to contain a running snapshot, but the response was " + policyResponseMap); + } + } catch (ResponseException e) { + fail("expected policy to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity())); + } + }); + + // Run retention every second + ClusterUpdateSettingsRequest req = new ClusterUpdateSettingsRequest(); + req.transientSettings(Settings.builder().put(LifecycleSettings.SLM_RETENTION_SCHEDULE, "*/1 * * * * ?")); + try (XContentBuilder builder = jsonBuilder()) { + req.toXContent(builder, ToXContent.EMPTY_PARAMS); + Request r = new Request("PUT", "/_cluster/settings"); + r.setJsonEntity(Strings.toString(builder)); + client().performRequest(r); + } + + // Cancel the snapshot since it is not going to complete quickly, do it in a thread because + // cancelling the snapshot can take a long time and we might as well check retention while + // its deleting + Thread t = new Thread(() -> { + try { + assertOK(client().performRequest(new Request("DELETE", "/_snapshot/" + slowRepo + "/" + slowSnapshotName))); + } catch (IOException e) { + fail("should not have thrown " + e); + } + }); + t.start(); + + // Check that the snapshot created by the policy has been removed by retention + assertBusy(() -> { + // We expect a failed response because the snapshot should not exist + try { + logger.info("--> checking to see if snapshot has been deleted..."); + Response response = client().performRequest(new Request("GET", "/_snapshot/" + slowRepo + "/" + completedSnapshotName)); + assertThat(EntityUtils.toString(response.getEntity()), containsString("snapshot_missing_exception")); + } catch (ResponseException e) { + assertThat(EntityUtils.toString(e.getResponse().getEntity()), containsString("snapshot_missing_exception")); + } + }, 60, TimeUnit.SECONDS); + + t.join(5000); + } + + /** + * Execute the given policy and return the generated snapshot name + */ + private String executePolicy(String policyId) { + try { + Response executeRepsonse = client().performRequest(new Request("PUT", "/_slm/policy/" + policyId + "/_execute")); + try (XContentParser parser = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, EntityUtils.toByteArray(executeRepsonse.getEntity()))) { + return parser.mapStrings().get("snapshot_name"); + } + } catch (Exception e) { + fail("failed to execute policy " + policyId + " - got: " + e); + throw new RuntimeException(e); + } + } + @SuppressWarnings("unchecked") private static Map extractMetadata(Map snapshotResponseMap, String snapshotPrefix) { List> snapResponse = ((List>) snapshotResponseMap.get("responses")).stream() diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java index 27881d3e695a3..b210c8f992964 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java @@ -161,7 +161,7 @@ public Collection createComponents(Client client, ClusterService cluster snapshotLifecycleService.set(new SnapshotLifecycleService(settings, () -> new SnapshotLifecycleTask(client, clusterService, snapshotHistoryStore.get()), clusterService, getClock())); snapshotRetentionService.set(new SnapshotRetentionService(settings, - () -> new SnapshotRetentionTask(client, clusterService, System::nanoTime, snapshotHistoryStore.get()), + () -> new SnapshotRetentionTask(client, clusterService, System::nanoTime, snapshotHistoryStore.get(), threadPool), clusterService, getClock())); return Arrays.asList(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get(), snapshotHistoryStore.get(), snapshotRetentionService.get()); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java index fa292e14fc8f8..10423207fc338 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; @@ -16,12 +17,15 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; +import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotState; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ilm.LifecycleSettings; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; @@ -63,14 +67,16 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener { private final Client client; private final ClusterService clusterService; private final LongSupplier nowNanoSupplier; + private final ThreadPool threadPool; private final SnapshotHistoryStore historyStore; public SnapshotRetentionTask(Client client, ClusterService clusterService, LongSupplier nowNanoSupplier, - SnapshotHistoryStore historyStore) { + SnapshotHistoryStore historyStore, ThreadPool threadPool) { this.client = new OriginSettingClient(client, ClientHelper.INDEX_LIFECYCLE_ORIGIN); this.clusterService = clusterService; this.nowNanoSupplier = nowNanoSupplier; this.historyStore = historyStore; + this.threadPool = threadPool; } @Override @@ -126,7 +132,7 @@ public void onResponse(Map> allSnapshots) { .collect(Collectors.toList()))); // Finally, delete the snapshots that need to be deleted - deleteSnapshots(snapshotsToBeDeleted, maxDeletionTime, slmStats); + maybeDeleteSnapshots(snapshotsToBeDeleted, maxDeletionTime, slmStats); updateStateWithStats(slmStats); } finally { @@ -246,15 +252,60 @@ static String getPolicyId(SnapshotInfo snapshotInfo) { " to have a policy in its metadata, but it did not")); } - void deleteSnapshots(Map> snapshotsToDelete, - TimeValue maximumTime, - SnapshotLifecycleStats slmStats) { + /** + * Maybe delete the given snapshots. If a snapshot is currently running according to the cluster + * state, this waits (using a {@link ClusterStateObserver} until a cluster state with no running + * snapshots before executing the blocking + * {@link #deleteSnapshots(Map, TimeValue, SnapshotLifecycleStats)} request. At most, we wait + * for the maximum allowed deletion time before timing out waiting for a state with no + * running snapshots. + * + * It's possible the task may still run into a SnapshotInProgressException, if a snapshot is + * started between the state retrieved here and the actual deletion. Since is is expected to be + * a rare case, no special handling is present. + */ + private void maybeDeleteSnapshots(Map> snapshotsToDelete, + TimeValue maximumTime, + SnapshotLifecycleStats slmStats) { int count = snapshotsToDelete.values().stream().mapToInt(List::size).sum(); if (count == 0) { logger.debug("no snapshots are eligible for deletion"); return; } + ClusterState state = clusterService.state(); + if (snapshotInProgress(state)) { + logger.debug("a snapshot is currently running, rescheduling SLM retention for after snapshot has completed"); + ClusterStateObserver observer = new ClusterStateObserver(clusterService, maximumTime, logger, threadPool.getThreadContext()); + CountDownLatch latch = new CountDownLatch(1); + observer.waitForNextChange( + new NoSnapshotRunningListener(observer, + newState -> threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(() -> { + try { + deleteSnapshots(snapshotsToDelete, maximumTime, slmStats); + } finally { + latch.countDown(); + } + }), + e -> { + latch.countDown(); + throw new ElasticsearchException(e); + })); + try { + latch.await(); + } catch (InterruptedException e) { + throw new ElasticsearchException(e); + } + } else { + deleteSnapshots(snapshotsToDelete, maximumTime, slmStats); + } + } + + void deleteSnapshots(Map> snapshotsToDelete, + TimeValue maximumTime, + SnapshotLifecycleStats slmStats) { + int count = snapshotsToDelete.values().stream().mapToInt(List::size).sum(); + logger.info("starting snapshot retention deletion for [{}] snapshots", count); long startTime = nowNanoSupplier.getAsLong(); final AtomicInteger deleted = new AtomicInteger(0); @@ -290,7 +341,7 @@ void deleteSnapshots(Map> snapshotsToDelete, // Check whether we have exceeded the maximum time allowed to spend deleting // snapshots, if we have, short-circuit the rest of the deletions TimeValue elapsedDeletionTime = TimeValue.timeValueNanos(nowNanoSupplier.getAsLong() - startTime); - logger.trace("elapsed time for deletion of [{}] snapshot: {}", info.snapshotId(), elapsedDeletionTime); + logger.debug("elapsed time for deletion of [{}] snapshot: {}", info.snapshotId(), elapsedDeletionTime); if (elapsedDeletionTime.compareTo(maximumTime) > 0) { logger.info("maximum snapshot retention deletion time reached, time spent: [{}]," + " maximum allowed time: [{}], deleted [{}] out of [{}] snapshots scheduled for deletion, failed to delete [{}]", @@ -354,4 +405,60 @@ public void onFailure(Exception e) { void updateStateWithStats(SnapshotLifecycleStats newStats) { clusterService.submitStateUpdateTask("update_slm_stats", new UpdateSnapshotLifecycleStatsTask(newStats)); } + + public static boolean snapshotInProgress(ClusterState state) { + SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE); + if (snapshotsInProgress == null || snapshotsInProgress.entries().isEmpty()) { + // No snapshots are running, new state is acceptable to proceed + return false; + } + + // There are snapshots + return true; + } + + /** + * A {@link ClusterStateObserver.Listener} that invokes the given function with the new state, + * once no snapshots are running. If a snapshot is still running it registers a new listener + * and tries again. Passes any exceptions to the original exception listener if they occur. + */ + class NoSnapshotRunningListener implements ClusterStateObserver.Listener { + + private final Consumer reRun; + private final Consumer exceptionConsumer; + private final ClusterStateObserver observer; + + NoSnapshotRunningListener(ClusterStateObserver observer, + Consumer reRun, + Consumer exceptionConsumer) { + this.observer = observer; + this.reRun = reRun; + this.exceptionConsumer = exceptionConsumer; + } + + @Override + public void onNewClusterState(ClusterState state) { + try { + if (snapshotInProgress(state)) { + observer.waitForNextChange(this); + } else { + logger.debug("retrying SLM snapshot retention deletion after snapshot has completed"); + reRun.accept(state); + } + } catch (Exception e) { + exceptionConsumer.accept(e); + } + } + + @Override + public void onClusterServiceClose() { + // This means the cluster is being shut down, so nothing to do here + } + + @Override + public void onTimeout(TimeValue timeout) { + exceptionConsumer.accept( + new IllegalStateException("slm retention snapshot deletion out while waiting for ongoing snapshots to complete")); + } + } } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java index ff42f48961f3b..8c96055302e5e 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java @@ -68,7 +68,7 @@ public void testJobsAreScheduled() { private static class FakeRetentionTask extends SnapshotRetentionTask { FakeRetentionTask() { - super(mock(Client.class), null, System::nanoTime, mock(SnapshotHistoryStore.class)); + super(mock(Client.class), null, System::nanoTime, mock(SnapshotHistoryStore.class), mock(ThreadPool.class)); } @Override diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java index 86e7a34214bbb..423882f31f112 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java @@ -180,6 +180,7 @@ private void retentionTaskTest(final boolean deletionSuccess) throws Exception { deletedSnapshotsInHistory.add(historyItem.getSnapshotName()); historyLatch.countDown(); }), + threadPool, () -> { List snaps = new ArrayList<>(2); snaps.add(eligibleSnapshot); @@ -273,6 +274,7 @@ private void timeBoundedDeletion(final boolean deletionSuccess) throws Exception deletedSnapshotsInHistory.add(historyItem.getSnapshotName()); historyLatch.countDown(); }), + threadPool, () -> { List snaps = Arrays.asList(snap1, snap2, snap3, snap4, snap5); logger.info("--> retrieving snapshots [{}]", snaps); @@ -342,10 +344,11 @@ private static class MockSnapshotRetentionTask extends SnapshotRetentionTask { MockSnapshotRetentionTask(Client client, ClusterService clusterService, SnapshotHistoryStore historyStore, + ThreadPool threadPool, Supplier>> snapshotRetriever, DeleteSnapshotMock deleteRunner, LongSupplier nanoSupplier) { - super(client, clusterService, nanoSupplier, historyStore); + super(client, clusterService, nanoSupplier, historyStore, threadPool); this.snapshotRetriever = snapshotRetriever; this.deleteRunner = deleteRunner; }