Skip to content

Commit caf35c9

Browse files
authored
[7.x] [ML] Autoscaling for machine learning (elastic#59309) (elastic#65151)
* [ML] Autoscaling for machine learning (elastic#59309) This provides an autoscaling service integration for machine learning. The underlying logic is fairly straightforward with a couple of caveats: - When considering to scale up/down, ML will automatically translate between Node size and the memory that the node will potentially provide for ML after the scaling plan is implemented. - If knowledge of job sizes is out of date, we will do a best effort check for scaling up. But, if that cannot be determined with our current view of job memory, we attempt a refresh and return a no_scale event - We assume that if the auto memory percent calculation is being used, we treat all JVM sizes on the nodes the same. - For scale down, we keep our last scale down calculation time in memory. So, if master nodes are changed in between, we reset the scale down delay.
1 parent 39f5f39 commit caf35c9

25 files changed

+2078
-178
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ public static String jobTaskId(String jobId) {
4848
return JOB_TASK_ID_PREFIX + jobId;
4949
}
5050

51+
public static String jobId(String jobTaskId) {
52+
return jobTaskId.substring(JOB_TASK_ID_PREFIX.length());
53+
}
54+
5155
/**
5256
* Namespaces the task ids for datafeeds.
5357
* A job id can be used as a datafeed id, because they are stored separately in cluster state.
@@ -67,6 +71,10 @@ public static String dataFrameAnalyticsTaskId(String id) {
6771
return DATA_FRAME_ANALYTICS_TASK_ID_PREFIX + id;
6872
}
6973

74+
public static String dataFrameAnalyticsId(String taskId) {
75+
return taskId.substring(DATA_FRAME_ANALYTICS_TASK_ID_PREFIX.length());
76+
}
77+
7078
@Nullable
7179
public static PersistentTasksCustomMetadata.PersistentTask<?> getJobTask(String jobId, @Nullable PersistentTasksCustomMetadata tasks) {
7280
return tasks == null ? null : tasks.getTask(jobTaskId(jobId));

x-pack/plugin/ml/build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ esplugin {
55
description 'Elasticsearch Expanded Pack Plugin - Machine Learning'
66
classname 'org.elasticsearch.xpack.ml.MachineLearning'
77
hasNativeController true
8-
extendedPlugins = ['x-pack-core', 'lang-painless']
8+
extendedPlugins = ['x-pack-autoscaling', 'lang-painless']
99
}
1010

1111

@@ -50,6 +50,7 @@ dependencies {
5050
compileOnly project(path: xpackModule('core'), configuration: 'default')
5151
testImplementation project(path: xpackModule('core'), configuration: 'testArtifacts')
5252
testImplementation project(path: xpackModule('ilm'), configuration: 'default')
53+
compileOnly project(path: xpackModule('autoscaling'), configuration: 'default')
5354
testImplementation project(path: xpackModule('data-streams'), configuration: 'default')
5455
// This should not be here
5556
testImplementation project(path: xpackModule('security'), configuration: 'testArtifacts')

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@
6565
import org.elasticsearch.threadpool.ScalingExecutorBuilder;
6666
import org.elasticsearch.threadpool.ThreadPool;
6767
import org.elasticsearch.watcher.ResourceWatcherService;
68+
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderConfiguration;
69+
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderService;
6870
import org.elasticsearch.xpack.core.ClientHelper;
6971
import org.elasticsearch.xpack.core.XPackPlugin;
7072
import org.elasticsearch.xpack.core.XPackSettings;
@@ -212,6 +214,8 @@
212214
import org.elasticsearch.xpack.ml.action.TransportValidateDetectorAction;
213215
import org.elasticsearch.xpack.ml.action.TransportValidateJobConfigAction;
214216
import org.elasticsearch.xpack.ml.annotations.AnnotationPersister;
217+
import org.elasticsearch.xpack.ml.autoscaling.MlAutoscalingDeciderService;
218+
import org.elasticsearch.xpack.ml.autoscaling.MlAutoscalingNamedWritableProvider;
215219
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobBuilder;
216220
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
217221
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
@@ -413,7 +417,7 @@ public Set<DiscoveryNodeRole> getRoles() {
413417
// controls the types of jobs that can be created, and each job alone is considerably smaller than what each node
414418
// can handle.
415419
public static final Setting<Integer> MAX_MACHINE_MEMORY_PERCENT =
416-
Setting.intSetting("xpack.ml.max_machine_memory_percent", 30, 5, 200, Property.Dynamic, Property.NodeScope);
420+
Setting.intSetting("xpack.ml.max_machine_memory_percent", 30, 5, 200, Property.Dynamic, Property.NodeScope);
417421
/**
418422
* This boolean value indicates if `max_machine_memory_percent` should be ignored and a automatic calculation is used instead.
419423
*
@@ -469,6 +473,17 @@ public Set<DiscoveryNodeRole> getRoles() {
469473
Property.NodeScope
470474
);
471475

476+
/**
477+
* This is the maximum possible node size for a machine learning node. It is useful when determining if a job could ever be opened
478+
* on the cluster.
479+
*
480+
* If the value is the default special case of `0b`, that means the value is ignored when assigning jobs.
481+
*/
482+
public static final Setting<ByteSizeValue> MAX_ML_NODE_SIZE = Setting.byteSizeSetting(
483+
"xpack.ml.max_ml_node_size",
484+
ByteSizeValue.ZERO,
485+
Property.NodeScope);
486+
472487
private static final Logger logger = LogManager.getLogger(MachineLearning.class);
473488

474489
private final Settings settings;
@@ -483,6 +498,7 @@ public Set<DiscoveryNodeRole> getRoles() {
483498
private final SetOnce<ActionFilter> mlUpgradeModeActionFilter = new SetOnce<>();
484499
private final SetOnce<CircuitBreaker> inferenceModelBreaker = new SetOnce<>();
485500
private final SetOnce<ModelLoadingService> modelLoadingService = new SetOnce<>();
501+
private final SetOnce<MlAutoscalingDeciderService> mlAutoscalingDeciderService = new SetOnce<>();
486502

487503
public MachineLearning(Settings settings, Path configPath) {
488504
this.settings = settings;
@@ -520,7 +536,8 @@ public List<Setting<?>> getSettings() {
520536
ModelLoadingService.INFERENCE_MODEL_CACHE_TTL,
521537
ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES,
522538
NIGHTLY_MAINTENANCE_REQUESTS_PER_SECOND,
523-
USE_AUTO_MACHINE_MEMORY_PERCENT));
539+
USE_AUTO_MACHINE_MEMORY_PERCENT,
540+
MAX_ML_NODE_SIZE));
524541
}
525542

