Skip to content

Commit 74feca2

Browse files
author
Hendrik Muhs
authored
[ML] Prevent node potentially going out of memory due to loading quantiles (#70376)
Large jobs with lots of partitions can get very big, retrieving snapshots for such a job can cause a node to go out of memory. With this change do not fetch quantiles when querying for (multiple) modelSnapshots to avoid memory overhead. Quantiles aren't needed for the API's using JobResultsProvider.modelSnapshots(...) fixes #70372
1 parent d4599ee commit 74feca2

File tree

4 files changed

+31
-36
lines changed

4 files changed

+31
-36
lines changed

x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java

+16
Original file line numberDiff line numberDiff line change
@@ -649,19 +649,23 @@ public void testGetSnapshots() {
649649
indexModelSnapshot(new ModelSnapshot.Builder(jobId).setSnapshotId("snap_2")
650650
.setTimestamp(Date.from(Instant.ofEpochMilli(10)))
651651
.setMinVersion(Version.V_7_4_0)
652+
.setQuantiles(new Quantiles(jobId, Date.from(Instant.ofEpochMilli(10)), randomAlphaOfLength(20)))
652653
.build());
653654
indexModelSnapshot(new ModelSnapshot.Builder(jobId).setSnapshotId("snap_1")
654655
.setTimestamp(Date.from(Instant.ofEpochMilli(11)))
655656
.setMinVersion(Version.V_7_2_0)
657+
.setQuantiles(new Quantiles(jobId, Date.from(Instant.ofEpochMilli(11)), randomAlphaOfLength(20)))
656658
.build());
657659
indexModelSnapshot(new ModelSnapshot.Builder(jobId).setSnapshotId("other_snap")
658660
.setTimestamp(Date.from(Instant.ofEpochMilli(12)))
659661
.setMinVersion(Version.V_7_3_0)
662+
.setQuantiles(new Quantiles(jobId, Date.from(Instant.ofEpochMilli(12)), randomAlphaOfLength(20)))
660663
.build());
661664
createJob("other_job");
662665
indexModelSnapshot(new ModelSnapshot.Builder("other_job").setSnapshotId("other_snap")
663666
.setTimestamp(Date.from(Instant.ofEpochMilli(10)))
664667
.setMinVersion(Version.CURRENT)
668+
.setQuantiles(new Quantiles("other_job", Date.from(Instant.ofEpochMilli(10)), randomAlphaOfLength(20)))
665669
.build());
666670
// Add a snapshot WITHOUT a min version.
667671
client().prepareIndex(AnomalyDetectorsIndex.jobResultsAliasedName("other_job"))
@@ -677,13 +681,17 @@ public void testGetSnapshots() {
677681
jobProvider.modelSnapshots(jobId, 0, 4, "9", "15", "", false, "snap_2,snap_1", future::onResponse, future::onFailure);
678682
List<ModelSnapshot> snapshots = future.actionGet().results();
679683
assertThat(snapshots.get(0).getSnapshotId(), equalTo("snap_2"));
684+
assertNull(snapshots.get(0).getQuantiles());
680685
assertThat(snapshots.get(1).getSnapshotId(), equalTo("snap_1"));
686+
assertNull(snapshots.get(1).getQuantiles());
681687

682688
future = new PlainActionFuture<>();
683689
jobProvider.modelSnapshots(jobId, 0, 4, "9", "15", "", false, "snap_*", future::onResponse, future::onFailure);
684690
snapshots = future.actionGet().results();
685691
assertThat(snapshots.get(0).getSnapshotId(), equalTo("snap_2"));
686692
assertThat(snapshots.get(1).getSnapshotId(), equalTo("snap_1"));
693+
assertNull(snapshots.get(0).getQuantiles());
694+
assertNull(snapshots.get(1).getQuantiles());
687695

688696
future = new PlainActionFuture<>();
689697
jobProvider.modelSnapshots(jobId, 0, 4, "9", "15", "", false, "snap_*,other_snap", future::onResponse, future::onFailure);
@@ -716,6 +724,14 @@ public void testGetSnapshots() {
716724
assertThat(snapshots.get(2).getSnapshotId(), equalTo("other_snap"));
717725
assertThat(snapshots.get(3).getSnapshotId(), equalTo("snap_2"));
718726
assertThat(snapshots.get(4).getSnapshotId(), equalTo("other_snap"));
727+
728+
// assert that quantiles are not loaded
729+
assertNull(snapshots.get(0).getQuantiles());
730+
assertNull(snapshots.get(1).getQuantiles());
731+
assertNull(snapshots.get(2).getQuantiles());
732+
assertNull(snapshots.get(3).getQuantiles());
733+
assertNull(snapshots.get(4).getQuantiles());
734+
719735
}
720736

721737
public void testGetAutodetectParams() throws Exception {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetModelSnapshotsAction.java

+1-14
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,9 @@
1717
import org.elasticsearch.tasks.Task;
1818
import org.elasticsearch.transport.TransportService;
1919
import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction;
20-
import org.elasticsearch.xpack.core.action.util.QueryPage;
21-
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
2220
import org.elasticsearch.xpack.ml.job.JobManager;
2321
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
2422

25-
import java.util.stream.Collectors;
26-
2723
public class TransportGetModelSnapshotsAction extends HandledTransportAction<GetModelSnapshotsAction.Request,
2824
GetModelSnapshotsAction.Response> {
2925

@@ -74,16 +70,7 @@ private void getModelSnapshots(GetModelSnapshotsAction.Request request, ActionLi
7470
request.getSort(),
7571
request.getDescOrder(),
7672
request.getSnapshotId(),
77-
page -> listener.onResponse(new GetModelSnapshotsAction.Response(clearQuantiles(page))),
73+
page -> listener.onResponse(new GetModelSnapshotsAction.Response(page)),
7874
listener::onFailure);
7975
}
80-
81-
public static QueryPage<ModelSnapshot> clearQuantiles(QueryPage<ModelSnapshot> page) {
82-
if (page.results() == null) {
83-
return page;
84-
}
85-
return new QueryPage<>(page.results().stream().map(snapshot ->
86-
new ModelSnapshot.Builder(snapshot).setQuantiles(null).build())
87-
.collect(Collectors.toList()), page.count(), page.getResultsField());
88-
}
8976
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java

+13
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import org.elasticsearch.search.aggregations.metrics.ExtendedStats;
8181
import org.elasticsearch.search.aggregations.metrics.Stats;
8282
import org.elasticsearch.search.builder.SearchSourceBuilder;
83+
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
8384
import org.elasticsearch.search.sort.FieldSortBuilder;
8485
import org.elasticsearch.search.sort.SortBuilders;
8586
import org.elasticsearch.search.sort.SortOrder;
@@ -154,6 +155,13 @@ public class JobResultsProvider {
154155
public static final int BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE = 20;
155156
private static final double ESTABLISHED_MEMORY_CV_THRESHOLD = 0.1;
156157

158+
// filter for quantiles in modelSnapshots to avoid memory overhead
159+
private static final FetchSourceContext REMOVE_QUANTILES_FROM_SOURCE = new FetchSourceContext(
160+
true,
161+
null,
162+
new String[] { ModelSnapshot.QUANTILES.getPreferredName() }
163+
);
164+
157165
private final Client client;
158166
private final Settings settings;
159167
private final IndexNameExpressionResolver resolver;
@@ -1006,6 +1014,8 @@ public void getModelSnapshot(String jobId, @Nullable String modelSnapshotId, Con
10061014
/**
10071015
* Get model snapshots for the job ordered by descending timestamp (newest first).
10081016
*
1017+
* Note: quantiles are removed from the results.
1018+
*
10091019
* @param jobId the job id
10101020
* @param from number of snapshots to from
10111021
* @param size number of snapshots to retrieve
@@ -1018,6 +1028,8 @@ public void modelSnapshots(String jobId, int from, int size, Consumer<QueryPage<
10181028
/**
10191029
* Get model snapshots for the job ordered by descending restore priority.
10201030
*
1031+
* Note: quantiles are removed from the results.
1032+
*
10211033
* @param jobId the job id
10221034
* @param from number of snapshots to from
10231035
* @param size number of snapshots to retrieve
@@ -1082,6 +1094,7 @@ private void modelSnapshots(String jobId,
10821094
sourceBuilder.from(from);
10831095
sourceBuilder.size(size);
10841096
sourceBuilder.trackTotalHits(true);
1097+
sourceBuilder.fetchSource(REMOVE_QUANTILES_FROM_SOURCE);
10851098
searchRequest.source(sourceBuilder);
10861099
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest,
10871100
ActionListener.<SearchResponse>wrap(searchResponse -> {

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/modelsnapshots/GetModelSnapshotsTests.java

+1-22
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,9 @@
66
*/
77
package org.elasticsearch.xpack.ml.modelsnapshots;
88

9-
import org.elasticsearch.common.ParseField;
109
import org.elasticsearch.test.ESTestCase;
11-
import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction;
12-
import org.elasticsearch.xpack.ml.action.TransportGetModelSnapshotsAction;
1310
import org.elasticsearch.xpack.core.action.util.PageParams;
14-
import org.elasticsearch.xpack.core.action.util.QueryPage;
15-
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
16-
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
17-
18-
import java.util.Arrays;
19-
import java.util.Date;
11+
import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction;
2012

2113
public class GetModelSnapshotsTests extends ESTestCase {
2214

@@ -31,17 +23,4 @@ public void testModelSnapshots_GivenNegativeSize() {
3123
() -> new GetModelSnapshotsAction.Request("foo", null).setPageParams(new PageParams(10, -5)));
3224
assertEquals("Parameter [size] cannot be < 0", e.getMessage());
3325
}
34-
35-
public void testModelSnapshots_clearQuantiles() {
36-
ModelSnapshot m1 = new ModelSnapshot.Builder("jobId").setQuantiles(
37-
new Quantiles("jobId", new Date(), "quantileState")).build();
38-
ModelSnapshot m2 = new ModelSnapshot.Builder("jobId").build();
39-
40-
QueryPage<ModelSnapshot> page = new QueryPage<>(Arrays.asList(m1, m2), 2, new ParseField("field"));
41-
page = TransportGetModelSnapshotsAction.clearQuantiles(page);
42-
assertEquals(2, page.results().size());
43-
for (ModelSnapshot modelSnapshot : page.results()) {
44-
assertNull(modelSnapshot.getQuantiles());
45-
}
46-
}
4726
}

0 commit comments

Comments
 (0)