Skip to content

Commit 4d8bba3

Browse files
authored
[7.x] [ML] adding ml autoscaling integration test (#65638) (#65775)
* [ML] adding ml autoscaling integration test (#65638) This adds ml autoscaling integration tests. The test verifies that the scaling requirements adjust according to the current real load on the cluster given machine learning jobs of various sizes. Additionally, there was a bug in the ml scaling service settings. This commit addresses the bug.
1 parent abdd276 commit 4d8bba3

File tree

10 files changed

+242
-24
lines changed

10 files changed

+242
-24
lines changed

x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/action/GetAutoscalingCapacityAction.java

+3
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,9 @@ public int hashCode() {
112112
return Objects.hash(results);
113113
}
114114

115+
public Map<String, AutoscalingDeciderResults> getResults() {
116+
return results;
117+
}
115118
}
116119

117120
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisLimits.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -87,17 +87,17 @@ public AnalysisLimits(Long categorizationExamplesLimit) {
8787
this(DEFAULT_MODEL_MEMORY_LIMIT_MB, categorizationExamplesLimit);
8888
}
8989

90-
public AnalysisLimits(Long modelMemoryLimit, Long categorizationExamplesLimit) {
91-
if (modelMemoryLimit != null && modelMemoryLimit < 1) {
92-
String msg = Messages.getMessage(Messages.JOB_CONFIG_MODEL_MEMORY_LIMIT_TOO_LOW, modelMemoryLimit, "1 MiB");
90+
public AnalysisLimits(Long modelMemoryLimitMb, Long categorizationExamplesLimit) {
91+
if (modelMemoryLimitMb != null && modelMemoryLimitMb < 1) {
92+
String msg = Messages.getMessage(Messages.JOB_CONFIG_MODEL_MEMORY_LIMIT_TOO_LOW, modelMemoryLimitMb, "1 MiB");
9393
throw ExceptionsHelper.badRequestException(msg);
9494
}
9595
if (categorizationExamplesLimit != null && categorizationExamplesLimit < 0) {
9696
String msg = Messages.getMessage(Messages.JOB_CONFIG_FIELD_VALUE_TOO_LOW, CATEGORIZATION_EXAMPLES_LIMIT, 0,
9797
categorizationExamplesLimit);
9898
throw ExceptionsHelper.badRequestException(msg);
9999
}
100-
this.modelMemoryLimit = modelMemoryLimit;
100+
this.modelMemoryLimit = modelMemoryLimitMb;
101101
this.categorizationExamplesLimit = categorizationExamplesLimit;
102102
}
103103

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.ml.integration;
8+
9+
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
10+
import org.elasticsearch.cluster.node.DiscoveryNode;
11+
import org.elasticsearch.common.settings.Settings;
12+
import org.elasticsearch.common.unit.ByteSizeValue;
13+
import org.elasticsearch.common.unit.TimeValue;
14+
import org.elasticsearch.xpack.autoscaling.Autoscaling;
15+
import org.elasticsearch.xpack.autoscaling.action.GetAutoscalingCapacityAction;
16+
import org.elasticsearch.xpack.autoscaling.action.PutAutoscalingPolicyAction;
17+
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult;
18+
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResults;
19+
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
20+
import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
21+
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
22+
import org.elasticsearch.xpack.core.ml.job.config.Detector;
23+
import org.elasticsearch.xpack.core.ml.job.config.Job;
24+
import org.elasticsearch.xpack.ml.MachineLearning;
25+
import org.elasticsearch.xpack.ml.autoscaling.MlAutoscalingDeciderService;
26+
import org.elasticsearch.xpack.ml.autoscaling.NativeMemoryCapacity;
27+
28+
import java.util.Arrays;
29+
import java.util.Collections;
30+
import java.util.List;
31+
import java.util.SortedMap;
32+
import java.util.TreeMap;
33+
import java.util.TreeSet;
34+
import java.util.stream.Collectors;
35+
36+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
37+
import static org.hamcrest.Matchers.containsString;
38+
import static org.hamcrest.Matchers.equalTo;
39+
import static org.hamcrest.Matchers.hasKey;
40+
41+
public class AutoscalingIT extends MlNativeAutodetectIntegTestCase {
42+
43+
private static final long BASIC_REQUIREMENT_MB = 10;
44+
private static final long NATIVE_PROCESS_OVERHEAD_MB = 30;
45+
private static final long BASELINE_OVERHEAD_MB = BASIC_REQUIREMENT_MB + NATIVE_PROCESS_OVERHEAD_MB;
46+
47+
// This test assumes that xpack.ml.max_machine_memory_percent is 30
48+
// and that xpack.ml.use_auto_machine_memory_percent is false
49+
public void testMLAutoscalingCapacity() {
50+
SortedMap<String, Settings> deciders = new TreeMap<>();
51+
deciders.put(MlAutoscalingDeciderService.NAME,
52+
Settings.builder().put(MlAutoscalingDeciderService.DOWN_SCALE_DELAY.getKey(), TimeValue.ZERO).build());
53+
final PutAutoscalingPolicyAction.Request request = new PutAutoscalingPolicyAction.Request(
54+
"ml_test",
55+
new TreeSet<>(),
56+
deciders
57+
);
58+
assertAcked(client().execute(PutAutoscalingPolicyAction.INSTANCE, request).actionGet());
59+
60+
assertMlCapacity(
61+
client().execute(
62+
GetAutoscalingCapacityAction.INSTANCE,
63+
new GetAutoscalingCapacityAction.Request()
64+
).actionGet(),
65+
"Requesting scale down as tier and/or node size could be smaller",
66+
0L,
67+
0L);
68+
69+
putJob("job1", 100);
70+
putJob("job2", 200);
71+
openJob("job1");
72+
openJob("job2");
73+
long expectedTierBytes = (long)Math.ceil(
74+
ByteSizeValue.ofMb(100 + BASELINE_OVERHEAD_MB + 200 + BASELINE_OVERHEAD_MB).getBytes() * 100 / 30.0
75+
);
76+
long expectedNodeBytes = (long)Math.ceil(ByteSizeValue.ofMb(200 + BASELINE_OVERHEAD_MB).getBytes() * 100 / 30.0);
77+
78+
assertMlCapacity(
79+
client().execute(
80+
GetAutoscalingCapacityAction.INSTANCE,
81+
new GetAutoscalingCapacityAction.Request()
82+
).actionGet(),
83+
"Requesting scale down as tier and/or node size could be smaller",
84+
expectedTierBytes,
85+
expectedNodeBytes);
86+
87+
putJob("bigjob1", 60_000);
88+
putJob("bigjob2", 50_000);
89+
openJob("bigjob1");
90+
openJob("bigjob2");
91+
List<DiscoveryNode> mlNodes = admin()
92+
.cluster()
93+
.prepareNodesInfo()
94+
.all()
95+
.get()
96+
.getNodes()
97+
.stream()
98+
.map(NodeInfo::getNode)
99+
.filter(MachineLearning::isMlNode)
100+
.collect(Collectors.toList());
101+
NativeMemoryCapacity currentScale = MlAutoscalingDeciderService.currentScale(mlNodes, 30, false);
102+
expectedTierBytes = (long)Math.ceil(
103+
(ByteSizeValue.ofMb(50_000 + BASIC_REQUIREMENT_MB + 60_000 + BASIC_REQUIREMENT_MB).getBytes()
104+
+ currentScale.getTier()
105+
) * 100 / 30.0
106+
);
107+
expectedNodeBytes = (long)Math.ceil(ByteSizeValue.ofMb(60_000 + BASELINE_OVERHEAD_MB).getBytes() * 100 / 30.0);
108+
109+
110+
assertMlCapacity(
111+
client().execute(
112+
GetAutoscalingCapacityAction.INSTANCE,
113+
new GetAutoscalingCapacityAction.Request()
114+
).actionGet(),
115+
"requesting scale up as number of jobs in queues exceeded configured limit",
116+
expectedTierBytes,
117+
expectedNodeBytes);
118+
119+
expectedTierBytes = (long)Math.ceil(
120+
ByteSizeValue.ofMb(100 + BASELINE_OVERHEAD_MB + 200 + BASELINE_OVERHEAD_MB).getBytes() * 100 / 30.0
121+
);
122+
expectedNodeBytes = (long)Math.ceil(ByteSizeValue.ofMb(200 + BASELINE_OVERHEAD_MB).getBytes() * 100 / 30.0);
123+
closeJob("bigjob1");
124+
closeJob("bigjob2");
125+
126+
assertMlCapacity(
127+
client().execute(
128+
GetAutoscalingCapacityAction.INSTANCE,
129+
new GetAutoscalingCapacityAction.Request()
130+
).actionGet(),
131+
"Requesting scale down as tier and/or node size could be smaller",
132+
expectedTierBytes,
133+
expectedNodeBytes);
134+
closeJob("job1");
135+
closeJob("job2");
136+
137+
assertMlCapacity(
138+
client().execute(
139+
GetAutoscalingCapacityAction.INSTANCE,
140+
new GetAutoscalingCapacityAction.Request()
141+
).actionGet(),
142+
"Requesting scale down as tier and/or node size could be smaller",
143+
0L,
144+
0L);
145+
}
146+
147+
private void assertMlCapacity(GetAutoscalingCapacityAction.Response capacity, String reason, long tierBytes, long nodeBytes) {
148+
assertThat(capacity.getResults(), hasKey("ml_test"));
149+
AutoscalingDeciderResults autoscalingDeciderResults = capacity.getResults().get("ml_test");
150+
assertThat(autoscalingDeciderResults.results(), hasKey("ml"));
151+
152+
AutoscalingDeciderResult autoscalingDeciderResult = autoscalingDeciderResults.results().get("ml");
153+
assertThat(autoscalingDeciderResult.reason().summary(), containsString(reason));
154+
assertThat(autoscalingDeciderResult.requiredCapacity().tier().memory().getBytes(), equalTo(tierBytes));
155+
assertThat(autoscalingDeciderResult.requiredCapacity().node().memory().getBytes(), equalTo(nodeBytes));
156+
}
157+
158+
private void putJob(String jobId, long limitMb) {
159+
Job.Builder job =
160+
new Job.Builder(jobId)
161+
.setAllowLazyOpen(true)
162+
.setAnalysisLimits(new AnalysisLimits(limitMb, null))
163+
.setAnalysisConfig(
164+
new AnalysisConfig.Builder((List<Detector>) null)
165+
.setBucketSpan(TimeValue.timeValueHours(1))
166+
.setDetectors(
167+
Collections.singletonList(
168+
new Detector.Builder("count", null)
169+
.setPartitionFieldName("user")
170+
.build())))
171+
.setDataDescription(
172+
new DataDescription.Builder()
173+
.setTimeFormat("epoch"));
174+
175+
registerJob(job);
176+
putJob(job);
177+
}
178+
}

x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java

+24-4
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66
package org.elasticsearch.xpack.ml.integration;
77

88
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
9+
import org.elasticsearch.cluster.NamedDiff;
10+
import org.elasticsearch.xpack.autoscaling.Autoscaling;
11+
import org.elasticsearch.xpack.autoscaling.AutoscalingMetadata;
12+
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult;
913
import org.elasticsearch.xpack.core.action.CreateDataStreamAction;
1014
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
1115
import org.elasticsearch.action.support.master.AcknowledgedResponse;
@@ -72,6 +76,9 @@
7276
import org.elasticsearch.xpack.core.slm.history.SnapshotLifecycleTemplateRegistry;
7377
import org.elasticsearch.xpack.datastreams.DataStreamsPlugin;
7478
import org.elasticsearch.xpack.ilm.IndexLifecycle;
79+
import org.elasticsearch.xpack.ml.LocalStateMachineLearning;
80+
import org.elasticsearch.xpack.ml.MachineLearning;
81+
import org.elasticsearch.xpack.ml.autoscaling.MlScalingReason;
7582

7683
import java.io.IOException;
7784
import java.net.URISyntaxException;
@@ -106,7 +113,10 @@ protected NamedXContentRegistry xContentRegistry() {
106113
protected Collection<Class<? extends Plugin>> nodePlugins() {
107114
return Arrays.asList(
108115
LocalStateCompositeXPackPlugin.class,
116+
MachineLearning.class,
109117
Netty4Plugin.class,
118+
Autoscaling.class,
119+
ReindexPlugin.class,
110120
// The monitoring plugin requires script and gsub processors to be loaded
111121
IngestCommonPlugin.class,
112122
// The monitoring plugin script processor references painless. Include this for script compilation.
@@ -121,6 +131,8 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
121131
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
122132
return Arrays.asList(
123133
XPackClientPlugin.class,
134+
Autoscaling.class,
135+
MachineLearning.class,
124136
Netty4Plugin.class,
125137
ReindexPlugin.class,
126138
// ILM is required for .ml-state template index settings
@@ -144,6 +156,7 @@ protected Settings externalClusterClientSettings() {
144156
builder.put(SecurityField.USER_SETTING.getKey(), "x_pack_rest_user:" + SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING);
145157
builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), true);
146158
builder.put(XPackSettings.WATCHER_ENABLED.getKey(), false);
159+
builder.put(Autoscaling.AUTOSCALING_ENABLED_SETTING.getKey(), true);
147160
builder.put(LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.getKey(), false);
148161
builder.put(LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING.getKey(), false);
149162
builder.put("xpack.security.transport.ssl.enabled", true);
@@ -236,18 +249,25 @@ protected void ensureClusterStateConsistency() throws IOException {
236249
entries.add(new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new));
237250
entries.add(new NamedWriteableRegistry.Entry(LifecycleAction.class, RolloverAction.NAME, RolloverAction::new));
238251
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.DATAFEED_TASK_NAME,
239-
StartDatafeedAction.DatafeedParams::new));
252+
StartDatafeedAction.DatafeedParams::new));
240253
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME,
241254
StartDataFrameAnalyticsAction.TaskParams::new));
242255
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.JOB_TASK_NAME,
243-
OpenJobAction.JobParams::new));
244-
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.JOB_SNAPSHOT_UPGRADE_TASK_NAME,
245-
SnapshotUpgradeTaskParams::new));
256+
OpenJobAction.JobParams::new));
246257
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new));
247258
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, DatafeedState.NAME, DatafeedState::fromStream));
248259
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, DataFrameAnalyticsTaskState.NAME,
249260
DataFrameAnalyticsTaskState::new));
250261
entries.add(new NamedWriteableRegistry.Entry(ClusterState.Custom.class, TokenMetadata.TYPE, TokenMetadata::new));
262+
entries.add(new NamedWriteableRegistry.Entry(Metadata.Custom.class, AutoscalingMetadata.NAME, AutoscalingMetadata::new));
263+
entries.add(new NamedWriteableRegistry.Entry(NamedDiff.class,
264+
AutoscalingMetadata.NAME,
265+
AutoscalingMetadata.AutoscalingMetadataDiff::new));
266+
entries.add(new NamedWriteableRegistry.Entry(
267+
AutoscalingDeciderResult.Reason.class,
268+
MlScalingReason.NAME,
269+
MlScalingReason::new
270+
));
251271
final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries);
252272
ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState();
253273
byte[] masterClusterStateBytes = ClusterState.Builder.toBytes(masterClusterState);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ public Set<DiscoveryNodeRole> getRoles() {
435435
false,
436436
Property.NodeScope);
437437
public static final Setting<Integer> MAX_LAZY_ML_NODES =
438-
Setting.intSetting("xpack.ml.max_lazy_ml_nodes", 0, 0, 3, Property.Dynamic, Property.NodeScope);
438+
Setting.intSetting("xpack.ml.max_lazy_ml_nodes", 0, 0, Property.Dynamic, Property.NodeScope);
439439

