|
10 | 10 | import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
|
11 | 11 | import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
12 | 12 | import org.elasticsearch.action.bulk.BulkResponse;
|
| 13 | +import org.elasticsearch.action.delete.DeleteResponse; |
13 | 14 | import org.elasticsearch.action.get.GetResponse;
|
14 | 15 | import org.elasticsearch.action.index.IndexAction;
|
15 | 16 | import org.elasticsearch.action.index.IndexRequest;
|
16 | 17 | import org.elasticsearch.action.search.SearchResponse;
|
17 | 18 | import org.elasticsearch.action.support.WriteRequest;
|
18 | 19 | import org.elasticsearch.index.query.QueryBuilder;
|
19 | 20 | import org.elasticsearch.index.query.QueryBuilders;
|
| 21 | +import org.elasticsearch.rest.RestStatus; |
20 | 22 | import org.elasticsearch.search.SearchHit;
|
21 | 23 | import org.elasticsearch.xpack.core.ml.action.EvaluateDataFrameAction;
|
22 | 24 | import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
|
@@ -314,6 +316,38 @@ public void testTwoJobsWithSameRandomizeSeedUseSameTrainingSet() throws Exceptio
|
314 | 316 | assertThat(secondRunTrainingRowsIds, equalTo(firstRunTrainingRowsIds));
|
315 | 317 | }
|
316 | 318 |
|
| 319 | + public void testDeleteExpiredData_RemovesUnusedState() throws Exception { |
| 320 | + initialize("classification_delete_expired_data"); |
| 321 | + indexData(sourceIndex, 100, 0, KEYWORD_FIELD); |
| 322 | + |
| 323 | + DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(KEYWORD_FIELD)); |
| 324 | + registerAnalytics(config); |
| 325 | + putAnalytics(config); |
| 326 | + startAnalytics(jobId); |
| 327 | + waitUntilAnalyticsIsStopped(jobId); |
| 328 | + |
| 329 | + assertProgress(jobId, 100, 100, 100, 100); |
| 330 | + assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L)); |
| 331 | + assertModelStatePersisted(stateDocId()); |
| 332 | + assertInferenceModelPersisted(jobId); |
| 333 | + |
| 334 | + // Call _delete_expired_data API and check nothing was deleted |
| 335 | + assertThat(deleteExpiredData().isDeleted(), is(true)); |
| 336 | + assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L)); |
| 337 | + assertModelStatePersisted(stateDocId()); |
| 338 | + |
| 339 | + // Delete the config straight from the config index |
| 340 | + DeleteResponse deleteResponse = client().prepareDelete().setIndex(".ml-config").setId(DataFrameAnalyticsConfig.documentId(jobId)) |
| 341 | + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).execute().actionGet(); |
| 342 | + assertThat(deleteResponse.status(), equalTo(RestStatus.OK)); |
| 343 | + |
| 344 | + // Now calling the _delete_expired_data API should remove unused state |
| 345 | + assertThat(deleteExpiredData().isDeleted(), is(true)); |
| 346 | + |
| 347 | + SearchResponse stateIndexSearchResponse = client().prepareSearch(".ml-state").execute().actionGet(); |
| 348 | + assertThat(stateIndexSearchResponse.getHits().getTotalHits().value, equalTo(0L)); |
| 349 | + } |
| 350 | + |
317 | 351 | private void initialize(String jobId) {
|
318 | 352 | this.jobId = jobId;
|
319 | 353 | this.sourceIndex = jobId + "_source_index";
|
|
0 commit comments