Skip to content

Commit 46582eb

Browse files
committed
[ML] Don't treat stale FAILED jobs as OPENING in job allocation (#31800)
Job persistent tasks with stale allocation IDs used to always be considered as OPENING jobs in the ML job node allocation decision. However, FAILED jobs are not relocated to other nodes, which leads to them blocking up the nodes they failed on after node restarts. FAILED jobs should not restrict how many other jobs can open on a node, regardless of whether they are stale or not. Closes #31794
1 parent ea1cb38 commit 46582eb

File tree

2 files changed

+66
-6
lines changed

2 files changed

+66
-6
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

+16-5
Original file line numberDiff line numberDiff line change
@@ -194,16 +194,27 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j
194194
for (PersistentTasksCustomMetaData.PersistentTask<?> assignedTask : assignedTasks) {
195195
JobTaskStatus jobTaskState = (JobTaskStatus) assignedTask.getStatus();
196196
JobState jobState;
197-
if (jobTaskState == null || // executor node didn't have the chance to set job status to OPENING
198-
// previous executor node failed and current executor node didn't have the chance to set job status to OPENING
199-
jobTaskState.isStatusStale(assignedTask)) {
197+
if (jobTaskState == null) {
198+
// executor node didn't have the chance to set job status to OPENING
200199
++numberOfAllocatingJobs;
201200
jobState = JobState.OPENING;
202201
} else {
203202
jobState = jobTaskState.getState();
203+
if (jobTaskState.isStatusStale(assignedTask)) {
204+
if (jobState == JobState.CLOSING) {
205+
// previous executor node failed while the job was closing - it won't
206+
// be reopened, so consider it CLOSED for resource usage purposes
207+
jobState = JobState.CLOSED;
208+
} else if (jobState != JobState.FAILED) {
209+
// previous executor node failed and current executor node didn't
210+
// have the chance to set job status to OPENING
211+
++numberOfAllocatingJobs;
212+
jobState = JobState.OPENING;
213+
}
214+
}
204215
}
205-
// Don't count FAILED jobs, as they don't consume native memory
206-
if (jobState != JobState.FAILED) {
216+
// Don't count CLOSED or FAILED jobs, as they don't consume native memory
217+
if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) {
207218
++numberOfAssignedJobs;
208219
String assignedJobId = ((OpenJobAction.JobParams) assignedTask.getParams()).getJobId();
209220
Job assignedJob = mlMetadata.getJobs().get(assignedJobId);

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java

+50-1
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() {
278278
nodeAttr, Collections.emptySet(), Version.CURRENT))
279279
.build();
280280

281-
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
281+
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
282282
addJobTask("job_id1", "_node_id1", null, tasksBuilder);
283283
addJobTask("job_id2", "_node_id1", null, tasksBuilder);
284284
addJobTask("job_id3", "_node_id2", null, tasksBuilder);
@@ -333,6 +333,55 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() {
333333
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
334334
}
335335

336+
public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob() {
337+
Map<String, String> nodeAttr = new HashMap<>();
338+
nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
339+
DiscoveryNodes nodes = DiscoveryNodes.builder()
340+
.add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
341+
nodeAttr, Collections.emptySet(), Version.CURRENT))
342+
.add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301),
343+
nodeAttr, Collections.emptySet(), Version.CURRENT))
344+
.add(new DiscoveryNode("_node_name3", "_node_id3", new TransportAddress(InetAddress.getLoopbackAddress(), 9302),
345+
nodeAttr, Collections.emptySet(), Version.CURRENT))
346+
.build();
347+
348+
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
349+
addJobTask("job_id1", "_node_id1", JobState.fromString("failed"), tasksBuilder);
350+
// This will make the allocation stale for job_id1
351+
tasksBuilder.reassignTask(MlMetadata.jobTaskId("job_id1"), new Assignment("_node_id1", "test assignment"));
352+
addJobTask("job_id2", "_node_id1", null, tasksBuilder);
353+
addJobTask("job_id3", "_node_id2", null, tasksBuilder);
354+
addJobTask("job_id4", "_node_id2", null, tasksBuilder);
355+
addJobTask("job_id5", "_node_id3", null, tasksBuilder);
356+
addJobTask("job_id6", "_node_id3", null, tasksBuilder);
357+
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
358+
359+
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
360+
csBuilder.nodes(nodes);
361+
MetaData.Builder metaData = MetaData.builder();
362+
RoutingTable.Builder routingTable = RoutingTable.builder();
363+
addJobAndIndices(metaData, routingTable, "job_id1", "job_id2", "job_id3", "job_id4", "job_id5", "job_id6", "job_id7", "job_id8");
364+
csBuilder.routingTable(routingTable.build());
365+
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
366+
csBuilder.metaData(metaData);
367+
368+
ClusterState cs = csBuilder.build();
369+
// Allocation won't be possible if the stale failed job is treated as opening
370+
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, 10, 30, logger);
371+
assertEquals("_node_id1", result.getExecutorNode());
372+
373+
tasksBuilder = PersistentTasksCustomMetaData.builder(tasks);
374+
addJobTask("job_id7", "_node_id1", null, tasksBuilder);
375+
tasks = tasksBuilder.build();
376+
377+
csBuilder = ClusterState.builder(cs);
378+
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
379+
cs = csBuilder.build();
380+
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", cs, 2, 10, 30, logger);
381+
assertNull("no node selected, because OPENING state", result.getExecutorNode());
382+
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
383+
}
384+
336385
public void testSelectLeastLoadedMlNode_noCompatibleJobTypeNodes() {
337386
Map<String, String> nodeAttr = new HashMap<>();
338387
nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");

0 commit comments

Comments
 (0)