Skip to content

Commit d451bd1

Browse files
authored
Fix job scheduling for same scheduled time (#64501)
The SchedulerEngine used by SLM uses a custom runnable that will schedule itself for its next execution if there is one to run. For the majority of jobs, this scheduling could be many hours or days away. Due to the scheduling so far in advance, there is a chance that time drifts on the machine or even that time varies core to core so there is no guarantee that the job actually runs on or after the scheduled time. This can cause some jobs to reschedule themselves for the same scheduled time even if they ran only a millisecond prior to the scheduled time, which causes unexpected actions to be taken such as what appears as duplicated snapshots. This change resolves this by checking the triggered time against the scheduled time and using the appropriate value to ensure that we do not have unexpected job runs. Relates #63754
1 parent 281ae6c commit d451bd1

File tree

2 files changed

+54
-4
lines changed

2 files changed

+54
-4
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -188,14 +188,19 @@ protected void notifyListeners(final String name, final long triggeredTime, fina
188188
}
189189
}
190190

191+
// for testing
192+
ActiveSchedule getSchedule(String jobId) {
193+
return schedules.get(jobId);
194+
}
195+
191196
class ActiveSchedule implements Runnable {
192197

193198
private final String name;
194199
private final Schedule schedule;
195200
private final long startTime;
196201

197202
private ScheduledFuture<?> future;
198-
private long scheduledTime;
203+
private long scheduledTime = -1;
199204

200205
ActiveSchedule(String name, Schedule schedule, long startTime) {
201206
this.name = name;
@@ -223,10 +228,10 @@ public void run() {
223228
scheduleNextRun(triggeredTime);
224229
}
225230

226-
private void scheduleNextRun(long currentTime) {
227-
this.scheduledTime = schedule.nextScheduledTimeAfter(startTime, currentTime);
231+
private void scheduleNextRun(long triggeredTime) {
232+
this.scheduledTime = computeNextScheduledTime(triggeredTime);
228233
if (scheduledTime != -1) {
229-
long delay = Math.max(0, scheduledTime - currentTime);
234+
long delay = Math.max(0, scheduledTime - clock.millis());
230235
try {
231236
synchronized (this) {
232237
if (future == null || future.isCancelled() == false) {
@@ -242,6 +247,28 @@ private void scheduleNextRun(long currentTime) {
242247
}
243248
}
244249

250+
// for testing
251+
long getScheduledTime() {
252+
return scheduledTime;
253+
}
254+
255+
long computeNextScheduledTime(long triggeredTime) {
256+
// multiple time sources + multiple cpus + ntp + VMs means you can't trust time ever!
257+
// scheduling happens far enough in advance in most cases that time can drift and we
258+
// may execute at some point before the scheduled time. There can also be time differences
259+
// between the CPU cores and/or the clock used by the threadpool and that used by this class
260+
// for scheduling. Regardless, we shouldn't reschedule to execute again until after the
261+
// scheduled time.
262+
final long scheduleAfterTime;
263+
if (scheduledTime != -1 && triggeredTime < scheduledTime) {
264+
scheduleAfterTime = scheduledTime;
265+
} else {
266+
scheduleAfterTime = triggeredTime;
267+
}
268+
269+
return schedule.nextScheduledTimeAfter(startTime, scheduleAfterTime);
270+
}
271+
245272
public synchronized void cancel() {
246273
FutureUtils.cancel(future);
247274
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngineTests.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@
1111
import org.elasticsearch.common.collect.Tuple;
1212
import org.elasticsearch.common.settings.Settings;
1313
import org.elasticsearch.test.ESTestCase;
14+
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.ActiveSchedule;
1415
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Job;
1516
import org.mockito.ArgumentCaptor;
1617

1718
import java.time.Clock;
19+
import java.time.ZoneId;
1820
import java.util.ArrayList;
1921
import java.util.Collections;
2022
import java.util.List;
@@ -184,6 +186,27 @@ public void testCancellingDuringRunPreventsRescheduling() throws Exception {
184186
}
185187
}
186188

189+
public void testNextScheduledTimeAfterCurrentScheduledTime() throws Exception {
190+
final Clock clock = Clock.fixed(Clock.systemUTC().instant(), ZoneId.of("UTC"));
191+
final long oneHourMillis = TimeUnit.HOURS.toMillis(1L);
192+
final String jobId = randomAlphaOfLength(4);
193+
final SchedulerEngine engine = new SchedulerEngine(Settings.EMPTY, clock);
194+
try {
195+
engine.add(new Job(jobId, ((startTime, now) -> now + oneHourMillis)));
196+
197+
ActiveSchedule activeSchedule = engine.getSchedule(jobId);
198+
assertNotNull(activeSchedule);
199+
assertEquals(clock.millis() + oneHourMillis, activeSchedule.getScheduledTime());
200+
201+
assertEquals(clock.millis() + oneHourMillis + oneHourMillis,
202+
activeSchedule.computeNextScheduledTime(clock.millis() - randomIntBetween(1, 999)));
203+
assertEquals(clock.millis() + oneHourMillis + oneHourMillis,
204+
activeSchedule.computeNextScheduledTime(clock.millis() + TimeUnit.SECONDS.toMillis(10L)));
205+
} finally {
206+
engine.stop();
207+
}
208+
}
209+
187210
private void assertFailedListenerLogMessage(Logger mockLogger, int times) {
188211
final ArgumentCaptor<ParameterizedMessage> messageCaptor = ArgumentCaptor.forClass(ParameterizedMessage.class);
189212
final ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);

0 commit comments

Comments
 (0)