Skip to content

Commit ff7df40

Browse files
authored
[ML] Uplift model memory limit on job migration (#37126)
When a 6.1-6.3 job is opened in a later version we increase the model memory limit by 30% if it's below 0.5GB. The migration of jobs from cluster state to the config index changes the job version, so we need to also do this uplift as part of that config migration. Relates #36961
1 parent 21d52f0 commit ff7df40

File tree

2 files changed

+16
-40
lines changed

2 files changed

+16
-40
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.elasticsearch.xpack.core.ml.MlMetadata;
3838
import org.elasticsearch.xpack.core.ml.MlTasks;
3939
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
40+
import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
4041
import org.elasticsearch.xpack.core.ml.job.config.Job;
4142
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
4243
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
@@ -403,11 +404,23 @@ public static Job updateJobForMigration(Job job) {
403404
Map<String, Object> custom = job.getCustomSettings() == null ? new HashMap<>() : new HashMap<>(job.getCustomSettings());
404405
custom.put(MIGRATED_FROM_VERSION, job.getJobVersion());
405406
builder.setCustomSettings(custom);
407+
// Increase the model memory limit for 6.1 - 6.3 jobs
408+
Version jobVersion = job.getJobVersion();
409+
if (jobVersion != null && jobVersion.onOrAfter(Version.V_6_1_0) && jobVersion.before(Version.V_6_3_0)) {
410+
// Increase model memory limit if < 512MB
411+
if (job.getAnalysisLimits() != null && job.getAnalysisLimits().getModelMemoryLimit() != null &&
412+
job.getAnalysisLimits().getModelMemoryLimit() < 512L) {
413+
long updatedModelMemoryLimit = (long) (job.getAnalysisLimits().getModelMemoryLimit() * 1.3);
414+
AnalysisLimits limits = new AnalysisLimits(updatedModelMemoryLimit,
415+
job.getAnalysisLimits().getCategorizationExamplesLimit());
416+
builder.setAnalysisLimits(limits);
417+
}
418+
}
406419
// Pre v5.5 (ml beta) jobs do not have a version.
407420
// These jobs cannot be opened, we rely on the missing version
408421
// to indicate this.
409422
// See TransportOpenJobAction.validate()
410-
if (job.getJobVersion() != null) {
423+
if (jobVersion != null) {
411424
builder.setJobVersion(Version.CURRENT);
412425
}
413426
return builder.build();

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

Lines changed: 2 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,6 @@
5555
import org.elasticsearch.xpack.core.ml.MlTasks;
5656
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
5757
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
58-
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
59-
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
60-
import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
6158
import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
6259
import org.elasticsearch.xpack.core.ml.job.config.Job;
6360
import org.elasticsearch.xpack.core.ml.job.config.JobState;
@@ -540,50 +537,16 @@ public void onFailure(Exception e) {
540537
);
541538

542539
// Tell the job tracker to refresh the memory requirement for this job and all other jobs that have persistent tasks
543-
ActionListener<PutJobAction.Response> jobUpdateListener = ActionListener.wrap(
540+
ActionListener<Boolean> jobUpdateListener = ActionListener.wrap(
544541
response -> memoryTracker.refreshJobMemoryAndAllOthers(jobParams.getJobId(), memoryRequirementRefreshListener),
545542
listener::onFailure
546543
);
547544

548-
// Increase the model memory limit for 6.1 - 6.3 jobs
549-
ActionListener<Boolean> missingMappingsListener = ActionListener.wrap(
550-
response -> {
551-
Job job = jobParams.getJob();
552-
if (job != null) {
553-
Version jobVersion = job.getJobVersion();
554-
if (jobVersion != null &&
555-
(jobVersion.onOrAfter(Version.V_6_1_0) && jobVersion.before(Version.V_6_3_0))) {
556-
// Increase model memory limit if < 512MB
557-
if (job.getAnalysisLimits() != null && job.getAnalysisLimits().getModelMemoryLimit() != null &&
558-
job.getAnalysisLimits().getModelMemoryLimit() < 512L) {
559-
560-
long updatedModelMemoryLimit = (long) (job.getAnalysisLimits().getModelMemoryLimit() * 1.3);
561-
AnalysisLimits limits = new AnalysisLimits(updatedModelMemoryLimit,
562-
job.getAnalysisLimits().getCategorizationExamplesLimit());
563-
564-
JobUpdate update = new JobUpdate.Builder(job.getId()).setJobVersion(Version.CURRENT)
565-
.setAnalysisLimits(limits).build();
566-
UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(job.getId(), update);
567-
executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest,
568-
jobUpdateListener);
569-
} else {
570-
jobUpdateListener.onResponse(null);
571-
}
572-
}
573-
else {
574-
jobUpdateListener.onResponse(null);
575-
}
576-
} else {
577-
jobUpdateListener.onResponse(null);
578-
}
579-
}, listener::onFailure
580-
);
581-
582545
// Try adding state doc mapping
583546
ActionListener<Boolean> resultsPutMappingHandler = ActionListener.wrap(
584547
response -> {
585548
addDocMappingIfMissing(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings::stateMapping,
586-
state, missingMappingsListener);
549+
state, jobUpdateListener);
587550
}, listener::onFailure
588551
);
589552

0 commit comments

Comments
 (0)