440440
// Before 8.0.0 this needs to match the max allowed value for xpack.ml.max_open_jobs,
441441
// as the current node could be running in a cluster where some nodes are still using

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java

+17-7
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,10 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService,
6363
private static final String MEMORY_STALE = "unable to make scaling decision as job memory requirements are stale";
6464
private static final long NO_SCALE_DOWN_POSSIBLE = -1L;
6565

66+
public static final String NAME = "ml";
6667
public static final Setting<Integer> NUM_ANOMALY_JOBS_IN_QUEUE = Setting.intSetting("num_anomaly_jobs_in_queue", 0, 0);
6768
public static final Setting<Integer> NUM_ANALYTICS_JOBS_IN_QUEUE = Setting.intSetting("num_analytics_jobs_in_queue", 0, 0);
68-
public static final Setting<TimeValue> DOWN_SCALE_DELAY = Setting.timeSetting("num_analytics_jobs_in_queue", TimeValue.ZERO);
69+
public static final Setting<TimeValue> DOWN_SCALE_DELAY = Setting.timeSetting("down_scale_delay", TimeValue.timeValueHours(1));
6970

7071
private final NodeLoadDetector nodeLoadDetector;
7172
private final MlMemoryTracker mlMemoryTracker;
@@ -156,7 +157,8 @@ static Optional<NativeMemoryCapacity> requiredCapacityForUnassignedJobs(List<Str
156157
}
157158
jobSizes.sort(Comparator.comparingLong(Long::longValue).reversed());
158159
long tierMemory = 0L;
159-
long nodeMemory = jobSizes.get(0);
160+
// Node memory needs to be AT LEAST the size of the largest job + the required overhead.
161+
long nodeMemory = jobSizes.get(0) + MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes();
160162
Iterator<Long> iter = jobSizes.iterator();
161163
while (jobSizes.size() > maxNumInQueue && iter.hasNext()) {
162164
tierMemory += iter.next();
@@ -226,7 +228,9 @@ private boolean newScaleDownCheck() {
226228
return scaleDownDetected == NO_SCALE_DOWN_POSSIBLE;
227229
}
228230

229-
NativeMemoryCapacity currentScale(final List<DiscoveryNode> machineLearningNodes) {
231+
public static NativeMemoryCapacity currentScale(final List<DiscoveryNode> machineLearningNodes,
232+
int maxMachineMemoryPercent,
233+
boolean useAuto) {
230234
long[] mlMemory = machineLearningNodes.stream()
231235
.mapToLong(node -> NativeMemoryCalculator.allowedBytesForMl(node, maxMachineMemoryPercent, useAuto).orElse(0L))
232236
.toArray();
@@ -244,6 +248,10 @@ NativeMemoryCapacity currentScale(final List<DiscoveryNode> machineLearningNodes
244248
);
245249
}
246250

251+
NativeMemoryCapacity currentScale(final List<DiscoveryNode> machineLearningNodes) {
252+
return currentScale(machineLearningNodes, maxMachineMemoryPercent, useAuto);
253+
}
254+
247255
@Override
248256
public void offMaster() {
249257
isMaster = false;
@@ -567,14 +575,16 @@ Optional<AutoscalingDeciderResult> checkForScaleDown(List<DiscoveryNode> nodes,
567575
return Optional.empty();
568576
}
569577
long currentlyNecessaryTier = nodeLoads.stream().mapToLong(NodeLoad::getAssignedJobMemory).sum();
578+
// The required NATIVE node memory is the largest job and our static overhead.
579+
long currentlyNecessaryNode = largestJob == 0 ? 0 : largestJob + MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes();
570580
// We consider a scale down if we are not fully utilizing the tier
571581
// Or our largest job could be on a smaller node (meaning the same size tier but smaller nodes are possible).
572-
if (currentlyNecessaryTier < currentCapacity.getTier() || largestJob < currentCapacity.getNode()) {
582+
if (currentlyNecessaryTier < currentCapacity.getTier() || currentlyNecessaryNode < currentCapacity.getNode()) {
573583
NativeMemoryCapacity nativeMemoryCapacity = new NativeMemoryCapacity(
574584
currentlyNecessaryTier,
575-
largestJob,
585+
currentlyNecessaryNode,
576586
// If our newly suggested native capacity is the same, we can use the previously stored jvm size
577-
largestJob == currentCapacity.getNode() ? currentCapacity.getJvmSize() : null);
587+
currentlyNecessaryNode == currentCapacity.getNode() ? currentCapacity.getJvmSize() : null);
578588
return Optional.of(
579589
new AutoscalingDeciderResult(
580590
nativeMemoryCapacity.autoscalingCapacity(maxMachineMemoryPercent, useAuto),
@@ -588,7 +598,7 @@ Optional<AutoscalingDeciderResult> checkForScaleDown(List<DiscoveryNode> nodes,
588598

589599
@Override
590600
public String name() {
591-
return "ml";
601+
return NAME;
592602
}
593603

594604
@Override

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlScalingReason.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
public class MlScalingReason implements AutoscalingDeciderResult.Reason {
2323

24-
static final String NAME = "ml";
24+
public static final String NAME = MlAutoscalingDeciderService.NAME;
2525
static final String WAITING_ANALYTICS_JOBS = "waiting_analytics_jobs";
2626
static final String WAITING_ANOMALY_JOBS = "waiting_anomaly_jobs";
2727
static final String CONFIGURATION = "configuration";

0 commit comments

Comments
 (0)