From be2966ccd7b9a3c18ba48a3c31c8ab3d59a80e49 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Wed, 9 Dec 2020 14:51:53 -0500 Subject: [PATCH 1/2] [ML] handle largest possible node size settings at the node level --- .../xpack/ml/MachineLearning.java | 10 +++- ...ransportStartDataFrameAnalyticsAction.java | 1 - .../xpack/ml/job/JobNodeSelector.java | 3 +- .../elasticsearch/xpack/ml/job/NodeLoad.java | 46 +++++++++++++++---- .../xpack/ml/job/NodeLoadDetector.java | 11 ++++- .../upgrader/SnapshotUpgradeTaskExecutor.java | 1 - .../task/OpenJobPersistentTasksExecutor.java | 1 - .../AbstractJobPersistentTasksExecutor.java | 3 -- .../xpack/ml/job/JobNodeSelectorTests.java | 25 ++-------- 9 files changed, 62 insertions(+), 39 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 440ea8461986d..d794352c49bf4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -417,6 +417,7 @@ public Set getRoles() { public static final String MAX_OPEN_JOBS_NODE_ATTR = "ml.max_open_jobs"; public static final String MACHINE_MEMORY_NODE_ATTR = "ml.machine_memory"; public static final String MAX_JVM_SIZE_NODE_ATTR = "ml.max_jvm_size"; + public static final String MAX_NODE_SIZE_NODE_ATTR = "ml.max_node_size"; public static final Setting CONCURRENT_JOB_ALLOCATIONS = Setting.intSetting("xpack.ml.node_concurrent_job_allocations", 2, 0, Property.Dynamic, Property.NodeScope); /** @@ -560,6 +561,7 @@ public Settings additionalSettings() { String maxOpenJobsPerNodeNodeAttrName = "node.attr." + MAX_OPEN_JOBS_NODE_ATTR; String machineMemoryAttrName = "node.attr." + MACHINE_MEMORY_NODE_ATTR; String jvmSizeAttrName = "node.attr." + MAX_JVM_SIZE_NODE_ATTR; + String maxNodeSizeAttrName = "node.attr." + MAX_NODE_SIZE_NODE_ATTR; if (enabled == false) { disallowMlNodeAttributes(mlEnabledNodeAttrName, maxOpenJobsPerNodeNodeAttrName, machineMemoryAttrName); @@ -576,10 +578,16 @@ public Settings additionalSettings() { addMlNodeAttribute(additionalSettings, machineMemoryAttrName, Long.toString(machineMemoryFromStats(OsProbe.getInstance().osStats()))); addMlNodeAttribute(additionalSettings, jvmSizeAttrName, Long.toString(Runtime.getRuntime().maxMemory())); + addMlNodeAttribute(additionalSettings, maxNodeSizeAttrName, Long.toString(MAX_ML_NODE_SIZE.get(settings).getBytes())); // This is not used in v7 and higher, but users are still prevented from setting it directly to avoid confusion disallowMlNodeAttributes(mlEnabledNodeAttrName); } else { - disallowMlNodeAttributes(mlEnabledNodeAttrName, maxOpenJobsPerNodeNodeAttrName, machineMemoryAttrName); + disallowMlNodeAttributes(mlEnabledNodeAttrName, + maxOpenJobsPerNodeNodeAttrName, + machineMemoryAttrName, + jvmSizeAttrName, + maxNodeSizeAttrName + ); } return additionalSettings.build(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java index 6996169327050..4b1126f196f6f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java @@ -640,7 +640,6 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(TaskParams params, maxOpenJobs, Integer.MAX_VALUE, maxMachineMemoryPercent, - maxNodeMemory, isMemoryTrackerRecentlyRefreshed, useAutoMemoryPercentage ); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java index a58574eea052f..cb0d9e65eab8a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java @@ -85,7 +85,6 @@ public JobNodeSelector(ClusterState clusterState, public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJobs, int maxConcurrentJobAllocations, int maxMachineMemoryPercent, - long maxNodeSize, boolean isMemoryTrackerRecentlyRefreshed, boolean useAutoMemoryPercentage) { // Try to allocate jobs according to memory usage, but if that's not possible (maybe due to a mixed version cluster or maybe @@ -99,6 +98,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob List reasons = new LinkedList<>(); long maxAvailableCount = Long.MIN_VALUE; long maxAvailableMemory = Long.MIN_VALUE; + long maxNodeSize = 0L; DiscoveryNode minLoadedNodeByCount = null; DiscoveryNode minLoadedNodeByMemory = null; for (DiscoveryNode node : clusterState.getNodes()) { @@ -126,6 +126,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob reasons.add(reason); continue; } + maxNodeSize = Math.max(currentLoad.getMaxNodeSize(), maxNodeSize); // Assuming the node is eligible at all, check loading allocateByMemory = currentLoad.isUseMemory(); int maxNumberOfOpenJobs = currentLoad.getMaxJobs(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoad.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoad.java index 21ff5dc9442c2..860bfbddb227e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoad.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoad.java @@ -31,6 +31,7 @@ public class NodeLoad { private final long numAssignedJobs; private final long assignedJobMemory; private final long numAllocatingJobs; + private final long maxNodeSize; NodeLoad(long maxMemory, int maxJobs, @@ -39,7 +40,8 @@ public class NodeLoad { String error, long numAssignedJobs, long assignedJobMemory, - long numAllocatingJobs) { + long numAllocatingJobs, + long maxNodeSize) { this.maxMemory = maxMemory; this.maxJobs = maxJobs; this.nodeId = nodeId; @@ -48,6 +50,7 @@ public class NodeLoad { this.numAssignedJobs = numAssignedJobs; this.assignedJobMemory = assignedJobMemory; this.numAllocatingJobs = numAllocatingJobs; + this.maxNodeSize = maxNodeSize; } /** @@ -107,6 +110,13 @@ public long getNumAllocatingJobs() { return numAllocatingJobs; } + /** + * @return The max node size determined via setting `xpack.ml.max_ml_node_size` + */ + public long getMaxNodeSize() { + return maxNodeSize; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -118,13 +128,24 @@ public boolean equals(Object o) { numAssignedJobs == nodeLoad.numAssignedJobs && assignedJobMemory == nodeLoad.assignedJobMemory && numAllocatingJobs == nodeLoad.numAllocatingJobs && + maxNodeSize == nodeLoad.maxNodeSize && Objects.equals(nodeId, nodeLoad.nodeId) && Objects.equals(error, nodeLoad.error); } @Override public int hashCode() { - return Objects.hash(maxMemory, maxJobs, nodeId, useMemory, error, numAssignedJobs, assignedJobMemory, numAllocatingJobs); + return Objects.hash( + maxMemory, + maxJobs, + nodeId, + useMemory, + error, + numAssignedJobs, + assignedJobMemory, + numAllocatingJobs, + maxNodeSize + ); } public static Builder builder(String nodeId) { @@ -140,6 +161,7 @@ public static class Builder { private long numAssignedJobs; private long assignedJobMemory; private long numAllocatingJobs; + private long maxNodeSize; public Builder(String nodeId) { this.nodeId = nodeId; @@ -188,6 +210,11 @@ public Builder incNumAllocatingJobs() { return this; } + public Builder setMaxMlNodeSize(long maxNodeSize) { + this.maxNodeSize = maxNodeSize; + return this; + } + void adjustForAnomalyJob(JobState jobState, String jobId, MlMemoryTracker mlMemoryTracker) { @@ -236,13 +263,14 @@ void adjustForAnalyticsJob(PersistentTasksCustomMetadata.PersistentTask assig public NodeLoad build() { return new NodeLoad(maxMemory, - maxJobs, - nodeId, - useMemory, - error, - numAssignedJobs, - assignedJobMemory, - numAllocatingJobs); + maxJobs, + nodeId, + useMemory, + error, + numAssignedJobs, + assignedJobMemory, + numAllocatingJobs, + maxNodeSize); } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoadDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoadDetector.java index 7cc7c001365ac..88c953d5e1633 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoadDetector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoadDetector.java @@ -67,10 +67,19 @@ public NodeLoad detectNodeLoad(ClusterState clusterState, + "] is not a long"); } + long maxNodeSize = 0L; + String maxNodeSizeStr = nodeAttributes.get(MachineLearning.MAX_NODE_SIZE_NODE_ATTR); + try { + maxNodeSize = Long.parseLong(maxNodeSizeStr); + } catch (NumberFormatException e) { + // Do nothing, just treat it as not set + } + NodeLoad.Builder nodeLoad = NodeLoad.builder(node.getId()) .setMaxMemory(maxMlMemory.orElse(-1L)) .setMaxJobs(maxNumberOfOpenJobs) - .setUseMemory(isMemoryTrackerRecentlyRefreshed); + .setUseMemory(isMemoryTrackerRecentlyRefreshed) + .setMaxMlNodeSize(maxNodeSize); if (errors.isEmpty() == false) { return nodeLoad.setError(Strings.collectionToCommaDelimitedString(errors)).build(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java index 82dbe427f093c..6d1efb77afbcb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java @@ -92,7 +92,6 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(SnapshotUpgradeTas Integer.MAX_VALUE, Integer.MAX_VALUE, maxMachineMemoryPercent, - Long.MAX_VALUE, isMemoryTrackerRecentlyRefreshed, useAutoMemoryPercentage); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java index d39fa1e8eb6c2..efe6607931b62 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java @@ -110,7 +110,6 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(OpenJobAction.JobP maxOpenJobs, maxConcurrentJobAllocations, maxMachineMemoryPercent, - maxNodeMemory, isMemoryTrackerRecentlyRefreshed, useAutoMemoryPercentage); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java index b20f0cd3294b4..c58cc9f785f10 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java @@ -27,7 +27,6 @@ import java.util.Optional; import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE; -import static org.elasticsearch.xpack.ml.MachineLearning.MAX_ML_NODE_SIZE; import static org.elasticsearch.xpack.ml.MachineLearning.MAX_OPEN_JOBS_PER_NODE; import static org.elasticsearch.xpack.ml.MachineLearning.USE_AUTO_MACHINE_MEMORY_PERCENT; @@ -69,7 +68,6 @@ public static List verifyIndicesPrimaryShardsAreActive(ClusterState clus protected volatile int maxMachineMemoryPercent; protected volatile int maxLazyMLNodes; protected volatile int maxOpenJobs; - protected final long maxNodeMemory; protected AbstractJobPersistentTasksExecutor(String taskName, String executor, @@ -85,7 +83,6 @@ protected AbstractJobPersistentTasksExecutor(String taskName, this.maxLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings); this.maxOpenJobs = MAX_OPEN_JOBS_PER_NODE.get(settings); this.useAutoMemoryPercentage = USE_AUTO_MACHINE_MEMORY_PERCENT.get(settings); - this.maxNodeMemory = MAX_ML_NODE_SIZE.get(settings).getBytes(); clusterService.getClusterSettings() .addSettingsUpdateConsumer(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, this::setMaxConcurrentJobAllocations); clusterService.getClusterSettings() diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java index 9467edaa7b610..ae2a84a549fa4 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java @@ -49,8 +49,6 @@ // TODO: in 8.0.0 remove all instances of MAX_OPEN_JOBS_NODE_ATTR from this file public class JobNodeSelectorTests extends ESTestCase { - // To simplify the logic in this class all jobs have the same memory requirement - private static final long MAX_JOB_BYTES = ByteSizeValue.ofGb(1).getBytes(); private static final ByteSizeValue JOB_MEMORY_REQUIREMENT = ByteSizeValue.ofMb(10); private MlMemoryTracker memoryTracker; @@ -124,7 +122,6 @@ public void testSelectLeastLoadedMlNode_byCount() { PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, 30, - MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertEquals("", result.getExplanation()); @@ -150,7 +147,6 @@ public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_maxCapacityCountLim PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, - MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertNull(result.getExecutorNode()); @@ -178,7 +174,6 @@ public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_maxCapacityCount PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, - MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertNull(result.getExecutorNode()); @@ -211,7 +206,6 @@ public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_maxCapacityMemoryLi PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, - MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertNull(result.getExecutorNode()); @@ -239,7 +233,6 @@ public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_givenTaskHasNull PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, - MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertNotNull(result.getExecutorNode()); @@ -265,7 +258,6 @@ public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_firstJobTooBigMemor PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, - MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertNull(result.getExecutorNode()); @@ -301,7 +293,6 @@ public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_maxCapacityMemor maxRunningJobsPerNode, 2, maxMachineMemoryPercent, - MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertNull(result.getExecutorNode()); @@ -332,7 +323,6 @@ public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_firstJobTooBigMe maxRunningJobsPerNode, 2, maxMachineMemoryPercent, - MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertNull(result.getExecutorNode()); @@ -367,7 +357,6 @@ public void testSelectLeastLoadedMlNode_noMlNodes() { 20, 2, 30, - MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertTrue(result.getExplanation().contains("because this node isn't a ml node")); @@ -411,7 +400,6 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { 10, 2, 30, - MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertEquals("_node_id3", result.getExecutorNode()); @@ -431,7 +419,6 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { result = jobNodeSelector.selectNode(10, 2, 30, - MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertNull("no node selected, because OPENING state", result.getExecutorNode()); @@ -447,7 +434,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { cs = csBuilder.build(); jobNodeSelector = new JobNodeSelector(cs, job7.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0, node -> nodeFilter(node, job7)); - result = jobNodeSelector.selectNode(10, 2, 30, MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); + result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed, false); assertNull("no node selected, because stale task", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); @@ -461,7 +448,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { jobNodeSelector = new JobNodeSelector(cs, job7.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0, node -> nodeFilter(node, job7)); - result = jobNodeSelector.selectNode(10, 2, 30, MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); + result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed, false); assertNull("no node selected, because null state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); } @@ -507,7 +494,6 @@ public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob() PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, 30, - MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertEquals("_node_id1", result.getExecutorNode()); @@ -523,7 +509,7 @@ public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob() jobNodeSelector = new JobNodeSelector(cs, job8.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0, node -> nodeFilter(node, job8)); - result = jobNodeSelector.selectNode(10, 2, 30, MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); + result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed, false); assertNull("no node selected, because OPENING state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); } @@ -561,7 +547,6 @@ public void testSelectLeastLoadedMlNode_noCompatibleJobTypeNodes() { PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, 30, - MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertThat(result.getExplanation(), containsString("because this node does not support jobs of type [incompatible_type]")); @@ -598,7 +583,6 @@ public void testSelectLeastLoadedMlNode_noNodesMatchingModelSnapshotMinVersion() PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, 30, - MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertThat(result.getExplanation(), containsString( @@ -633,7 +617,6 @@ public void testSelectLeastLoadedMlNode_jobWithRules() { PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, 30, - MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertNotNull(result.getExecutorNode()); @@ -688,6 +671,7 @@ public void testMaximumPossibleNodeMemoryTooSmall() { Map nodeAttr = new HashMap<>(); nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode)); nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, Long.toString(machineMemory)); + nodeAttr.put(MachineLearning.MAX_NODE_SIZE_NODE_ATTR, Long.toString(10L)); ClusterState.Builder cs = fillNodesWithRunningJobs(nodeAttr, numNodes, maxRunningJobsPerNode); @@ -700,7 +684,6 @@ public void testMaximumPossibleNodeMemoryTooSmall() { PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, - 10L, isMemoryTrackerRecentlyRefreshed, false); assertNull(result.getExecutorNode()); From 3d678d515fbffdd8c1d4faf9b9de7423f736011b Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Thu, 10 Dec 2020 11:23:35 -0500 Subject: [PATCH 2/2] making some settings dynamic --- .../xpack/ml/MachineLearning.java | 8 ++-- ...ransportStartDataFrameAnalyticsAction.java | 1 + .../MlAutoscalingDeciderService.java | 7 ++- .../xpack/ml/job/JobNodeSelector.java | 3 +- .../elasticsearch/xpack/ml/job/NodeLoad.java | 46 ++++--------------- .../xpack/ml/job/NodeLoadDetector.java | 11 +---- .../upgrader/SnapshotUpgradeTaskExecutor.java | 1 + .../task/OpenJobPersistentTasksExecutor.java | 1 + .../AbstractJobPersistentTasksExecutor.java | 16 ++++++- ...ortStartDataFrameAnalyticsActionTests.java | 1 + .../MlAutoscalingDeciderServiceTests.java | 4 +- .../xpack/ml/job/JobNodeSelectorTests.java | 25 ++++++++-- .../OpenJobPersistentTasksExecutorTests.java | 27 ++++++++--- 13 files changed, 84 insertions(+), 67 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index d794352c49bf4..70a4b6382d7e8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -417,7 +417,6 @@ public Set getRoles() { public static final String MAX_OPEN_JOBS_NODE_ATTR = "ml.max_open_jobs"; public static final String MACHINE_MEMORY_NODE_ATTR = "ml.machine_memory"; public static final String MAX_JVM_SIZE_NODE_ATTR = "ml.max_jvm_size"; - public static final String MAX_NODE_SIZE_NODE_ATTR = "ml.max_node_size"; public static final Setting CONCURRENT_JOB_ALLOCATIONS = Setting.intSetting("xpack.ml.node_concurrent_job_allocations", 2, 0, Property.Dynamic, Property.NodeScope); /** @@ -449,6 +448,7 @@ public Set getRoles() { public static final Setting USE_AUTO_MACHINE_MEMORY_PERCENT = Setting.boolSetting( "xpack.ml.use_auto_machine_memory_percent", false, + Property.Dynamic, Property.NodeScope); public static final Setting MAX_LAZY_ML_NODES = Setting.intSetting("xpack.ml.max_lazy_ml_nodes", 0, 0, Property.Dynamic, Property.NodeScope); @@ -498,6 +498,7 @@ public Set getRoles() { public static final Setting MAX_ML_NODE_SIZE = Setting.byteSizeSetting( "xpack.ml.max_ml_node_size", ByteSizeValue.ZERO, + Property.Dynamic, Property.NodeScope); private static final Logger logger = LogManager.getLogger(MachineLearning.class); @@ -561,7 +562,6 @@ public Settings additionalSettings() { String maxOpenJobsPerNodeNodeAttrName = "node.attr." + MAX_OPEN_JOBS_NODE_ATTR; String machineMemoryAttrName = "node.attr." + MACHINE_MEMORY_NODE_ATTR; String jvmSizeAttrName = "node.attr." + MAX_JVM_SIZE_NODE_ATTR; - String maxNodeSizeAttrName = "node.attr." + MAX_NODE_SIZE_NODE_ATTR; if (enabled == false) { disallowMlNodeAttributes(mlEnabledNodeAttrName, maxOpenJobsPerNodeNodeAttrName, machineMemoryAttrName); @@ -578,15 +578,13 @@ public Settings additionalSettings() { addMlNodeAttribute(additionalSettings, machineMemoryAttrName, Long.toString(machineMemoryFromStats(OsProbe.getInstance().osStats()))); addMlNodeAttribute(additionalSettings, jvmSizeAttrName, Long.toString(Runtime.getRuntime().maxMemory())); - addMlNodeAttribute(additionalSettings, maxNodeSizeAttrName, Long.toString(MAX_ML_NODE_SIZE.get(settings).getBytes())); // This is not used in v7 and higher, but users are still prevented from setting it directly to avoid confusion disallowMlNodeAttributes(mlEnabledNodeAttrName); } else { disallowMlNodeAttributes(mlEnabledNodeAttrName, maxOpenJobsPerNodeNodeAttrName, machineMemoryAttrName, - jvmSizeAttrName, - maxNodeSizeAttrName + jvmSizeAttrName ); } return additionalSettings.build(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java index 4b1126f196f6f..6996169327050 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java @@ -640,6 +640,7 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(TaskParams params, maxOpenJobs, Integer.MAX_VALUE, maxMachineMemoryPercent, + maxNodeMemory, isMemoryTrackerRecentlyRefreshed, useAutoMemoryPercentage ); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java index 4f093e976c01e..310b4a1f2cdc5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java @@ -72,12 +72,12 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService, private final NodeLoadDetector nodeLoadDetector; private final MlMemoryTracker mlMemoryTracker; private final Supplier timeSupplier; - private final boolean useAuto; private volatile boolean isMaster; private volatile boolean running; private volatile int maxMachineMemoryPercent; private volatile int maxOpenJobs; + private volatile boolean useAuto; private volatile long lastTimeToScale; private volatile long scaleDownDetected; @@ -99,6 +99,7 @@ public MlAutoscalingDeciderService(MlMemoryTracker memoryTracker, Settings setti clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_MACHINE_MEMORY_PERCENT, this::setMaxMachineMemoryPercent); clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_OPEN_JOBS_PER_NODE, this::setMaxOpenJobs); + clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.USE_AUTO_MACHINE_MEMORY_PERCENT, this::setUseAuto); clusterService.addLocalNodeMasterListener(this); clusterService.addLifecycleListener(new LifecycleListener() { @Override @@ -206,6 +207,10 @@ void setMaxOpenJobs(int maxOpenJobs) { this.maxOpenJobs = maxOpenJobs; } + void setUseAuto(boolean useAuto) { + this.useAuto = useAuto; + } + @Override public void onMaster() { isMaster = true; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java index cb0d9e65eab8a..a58574eea052f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java @@ -85,6 +85,7 @@ public JobNodeSelector(ClusterState clusterState, public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJobs, int maxConcurrentJobAllocations, int maxMachineMemoryPercent, + long maxNodeSize, boolean isMemoryTrackerRecentlyRefreshed, boolean useAutoMemoryPercentage) { // Try to allocate jobs according to memory usage, but if that's not possible (maybe due to a mixed version cluster or maybe @@ -98,7 +99,6 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob List reasons = new LinkedList<>(); long maxAvailableCount = Long.MIN_VALUE; long maxAvailableMemory = Long.MIN_VALUE; - long maxNodeSize = 0L; DiscoveryNode minLoadedNodeByCount = null; DiscoveryNode minLoadedNodeByMemory = null; for (DiscoveryNode node : clusterState.getNodes()) { @@ -126,7 +126,6 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob reasons.add(reason); continue; } - maxNodeSize = Math.max(currentLoad.getMaxNodeSize(), maxNodeSize); // Assuming the node is eligible at all, check loading allocateByMemory = currentLoad.isUseMemory(); int maxNumberOfOpenJobs = currentLoad.getMaxJobs(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoad.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoad.java index 860bfbddb227e..21ff5dc9442c2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoad.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoad.java @@ -31,7 +31,6 @@ public class NodeLoad { private final long numAssignedJobs; private final long assignedJobMemory; private final long numAllocatingJobs; - private final long maxNodeSize; NodeLoad(long maxMemory, int maxJobs, @@ -40,8 +39,7 @@ public class NodeLoad { String error, long numAssignedJobs, long assignedJobMemory, - long numAllocatingJobs, - long maxNodeSize) { + long numAllocatingJobs) { this.maxMemory = maxMemory; this.maxJobs = maxJobs; this.nodeId = nodeId; @@ -50,7 +48,6 @@ public class NodeLoad { this.numAssignedJobs = numAssignedJobs; this.assignedJobMemory = assignedJobMemory; this.numAllocatingJobs = numAllocatingJobs; - this.maxNodeSize = maxNodeSize; } /** @@ -110,13 +107,6 @@ public long getNumAllocatingJobs() { return numAllocatingJobs; } - /** - * @return The max node size determined via setting `xpack.ml.max_ml_node_size` - */ - public long getMaxNodeSize() { - return maxNodeSize; - } - @Override public boolean equals(Object o) { if (this == o) return true; @@ -128,24 +118,13 @@ public boolean equals(Object o) { numAssignedJobs == nodeLoad.numAssignedJobs && assignedJobMemory == nodeLoad.assignedJobMemory && numAllocatingJobs == nodeLoad.numAllocatingJobs && - maxNodeSize == nodeLoad.maxNodeSize && Objects.equals(nodeId, nodeLoad.nodeId) && Objects.equals(error, nodeLoad.error); } @Override public int hashCode() { - return Objects.hash( - maxMemory, - maxJobs, - nodeId, - useMemory, - error, - numAssignedJobs, - assignedJobMemory, - numAllocatingJobs, - maxNodeSize - ); + return Objects.hash(maxMemory, maxJobs, nodeId, useMemory, error, numAssignedJobs, assignedJobMemory, numAllocatingJobs); } public static Builder builder(String nodeId) { @@ -161,7 +140,6 @@ public static class Builder { private long numAssignedJobs; private long assignedJobMemory; private long numAllocatingJobs; - private long maxNodeSize; public Builder(String nodeId) { this.nodeId = nodeId; @@ -210,11 +188,6 @@ public Builder incNumAllocatingJobs() { return this; } - public Builder setMaxMlNodeSize(long maxNodeSize) { - this.maxNodeSize = maxNodeSize; - return this; - } - void adjustForAnomalyJob(JobState jobState, String jobId, MlMemoryTracker mlMemoryTracker) { @@ -263,14 +236,13 @@ void adjustForAnalyticsJob(PersistentTasksCustomMetadata.PersistentTask assig public NodeLoad build() { return new NodeLoad(maxMemory, - maxJobs, - nodeId, - useMemory, - error, - numAssignedJobs, - assignedJobMemory, - numAllocatingJobs, - maxNodeSize); + maxJobs, + nodeId, + useMemory, + error, + numAssignedJobs, + assignedJobMemory, + numAllocatingJobs); } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoadDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoadDetector.java index 88c953d5e1633..7cc7c001365ac 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoadDetector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoadDetector.java @@ -67,19 +67,10 @@ public NodeLoad detectNodeLoad(ClusterState clusterState, + "] is not a long"); } - long maxNodeSize = 0L; - String maxNodeSizeStr = nodeAttributes.get(MachineLearning.MAX_NODE_SIZE_NODE_ATTR); - try { - maxNodeSize = Long.parseLong(maxNodeSizeStr); - } catch (NumberFormatException e) { - // Do nothing, just treat it as not set - } - NodeLoad.Builder nodeLoad = NodeLoad.builder(node.getId()) .setMaxMemory(maxMlMemory.orElse(-1L)) .setMaxJobs(maxNumberOfOpenJobs) - .setUseMemory(isMemoryTrackerRecentlyRefreshed) - .setMaxMlNodeSize(maxNodeSize); + .setUseMemory(isMemoryTrackerRecentlyRefreshed); if (errors.isEmpty() == false) { return nodeLoad.setError(Strings.collectionToCommaDelimitedString(errors)).build(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java index 6d1efb77afbcb..82dbe427f093c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java @@ -92,6 +92,7 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(SnapshotUpgradeTas Integer.MAX_VALUE, Integer.MAX_VALUE, maxMachineMemoryPercent, + Long.MAX_VALUE, isMemoryTrackerRecentlyRefreshed, useAutoMemoryPercentage); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java index efe6607931b62..d39fa1e8eb6c2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java @@ -110,6 +110,7 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(OpenJobAction.JobP maxOpenJobs, maxConcurrentJobAllocations, maxMachineMemoryPercent, + maxNodeMemory, isMemoryTrackerRecentlyRefreshed, useAutoMemoryPercentage); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java index c58cc9f785f10..d90d466b80e3f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java @@ -14,6 +14,7 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.persistent.PersistentTaskParams; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.persistent.PersistentTasksExecutor; @@ -27,6 +28,7 @@ import java.util.Optional; import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE; +import static org.elasticsearch.xpack.ml.MachineLearning.MAX_ML_NODE_SIZE; import static org.elasticsearch.xpack.ml.MachineLearning.MAX_OPEN_JOBS_PER_NODE; import static org.elasticsearch.xpack.ml.MachineLearning.USE_AUTO_MACHINE_MEMORY_PERCENT; @@ -59,7 +61,6 @@ public static List verifyIndicesPrimaryShardsAreActive(ClusterState clus return unavailableIndices; } - protected final boolean useAutoMemoryPercentage; protected final MlMemoryTracker memoryTracker; protected final IndexNameExpressionResolver expressionResolver; @@ -67,6 +68,8 @@ public static List verifyIndicesPrimaryShardsAreActive(ClusterState clus protected volatile int maxConcurrentJobAllocations; protected volatile int maxMachineMemoryPercent; protected volatile int maxLazyMLNodes; + protected volatile boolean useAutoMemoryPercentage; + protected volatile long maxNodeMemory; protected volatile int maxOpenJobs; protected AbstractJobPersistentTasksExecutor(String taskName, @@ -83,12 +86,15 @@ protected AbstractJobPersistentTasksExecutor(String taskName, this.maxLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings); this.maxOpenJobs = MAX_OPEN_JOBS_PER_NODE.get(settings); this.useAutoMemoryPercentage = USE_AUTO_MACHINE_MEMORY_PERCENT.get(settings); + this.maxNodeMemory = MAX_ML_NODE_SIZE.get(settings).getBytes(); clusterService.getClusterSettings() .addSettingsUpdateConsumer(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, this::setMaxConcurrentJobAllocations); clusterService.getClusterSettings() .addSettingsUpdateConsumer(MachineLearning.MAX_MACHINE_MEMORY_PERCENT, this::setMaxMachineMemoryPercent); clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_LAZY_ML_NODES, this::setMaxLazyMLNodes); clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_OPEN_JOBS_PER_NODE, this::setMaxOpenJobs); + clusterService.getClusterSettings().addSettingsUpdateConsumer(USE_AUTO_MACHINE_MEMORY_PERCENT, this::setUseAutoMemoryPercentage); + clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_ML_NODE_SIZE, this::setMaxNodeSize); } protected abstract String[] indicesOfInterest(Params params); @@ -133,6 +139,14 @@ void setMaxOpenJobs(int maxOpenJobs) { this.maxOpenJobs = maxOpenJobs; } + void setUseAutoMemoryPercentage(boolean useAutoMemoryPercentage) { + this.useAutoMemoryPercentage = useAutoMemoryPercentage; + } + + void setMaxNodeSize(ByteSizeValue maxNodeSize) { + this.maxNodeMemory = maxNodeSize.getBytes(); + } + public Optional checkRequiredIndices(String jobId, ClusterState clusterState, String... indicesOfInterest) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsActionTests.java index 7b3ed55e63acd..f49eb74ace686 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsActionTests.java @@ -153,6 +153,7 @@ private static TaskExecutor createTaskExecutor() { MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT, MachineLearning.USE_AUTO_MACHINE_MEMORY_PERCENT, + MachineLearning.MAX_ML_NODE_SIZE, MachineLearning.MAX_LAZY_ML_NODES, MachineLearning.MAX_OPEN_JOBS_PER_NODE)); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderServiceTests.java index 38a1b4a1f5a5f..7f15d88458e69 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderServiceTests.java @@ -67,7 +67,9 @@ public void setup() { timeSupplier = System::currentTimeMillis; ClusterSettings cSettings = new ClusterSettings( Settings.EMPTY, - Set.of(MachineLearning.MAX_MACHINE_MEMORY_PERCENT, MachineLearning.MAX_OPEN_JOBS_PER_NODE)); + Set.of(MachineLearning.MAX_MACHINE_MEMORY_PERCENT, + MachineLearning.MAX_OPEN_JOBS_PER_NODE, + MachineLearning.USE_AUTO_MACHINE_MEMORY_PERCENT)); when(clusterService.getClusterSettings()).thenReturn(cSettings); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java index ae2a84a549fa4..9467edaa7b610 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java @@ -49,6 +49,8 @@ // TODO: in 8.0.0 remove all instances of MAX_OPEN_JOBS_NODE_ATTR from this file public class JobNodeSelectorTests extends ESTestCase { + // To simplify the logic in this class all jobs have the same memory requirement + private static final long MAX_JOB_BYTES = ByteSizeValue.ofGb(1).getBytes(); private static final ByteSizeValue JOB_MEMORY_REQUIREMENT = ByteSizeValue.ofMb(10); private MlMemoryTracker memoryTracker; @@ -122,6 +124,7 @@ public void testSelectLeastLoadedMlNode_byCount() { PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, 30, + MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertEquals("", result.getExplanation()); @@ -147,6 +150,7 @@ public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_maxCapacityCountLim PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, + MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertNull(result.getExecutorNode()); @@ -174,6 +178,7 @@ public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_maxCapacityCount PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, + MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertNull(result.getExecutorNode()); @@ -206,6 +211,7 @@ public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_maxCapacityMemoryLi PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, + MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertNull(result.getExecutorNode()); @@ -233,6 +239,7 @@ public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_givenTaskHasNull PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, + MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertNotNull(result.getExecutorNode()); @@ -258,6 +265,7 @@ public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_firstJobTooBigMemor PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, + MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertNull(result.getExecutorNode()); @@ -293,6 +301,7 @@ public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_maxCapacityMemor maxRunningJobsPerNode, 2, maxMachineMemoryPercent, + MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertNull(result.getExecutorNode()); @@ -323,6 +332,7 @@ public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_firstJobTooBigMe maxRunningJobsPerNode, 2, maxMachineMemoryPercent, + MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertNull(result.getExecutorNode()); @@ -357,6 +367,7 @@ public void testSelectLeastLoadedMlNode_noMlNodes() { 20, 2, 30, + MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertTrue(result.getExplanation().contains("because this node isn't a ml node")); @@ -400,6 +411,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { 10, 2, 30, + MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertEquals("_node_id3", result.getExecutorNode()); @@ -419,6 +431,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { result = jobNodeSelector.selectNode(10, 2, 30, + MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertNull("no node selected, because OPENING state", result.getExecutorNode()); @@ -434,7 +447,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { cs = csBuilder.build(); jobNodeSelector = new JobNodeSelector(cs, job7.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0, node -> nodeFilter(node, job7)); - result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed, false); + result = jobNodeSelector.selectNode(10, 2, 30, MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertNull("no node selected, because stale task", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); @@ -448,7 +461,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { jobNodeSelector = new JobNodeSelector(cs, job7.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0, node -> nodeFilter(node, job7)); - result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed, false); + result = jobNodeSelector.selectNode(10, 2, 30, MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertNull("no node selected, because null state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); } @@ -494,6 +507,7 @@ public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob() PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, 30, + MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertEquals("_node_id1", result.getExecutorNode()); @@ -509,7 +523,7 @@ public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob() jobNodeSelector = new JobNodeSelector(cs, job8.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0, node -> nodeFilter(node, job8)); - result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed, false); + result = jobNodeSelector.selectNode(10, 2, 30, MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertNull("no node selected, because OPENING state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); } @@ -547,6 +561,7 @@ public void testSelectLeastLoadedMlNode_noCompatibleJobTypeNodes() { PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, 30, + MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertThat(result.getExplanation(), containsString("because this node does not support jobs of type [incompatible_type]")); @@ -583,6 +598,7 @@ public void testSelectLeastLoadedMlNode_noNodesMatchingModelSnapshotMinVersion() PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, 30, + MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertThat(result.getExplanation(), containsString( @@ -617,6 +633,7 @@ public void testSelectLeastLoadedMlNode_jobWithRules() { PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, 30, + MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertNotNull(result.getExecutorNode()); @@ -671,7 +688,6 @@ public void testMaximumPossibleNodeMemoryTooSmall() { Map nodeAttr = new HashMap<>(); nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode)); nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, Long.toString(machineMemory)); - nodeAttr.put(MachineLearning.MAX_NODE_SIZE_NODE_ATTR, Long.toString(10L)); ClusterState.Builder cs = fillNodesWithRunningJobs(nodeAttr, numNodes, maxRunningJobsPerNode); @@ -684,6 +700,7 @@ public void testMaximumPossibleNodeMemoryTooSmall() { PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, + 10L, isMemoryTrackerRecentlyRefreshed, false); assertNull(result.getExecutorNode()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java index fe657a72374fb..73f7da9106a4a 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java @@ -110,8 +110,13 @@ public void testValidate_givenValidJob() { public void testGetAssignment_GivenJobThatRequiresMigration() { ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, - Sets.newHashSet(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT, - MachineLearning.MAX_LAZY_ML_NODES, MachineLearning.MAX_OPEN_JOBS_PER_NODE, MachineLearning.USE_AUTO_MACHINE_MEMORY_PERCENT) + Sets.newHashSet(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, + MachineLearning.MAX_MACHINE_MEMORY_PERCENT, + MachineLearning.MAX_LAZY_ML_NODES, + MachineLearning.MAX_ML_NODE_SIZE, + MachineLearning.MAX_OPEN_JOBS_PER_NODE, + MachineLearning.USE_AUTO_MACHINE_MEMORY_PERCENT + ) ); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); @@ -125,8 +130,13 @@ public void testGetAssignment_GivenJobThatRequiresMigration() { public void testGetAssignment_GivenUnavailableIndicesWithLazyNode() { Settings settings = Settings.builder().put(MachineLearning.MAX_LAZY_ML_NODES.getKey(), 1).build(); ClusterSettings clusterSettings = new ClusterSettings(settings, - Sets.newHashSet(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT, - MachineLearning.MAX_LAZY_ML_NODES, MachineLearning.MAX_OPEN_JOBS_PER_NODE, MachineLearning.USE_AUTO_MACHINE_MEMORY_PERCENT) + Sets.newHashSet(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, + MachineLearning.MAX_MACHINE_MEMORY_PERCENT, + MachineLearning.MAX_LAZY_ML_NODES, + MachineLearning.MAX_ML_NODE_SIZE, + MachineLearning.MAX_OPEN_JOBS_PER_NODE, + MachineLearning.USE_AUTO_MACHINE_MEMORY_PERCENT + ) ); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); @@ -150,8 +160,13 @@ public void testGetAssignment_GivenUnavailableIndicesWithLazyNode() { public void testGetAssignment_GivenLazyJobAndNoGlobalLazyNodes() { Settings settings = Settings.builder().put(MachineLearning.MAX_LAZY_ML_NODES.getKey(), 0).build(); ClusterSettings clusterSettings = new ClusterSettings(settings, - Sets.newHashSet(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT, - MachineLearning.MAX_LAZY_ML_NODES, MachineLearning.MAX_OPEN_JOBS_PER_NODE, MachineLearning.USE_AUTO_MACHINE_MEMORY_PERCENT) + Sets.newHashSet(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, + MachineLearning.MAX_MACHINE_MEMORY_PERCENT, + MachineLearning.MAX_LAZY_ML_NODES, + MachineLearning.MAX_ML_NODE_SIZE, + MachineLearning.MAX_OPEN_JOBS_PER_NODE, + MachineLearning.USE_AUTO_MACHINE_MEMORY_PERCENT + ) ); when(clusterService.getClusterSettings()).thenReturn(clusterSettings);