Skip to content

Commit 28758b0

Browse files
authored
[ML] allow autoscaling to work when vertical scaling is possible (#84242)
When an NLP model is deployed, or a DFA/Anomaly job is assigned, we have historically relied only on the xpack.ml.max_lazy_ml_nodes to determine if scaling is possible. But, in certain scenarios, it may be that scaling is available when xpack.ml.max_lazy_ml_nodes is fully satisfied. xpack.ml.max_ml_node_size is now checked to see if the current ML nodes exceed this size. If not, we assume vertical scaling is possible and allow the tasks to be created. closes #84198
1 parent c468612 commit 28758b0

File tree

5 files changed

+116
-12
lines changed

5 files changed

+116
-12
lines changed

docs/changelog/84242.yaml

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 84242
2+
summary: Allow autoscaling to work when vertical scaling is possible
3+
area: Machine Learning
4+
type: bug
5+
issues:
6+
- 84198

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartTrainedModelDeploymentAction.java

+20-3
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.cluster.service.ClusterService;
2828
import org.elasticsearch.common.inject.Inject;
2929
import org.elasticsearch.common.settings.Settings;
30+
import org.elasticsearch.common.unit.ByteSizeValue;
3031
import org.elasticsearch.common.util.set.Sets;
3132
import org.elasticsearch.core.TimeValue;
3233
import org.elasticsearch.index.query.QueryBuilders;
@@ -62,6 +63,7 @@
6263
import org.elasticsearch.xpack.ml.inference.allocation.TrainedModelAllocationService;
6364
import org.elasticsearch.xpack.ml.inference.persistence.ChunkedTrainedModelRestorer;
6465
import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelDefinitionDoc;
66+
import org.elasticsearch.xpack.ml.job.NodeLoadDetector;
6567
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
6668

6769
import java.util.Collections;
@@ -70,6 +72,7 @@
7072
import java.util.List;
7173
import java.util.Map;
7274
import java.util.Objects;
75+
import java.util.OptionalLong;
7376
import java.util.Set;
7477
import java.util.function.Predicate;
7578
import java.util.stream.Collectors;
@@ -89,6 +92,7 @@ public class TransportStartTrainedModelDeploymentAction extends TransportMasterN
8992
private final NamedXContentRegistry xContentRegistry;
9093
private final MlMemoryTracker memoryTracker;
9194
protected volatile int maxLazyMLNodes;
95+
protected volatile long maxMLNodeSize;
9296

9397
@Inject
9498
public TransportStartTrainedModelDeploymentAction(
@@ -121,13 +125,19 @@ public TransportStartTrainedModelDeploymentAction(
121125
this.memoryTracker = Objects.requireNonNull(memoryTracker);
122126
this.trainedModelAllocationService = Objects.requireNonNull(trainedModelAllocationService);
123127
this.maxLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings);
128+
this.maxMLNodeSize = MachineLearning.MAX_ML_NODE_SIZE.get(settings).getBytes();
124129
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_LAZY_ML_NODES, this::setMaxLazyMLNodes);
130+
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_ML_NODE_SIZE, this::setMaxMLNodeSize);
125131
}
126132

127133
private void setMaxLazyMLNodes(int value) {
128134
this.maxLazyMLNodes = value;
129135
}
130136

