Skip to content

Commit d2aee62

Browse files
[ML] Delete unused data frame analytics state (#50243)
This commit adds removal of unused data frame analytics state from the _delete_expired_data API (and in extend th ML daily maintenance task). At the moment the potential state docs include the progress document and state for regression and classification analyses.
1 parent 230d476 commit d2aee62

25 files changed

+235
-22
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java

+10
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.elasticsearch.xpack.core.ml.dataframe;
77

88
import org.elasticsearch.Version;
9+
import org.elasticsearch.common.Nullable;
910
import org.elasticsearch.common.ParseField;
1011
import org.elasticsearch.common.Strings;
1112
import org.elasticsearch.common.io.stream.StreamInput;
@@ -310,6 +311,15 @@ public static String documentId(String id) {
310311
return TYPE + "-" + id;
311312
}
312313

314+
/**
315+
* Returns the job id from the doc id. Returns {@code null} if the doc id is invalid.
316+
*/
317+
@Nullable
318+
public static String extractJobIdFromDocId(String docId) {
319+
String jobId = docId.replaceAll("^" + TYPE +"-", "");
320+
return jobId.equals(docId) ? null : jobId;
321+
}
322+
313323
public static class Builder {
314324

315325
private String id;

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/Classification.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public class Classification implements DataFrameAnalysis {
3939
public static final ParseField TRAINING_PERCENT = new ParseField("training_percent");
4040
public static final ParseField RANDOMIZE_SEED = new ParseField("randomize_seed");
4141

42+
private static final String STATE_DOC_ID_SUFFIX = "_classification_state#1";
43+
4244
private static final ConstructingObjectParser<Classification, Void> LENIENT_PARSER = createParser(true);
4345
private static final ConstructingObjectParser<Classification, Void> STRICT_PARSER = createParser(false);
4446

@@ -256,7 +258,12 @@ public boolean persistsState() {
256258

257259
@Override
258260
public String getStateDocId(String jobId) {
259-
return jobId + "_classification_state#1";
261+
return jobId + STATE_DOC_ID_SUFFIX;
262+
}
263+
264+
public static String extractJobIdFromStateDoc(String stateDocId) {
265+
int suffixIndex = stateDocId.lastIndexOf(STATE_DOC_ID_SUFFIX);
266+
return suffixIndex <= 0 ? null : stateDocId.substring(0, suffixIndex);
260267
}
261268

262269
@Override

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/Regression.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public class Regression implements DataFrameAnalysis {
3636
public static final ParseField TRAINING_PERCENT = new ParseField("training_percent");
3737
public static final ParseField RANDOMIZE_SEED = new ParseField("randomize_seed");
3838

39+
private static final String STATE_DOC_ID_SUFFIX = "_regression_state#1";
40+
3941
private static final ConstructingObjectParser<Regression, Void> LENIENT_PARSER = createParser(true);
4042
private static final ConstructingObjectParser<Regression, Void> STRICT_PARSER = createParser(false);
4143

@@ -196,7 +198,12 @@ public boolean persistsState() {
196198

197199
@Override
198200
public String getStateDocId(String jobId) {
199-
return jobId + "_regression_state#1";
201+
return jobId + STATE_DOC_ID_SUFFIX;
202+
}
203+
204+
public static String extractJobIdFromStateDoc(String stateDocId) {
205+
int suffixIndex = stateDocId.lastIndexOf(STATE_DOC_ID_SUFFIX);
206+
return suffixIndex <= 0 ? null : stateDocId.substring(0, suffixIndex);
200207
}
201208

202209
@Override

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java

+8
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import static org.hamcrest.Matchers.is;
5454
import static org.hamcrest.Matchers.not;
5555
import static org.hamcrest.Matchers.notNullValue;
56+
import static org.hamcrest.Matchers.nullValue;
5657
import static org.hamcrest.Matchers.startsWith;
5758

5859
public class DataFrameAnalyticsConfigTests extends AbstractSerializingTestCase<DataFrameAnalyticsConfig> {
@@ -384,6 +385,13 @@ public void testToXContent_GivenAnalysisWithRandomizeSeedAndVersionIsBeforeItWas
384385
}
385386
}
386387

388+
public void testExtractJobIdFromDocId() {
389+
assertThat(DataFrameAnalyticsConfig.extractJobIdFromDocId("data_frame_analytics_config-foo"), equalTo("foo"));
390+
assertThat(DataFrameAnalyticsConfig.extractJobIdFromDocId("data_frame_analytics_config-data_frame_analytics_config-foo"),
391+
equalTo("data_frame_analytics_config-foo"));
392+
assertThat(DataFrameAnalyticsConfig.extractJobIdFromDocId("foo"), is(nullValue()));
393+
}
394+
387395
private static void assertTooSmall(ElasticsearchStatusException e) {
388396
assertThat(e.getMessage(), startsWith("model_memory_limit must be at least 1kb."));
389397
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/ClassificationTests.java

+5
Original file line numberDiff line numberDiff line change
@@ -216,4 +216,9 @@ public void testGetStateDocId() {
216216
String randomId = randomAlphaOfLength(10);
217217
assertThat(classification.getStateDocId(randomId), equalTo(randomId + "_classification_state#1"));
218218
}
219+
220+
public void testExtractJobIdFromStateDoc() {
221+
assertThat(Classification.extractJobIdFromStateDoc("foo_bar-1_classification_state#1"), equalTo("foo_bar-1"));
222+
assertThat(Classification.extractJobIdFromStateDoc("noop"), is(nullValue()));
223+
}
219224
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/RegressionTests.java

+5
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,11 @@ public void testGetStateDocId() {
110110
assertThat(regression.getStateDocId(randomId), equalTo(randomId + "_regression_state#1"));
111111
}
112112

113+
public void testExtractJobIdFromStateDoc() {
114+
assertThat(Regression.extractJobIdFromStateDoc("foo_bar-1_regression_state#1"), equalTo("foo_bar-1"));
115+
assertThat(Regression.extractJobIdFromStateDoc("noop"), is(nullValue()));
116+
}
117+
113118
public void testToXContent_GivenVersionBeforeRandomizeSeedWasIntroduced() throws IOException {
114119
Regression regression = createRandom();
115120
assertThat(regression.getRandomizeSeed(), is(notNullValue()));

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java

+34
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,15 @@
1010
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
1111
import org.elasticsearch.action.bulk.BulkRequestBuilder;
1212
import org.elasticsearch.action.bulk.BulkResponse;
13+
import org.elasticsearch.action.delete.DeleteResponse;
1314
import org.elasticsearch.action.get.GetResponse;
1415
import org.elasticsearch.action.index.IndexAction;
1516
import org.elasticsearch.action.index.IndexRequest;
1617
import org.elasticsearch.action.search.SearchResponse;
1718
import org.elasticsearch.action.support.WriteRequest;
1819
import org.elasticsearch.index.query.QueryBuilder;
1920
import org.elasticsearch.index.query.QueryBuilders;
21+
import org.elasticsearch.rest.RestStatus;
2022
import org.elasticsearch.search.SearchHit;
2123
import org.elasticsearch.xpack.core.ml.action.EvaluateDataFrameAction;
2224
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
@@ -315,6 +317,38 @@ public void testTwoJobsWithSameRandomizeSeedUseSameTrainingSet() throws Exceptio
315317
assertThat(secondRunTrainingRowsIds, equalTo(firstRunTrainingRowsIds));
316318
}
317319

320+
public void testDeleteExpiredData_RemovesUnusedState() throws Exception {
321+
initialize("classification_delete_expired_data");
322+
indexData(sourceIndex, 100, 0, KEYWORD_FIELD);
323+
324+
DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(KEYWORD_FIELD));
325+
registerAnalytics(config);
326+
putAnalytics(config);
327+
startAnalytics(jobId);
328+
waitUntilAnalyticsIsStopped(jobId);
329+
330+
assertProgress(jobId, 100, 100, 100, 100);
331+
assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L));
332+
assertModelStatePersisted(stateDocId());
333+
assertInferenceModelPersisted(jobId);
334+
335+
// Call _delete_expired_data API and check nothing was deleted
336+
assertThat(deleteExpiredData().isDeleted(), is(true));
337+
assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L));
338+
assertModelStatePersisted(stateDocId());
339+
340+
// Delete the config straight from the config index
341+
DeleteResponse deleteResponse = client().prepareDelete(".ml-config", DataFrameAnalyticsConfig.documentId(jobId))
342+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).execute().actionGet();
343+
assertThat(deleteResponse.status(), equalTo(RestStatus.OK));
344+
345+
// Now calling the _delete_expired_data API should remove unused state
346+
assertThat(deleteExpiredData().isDeleted(), is(true));
347+
348+
SearchResponse stateIndexSearchResponse = client().prepareSearch(".ml-state").execute().actionGet();
349+
assertThat(stateIndexSearchResponse.getHits().getTotalHits().value, equalTo(0L));
350+
}
351+
318352
private void initialize(String jobId) {
319353
this.jobId = jobId;
320354
this.sourceIndex = jobId + "_source_index";

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java

+2-5
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public void tearDownData() {
8787
cleanUp();
8888
}
8989

90-
public void testDeleteExpiredDataGivenNothingToDelete() throws Exception {
90+
public void testDeleteExpiredData_GivenNothingToDelete() throws Exception {
9191
// Tests that nothing goes wrong when there's nothing to delete
9292
client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get();
9393
}
@@ -201,10 +201,7 @@ public void testDeleteExpiredData() throws Exception {
201201
assertThat(indexUnusedStateDocsResponse.get().status(), equalTo(RestStatus.OK));
202202

203203
// Now call the action under test
204-
client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get();
205-
206-
// We need to refresh to ensure the deletion is visible
207-
client().admin().indices().prepareRefresh("*").get();
204+
assertThat(deleteExpiredData().isDeleted(), is(true));
208205

209206
// no-retention job should have kept all data
210207
assertThat(getBuckets("no-retention").size(), is(greaterThanOrEqualTo(70)));

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.search.SearchHits;
2222
import org.elasticsearch.search.sort.SortBuilders;
2323
import org.elasticsearch.search.sort.SortOrder;
24+
import org.elasticsearch.xpack.core.action.util.PageParams;
2425
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
2526
import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
2627
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
@@ -45,7 +46,6 @@
4546
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
4647
import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction;
4748
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
48-
import org.elasticsearch.xpack.core.action.util.PageParams;
4949
import org.elasticsearch.xpack.core.ml.calendars.Calendar;
5050
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
5151
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
4141
import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;
4242
import org.elasticsearch.xpack.core.ml.utils.QueryProvider;
43-
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
43+
import org.elasticsearch.xpack.ml.dataframe.StoredProgress;
4444
import org.hamcrest.Matcher;
4545
import org.hamcrest.Matchers;
4646

@@ -205,7 +205,7 @@ protected void assertProgress(String id, int reindexing, int loadingData, int an
205205
}
206206

207207
protected SearchResponse searchStoredProgress(String jobId) {
208-
String docId = DataFrameAnalyticsTask.progressDocId(jobId);
208+
String docId = StoredProgress.documentId(jobId);
209209
return client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern())
210210
.setQuery(QueryBuilders.idsQuery().addIds(docId))
211211
.get();

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java

+11
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.xpack.core.ml.MachineLearningField;
3131
import org.elasticsearch.xpack.core.ml.MlMetadata;
3232
import org.elasticsearch.xpack.core.ml.MlTasks;
33+
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
3334
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
3435
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
3536
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
@@ -142,6 +143,16 @@ private void waitForPendingTasks() {
142143
}
143144
}
144145

146+
protected DeleteExpiredDataAction.Response deleteExpiredData() throws Exception {
147+
DeleteExpiredDataAction.Response response = client().execute(DeleteExpiredDataAction.INSTANCE,
148+
new DeleteExpiredDataAction.Request()).get();
149+
150+
// We need to refresh to ensure the deletion is visible
151+
client().admin().indices().prepareRefresh("*").get();
152+
153+
return response;
154+
}
155+
145156
@Override
146157
protected void ensureClusterStateConsistency() throws IOException {
147158
if (cluster() != null && cluster().size() > 0) {

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java

+34
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@
77

88
import org.elasticsearch.action.bulk.BulkRequestBuilder;
99
import org.elasticsearch.action.bulk.BulkResponse;
10+
import org.elasticsearch.action.delete.DeleteResponse;
1011
import org.elasticsearch.action.get.GetResponse;
1112
import org.elasticsearch.action.index.IndexRequest;
1213
import org.elasticsearch.action.search.SearchResponse;
1314
import org.elasticsearch.action.support.WriteRequest;
1415
import org.elasticsearch.common.unit.TimeValue;
16+
import org.elasticsearch.rest.RestStatus;
1517
import org.elasticsearch.search.SearchHit;
1618
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
1719
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
@@ -272,6 +274,38 @@ public void testTwoJobsWithSameRandomizeSeedUseSameTrainingSet() throws Exceptio
272274
assertThat(secondRunTrainingRowsIds, equalTo(firstRunTrainingRowsIds));
273275
}
274276

277+
public void testDeleteExpiredData_RemovesUnusedState() throws Exception {
278+
initialize("regression_delete_expired_data");
279+
indexData(sourceIndex, 100, 0);
280+
281+
DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Regression(DEPENDENT_VARIABLE_FIELD));
282+
registerAnalytics(config);
283+
putAnalytics(config);
284+
startAnalytics(jobId);
285+
waitUntilAnalyticsIsStopped(jobId);
286+
287+
assertProgress(jobId, 100, 100, 100, 100);
288+
assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L));
289+
assertModelStatePersisted(stateDocId());
290+
assertInferenceModelPersisted(jobId);
291+
292+
// Call _delete_expired_data API and check nothing was deleted
293+
assertThat(deleteExpiredData().isDeleted(), is(true));
294+
assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L));
295+
assertModelStatePersisted(stateDocId());
296+
297+
// Delete the config straight from the config index
298+
DeleteResponse deleteResponse = client().prepareDelete(".ml-config", DataFrameAnalyticsConfig.documentId(jobId))
299+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).execute().actionGet();
300+
assertThat(deleteResponse.status(), equalTo(RestStatus.OK));
301+
302+
// Now calling the _delete_expired_data API should remove unused state
303+
assertThat(deleteExpiredData().isDeleted(), is(true));
304+
305+
SearchResponse stateIndexSearchResponse = client().prepareSearch(".ml-state").execute().actionGet();
306+
assertThat(stateIndexSearchResponse.getHits().getTotalHits().value, equalTo(0L));
307+
}
308+
275309
private void initialize(String jobId) {
276310
this.jobId = jobId;
277311
this.sourceIndex = jobId + "_source_index";

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDataFrameAnalyticsAction.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
4444
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
4545
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
46-
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
46+
import org.elasticsearch.xpack.ml.dataframe.StoredProgress;
4747
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
4848
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
4949
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
@@ -165,7 +165,7 @@ private void deleteState(ParentTaskAssigningClient parentTaskClient,
165165
DataFrameAnalyticsConfig config,
166166
ActionListener<BulkByScrollResponse> listener) {
167167
List<String> ids = new ArrayList<>();
168-
ids.add(DataFrameAnalyticsTask.progressDocId(config.getId()));
168+
ids.add(StoredProgress.documentId(config.getId()));
169169
if (config.getAnalysis().persistsState()) {
170170
ids.add(config.getAnalysis().getStateDocId(config.getId()));
171171
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ private void searchStoredProgresses(List<String> configIds, ActionListener<List<
187187
SearchRequest searchRequest = new SearchRequest(AnomalyDetectorsIndex.jobStateIndexPattern());
188188
searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
189189
searchRequest.source().size(1);
190-
searchRequest.source().query(QueryBuilders.idsQuery().addIds(DataFrameAnalyticsTask.progressDocId(configId)));
190+
searchRequest.source().query(QueryBuilders.idsQuery().addIds(StoredProgress.documentId(configId)));
191191
multiSearchRequest.add(searchRequest);
192192
}
193193

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java

+1-5
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ private void persistProgress(Runnable runnable) {
244244
statsResponse -> {
245245
GetDataFrameAnalyticsStatsAction.Response.Stats stats = statsResponse.getResponse().results().get(0);
246246
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias());
247-
indexRequest.id(progressDocId(taskParams.getId()));
247+
indexRequest.id(StoredProgress.documentId(taskParams.getId()));
248248
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
249249
try (XContentBuilder jsonBuilder = JsonXContent.contentBuilder()) {
250250
new StoredProgress(stats.getProgress()).toXContent(jsonBuilder, Payload.XContent.EMPTY_PARAMS);
@@ -310,10 +310,6 @@ public static StartingState determineStartingState(String jobId, List<PhaseProgr
310310
}
311311
}
312312

313-
public static String progressDocId(String id) {
314-
return "data_frame_analytics-" + id + "-progress";
315-
}
316-
317313
public static class ProgressTracker {
318314

319315
public static final String REINDEXING = "reindexing";

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/StoredProgress.java

+14
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66
package org.elasticsearch.xpack.ml.dataframe;
77

8+
import org.elasticsearch.common.Nullable;
89
import org.elasticsearch.common.ParseField;
910
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
1011
import org.elasticsearch.common.xcontent.ToXContentObject;
@@ -14,6 +15,8 @@
1415
import java.io.IOException;
1516
import java.util.List;
1617
import java.util.Objects;
18+
import java.util.regex.Matcher;
19+
import java.util.regex.Pattern;
1720

1821
public class StoredProgress implements ToXContentObject {
1922

@@ -57,4 +60,15 @@ public boolean equals(Object o) {
5760
public int hashCode() {
5861
return Objects.hash(progress);
5962
}
63+
64+
public static String documentId(String id) {
65+
return "data_frame_analytics-" + id + "-progress";
66+
}
67+
68+
@Nullable
69+
public static String extractJobIdFromDocId(String docId) {
70+
Pattern pattern = Pattern.compile("^data_frame_analytics-(.*)-progress$");
71+
Matcher matcher = pattern.matcher(docId);
72+
return matcher.find() ? matcher.group(1) : null;
73+
}
6074
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedJobsIterator.java

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.index.query.TermQueryBuilder;
1717
import org.elasticsearch.search.SearchHit;
1818
import org.elasticsearch.xpack.core.ml.job.config.Job;
19+
import org.elasticsearch.xpack.ml.utils.persistence.BatchedDocumentsIterator;
1920

2021
import java.io.IOException;
2122
import java.io.InputStream;

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedResultsIterator.java

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.index.query.TermsQueryBuilder;
1111
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
1212
import org.elasticsearch.xpack.core.ml.job.results.Result;
13+
import org.elasticsearch.xpack.ml.utils.persistence.BatchedDocumentsIterator;
1314

1415
public abstract class BatchedResultsIterator<T> extends BatchedDocumentsIterator<Result<T>> {
1516

0 commit comments

Comments
 (0)