526543
public Settings additionalSettings() {
@@ -754,6 +771,8 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
754771
// Perform node startup operations
755772
nativeStorageProvider.cleanupLocalTmpStorageInCaseOfUncleanShutdown();
756773

774+
mlAutoscalingDeciderService.set(new MlAutoscalingDeciderService(memoryTracker, settings, clusterService));
775+
757776
return Arrays.asList(
758777
mlLifeCycleService,
759778
jobResultsProvider,
@@ -1085,6 +1104,7 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
10851104
namedXContent.addAll(new MlDataFrameAnalysisNamedXContentProvider().getNamedXContentParsers());
10861105
namedXContent.addAll(new MlInferenceNamedXContentProvider().getNamedXContentParsers());
10871106
namedXContent.addAll(new MlModelSizeNamedXContentProvider().getNamedXContentParsers());
1107+
namedXContent.addAll(MlAutoscalingNamedWritableProvider.getXContentParsers());
10881108
return namedXContent;
10891109
}
10901110

@@ -1115,4 +1135,13 @@ public void setCircuitBreaker(CircuitBreaker circuitBreaker) {
11151135
assert circuitBreaker.getName().equals(TRAINED_MODEL_CIRCUIT_BREAKER_NAME);
11161136
this.inferenceModelBreaker.set(circuitBreaker);
11171137
}
1138+
1139+
public Collection<AutoscalingDeciderService<? extends AutoscalingDeciderConfiguration>> deciders() {
1140+
if (enabled) {
1141+
assert mlAutoscalingDeciderService.get() != null;
1142+
return Collections.singletonList(mlAutoscalingDeciderService.get());
1143+
} else {
1144+
return Collections.emptyList();
1145+
}
1146+
}
11181147
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -635,6 +635,7 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(TaskParams params,
635635
maxOpenJobs,
636636
Integer.MAX_VALUE,
637637
maxMachineMemoryPercent,
638+
maxNodeMemory,
638639
isMemoryTrackerRecentlyRefreshed,
639640
useAutoMemoryPercentage
640641
);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.ml.autoscaling;
8+
9+
import org.elasticsearch.common.ParseField;
10+
import org.elasticsearch.common.io.stream.StreamInput;
11+
import org.elasticsearch.common.io.stream.StreamOutput;
12+
import org.elasticsearch.common.unit.TimeValue;
13+
import org.elasticsearch.common.xcontent.ObjectParser;
14+
import org.elasticsearch.common.xcontent.XContentBuilder;
15+
import org.elasticsearch.common.xcontent.XContentParser;
16+
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderConfiguration;
17+
18+
import java.io.IOException;
19+
import java.util.Objects;
20+
21+
public class MlAutoscalingDeciderConfiguration implements AutoscalingDeciderConfiguration {
22+
static final String NAME = "ml";
23+
24+
private static final int DEFAULT_ANOMALY_JOBS_IN_QUEUE = 0;
25+
private static final int DEFAULT_ANALYTICS_JOBS_IN_QUEUE = 0;
26+
27+
private static final ParseField NUM_ANOMALY_JOBS_IN_QUEUE = new ParseField("num_anomaly_jobs_in_queue");
28+
private static final ParseField NUM_ANALYTICS_JOBS_IN_QUEUE = new ParseField("num_analytics_jobs_in_queue");
29+
private static final ParseField DOWN_SCALE_DELAY = new ParseField("down_scale_delay");
30+
31+
private static final ObjectParser<MlAutoscalingDeciderConfiguration.Builder, Void> PARSER = new ObjectParser<>(NAME,
32+
MlAutoscalingDeciderConfiguration.Builder::new);
33+
34+
static {
35+
PARSER.declareInt(MlAutoscalingDeciderConfiguration.Builder::setNumAnomalyJobsInQueue, NUM_ANOMALY_JOBS_IN_QUEUE);
36+
PARSER.declareInt(MlAutoscalingDeciderConfiguration.Builder::setNumAnalyticsJobsInQueue, NUM_ANALYTICS_JOBS_IN_QUEUE);
37+
PARSER.declareString(MlAutoscalingDeciderConfiguration.Builder::setDownScaleDelay, DOWN_SCALE_DELAY);
38+
}
39+
40+
public static MlAutoscalingDeciderConfiguration parse(final XContentParser parser) {
41+
return PARSER.apply(parser, null).build();
42+
}
43+
44+
private final int numAnomalyJobsInQueue;
45+
private final int numAnalyticsJobsInQueue;
46+
private final TimeValue downScaleDelay;
47+
48+
MlAutoscalingDeciderConfiguration(int numAnomalyJobsInQueue, int numAnalyticsJobsInQueue, TimeValue downScaleDelay) {
49+
if (numAnomalyJobsInQueue < 0) {
50+
throw new IllegalArgumentException("[" + NUM_ANOMALY_JOBS_IN_QUEUE.getPreferredName() + "] must be non-negative");
51+
}
52+
if (numAnalyticsJobsInQueue < 0) {
53+
throw new IllegalArgumentException("[" + NUM_ANALYTICS_JOBS_IN_QUEUE.getPreferredName() + "] must be non-negative");
54+
}
55+
this.numAnalyticsJobsInQueue = numAnalyticsJobsInQueue;
56+
this.numAnomalyJobsInQueue = numAnomalyJobsInQueue;
57+
this.downScaleDelay = downScaleDelay;
58+
}
59+
60+
public MlAutoscalingDeciderConfiguration(StreamInput in) throws IOException {
61+
numAnomalyJobsInQueue = in.readVInt();
62+
numAnalyticsJobsInQueue = in.readVInt();
63+
downScaleDelay = in.readTimeValue();
64+
}
65+
66+
@Override
67+
public String name() {
68+
return NAME;
69+
}
70+
71+
@Override
72+
public String getWriteableName() {
73+
return NAME;
74+
}
75+
76+
@Override
77+
public void writeTo(StreamOutput out) throws IOException {
78+
out.writeVInt(numAnomalyJobsInQueue);
79+
out.writeVInt(numAnalyticsJobsInQueue);
80+
out.writeTimeValue(downScaleDelay);
81+
}
82+
83+
public int getNumAnomalyJobsInQueue() {
84+
return numAnomalyJobsInQueue;
85+
}
86+
87+
public int getNumAnalyticsJobsInQueue() {
88+
return numAnalyticsJobsInQueue;
89+
}
90+
91+
public TimeValue getDownScaleDelay() {
92+
return downScaleDelay;
93+
}
94+
95+
@Override
96+
public boolean equals(Object o) {
97+
if (this == o) return true;
98+
if (o == null || getClass() != o.getClass()) return false;
99+
MlAutoscalingDeciderConfiguration that = (MlAutoscalingDeciderConfiguration) o;
100+
return numAnomalyJobsInQueue == that.numAnomalyJobsInQueue &&
101+
numAnalyticsJobsInQueue == that.numAnalyticsJobsInQueue &&
102+
Objects.equals(downScaleDelay, that.downScaleDelay);
103+
}
104+
105+
@Override
106+
public int hashCode() {
107+
return Objects.hash(numAnomalyJobsInQueue, numAnalyticsJobsInQueue, downScaleDelay);
108+
}
109+
110+
@Override
111+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
112+
builder.startObject();
113+
builder.field(NUM_ANOMALY_JOBS_IN_QUEUE .getPreferredName(), numAnomalyJobsInQueue);
114+
builder.field(NUM_ANALYTICS_JOBS_IN_QUEUE.getPreferredName(), numAnalyticsJobsInQueue);
115+
builder.field(DOWN_SCALE_DELAY.getPreferredName(), downScaleDelay.getStringRep());
116+
builder.endObject();
117+
return builder;
118+
}
119+
120+
public static Builder builder() {
121+
return new Builder();
122+
}
123+
124+
public static class Builder {
125+
126+
private int numAnomalyJobsInQueue = DEFAULT_ANOMALY_JOBS_IN_QUEUE;
127+
private int numAnalyticsJobsInQueue = DEFAULT_ANALYTICS_JOBS_IN_QUEUE;
128+
private TimeValue downScaleDelay = TimeValue.ZERO;
129+
130+
public Builder setNumAnomalyJobsInQueue(int numAnomalyJobsInQueue) {
131+
this.numAnomalyJobsInQueue = numAnomalyJobsInQueue;
132+
return this;
133+
}
134+
135+
public Builder setNumAnalyticsJobsInQueue(int numAnalyticsJobsInQueue) {
136+
this.numAnalyticsJobsInQueue = numAnalyticsJobsInQueue;
137+
return this;
138+
}
139+
140+
Builder setDownScaleDelay(String unparsedTimeValue) {
141+
return setDownScaleDelay(TimeValue.parseTimeValue(unparsedTimeValue, DOWN_SCALE_DELAY.getPreferredName()));
142+
}
143+
144+
public Builder setDownScaleDelay(TimeValue downScaleDelay) {
145+
this.downScaleDelay = Objects.requireNonNull(downScaleDelay);
146+
return this;
147+
}
148+
149+
public MlAutoscalingDeciderConfiguration build() {
150+
return new MlAutoscalingDeciderConfiguration(numAnomalyJobsInQueue, numAnalyticsJobsInQueue, downScaleDelay);
151+
}
152+
}
153+
154+
}

0 commit comments

Comments
 (0)