Skip to content

[7.x] [ML] adding ml autoscaling integration test (#65638) #65775

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ public int hashCode() {
return Objects.hash(results);
}

public Map<String, AutoscalingDeciderResults> getResults() {
return results;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,17 @@ public AnalysisLimits(Long categorizationExamplesLimit) {
this(DEFAULT_MODEL_MEMORY_LIMIT_MB, categorizationExamplesLimit);
}

public AnalysisLimits(Long modelMemoryLimit, Long categorizationExamplesLimit) {
if (modelMemoryLimit != null && modelMemoryLimit < 1) {
String msg = Messages.getMessage(Messages.JOB_CONFIG_MODEL_MEMORY_LIMIT_TOO_LOW, modelMemoryLimit, "1 MiB");
public AnalysisLimits(Long modelMemoryLimitMb, Long categorizationExamplesLimit) {
if (modelMemoryLimitMb != null && modelMemoryLimitMb < 1) {
String msg = Messages.getMessage(Messages.JOB_CONFIG_MODEL_MEMORY_LIMIT_TOO_LOW, modelMemoryLimitMb, "1 MiB");
throw ExceptionsHelper.badRequestException(msg);
}
if (categorizationExamplesLimit != null && categorizationExamplesLimit < 0) {
String msg = Messages.getMessage(Messages.JOB_CONFIG_FIELD_VALUE_TOO_LOW, CATEGORIZATION_EXAMPLES_LIMIT, 0,
categorizationExamplesLimit);
throw ExceptionsHelper.badRequestException(msg);
}
this.modelMemoryLimit = modelMemoryLimit;
this.modelMemoryLimit = modelMemoryLimitMb;
this.categorizationExamplesLimit = categorizationExamplesLimit;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ml.integration;

import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.autoscaling.Autoscaling;
import org.elasticsearch.xpack.autoscaling.action.GetAutoscalingCapacityAction;
import org.elasticsearch.xpack.autoscaling.action.PutAutoscalingPolicyAction;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResults;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.autoscaling.MlAutoscalingDeciderService;
import org.elasticsearch.xpack.ml.autoscaling.NativeMemoryCapacity;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasKey;

public class AutoscalingIT extends MlNativeAutodetectIntegTestCase {

private static final long BASIC_REQUIREMENT_MB = 10;
private static final long NATIVE_PROCESS_OVERHEAD_MB = 30;
private static final long BASELINE_OVERHEAD_MB = BASIC_REQUIREMENT_MB + NATIVE_PROCESS_OVERHEAD_MB;

// This test assumes that xpack.ml.max_machine_memory_percent is 30
// and that xpack.ml.use_auto_machine_memory_percent is false
public void testMLAutoscalingCapacity() {
SortedMap<String, Settings> deciders = new TreeMap<>();
deciders.put(MlAutoscalingDeciderService.NAME,
Settings.builder().put(MlAutoscalingDeciderService.DOWN_SCALE_DELAY.getKey(), TimeValue.ZERO).build());
final PutAutoscalingPolicyAction.Request request = new PutAutoscalingPolicyAction.Request(
"ml_test",
new TreeSet<>(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benwtrent do you know why this did not work? It is preventing me from getting #66082 merged.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@henningandersen when I tried to supply "ml" it complained saying it was not really a valid node role. Now, this may have been due to the ML plugin not being loaded correctly? The easiest way to check is to simply put ml in there in 7.x and ry to run the test. I will double check

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I did that and get the validation error. I only wanted to be sure I was not chasing something you already investigated deeply.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@henningandersen

 REPRODUCE WITH: ./gradlew ':x-pack:plugin:ml:qa:native-multi-node-tests:javaRestTest' --tests "org.elasticsearch.xpack.ml.integration.AutoscalingIT.testMLAutoscalingCapacity" -Dtests.seed=93A328B876AF5108 -Dtests.security.manager=true -Dtests.locale=pl -Dtests.timezone=Atlantic/Faroe -Druntime.java=11
  2> org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: ml;
        at __randomizedtesting.SeedInfo.seed([93A328B876AF5108:D856B3907EB37506]:0)
        at org.elasticsearch.xpack.autoscaling.action.PutAutoscalingPolicyAction$Request.validate(PutAutoscalingPolicyAction.java:150)
        at org.elasticsearch.action.TransportActionNodeProxy.execute(TransportActionNodeProxy.java:42)
        at org.elasticsearch.client.transport.TransportProxyClient.lambda$execute$0(TransportProxyClient.java:55)
        at org.elasticsearch.client.transport.TransportClientNodesService.execute(TransportClientNodesService.java:253)
        at org.elasticsearch.client.transport.TransportProxyClient.execute(TransportProxyClient.java:55)
        at org.elasticsearch.client.transport.TransportClient.doExecute(TransportClient.java:391)
        at org.elasticsearch.client.support.AbstractClient.execute(AbstractClient.java:412)
        at org.elasticsearch.client.FilterClient.doExecute(FilterClient.java:65)
        at org.elasticsearch.client.support.AbstractClient.execute(AbstractClient.java:412)
        at org.elasticsearch.client.support.AbstractClient.execute(AbstractClient.java:401)
        at org.elasticsearch.xpack.ml.integration.AutoscalingIT.testMLAutoscalingCapacity(AutoscalingIT.java:57)

I THINK the node role format has changed in master vs 7.x. Passing ml as the only role works fine in master.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhhh, the validation runs in the transport client and this causes the issue, since the client does not know about the ml plugin. Using a core role like master passes that validation (but fails the test with my PR due to validation).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me open a PR to move the validation out of the request in 7.x.

deciders
);
assertAcked(client().execute(PutAutoscalingPolicyAction.INSTANCE, request).actionGet());

assertMlCapacity(
client().execute(
GetAutoscalingCapacityAction.INSTANCE,
new GetAutoscalingCapacityAction.Request()
).actionGet(),
"Requesting scale down as tier and/or node size could be smaller",
0L,
0L);

putJob("job1", 100);
putJob("job2", 200);
openJob("job1");
openJob("job2");
long expectedTierBytes = (long)Math.ceil(
ByteSizeValue.ofMb(100 + BASELINE_OVERHEAD_MB + 200 + BASELINE_OVERHEAD_MB).getBytes() * 100 / 30.0
);
long expectedNodeBytes = (long)Math.ceil(ByteSizeValue.ofMb(200 + BASELINE_OVERHEAD_MB).getBytes() * 100 / 30.0);

assertMlCapacity(
client().execute(
GetAutoscalingCapacityAction.INSTANCE,
new GetAutoscalingCapacityAction.Request()
).actionGet(),
"Requesting scale down as tier and/or node size could be smaller",
expectedTierBytes,
expectedNodeBytes);

putJob("bigjob1", 60_000);
putJob("bigjob2", 50_000);
openJob("bigjob1");
openJob("bigjob2");
List<DiscoveryNode> mlNodes = admin()
.cluster()
.prepareNodesInfo()
.all()
.get()
.getNodes()
.stream()
.map(NodeInfo::getNode)
.filter(MachineLearning::isMlNode)
.collect(Collectors.toList());
NativeMemoryCapacity currentScale = MlAutoscalingDeciderService.currentScale(mlNodes, 30, false);
expectedTierBytes = (long)Math.ceil(
(ByteSizeValue.ofMb(50_000 + BASIC_REQUIREMENT_MB + 60_000 + BASIC_REQUIREMENT_MB).getBytes()
+ currentScale.getTier()
) * 100 / 30.0
);
expectedNodeBytes = (long)Math.ceil(ByteSizeValue.ofMb(60_000 + BASELINE_OVERHEAD_MB).getBytes() * 100 / 30.0);


assertMlCapacity(
client().execute(
GetAutoscalingCapacityAction.INSTANCE,
new GetAutoscalingCapacityAction.Request()
).actionGet(),
"requesting scale up as number of jobs in queues exceeded configured limit",
expectedTierBytes,
expectedNodeBytes);

expectedTierBytes = (long)Math.ceil(
ByteSizeValue.ofMb(100 + BASELINE_OVERHEAD_MB + 200 + BASELINE_OVERHEAD_MB).getBytes() * 100 / 30.0
);
expectedNodeBytes = (long)Math.ceil(ByteSizeValue.ofMb(200 + BASELINE_OVERHEAD_MB).getBytes() * 100 / 30.0);
closeJob("bigjob1");
closeJob("bigjob2");

assertMlCapacity(
client().execute(
GetAutoscalingCapacityAction.INSTANCE,
new GetAutoscalingCapacityAction.Request()
).actionGet(),
"Requesting scale down as tier and/or node size could be smaller",
expectedTierBytes,
expectedNodeBytes);
closeJob("job1");
closeJob("job2");

assertMlCapacity(
client().execute(
GetAutoscalingCapacityAction.INSTANCE,
new GetAutoscalingCapacityAction.Request()
).actionGet(),
"Requesting scale down as tier and/or node size could be smaller",
0L,
0L);
}

private void assertMlCapacity(GetAutoscalingCapacityAction.Response capacity, String reason, long tierBytes, long nodeBytes) {
assertThat(capacity.getResults(), hasKey("ml_test"));
AutoscalingDeciderResults autoscalingDeciderResults = capacity.getResults().get("ml_test");
assertThat(autoscalingDeciderResults.results(), hasKey("ml"));

AutoscalingDeciderResult autoscalingDeciderResult = autoscalingDeciderResults.results().get("ml");
assertThat(autoscalingDeciderResult.reason().summary(), containsString(reason));
assertThat(autoscalingDeciderResult.requiredCapacity().tier().memory().getBytes(), equalTo(tierBytes));
assertThat(autoscalingDeciderResult.requiredCapacity().node().memory().getBytes(), equalTo(nodeBytes));
}

private void putJob(String jobId, long limitMb) {
Job.Builder job =
new Job.Builder(jobId)
.setAllowLazyOpen(true)
.setAnalysisLimits(new AnalysisLimits(limitMb, null))
.setAnalysisConfig(
new AnalysisConfig.Builder((List<Detector>) null)
.setBucketSpan(TimeValue.timeValueHours(1))
.setDetectors(
Collections.singletonList(
new Detector.Builder("count", null)
.setPartitionFieldName("user")
.build())))
.setDataDescription(
new DataDescription.Builder()
.setTimeFormat("epoch"));

registerJob(job);
putJob(job);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
package org.elasticsearch.xpack.ml.integration;

import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.xpack.autoscaling.Autoscaling;
import org.elasticsearch.xpack.autoscaling.AutoscalingMetadata;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult;
import org.elasticsearch.xpack.core.action.CreateDataStreamAction;
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
Expand Down Expand Up @@ -72,6 +76,9 @@
import org.elasticsearch.xpack.core.slm.history.SnapshotLifecycleTemplateRegistry;
import org.elasticsearch.xpack.datastreams.DataStreamsPlugin;
import org.elasticsearch.xpack.ilm.IndexLifecycle;
import org.elasticsearch.xpack.ml.LocalStateMachineLearning;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.autoscaling.MlScalingReason;

import java.io.IOException;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -106,7 +113,10 @@ protected NamedXContentRegistry xContentRegistry() {
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(
LocalStateCompositeXPackPlugin.class,
MachineLearning.class,
Netty4Plugin.class,
Autoscaling.class,
ReindexPlugin.class,
// The monitoring plugin requires script and gsub processors to be loaded
IngestCommonPlugin.class,
// The monitoring plugin script processor references painless. Include this for script compilation.
Expand All @@ -121,6 +131,8 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Arrays.asList(
XPackClientPlugin.class,
Autoscaling.class,
MachineLearning.class,
Netty4Plugin.class,
ReindexPlugin.class,
// ILM is required for .ml-state template index settings
Expand All @@ -144,6 +156,7 @@ protected Settings externalClusterClientSettings() {
builder.put(SecurityField.USER_SETTING.getKey(), "x_pack_rest_user:" + SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING);
builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), true);
builder.put(XPackSettings.WATCHER_ENABLED.getKey(), false);
builder.put(Autoscaling.AUTOSCALING_ENABLED_SETTING.getKey(), true);
builder.put(LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.getKey(), false);
builder.put(LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING.getKey(), false);
builder.put("xpack.security.transport.ssl.enabled", true);
Expand Down Expand Up @@ -236,18 +249,25 @@ protected void ensureClusterStateConsistency() throws IOException {
entries.add(new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new));
entries.add(new NamedWriteableRegistry.Entry(LifecycleAction.class, RolloverAction.NAME, RolloverAction::new));
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.DATAFEED_TASK_NAME,
StartDatafeedAction.DatafeedParams::new));
StartDatafeedAction.DatafeedParams::new));
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME,
StartDataFrameAnalyticsAction.TaskParams::new));
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.JOB_TASK_NAME,
OpenJobAction.JobParams::new));
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.JOB_SNAPSHOT_UPGRADE_TASK_NAME,
SnapshotUpgradeTaskParams::new));
OpenJobAction.JobParams::new));
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new));
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, DatafeedState.NAME, DatafeedState::fromStream));
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, DataFrameAnalyticsTaskState.NAME,
DataFrameAnalyticsTaskState::new));
entries.add(new NamedWriteableRegistry.Entry(ClusterState.Custom.class, TokenMetadata.TYPE, TokenMetadata::new));
entries.add(new NamedWriteableRegistry.Entry(Metadata.Custom.class, AutoscalingMetadata.NAME, AutoscalingMetadata::new));
entries.add(new NamedWriteableRegistry.Entry(NamedDiff.class,
AutoscalingMetadata.NAME,
AutoscalingMetadata.AutoscalingMetadataDiff::new));
entries.add(new NamedWriteableRegistry.Entry(
AutoscalingDeciderResult.Reason.class,
MlScalingReason.NAME,
MlScalingReason::new
));
final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries);
ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState();
byte[] masterClusterStateBytes = ClusterState.Builder.toBytes(masterClusterState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ public Set<DiscoveryNodeRole> getRoles() {
false,
Property.NodeScope);
public static final Setting<Integer> MAX_LAZY_ML_NODES =
Setting.intSetting("xpack.ml.max_lazy_ml_nodes", 0, 0, 3, Property.Dynamic, Property.NodeScope);
Setting.intSetting("xpack.ml.max_lazy_ml_nodes", 0, 0, Property.Dynamic, Property.NodeScope);

// Before 8.0.0 this needs to match the max allowed value for xpack.ml.max_open_jobs,
// as the current node could be running in a cluster where some nodes are still using
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService,
private static final String MEMORY_STALE = "unable to make scaling decision as job memory requirements are stale";
private static final long NO_SCALE_DOWN_POSSIBLE = -1L;

public static final String NAME = "ml";
public static final Setting<Integer> NUM_ANOMALY_JOBS_IN_QUEUE = Setting.intSetting("num_anomaly_jobs_in_queue", 0, 0);
public static final Setting<Integer> NUM_ANALYTICS_JOBS_IN_QUEUE = Setting.intSetting("num_analytics_jobs_in_queue", 0, 0);
public static final Setting<TimeValue> DOWN_SCALE_DELAY = Setting.timeSetting("num_analytics_jobs_in_queue", TimeValue.ZERO);
public static final Setting<TimeValue> DOWN_SCALE_DELAY = Setting.timeSetting("down_scale_delay", TimeValue.timeValueHours(1));

private final NodeLoadDetector nodeLoadDetector;
private final MlMemoryTracker mlMemoryTracker;
Expand Down Expand Up @@ -156,7 +157,8 @@ static Optional<NativeMemoryCapacity> requiredCapacityForUnassignedJobs(List<Str
}
jobSizes.sort(Comparator.comparingLong(Long::longValue).reversed());
long tierMemory = 0L;
long nodeMemory = jobSizes.get(0);
// Node memory needs to be AT LEAST the size of the largest job + the required overhead.
long nodeMemory = jobSizes.get(0) + MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes();
Iterator<Long> iter = jobSizes.iterator();
while (jobSizes.size() > maxNumInQueue && iter.hasNext()) {
tierMemory += iter.next();
Expand Down Expand Up @@ -226,7 +228,9 @@ private boolean newScaleDownCheck() {
return scaleDownDetected == NO_SCALE_DOWN_POSSIBLE;
}

NativeMemoryCapacity currentScale(final List<DiscoveryNode> machineLearningNodes) {
public static NativeMemoryCapacity currentScale(final List<DiscoveryNode> machineLearningNodes,
int maxMachineMemoryPercent,
boolean useAuto) {
long[] mlMemory = machineLearningNodes.stream()
.mapToLong(node -> NativeMemoryCalculator.allowedBytesForMl(node, maxMachineMemoryPercent, useAuto).orElse(0L))
.toArray();
Expand All @@ -244,6 +248,10 @@ NativeMemoryCapacity currentScale(final List<DiscoveryNode> machineLearningNodes
);
}

NativeMemoryCapacity currentScale(final List<DiscoveryNode> machineLearningNodes) {
return currentScale(machineLearningNodes, maxMachineMemoryPercent, useAuto);
}

@Override
public void offMaster() {
isMaster = false;
Expand Down Expand Up @@ -567,14 +575,16 @@ Optional<AutoscalingDeciderResult> checkForScaleDown(List<DiscoveryNode> nodes,
return Optional.empty();
}
long currentlyNecessaryTier = nodeLoads.stream().mapToLong(NodeLoad::getAssignedJobMemory).sum();
// The required NATIVE node memory is the largest job and our static overhead.
long currentlyNecessaryNode = largestJob == 0 ? 0 : largestJob + MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes();
// We consider a scale down if we are not fully utilizing the tier
// Or our largest job could be on a smaller node (meaning the same size tier but smaller nodes are possible).
if (currentlyNecessaryTier < currentCapacity.getTier() || largestJob < currentCapacity.getNode()) {
if (currentlyNecessaryTier < currentCapacity.getTier() || currentlyNecessaryNode < currentCapacity.getNode()) {
NativeMemoryCapacity nativeMemoryCapacity = new NativeMemoryCapacity(
currentlyNecessaryTier,
largestJob,
currentlyNecessaryNode,
// If our newly suggested native capacity is the same, we can use the previously stored jvm size
largestJob == currentCapacity.getNode() ? currentCapacity.getJvmSize() : null);
currentlyNecessaryNode == currentCapacity.getNode() ? currentCapacity.getJvmSize() : null);
return Optional.of(
new AutoscalingDeciderResult(
nativeMemoryCapacity.autoscalingCapacity(maxMachineMemoryPercent, useAuto),
Expand All @@ -588,7 +598,7 @@ Optional<AutoscalingDeciderResult> checkForScaleDown(List<DiscoveryNode> nodes,

@Override
public String name() {
return "ml";
return NAME;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

public class MlScalingReason implements AutoscalingDeciderResult.Reason {

static final String NAME = "ml";
public static final String NAME = MlAutoscalingDeciderService.NAME;
static final String WAITING_ANALYTICS_JOBS = "waiting_analytics_jobs";
static final String WAITING_ANOMALY_JOBS = "waiting_anomaly_jobs";
static final String CONFIGURATION = "configuration";
Expand Down
Loading