Skip to content

Commit 68358df

Browse files
authored
[ML] adding ml autoscaling integration test (#65638)
This adds ml autoscaling integration tests. The test verifies that the scaling requirements adjust according to the current real load on the cluster given machine learning jobs of various sizes. Additionally, there was a bug in the ml scaling service settings. This commit addresses the bug.
1 parent ff5cb90 commit 68358df

File tree

9 files changed

+235
-21
lines changed

9 files changed

+235
-21
lines changed

x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/action/GetAutoscalingCapacityAction.java

+3
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,9 @@ public int hashCode() {
112112
return Objects.hash(results);
113113
}
114114

115+
public Map<String, AutoscalingDeciderResults> getResults() {
116+
return results;
117+
}
115118
}
116119

117120
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisLimits.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -87,17 +87,17 @@ public AnalysisLimits(Long categorizationExamplesLimit) {
8787
this(DEFAULT_MODEL_MEMORY_LIMIT_MB, categorizationExamplesLimit);
8888
}
8989

90-
public AnalysisLimits(Long modelMemoryLimit, Long categorizationExamplesLimit) {
91-
if (modelMemoryLimit != null && modelMemoryLimit < 1) {
92-
String msg = Messages.getMessage(Messages.JOB_CONFIG_MODEL_MEMORY_LIMIT_TOO_LOW, modelMemoryLimit, "1 MiB");
90+
public AnalysisLimits(Long modelMemoryLimitMb, Long categorizationExamplesLimit) {
91+
if (modelMemoryLimitMb != null && modelMemoryLimitMb < 1) {
92+
String msg = Messages.getMessage(Messages.JOB_CONFIG_MODEL_MEMORY_LIMIT_TOO_LOW, modelMemoryLimitMb, "1 MiB");
9393
throw ExceptionsHelper.badRequestException(msg);
9494
}
9595
if (categorizationExamplesLimit != null && categorizationExamplesLimit < 0) {
9696
String msg = Messages.getMessage(Messages.JOB_CONFIG_FIELD_VALUE_TOO_LOW, CATEGORIZATION_EXAMPLES_LIMIT, 0,
9797
categorizationExamplesLimit);
9898
throw ExceptionsHelper.badRequestException(msg);
9999
}
100-
this.modelMemoryLimit = modelMemoryLimit;
100+
this.modelMemoryLimit = modelMemoryLimitMb;
101101
this.categorizationExamplesLimit = categorizationExamplesLimit;
102102
}
103103

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.ml.integration;
8+
9+
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
10+
import org.elasticsearch.cluster.node.DiscoveryNode;
11+
import org.elasticsearch.common.settings.Settings;
12+
import org.elasticsearch.common.unit.ByteSizeValue;
13+
import org.elasticsearch.common.unit.TimeValue;
14+
import org.elasticsearch.xpack.autoscaling.Autoscaling;
15+
import org.elasticsearch.xpack.autoscaling.action.GetAutoscalingCapacityAction;
16+
import org.elasticsearch.xpack.autoscaling.action.PutAutoscalingPolicyAction;
17+
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult;
18+
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResults;
19+
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
20+
import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
21+
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
22+
import org.elasticsearch.xpack.core.ml.job.config.Detector;
23+
import org.elasticsearch.xpack.core.ml.job.config.Job;
24+
import org.elasticsearch.xpack.ml.MachineLearning;
25+
import org.elasticsearch.xpack.ml.autoscaling.MlAutoscalingDeciderService;
26+
import org.elasticsearch.xpack.ml.autoscaling.NativeMemoryCapacity;
27+
28+
import java.util.Arrays;
29+
import java.util.Collections;
30+
import java.util.List;
31+
import java.util.SortedMap;
32+
import java.util.TreeMap;
33+
import java.util.TreeSet;
34+
import java.util.stream.Collectors;
35+
36+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
37+
import static org.hamcrest.Matchers.containsString;
38+
import static org.hamcrest.Matchers.equalTo;
39+
import static org.hamcrest.Matchers.hasKey;
40+
41+
public class AutoscalingIT extends MlNativeAutodetectIntegTestCase {
42+
43+
private static final long BASIC_REQUIREMENT_MB = 10;
44+
private static final long NATIVE_PROCESS_OVERHEAD_MB = 30;
45+
private static final long BASELINE_OVERHEAD_MB = BASIC_REQUIREMENT_MB + NATIVE_PROCESS_OVERHEAD_MB;
46+
47+
@Override
48+
protected Settings externalClusterClientSettings() {
49+
return Settings.builder().put(super.externalClusterClientSettings())
50+
.put(Autoscaling.AUTOSCALING_ENABLED_SETTING.getKey(), true)
51+
.build();
52+
}
53+
54+
// This test assumes that xpack.ml.max_machine_memory_percent is 30
55+
// and that xpack.ml.use_auto_machine_memory_percent is false
56+
public void testMLAutoscalingCapacity() {
57+
SortedMap<String, Settings> deciders = new TreeMap<>();
58+
deciders.put(MlAutoscalingDeciderService.NAME,
59+
Settings.builder().put(MlAutoscalingDeciderService.DOWN_SCALE_DELAY.getKey(), TimeValue.ZERO).build());
60+
final PutAutoscalingPolicyAction.Request request = new PutAutoscalingPolicyAction.Request(
61+
"ml_test",
62+
new TreeSet<>(Arrays.asList("ml")),
63+
deciders
64+
);
65+
assertAcked(client().execute(PutAutoscalingPolicyAction.INSTANCE, request).actionGet());
66+
67+
assertMlCapacity(
68+
client().execute(
69+
GetAutoscalingCapacityAction.INSTANCE,
70+
new GetAutoscalingCapacityAction.Request()
71+
).actionGet(),
72+
"Requesting scale down as tier and/or node size could be smaller",
73+
0L,
74+
0L);
75+
76+
putJob("job1", 100);
77+
putJob("job2", 200);
78+
openJob("job1");
79+
openJob("job2");
80+
long expectedTierBytes = (long)Math.ceil(
81+
ByteSizeValue.ofMb(100 + BASELINE_OVERHEAD_MB + 200 + BASELINE_OVERHEAD_MB).getBytes() * 100 / 30.0
82+
);
83+
long expectedNodeBytes = (long)Math.ceil(ByteSizeValue.ofMb(200 + BASELINE_OVERHEAD_MB).getBytes() * 100 / 30.0);
84+
85+
assertMlCapacity(
86+
client().execute(
87+
GetAutoscalingCapacityAction.INSTANCE,
88+
new GetAutoscalingCapacityAction.Request()
89+
).actionGet(),
90+
"Requesting scale down as tier and/or node size could be smaller",
91+
expectedTierBytes,
92+
expectedNodeBytes);
93+
94+
putJob("bigjob1", 60_000);
95+
putJob("bigjob2", 50_000);
96+
openJob("bigjob1");
97+
openJob("bigjob2");
98+
List<DiscoveryNode> mlNodes = admin()
99+
.cluster()
100+
.prepareNodesInfo()
101+
.all()
102+
.get()
103+
.getNodes()
104+
.stream()
105+
.map(NodeInfo::getNode)
106+
.filter(MachineLearning::isMlNode)
107+
.collect(Collectors.toList());
108+
NativeMemoryCapacity currentScale = MlAutoscalingDeciderService.currentScale(mlNodes, 30, false);
109+
expectedTierBytes = (long)Math.ceil(
110+
(ByteSizeValue.ofMb(50_000 + BASIC_REQUIREMENT_MB + 60_000 + BASIC_REQUIREMENT_MB).getBytes()
111+
+ currentScale.getTier()
112+
) * 100 / 30.0
113+
);
114+
expectedNodeBytes = (long)Math.ceil(ByteSizeValue.ofMb(60_000 + BASELINE_OVERHEAD_MB).getBytes() * 100 / 30.0);
115+
116+
117+
assertMlCapacity(
118+
client().execute(
119+
GetAutoscalingCapacityAction.INSTANCE,
120+
new GetAutoscalingCapacityAction.Request()
121+
).actionGet(),
122+
"requesting scale up as number of jobs in queues exceeded configured limit",
123+
expectedTierBytes,
124+
expectedNodeBytes);
125+
126+
expectedTierBytes = (long)Math.ceil(
127+
ByteSizeValue.ofMb(100 + BASELINE_OVERHEAD_MB + 200 + BASELINE_OVERHEAD_MB).getBytes() * 100 / 30.0
128+
);
129+
expectedNodeBytes = (long)Math.ceil(ByteSizeValue.ofMb(200 + BASELINE_OVERHEAD_MB).getBytes() * 100 / 30.0);
130+
closeJob("bigjob1");
131+
closeJob("bigjob2");
132+
133+
assertMlCapacity(
134+
client().execute(
135+
GetAutoscalingCapacityAction.INSTANCE,
136+
new GetAutoscalingCapacityAction.Request()
137+
).actionGet(),
138+
"Requesting scale down as tier and/or node size could be smaller",
139+
expectedTierBytes,
140+
expectedNodeBytes);
141+
closeJob("job1");
142+
closeJob("job2");
143+
144+
assertMlCapacity(
145+
client().execute(
146+
GetAutoscalingCapacityAction.INSTANCE,
147+
new GetAutoscalingCapacityAction.Request()
148+
).actionGet(),
149+
"Requesting scale down as tier and/or node size could be smaller",
150+
0L,
151+
0L);
152+
}
153+
154+
private void assertMlCapacity(GetAutoscalingCapacityAction.Response capacity, String reason, long tierBytes, long nodeBytes) {
155+
assertThat(capacity.getResults(), hasKey("ml_test"));
156+
AutoscalingDeciderResults autoscalingDeciderResults = capacity.getResults().get("ml_test");
157+
assertThat(autoscalingDeciderResults.results(), hasKey("ml"));
158+
159+
AutoscalingDeciderResult autoscalingDeciderResult = autoscalingDeciderResults.results().get("ml");
160+
assertThat(autoscalingDeciderResult.reason().summary(), containsString(reason));
161+
assertThat(autoscalingDeciderResult.requiredCapacity().tier().memory().getBytes(), equalTo(tierBytes));
162+
assertThat(autoscalingDeciderResult.requiredCapacity().node().memory().getBytes(), equalTo(nodeBytes));
163+
}
164+
165+
private void putJob(String jobId, long limitMb) {
166+
Job.Builder job =
167+
new Job.Builder(jobId)
168+
.setAllowLazyOpen(true)
169+
.setAnalysisLimits(new AnalysisLimits(limitMb, null))
170+
.setAnalysisConfig(
171+
new AnalysisConfig.Builder((List<Detector>) null)
172+
.setBucketSpan(TimeValue.timeValueHours(1))
173+
.setDetectors(
174+
Collections.singletonList(
175+
new Detector.Builder("count", null)
176+
.setPartitionFieldName("user")
177+
.build())))
178+
.setDataDescription(
179+
new DataDescription.Builder()
180+
.setTimeFormat("epoch"));
181+
182+
registerJob(job);
183+
putJob(job);
184+
}
185+
}

x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java

+17-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66
package org.elasticsearch.xpack.ml.integration;
77

88
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
9+
import org.elasticsearch.cluster.NamedDiff;
10+
import org.elasticsearch.xpack.autoscaling.Autoscaling;
11+
import org.elasticsearch.xpack.autoscaling.AutoscalingMetadata;
12+
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult;
913
import org.elasticsearch.xpack.core.action.CreateDataStreamAction;
1014
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
1115
import org.elasticsearch.action.support.master.AcknowledgedResponse;
@@ -73,6 +77,7 @@
7377
import org.elasticsearch.xpack.datastreams.DataStreamsPlugin;
7478
import org.elasticsearch.xpack.ilm.IndexLifecycle;
7579
import org.elasticsearch.xpack.ml.LocalStateMachineLearning;
80+
import org.elasticsearch.xpack.ml.autoscaling.MlScalingReason;
7681

7782
import java.io.IOException;
7883
import java.io.UncheckedIOException;
@@ -113,6 +118,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
113118
return Arrays.asList(
114119
LocalStateMachineLearning.class,
115120
Netty4Plugin.class,
121+
Autoscaling.class,
116122
ReindexPlugin.class,
117123
// The monitoring plugin requires script and gsub processors to be loaded
118124
IngestCommonPlugin.class,
@@ -257,16 +263,25 @@ protected void ensureClusterStateConsistency() throws IOException {
257263
entries.add(new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new));
258264
entries.add(new NamedWriteableRegistry.Entry(LifecycleAction.class, RolloverAction.NAME, RolloverAction::new));
259265
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.DATAFEED_TASK_NAME,
260-
StartDatafeedAction.DatafeedParams::new));
266+
StartDatafeedAction.DatafeedParams::new));
261267
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME,
262268
StartDataFrameAnalyticsAction.TaskParams::new));
263269
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.JOB_TASK_NAME,
264-
OpenJobAction.JobParams::new));
270+
OpenJobAction.JobParams::new));
265271
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new));
266272
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, DatafeedState.NAME, DatafeedState::fromStream));
267273
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, DataFrameAnalyticsTaskState.NAME,
268274
DataFrameAnalyticsTaskState::new));
269275
entries.add(new NamedWriteableRegistry.Entry(ClusterState.Custom.class, TokenMetadata.TYPE, TokenMetadata::new));
276+
entries.add(new NamedWriteableRegistry.Entry(Metadata.Custom.class, AutoscalingMetadata.NAME, AutoscalingMetadata::new));
277+
entries.add(new NamedWriteableRegistry.Entry(NamedDiff.class,
278+
AutoscalingMetadata.NAME,
279+
AutoscalingMetadata.AutoscalingMetadataDiff::new));
280+
entries.add(new NamedWriteableRegistry.Entry(
281+
AutoscalingDeciderResult.Reason.class,
282+
MlScalingReason.NAME,
283+
MlScalingReason::new
284+
));
270285
final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries);
271286
ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState();
272287
byte[] masterClusterStateBytes = ClusterState.Builder.toBytes(masterClusterState);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,7 @@ public Set<DiscoveryNodeRole> getRoles() {
450450
false,
451451
Property.NodeScope);
452452
public static final Setting<Integer> MAX_LAZY_ML_NODES =
453-
Setting.intSetting("xpack.ml.max_lazy_ml_nodes", 0, 0, 3, Property.Dynamic, Property.NodeScope);
453+
Setting.intSetting("xpack.ml.max_lazy_ml_nodes", 0, 0, Property.Dynamic, Property.NodeScope);
454454