137+
private void setMaxMLNodeSize(ByteSizeValue value) {
138+
this.maxMLNodeSize = value.getBytes();
139+
}
140+
131141
@Override
132142
protected void masterOperation(
133143
Task task,
@@ -241,7 +251,7 @@ private void waitForDeploymentState(
241251
AllocationStatus.State state,
242252
ActionListener<CreateTrainedModelAllocationAction.Response> listener
243253
) {
244-
DeploymentStartedPredicate predicate = new DeploymentStartedPredicate(modelId, state, maxLazyMLNodes);
254+
DeploymentStartedPredicate predicate = new DeploymentStartedPredicate(modelId, state, maxLazyMLNodes, maxMLNodeSize);
245255
trainedModelAllocationService.waitForAllocationCondition(
246256
modelId,
247257
predicate,
@@ -402,11 +412,13 @@ private static class DeploymentStartedPredicate implements Predicate<ClusterStat
402412
private final String modelId;
403413
private final AllocationStatus.State waitForState;
404414
private final int maxLazyMLNodes;
415+
private final long maxMLNodeSize;
405416

406-
DeploymentStartedPredicate(String modelId, AllocationStatus.State waitForState, int maxLazyMLNodes) {
417+
DeploymentStartedPredicate(String modelId, AllocationStatus.State waitForState, int maxLazyMLNodes, long maxMLNodeSize) {
407418
this.modelId = ExceptionsHelper.requireNonNull(modelId, "model_id");
408419
this.waitForState = waitForState;
409420
this.maxLazyMLNodes = maxLazyMLNodes;
421+
this.maxMLNodeSize = maxMLNodeSize;
410422
}
411423

412424
@Override
@@ -445,9 +457,14 @@ public boolean test(ClusterState clusterState) {
445457
.filter(d -> nodesShuttingDown.contains(d.getId()) == false)
446458
.filter(TaskParams::mayAllocateToNode)
447459
.collect(Collectors.toList());
460+
OptionalLong smallestMLNode = nodes.stream().map(NodeLoadDetector::getNodeSize).flatMapToLong(OptionalLong::stream).min();
448461

449462
// No nodes allocated at all!
450-
if (nodesAndState.isEmpty() && maxLazyMLNodes <= nodes.size()) {
463+
if (nodesAndState.isEmpty()
464+
// We cannot scale horizontally
465+
&& maxLazyMLNodes <= nodes.size()
466+
// We cannot scale vertically
467+
&& (smallestMLNode.isEmpty() || smallestMLNode.getAsLong() >= maxMLNodeSize)) {
451468
String msg = "Could not start deployment because no suitable nodes were found, allocation explanation ["
452469
+ trainedModelAllocation.getReason()
453470
+ "]";

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java

+24-7
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.Locale;
2828
import java.util.Map;
2929
import java.util.Objects;
30+
import java.util.OptionalLong;
3031
import java.util.TreeMap;
3132
import java.util.function.Function;
3233
import java.util.stream.Collectors;
@@ -272,40 +273,45 @@ public PersistentTasksCustomMetadata.Assignment selectNode(
272273
reasons.values(),
273274
maxNodeSize > 0L
274275
? NativeMemoryCalculator.allowedBytesForMl(maxNodeSize, maxMachineMemoryPercent, useAutoMemoryPercentage)
275-
: Long.MAX_VALUE
276+
: Long.MAX_VALUE,
277+
maxNodeSize
276278
);
277279
}
278280

279281
PersistentTasksCustomMetadata.Assignment createAssignment(
280282
long estimatedMemoryUsage,
281283
DiscoveryNode minLoadedNode,
282284
Collection<String> reasons,
283-
long biggestPossibleJob
285+
long mostAvailableMemoryForML,
286+
long maxNodeSize
284287
) {
285288
if (minLoadedNode == null) {
286289
String explanation = String.join("|", reasons);
287290
PersistentTasksCustomMetadata.Assignment currentAssignment = new PersistentTasksCustomMetadata.Assignment(null, explanation);
288291
logger.debug("no node selected for job [{}], reasons [{}]", jobId, explanation);
289-
if ((MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + estimatedMemoryUsage) > biggestPossibleJob) {
292+
if ((MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + estimatedMemoryUsage) > mostAvailableMemoryForML) {
290293
ParameterizedMessage message = new ParameterizedMessage(
291294
"[{}] not waiting for node assignment as estimated job size [{}] is greater than largest possible job size [{}]",
292295
jobId,
293296
MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + estimatedMemoryUsage,
294-
biggestPossibleJob
297+
mostAvailableMemoryForML
295298
);
296299
logger.info(message);
297300
List<String> newReasons = new ArrayList<>(reasons);
298301
newReasons.add(message.getFormattedMessage());
299302
explanation = String.join("|", newReasons);
300303
return new PersistentTasksCustomMetadata.Assignment(null, explanation);
301304
}
302-
return considerLazyAssignment(currentAssignment);
305+
return considerLazyAssignment(currentAssignment, maxNodeSize);
303306
}
304307
logger.debug("selected node [{}] for job [{}]", minLoadedNode, jobId);
305308
return new PersistentTasksCustomMetadata.Assignment(minLoadedNode.getId(), "");
306309
}
307310

308-
PersistentTasksCustomMetadata.Assignment considerLazyAssignment(PersistentTasksCustomMetadata.Assignment currentAssignment) {
311+
PersistentTasksCustomMetadata.Assignment considerLazyAssignment(
312+
PersistentTasksCustomMetadata.Assignment currentAssignment,
313+
long maxNodeSize
314+
) {
309315

310316
assert currentAssignment.getExecutorNode() == null;
311317

@@ -316,10 +322,21 @@ PersistentTasksCustomMetadata.Assignment considerLazyAssignment(PersistentTasksC
316322
}
317323
}
318324

325+
// Can we scale horizontally?
319326
if (numMlNodes < maxLazyNodes) { // Means we have lazy nodes left to allocate
320327
return AWAITING_LAZY_ASSIGNMENT;
321328
}
322-
329+
// Can we scale vertically and is scaling possible?
330+
if (maxNodeSize > 0L && maxLazyNodes > 0) {
331+
OptionalLong smallestMLNode = candidateNodes.stream()
332+
.filter(MachineLearning::isMlNode)
333+
.map(NodeLoadDetector::getNodeSize)
334+
.flatMapToLong(OptionalLong::stream)
335+
.min();
336+
if (smallestMLNode.isPresent() && smallestMLNode.getAsLong() < maxNodeSize) {
337+
return AWAITING_LAZY_ASSIGNMENT;
338+
}
339+
}
323340
return currentAssignment;
324341
}
325342

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoadDetector.java

+17
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,27 @@
2929
import java.util.OptionalLong;
3030
import java.util.stream.Collectors;
3131

32+
import static org.elasticsearch.xpack.ml.MachineLearning.MACHINE_MEMORY_NODE_ATTR;
33+
3234
public class NodeLoadDetector {
3335

3436
private final MlMemoryTracker mlMemoryTracker;
3537

38+
/**
39+
* Returns the node's total memory size.
40+
* @param node The node whose size to grab
41+
* @return maybe the answer, will be empty if size cannot be determined
42+
*/
43+
public static OptionalLong getNodeSize(DiscoveryNode node) {
44+
String memoryString = node.getAttributes().get(MACHINE_MEMORY_NODE_ATTR);
45+
try {
46+
return OptionalLong.of(Long.parseLong(memoryString));
47+
} catch (NumberFormatException e) {
48+
assert e == null : "ml.machine_memory should parse because we set it internally: invalid value was " + memoryString;
49+
return OptionalLong.empty();
50+
}
51+
}
52+
3653
public NodeLoadDetector(MlMemoryTracker memoryTracker) {
3754
this.mlMemoryTracker = memoryTracker;
3855
}

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java

+49-2
Original file line numberDiff line numberDiff line change
@@ -1009,7 +1009,8 @@ public void testConsiderLazyAssignmentWithNoLazyNodes() {
10091009
node -> nodeFilter(node, job)
10101010
);
10111011
PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.considerLazyAssignment(
1012-
new PersistentTasksCustomMetadata.Assignment(null, "foo")
1012+
new PersistentTasksCustomMetadata.Assignment(null, "foo"),
1013+
ByteSizeValue.ofGb(1).getBytes()
10131014
);
10141015
assertEquals("foo", result.getExplanation());
10151016
assertNull(result.getExecutorNode());
@@ -1053,7 +1054,53 @@ public void testConsiderLazyAssignmentWithLazyNodes() {
10531054
node -> nodeFilter(node, job)
10541055
);
10551056
PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.considerLazyAssignment(
1056-
new PersistentTasksCustomMetadata.Assignment(null, "foo")
1057+
new PersistentTasksCustomMetadata.Assignment(null, "foo"),
1058+
ByteSizeValue.ofGb(1).getBytes()
1059+
);
1060+
assertEquals(JobNodeSelector.AWAITING_LAZY_ASSIGNMENT.getExplanation(), result.getExplanation());
1061+
assertNull(result.getExecutorNode());
1062+
}
1063+
1064+
public void testConsiderLazyAssignmentWithFilledLazyNodesAndVerticalScale() {
1065+
DiscoveryNodes nodes = DiscoveryNodes.builder()
1066+
.add(
1067+
new DiscoveryNode(
1068+
"_node_name1",
1069+
"_node_id1",
1070+
new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
1071+
Map.of(MachineLearning.MACHINE_MEMORY_NODE_ATTR, Long.toString(ByteSizeValue.ofGb(1).getBytes())),
1072+
ROLES_WITH_ML,
1073+
Version.CURRENT
1074+
)
1075+
)
1076+
.add(
1077+
new DiscoveryNode(
1078+
"_node_name2",
1079+
"_node_id2",
1080+
new TransportAddress(InetAddress.getLoopbackAddress(), 9301),
1081+
Map.of(MachineLearning.MACHINE_MEMORY_NODE_ATTR, Long.toString(ByteSizeValue.ofGb(1).getBytes())),
1082+
ROLES_WITH_ML,
1083+
Version.CURRENT
1084+
)
1085+
)
1086+
.build();
1087+
1088+
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
1089+
cs.nodes(nodes);
1090+
1091+
Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id1000", JOB_MEMORY_REQUIREMENT).build(new Date());
1092+
JobNodeSelector jobNodeSelector = new JobNodeSelector(
1093+
cs.build(),
1094+
shuffled(cs.nodes()),
1095+
job.getId(),
1096+
MlTasks.JOB_TASK_NAME,
1097+
memoryTracker,
1098+
randomIntBetween(1, 3),
1099+
node -> nodeFilter(node, job)
1100+
);
1101+
PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.considerLazyAssignment(
1102+
new PersistentTasksCustomMetadata.Assignment(null, "foo"),
1103+
ByteSizeValue.ofGb(64).getBytes()
10571104
);
10581105
assertEquals(JobNodeSelector.AWAITING_LAZY_ASSIGNMENT.getExplanation(), result.getExplanation());
10591106
assertNull(result.getExecutorNode());

0 commit comments

Comments
 (0)