Skip to content

Commit 879dd5a

Browse files
authored
Ensure cancelled jobs do not continue to run (#63762)
This commit ensures that jobs within the SchedulerEngine do not continue to run after they are cancelled. There was no synchronization between the cancel method of an ActiveSchedule and the run method, so an actively running schedule would go ahead and reschedule itself even if the cancel method had been called. This commit adds synchronization between cancelling and the scheduling of the next run to ensure that the job is cancelled. In real life scenarios this could manifest as a job running multiple times for SLM. This could happen if a job had been triggered and was cancelled prior to completing its run such as if the node was no longer the master node or if SLM was stopping/stopped. Closes #63754
1 parent d126afb commit 879dd5a

File tree

2 files changed

+42
-4
lines changed

2 files changed

+42
-4
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,8 @@ class ActiveSchedule implements Runnable {
194194
private final Schedule schedule;
195195
private final long startTime;
196196

197-
private volatile ScheduledFuture<?> future;
198-
private volatile long scheduledTime;
197+
private ScheduledFuture<?> future;
198+
private long scheduledTime;
199199

200200
ActiveSchedule(String name, Schedule schedule, long startTime) {
201201
this.name = name;
@@ -228,7 +228,11 @@ private void scheduleNextRun(long currentTime) {
228228
if (scheduledTime != -1) {
229229
long delay = Math.max(0, scheduledTime - currentTime);
230230
try {
231-
future = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);
231+
synchronized (this) {
232+
if (future == null || future.isCancelled() == false) {
233+
future = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);
234+
}
235+
}
232236
} catch (RejectedExecutionException e) {
233237
// ignoring rejections if the scheduler has been shut down already
234238
if (scheduler.isShutdown() == false) {
@@ -238,7 +242,7 @@ private void scheduleNextRun(long currentTime) {
238242
}
239243
}
240244

241-
public void cancel() {
245+
public synchronized void cancel() {
242246
FutureUtils.cancel(future);
243247
}
244248
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngineTests.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,23 @@
1111
import org.elasticsearch.common.collect.Tuple;
1212
import org.elasticsearch.common.settings.Settings;
1313
import org.elasticsearch.test.ESTestCase;
14+
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Job;
1415
import org.mockito.ArgumentCaptor;
1516

1617
import java.time.Clock;
1718
import java.util.ArrayList;
1819
import java.util.Collections;
1920
import java.util.List;
2021
import java.util.concurrent.CountDownLatch;
22+
import java.util.concurrent.TimeUnit;
2123
import java.util.concurrent.atomic.AtomicBoolean;
2224
import java.util.concurrent.atomic.AtomicInteger;
2325

2426
import static org.hamcrest.Matchers.any;
2527
import static org.hamcrest.Matchers.arrayWithSize;
2628
import static org.hamcrest.Matchers.equalTo;
2729
import static org.hamcrest.Matchers.instanceOf;
30+
import static org.hamcrest.Matchers.is;
2831
import static org.mockito.Matchers.argThat;
2932
import static org.mockito.Mockito.doAnswer;
3033
import static org.mockito.Mockito.mock;
@@ -150,6 +153,37 @@ public void testListenersThrowingExceptionsDoNotCauseNextScheduledTaskToBeSkippe
150153
}
151154
}
152155

156+
public void testCancellingDuringRunPreventsRescheduling() throws Exception {
157+
final CountDownLatch jobRunningLatch = new CountDownLatch(1);
158+
final CountDownLatch listenerLatch = new CountDownLatch(1);
159+
final AtomicInteger calledCount = new AtomicInteger(0);
160+
final SchedulerEngine engine = new SchedulerEngine(Settings.EMPTY, Clock.systemUTC());
161+
final String jobId = randomAlphaOfLength(4);
162+
try {
163+
engine.register(event -> {
164+
assertThat(event.getJobName(), is(jobId));
165+
calledCount.incrementAndGet();
166+
jobRunningLatch.countDown();
167+
try {
168+
listenerLatch.await();
169+
} catch (InterruptedException e) {
170+
Thread.currentThread().interrupt();
171+
}
172+
});
173+
engine.add(new Job(jobId, ((startTime, now) -> 0)));
174+
175+
jobRunningLatch.await();
176+
final int called = calledCount.get();
177+
assertEquals(1, called);
178+
engine.remove(jobId);
179+
listenerLatch.countDown();
180+
181+
assertBusy(() -> assertEquals(called, calledCount.get()), 5, TimeUnit.MILLISECONDS);
182+
} finally {
183+
engine.stop();
184+
}
185+
}
186+
153187
private void assertFailedListenerLogMessage(Logger mockLogger, int times) {
154188
final ArgumentCaptor<ParameterizedMessage> messageCaptor = ArgumentCaptor.forClass(ParameterizedMessage.class);
155189
final ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);

0 commit comments

Comments
 (0)