Skip to content

Commit 52f37d0

Browse files
committed
[ML] Stop datafeeds running when their jobs are stale
We already had logic to stop datafeeds running against jobs that were OPENING, but a job that relocates from one node to another while OPENED stays OPENED, and this could cause the datafeed to fail when it sent data to the OPENED job on its new node before it had a corresponding autodetect process. This change extends the check to stop datafeeds running when their job is OPENING _or_ stale (i.e. has not had its status reset since relocating to a different node). Relates elastic#36810
1 parent 054c3bb commit 52f37d0

File tree

6 files changed

+103
-33
lines changed

6 files changed

+103
-33
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ public static PersistentTasksCustomMetaData.PersistentTask<?> getDatafeedTask(St
5555
return tasks == null ? null : tasks.getTask(datafeedTaskId(datafeedId));
5656
}
5757

58+
/**
59+
* Note that the return value of this method does NOT take node relocations into account.
60+
* Use {@link #getJobStateModifiedForReassignments} to return a value
61+
*/
5862
public static JobState getJobState(String jobId, @Nullable PersistentTasksCustomMetaData tasks) {
5963
PersistentTasksCustomMetaData.PersistentTask<?> task = getJobTask(jobId, tasks);
6064
if (task != null) {
@@ -68,6 +72,36 @@ public static JobState getJobState(String jobId, @Nullable PersistentTasksCustom
6872
return JobState.CLOSED;
6973
}
7074

75+
public static JobState getJobStateModifiedForReassignments(String jobId, @Nullable PersistentTasksCustomMetaData tasks) {
76+
return getJobStateModifiedForReassignments(getJobTask(jobId, tasks));
77+
}
78+
79+
public static JobState getJobStateModifiedForReassignments(@Nullable PersistentTasksCustomMetaData.PersistentTask<?> task) {
80+
if (task == null) {
81+
// If we haven't opened a job than there will be no persistent task, which is the same as if the job was closed
82+
return JobState.CLOSED;
83+
}
84+
JobTaskState jobTaskState = (JobTaskState) task.getState();
85+
if (jobTaskState == null) {
86+
return JobState.OPENING;
87+
}
88+
JobState jobState = jobTaskState.getState();
89+
if (jobTaskState.isStatusStale(task)) {
90+
// the job is re-locating
91+
if (jobState == JobState.CLOSING) {
92+
// previous executor node failed while the job was closing - it won't
93+
// be reopened on another node, so consider it CLOSED for most purposes
94+
return JobState.CLOSED;
95+
}
96+
if (jobState != JobState.FAILED) {
97+
// previous executor node failed and current executor node didn't
98+
// have the chance to set job status to OPENING
99+
return JobState.OPENING;
100+
}
101+
}
102+
return jobState;
103+
}
104+
71105
public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksCustomMetaData tasks) {
72106
PersistentTasksCustomMetaData.PersistentTask<?> task = getDatafeedTask(datafeedId, tasks);
73107
if (task != null && task.getState() != null) {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskState.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,17 @@ public JobState getState() {
6767
return state;
6868
}
6969

70+
/**
71+
* The job state stores the allocation ID at the time it was last set.
72+
* This method compares the allocation ID in the state with the allocation
73+
* ID in the task. If the two are different then the task has been relocated
74+
* to a different node after the last time the state was set. This in turn
75+
* means that the state is not necessarily correct. For example, a job that
76+
* has a state of OPENED but is stale must be considered to be OPENING, because
77+
* it won't yet have a corresponding autodetect process.
78+
* @param task The job task to check.
79+
* @return Has the task been relocated to another node and not had its status set since then?
80+
*/
7081
public boolean isStatusStale(PersistentTask<?> task) {
7182
return allocationId != task.getAllocationId();
7283
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -225,31 +225,13 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j
225225
Collection<PersistentTasksCustomMetaData.PersistentTask<?>> assignedTasks = persistentTasks.findTasks(
226226
MlTasks.JOB_TASK_NAME, task -> node.getId().equals(task.getExecutorNode()));
227227
for (PersistentTasksCustomMetaData.PersistentTask<?> assignedTask : assignedTasks) {
228-
JobTaskState jobTaskState = (JobTaskState) assignedTask.getState();
229-
JobState jobState;
230-
if (jobTaskState == null) {
231-
// executor node didn't have the chance to set job status to OPENING
232-
++numberOfAllocatingJobs;
233-
jobState = JobState.OPENING;
234-
} else {
235-
jobState = jobTaskState.getState();
236-
if (jobTaskState.isStatusStale(assignedTask)) {
237-
// the job is re-locating
238-
if (jobState == JobState.CLOSING) {
239-
// previous executor node failed while the job was closing - it won't
240-
// be reopened, so consider it CLOSED for resource usage purposes
241-
jobState = JobState.CLOSED;
242-
} else if (jobState != JobState.FAILED) {
243-
// previous executor node failed and current executor node didn't
244-
// have the chance to set job status to OPENING
245-
++numberOfAllocatingJobs;
246-
jobState = JobState.OPENING;
247-
}
248-
}
249-
}
228+
JobState jobState = MlTasks.getJobStateModifiedForReassignments(assignedTask);
250229
if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) {
251230
// Don't count CLOSED or FAILED jobs, as they don't consume native memory
252231
++numberOfAssignedJobs;
232+
if (jobState == JobState.OPENING) {
233+
++numberOfAllocatingJobs;
234+
}
253235
OpenJobAction.JobParams params = (OpenJobAction.JobParams) assignedTask.getParams();
254236
Long jobMemoryRequirement = memoryTracker.getJobMemoryRequirement(params.getJobId());
255237
if (jobMemoryRequirement == null) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ public void onFailure(Exception e) {
161161
protected void doRun() {
162162
Long next = null;
163163
try {
164-
next = holder.executeLoopBack(startTime, endTime);
164+
next = holder.executeLookBack(startTime, endTime);
165165
} catch (DatafeedJob.ExtractionProblemException e) {
166166
if (endTime == null) {
167167
next = e.nextDelayInMsSinceEpoch;
@@ -253,7 +253,7 @@ private String getJobId(TransportStartDatafeedAction.DatafeedTask task) {
253253
}
254254

255255
private JobState getJobState(PersistentTasksCustomMetaData tasks, TransportStartDatafeedAction.DatafeedTask datafeedTask) {
256-
return MlTasks.getJobState(getJobId(datafeedTask), tasks);
256+
return MlTasks.getJobStateModifiedForReassignments(getJobId(datafeedTask), tasks);
257257
}
258258

259259
private TimeValue computeNextDelay(long next) {
@@ -272,7 +272,7 @@ public class Holder {
272272
private final TransportStartDatafeedAction.DatafeedTask task;
273273
private final long allocationId;
274274
private final String datafeedId;
275-
// To ensure that we wait until loopback / realtime search has completed before we stop the datafeed
275+
// To ensure that we wait until lookback / realtime search has completed before we stop the datafeed
276276
private final ReentrantLock datafeedJobLock = new ReentrantLock(true);
277277
private final DatafeedJob datafeedJob;
278278
private final boolean autoCloseJob;
@@ -352,7 +352,7 @@ public void setRelocating() {
352352
isRelocating = true;
353353
}
354354

355-
private Long executeLoopBack(long startTime, Long endTime) throws Exception {
355+
private Long executeLookBack(long startTime, Long endTime) throws Exception {
356356
datafeedJobLock.lock();
357357
try {
358358
if (isRunning() && !isIsolated()) {

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -575,10 +575,16 @@ public void testJobTaskMatcherMatch() {
575575
}
576576

577577
public static void addJobTask(String jobId, String nodeId, JobState jobState, PersistentTasksCustomMetaData.Builder builder) {
578+
addJobTask(jobId, nodeId, jobState, builder, false);
579+
}
580+
581+
public static void addJobTask(String jobId, String nodeId, JobState jobState, PersistentTasksCustomMetaData.Builder builder,
582+
boolean isStale) {
578583
builder.addTask(MlTasks.jobTaskId(jobId), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams(jobId),
579-
new Assignment(nodeId, "test assignment"));
584+
new Assignment(nodeId, "test assignment"));
580585
if (jobState != null) {
581-
builder.updateTaskState(MlTasks.jobTaskId(jobId), new JobTaskState(jobState, builder.getLastAllocationId()));
586+
builder.updateTaskState(MlTasks.jobTaskId(jobId),
587+
new JobTaskState(jobState, builder.getLastAllocationId() - (isStale ? 1 : 0)));
582588
}
583589
}
584590

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ public void testRealTime_GivenNonStoppingAnalysisProblem() throws Exception {
222222
assertThat(datafeedManager.isRunning(task.getAllocationId()), is(true));
223223
}
224224

225-
public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws Exception {
225+
public void testStart_GivenNewlyCreatedJobLookBackAndRealtime() throws Exception {
226226
when(datafeedJob.runLookBack(anyLong(), anyLong())).thenReturn(1L);
227227
when(datafeedJob.runRealtime()).thenReturn(1L);
228228

@@ -282,8 +282,45 @@ public void testDatafeedTaskWaitsUntilJobIsOpened() {
282282
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
283283
}
284284

285+
public void testDatafeedTaskWaitsUntilJobIsNotStale() {
286+
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
287+
addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder, true);
288+
ClusterState.Builder cs = ClusterState.builder(clusterService.state())
289+
.metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
290+
when(clusterService.state()).thenReturn(cs.build());
291+
292+
Consumer<Exception> handler = mockConsumer();
293+
DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L);
294+
datafeedManager.run(task, handler);
295+
296+
// Verify datafeed has not started running yet as job is stale (i.e. even though opened it is part way through relocating)
297+
verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
298+
299+
tasksBuilder = PersistentTasksCustomMetaData.builder();
300+
addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder, true);
301+
addJobTask("another_job", "node_id", JobState.OPENED, tasksBuilder);
302+
ClusterState.Builder anotherJobCs = ClusterState.builder(clusterService.state())
303+
.metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
304+
305+
capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", anotherJobCs.build(), cs.build()));
306+
307+
// Still no run
308+
verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
309+
310+
tasksBuilder = PersistentTasksCustomMetaData.builder();
311+
addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder);
312+
ClusterState.Builder jobOpenedCs = ClusterState.builder(clusterService.state())
313+
.metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
314+
315+
capturedClusterStateListener.getValue().clusterChanged(
316+
new ClusterChangedEvent("_source", jobOpenedCs.build(), anotherJobCs.build()));
317+
318+
// Now it should run as the job state chanded to OPENED
319+
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
320+
}
321+
285322
public void testDatafeedTaskStopsBecauseJobFailedWhileOpening() {
286-
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
323+
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
287324
addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder);
288325
ClusterState.Builder cs = ClusterState.builder(clusterService.state())
289326
.metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
@@ -296,7 +333,7 @@ public void testDatafeedTaskStopsBecauseJobFailedWhileOpening() {
296333
// Verify datafeed has not started running yet as job is still opening
297334
verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
298335

299-
tasksBuilder = PersistentTasksCustomMetaData.builder();
336+
tasksBuilder = PersistentTasksCustomMetaData.builder();
300337
addJobTask("job_id", "node_id", JobState.FAILED, tasksBuilder);
301338
ClusterState.Builder updatedCs = ClusterState.builder(clusterService.state())
302339
.metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
@@ -309,7 +346,7 @@ public void testDatafeedTaskStopsBecauseJobFailedWhileOpening() {
309346
}
310347

311348
public void testDatafeedGetsStoppedWhileWaitingForJobToOpen() {
312-
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
349+
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
313350
addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder);
314351
ClusterState.Builder cs = ClusterState.builder(clusterService.state())
315352
.metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
@@ -326,7 +363,7 @@ public void testDatafeedGetsStoppedWhileWaitingForJobToOpen() {
326363
datafeedManager.stopDatafeed(task, "test", StopDatafeedAction.DEFAULT_TIMEOUT);
327364

328365
// Update job state to opened
329-
tasksBuilder = PersistentTasksCustomMetaData.builder();
366+
tasksBuilder = PersistentTasksCustomMetaData.builder();
330367
addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder);
331368
ClusterState.Builder updatedCs = ClusterState.builder(clusterService.state())
332369
.metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));

0 commit comments

Comments
 (0)