From cfa62246f4f1f98badfe2532fd9a20286d1be09d Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 29 Oct 2019 15:13:55 -0600 Subject: [PATCH] Don't schedule SLM jobs when services have been stopped This adds a guard for the SLM lifecycle and retention service that prevents new jobs from being scheduled once the service has been stopped. Previous if the node were shut down the service would be stopped, but a cluster state or local master election would cause a job to attempt to be scheduled. This could lead to an uncaught `RejectedExecutionException`. Resolves #47749 --- .../xpack/slm/SnapshotLifecycleService.java | 10 +++++++++- .../xpack/slm/SnapshotRetentionService.java | 8 ++++++-- .../xpack/slm/SnapshotLifecycleServiceTests.java | 7 +++++++ .../xpack/slm/SnapshotRetentionServiceTests.java | 6 ++++++ 4 files changed, 28 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleService.java index c0f8e65158771..4529621dfcc8e 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleService.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -48,6 +49,7 @@ public class SnapshotLifecycleService implements LocalNodeMasterListener, Closea private final ClusterService clusterService; private final SnapshotLifecycleTask snapshotTask; private final Map scheduledTasks = ConcurrentCollections.newConcurrentMap(); + private final AtomicBoolean running = new AtomicBoolean(true); private volatile boolean isMaster = false; public SnapshotLifecycleService(Settings settings, @@ -160,6 +162,10 @@ public void cleanupDeletedPolicies(final ClusterState state) { * the same version of a policy has already been scheduled it does not overwrite the job. */ public void maybeScheduleSnapshot(final SnapshotLifecyclePolicyMetadata snapshotLifecyclePolicy) { + if (this.running.get() == false) { + return; + } + final String jobId = getJobId(snapshotLifecyclePolicy); final Pattern existingJobPattern = Pattern.compile(snapshotLifecyclePolicy.getPolicy().getId() + JOB_PATTERN_SUFFIX); @@ -237,6 +243,8 @@ public String executorName() { @Override public void close() { - this.scheduler.stop(); + if (this.running.compareAndSet(true, false)) { + this.scheduler.stop(); + } } } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionService.java index 235df846b4897..0eefbbf92efbc 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionService.java @@ -20,6 +20,7 @@ import java.io.Closeable; import java.time.Clock; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; /** @@ -38,6 +39,7 @@ public class SnapshotRetentionService implements LocalNodeMasterListener, Closea private final SchedulerEngine scheduler; private final SnapshotRetentionTask retentionTask; private final Clock clock; + private final AtomicBoolean running = new AtomicBoolean(true); private volatile String slmRetentionSchedule; private volatile boolean isMaster = false; @@ -81,7 +83,7 @@ public void offMaster() { private void rescheduleRetentionJob() { final String schedule = this.slmRetentionSchedule; - if (this.isMaster && Strings.hasText(schedule)) { + if (this.running.get() && this.isMaster && Strings.hasText(schedule)) { final SchedulerEngine.Job retentionJob = new SchedulerEngine.Job(SLM_RETENTION_JOB_ID, new CronSchedule(schedule)); logger.debug("scheduling SLM retention job for [{}]", schedule); @@ -113,6 +115,8 @@ public String executorName() { @Override public void close() { - this.scheduler.stop(); + if (this.running.compareAndSet(true, false)) { + this.scheduler.stop(); + } } } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java index 2a8868c480c14..8336e10482136 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java @@ -133,6 +133,13 @@ public void testNothingScheduledWhenNotRunning() { // Since the service is stopped, jobs should have been cancelled assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet())); + // No jobs should be scheduled when service is closed + state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats())); + sls.close(); + sls.onMaster(); + sls.clusterChanged(new ClusterChangedEvent("1", state, emptyState)); + assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet())); + threadPool.shutdownNow(); } } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java index 15ed96665f544..28cf4f2f71e83 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java @@ -65,6 +65,12 @@ public void testJobsAreScheduled() { service.setUpdateSchedule(""); assertThat(service.getScheduler().jobCount(), equalTo(0)); + + // Service should not scheduled any jobs once closed + service.close(); + service.onMaster(); + assertThat(service.getScheduler().jobCount(), equalTo(0)); + threadPool.shutdownNow(); } }