Skip to content

Commit c1e1bce

Browse files
committed
[ML] Don't treat stale FAILED jobs as OPENING in job allocation (#31800)
Job persistent tasks with stale allocation IDs used to always be considered as OPENING jobs in the ML job node allocation decision. However, FAILED jobs are not relocated to other nodes, which leads to them blocking up the nodes they failed on after node restarts. FAILED jobs should not restrict how many other jobs can open on a node, regardless of whether they are stale or not. Closes #31794
1 parent 268c614 commit c1e1bce

File tree

2 files changed

+69
-10
lines changed

2 files changed

+69
-10
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -203,16 +203,27 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j
203203
for (PersistentTasksCustomMetaData.PersistentTask<?> assignedTask : assignedTasks) {
204204
JobTaskState jobTaskState = (JobTaskState) assignedTask.getState();
205205
JobState jobState;
206-
if (jobTaskState == null || // executor node didn't have the chance to set job status to OPENING
207-
// previous executor node failed and current executor node didn't have the chance to set job status to OPENING
208-
jobTaskState.isStatusStale(assignedTask)) {
206+
if (jobTaskState == null) {
207+
// executor node didn't have the chance to set job status to OPENING
209208
++numberOfAllocatingJobs;
210209
jobState = JobState.OPENING;
211210
} else {
212211
jobState = jobTaskState.getState();
212+
if (jobTaskState.isStatusStale(assignedTask)) {
213+
if (jobState == JobState.CLOSING) {
214+
// previous executor node failed while the job was closing - it won't
215+
// be reopened, so consider it CLOSED for resource usage purposes
216+
jobState = JobState.CLOSED;
217+
} else if (jobState != JobState.FAILED) {
218+
// previous executor node failed and current executor node didn't
219+
// have the chance to set job status to OPENING
220+
++numberOfAllocatingJobs;
221+
jobState = JobState.OPENING;
222+
}
223+
}
213224
}
214-
// Don't count FAILED jobs, as they don't consume native memory
215-
if (jobState != JobState.FAILED) {
225+
// Don't count CLOSED or FAILED jobs, as they don't consume native memory
226+
if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) {
216227
++numberOfAssignedJobs;
217228
String assignedJobId = ((OpenJobAction.JobParams) assignedTask.getParams()).getJobId();
218229
Job assignedJob = mlMetadata.getJobs().get(assignedJobId);

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
import java.io.IOException;
5656
import java.net.InetAddress;
5757
import java.util.ArrayList;
58-
import java.util.Arrays;
5958
import java.util.Collections;
6059
import java.util.Date;
6160
import java.util.HashMap;
@@ -285,7 +284,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() {
285284
nodeAttr, Collections.emptySet(), Version.CURRENT))
286285
.build();
287286

288-
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
287+
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
289288
addJobTask("job_id1", "_node_id1", null, tasksBuilder);
290289
addJobTask("job_id2", "_node_id1", null, tasksBuilder);
291290
addJobTask("job_id3", "_node_id2", null, tasksBuilder);
@@ -340,6 +339,55 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() {
340339
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
341340
}
342341

342+
public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob() {
343+
Map<String, String> nodeAttr = new HashMap<>();
344+
nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
345+
DiscoveryNodes nodes = DiscoveryNodes.builder()
346+
.add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
347+
nodeAttr, Collections.emptySet(), Version.CURRENT))
348+
.add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301),
349+
nodeAttr, Collections.emptySet(), Version.CURRENT))
350+
.add(new DiscoveryNode("_node_name3", "_node_id3", new TransportAddress(InetAddress.getLoopbackAddress(), 9302),
351+
nodeAttr, Collections.emptySet(), Version.CURRENT))
352+
.build();
353+
354+
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
355+
addJobTask("job_id1", "_node_id1", JobState.fromString("failed"), tasksBuilder);
356+
// This will make the allocation stale for job_id1
357+
tasksBuilder.reassignTask(MlMetadata.jobTaskId("job_id1"), new Assignment("_node_id1", "test assignment"));
358+
addJobTask("job_id2", "_node_id1", null, tasksBuilder);
359+
addJobTask("job_id3", "_node_id2", null, tasksBuilder);
360+
addJobTask("job_id4", "_node_id2", null, tasksBuilder);
361+
addJobTask("job_id5", "_node_id3", null, tasksBuilder);
362+
addJobTask("job_id6", "_node_id3", null, tasksBuilder);
363+
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
364+
365+
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
366+
csBuilder.nodes(nodes);
367+
MetaData.Builder metaData = MetaData.builder();
368+
RoutingTable.Builder routingTable = RoutingTable.builder();
369+
addJobAndIndices(metaData, routingTable, "job_id1", "job_id2", "job_id3", "job_id4", "job_id5", "job_id6", "job_id7", "job_id8");
370+
csBuilder.routingTable(routingTable.build());
371+
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
372+
csBuilder.metaData(metaData);
373+
374+
ClusterState cs = csBuilder.build();
375+
// Allocation won't be possible if the stale failed job is treated as opening
376+
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, 10, 30, logger);
377+
assertEquals("_node_id1", result.getExecutorNode());
378+
379+
tasksBuilder = PersistentTasksCustomMetaData.builder(tasks);
380+
addJobTask("job_id7", "_node_id1", null, tasksBuilder);
381+
tasks = tasksBuilder.build();
382+
383+
csBuilder = ClusterState.builder(cs);
384+
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
385+
cs = csBuilder.build();
386+
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", cs, 2, 10, 30, logger);
387+
assertNull("no node selected, because OPENING state", result.getExecutorNode());
388+
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
389+
}
390+
343391
public void testSelectLeastLoadedMlNode_noCompatibleJobTypeNodes() {
344392
Map<String, String> nodeAttr = new HashMap<>();
345393
nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
@@ -678,13 +726,13 @@ private ClusterState getClusterStateWithMappingsWithMetaData(Map<String, Object>
678726

679727
private static Function<String, Job> jobWithRulesCreator() {
680728
return jobId -> {
681-
DetectionRule rule = new DetectionRule.Builder(Arrays.asList(
729+
DetectionRule rule = new DetectionRule.Builder(Collections.singletonList(
682730
new RuleCondition(RuleCondition.AppliesTo.TYPICAL, Operator.LT, 100.0)
683731
)).build();
684732

685733
Detector.Builder detector = new Detector.Builder("count", null);
686-
detector.setRules(Arrays.asList(rule));
687-
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build()));
734+
detector.setRules(Collections.singletonList(rule));
735+
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
688736
DataDescription.Builder dataDescription = new DataDescription.Builder();
689737
Job.Builder job = new Job.Builder(jobId);
690738
job.setAnalysisConfig(analysisConfig);

0 commit comments

Comments
 (0)