Skip to content

Commit af0db76

Browse files
committed
Don't schedule SLM jobs when services have been stopped (elastic#48658)
This adds a guard for the SLM lifecycle and retention service that prevents new jobs from being scheduled once the service has been stopped. Previous if the node were shut down the service would be stopped, but a cluster state or local master election would cause a job to attempt to be scheduled. This could lead to an uncaught `RejectedExecutionException`. Resolves elastic#47749
1 parent 4204b2c commit af0db76

File tree

4 files changed

+28
-3
lines changed

4 files changed

+28
-3
lines changed

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleService.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.Map;
3131
import java.util.Optional;
3232
import java.util.Set;
33+
import java.util.concurrent.atomic.AtomicBoolean;
3334
import java.util.function.Supplier;
3435
import java.util.regex.Pattern;
3536
import java.util.stream.Collectors;
@@ -48,6 +49,7 @@ public class SnapshotLifecycleService implements LocalNodeMasterListener, Closea
4849
private final ClusterService clusterService;
4950
private final SnapshotLifecycleTask snapshotTask;
5051
private final Map<String, SchedulerEngine.Job> scheduledTasks = ConcurrentCollections.newConcurrentMap();
52+
private final AtomicBoolean running = new AtomicBoolean(true);
5153
private volatile boolean isMaster = false;
5254

5355
public SnapshotLifecycleService(Settings settings,
@@ -160,6 +162,10 @@ public void cleanupDeletedPolicies(final ClusterState state) {
160162
* the same version of a policy has already been scheduled it does not overwrite the job.
161163
*/
162164
public void maybeScheduleSnapshot(final SnapshotLifecyclePolicyMetadata snapshotLifecyclePolicy) {
165+
if (this.running.get() == false) {
166+
return;
167+
}
168+
163169
final String jobId = getJobId(snapshotLifecyclePolicy);
164170
final Pattern existingJobPattern = Pattern.compile(snapshotLifecyclePolicy.getPolicy().getId() + JOB_PATTERN_SUFFIX);
165171

@@ -237,6 +243,8 @@ public String executorName() {
237243

238244
@Override
239245
public void close() {
240-
this.scheduler.stop();
246+
if (this.running.compareAndSet(true, false)) {
247+
this.scheduler.stop();
248+
}
241249
}
242250
}

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionService.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.io.Closeable;
2222
import java.time.Clock;
23+
import java.util.concurrent.atomic.AtomicBoolean;
2324
import java.util.function.Supplier;
2425

2526
/**
@@ -38,6 +39,7 @@ public class SnapshotRetentionService implements LocalNodeMasterListener, Closea
3839
private final SchedulerEngine scheduler;
3940
private final SnapshotRetentionTask retentionTask;
4041
private final Clock clock;
42+
private final AtomicBoolean running = new AtomicBoolean(true);
4143

4244
private volatile String slmRetentionSchedule;
4345
private volatile boolean isMaster = false;
@@ -81,7 +83,7 @@ public void offMaster() {
8183

8284
private void rescheduleRetentionJob() {
8385
final String schedule = this.slmRetentionSchedule;
84-
if (this.isMaster && Strings.hasText(schedule)) {
86+
if (this.running.get() && this.isMaster && Strings.hasText(schedule)) {
8587
final SchedulerEngine.Job retentionJob = new SchedulerEngine.Job(SLM_RETENTION_JOB_ID,
8688
new CronSchedule(schedule));
8789
logger.debug("scheduling SLM retention job for [{}]", schedule);
@@ -113,6 +115,8 @@ public String executorName() {
113115

114116
@Override
115117
public void close() {
116-
this.scheduler.stop();
118+
if (this.running.compareAndSet(true, false)) {
119+
this.scheduler.stop();
120+
}
117121
}
118122
}

x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java

+7
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,13 @@ public void testNothingScheduledWhenNotRunning() {
133133
// Since the service is stopped, jobs should have been cancelled
134134
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
135135

136+
// No jobs should be scheduled when service is closed
137+
state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats()));
138+
sls.close();
139+
sls.onMaster();
140+
sls.clusterChanged(new ClusterChangedEvent("1", state, emptyState));
141+
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
142+
136143
threadPool.shutdownNow();
137144
}
138145
}

x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java

+6
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,12 @@ public void testJobsAreScheduled() {
6666

6767
service.setUpdateSchedule("");
6868
assertThat(service.getScheduler().jobCount(), equalTo(0));
69+
70+
// Service should not scheduled any jobs once closed
71+
service.close();
72+
service.onMaster();
73+
assertThat(service.getScheduler().jobCount(), equalTo(0));
74+
6975
threadPool.shutdownNow();
7076
}
7177
}

0 commit comments

Comments
 (0)