455455
// Before 8.0.0 this needs to match the max allowed value for xpack.ml.max_open_jobs,
456456
// as the current node could be running in a cluster where some nodes are still using

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java

+17-7
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,10 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService,
6363
private static final String MEMORY_STALE = "unable to make scaling decision as job memory requirements are stale";
6464
private static final long NO_SCALE_DOWN_POSSIBLE = -1L;
6565

66+
public static final String NAME = "ml";
6667
public static final Setting<Integer> NUM_ANOMALY_JOBS_IN_QUEUE = Setting.intSetting("num_anomaly_jobs_in_queue", 0, 0);
6768
public static final Setting<Integer> NUM_ANALYTICS_JOBS_IN_QUEUE = Setting.intSetting("num_analytics_jobs_in_queue", 0, 0);
68-
public static final Setting<TimeValue> DOWN_SCALE_DELAY = Setting.timeSetting("num_analytics_jobs_in_queue", TimeValue.ZERO);
69+
public static final Setting<TimeValue> DOWN_SCALE_DELAY = Setting.timeSetting("down_scale_delay", TimeValue.timeValueHours(1));
6970

7071
private final NodeLoadDetector nodeLoadDetector;
7172
private final MlMemoryTracker mlMemoryTracker;
@@ -156,7 +157,8 @@ static Optional<NativeMemoryCapacity> requiredCapacityForUnassignedJobs(List<Str
156157
}
157158
jobSizes.sort(Comparator.comparingLong(Long::longValue).reversed());
158159
long tierMemory = 0L;
159-
long nodeMemory = jobSizes.get(0);
160+
// Node memory needs to be AT LEAST the size of the largest job + the required overhead.
161+
long nodeMemory = jobSizes.get(0) + MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes();
160162
Iterator<Long> iter = jobSizes.iterator();
161163
while (jobSizes.size() > maxNumInQueue && iter.hasNext()) {
162164
tierMemory += iter.next();
@@ -226,7 +228,9 @@ private boolean newScaleDownCheck() {
226228
return scaleDownDetected == NO_SCALE_DOWN_POSSIBLE;
227229
}
228230

229-
NativeMemoryCapacity currentScale(final List<DiscoveryNode> machineLearningNodes) {
231+
public static NativeMemoryCapacity currentScale(final List<DiscoveryNode> machineLearningNodes,
232+
int maxMachineMemoryPercent,
233+
boolean useAuto) {
230234
long[] mlMemory = machineLearningNodes.stream()
231235
.mapToLong(node -> NativeMemoryCalculator.allowedBytesForMl(node, maxMachineMemoryPercent, useAuto).orElse(0L))
232236
.toArray();
@@ -244,6 +248,10 @@ NativeMemoryCapacity currentScale(final List<DiscoveryNode> machineLearningNodes
244248
);
245249
}
246250

251+
NativeMemoryCapacity currentScale(final List<DiscoveryNode> machineLearningNodes) {
252+
return currentScale(machineLearningNodes, maxMachineMemoryPercent, useAuto);
253+
}
254+
247255
@Override
248256
public void offMaster() {
249257
isMaster = false;
@@ -567,14 +575,16 @@ Optional<AutoscalingDeciderResult> checkForScaleDown(List<DiscoveryNode> nodes,
567575
return Optional.empty();
568576
}
569577
long currentlyNecessaryTier = nodeLoads.stream().mapToLong(NodeLoad::getAssignedJobMemory).sum();
578+
// The required NATIVE node memory is the largest job and our static overhead.
579+
long currentlyNecessaryNode = largestJob == 0 ? 0 : largestJob + MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes();
570580
// We consider a scale down if we are not fully utilizing the tier
571581
// Or our largest job could be on a smaller node (meaning the same size tier but smaller nodes are possible).
572-
if (currentlyNecessaryTier < currentCapacity.getTier() || largestJob < currentCapacity.getNode()) {
582+
if (currentlyNecessaryTier < currentCapacity.getTier() || currentlyNecessaryNode < currentCapacity.getNode()) {
573583
NativeMemoryCapacity nativeMemoryCapacity = new NativeMemoryCapacity(
574584
currentlyNecessaryTier,
575-
largestJob,
585+
currentlyNecessaryNode,
576586
// If our newly suggested native capacity is the same, we can use the previously stored jvm size
577-
largestJob == currentCapacity.getNode() ? currentCapacity.getJvmSize() : null);
587+
currentlyNecessaryNode == currentCapacity.getNode() ? currentCapacity.getJvmSize() : null);
578588
return Optional.of(
579589
new AutoscalingDeciderResult(
580590
nativeMemoryCapacity.autoscalingCapacity(maxMachineMemoryPercent, useAuto),
@@ -588,7 +598,7 @@ Optional<AutoscalingDeciderResult> checkForScaleDown(List<DiscoveryNode> nodes,
588598

589599
@Override
590600
public String name() {
591-
return "ml";
601+
return NAME;
592602
}
593603

594604
@Override

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlScalingReason.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
public class MlScalingReason implements AutoscalingDeciderResult.Reason {
2323

24-
static final String NAME = "ml";
24+
public static final String NAME = MlAutoscalingDeciderService.NAME;
2525
static final String WAITING_ANALYTICS_JOBS = "waiting_analytics_jobs";
2626
static final String WAITING_ANOMALY_JOBS = "waiting_anomaly_jobs";
2727
static final String CONFIGURATION = "configuration";

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/NativeMemoryCapacity.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ AutoscalingCapacity autoscalingCapacity(int maxMemoryPercent, boolean useAuto) {
5454
// will have the same dynamic memory calculation. And thus the tier is simply the sum of the memory necessary
5555
// times that scaling factor.
5656
NativeMemoryCalculator.modelMemoryPercent(node, jvmSize, maxMemoryPercent, useAuto);
57-
double inverseScale = maxMemoryPercent <= 0 ? 0 : 100.0 / memoryPercentForMl;
57+
double inverseScale = memoryPercentForMl <= 0 ? 0 : 100.0 / memoryPercentForMl;
5858
return new AutoscalingCapacity(
5959
new AutoscalingCapacity.AutoscalingResources(null, ByteSizeValue.ofBytes((long)Math.ceil(tier * inverseScale))),
6060
new AutoscalingCapacity.AutoscalingResources(null, ByteSizeValue.ofBytes((long)Math.ceil(node * inverseScale))));

0 commit comments

Comments
 (0)