Skip to content

Commit 9358a61

Browse files
authored
[ML] allow unran/incomplete forecasts to be deleted for stopped/failed jobs (#57152) (#57173)
If a job is NOT opened, forecasts should be able to be deleted, no matter their state. This also fixes a bug with expanding forecast IDs. We should check for wildcard `*` and `_all` when expanding the ids closes #56419
1 parent 0a9041c commit 9358a61

File tree

2 files changed

+111
-15
lines changed

2 files changed

+111
-15
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteForecastAction.java

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
import org.elasticsearch.action.support.HandledTransportAction;
2020
import org.elasticsearch.action.support.master.AcknowledgedResponse;
2121
import org.elasticsearch.client.Client;
22-
import org.elasticsearch.cluster.metadata.Metadata;
22+
import org.elasticsearch.cluster.ClusterState;
23+
import org.elasticsearch.cluster.service.ClusterService;
2324
import org.elasticsearch.common.Strings;
2425
import org.elasticsearch.common.collect.Tuple;
2526
import org.elasticsearch.common.inject.Inject;
@@ -36,13 +37,16 @@
3637
import org.elasticsearch.index.reindex.DeleteByQueryAction;
3738
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
3839
import org.elasticsearch.index.reindex.ScrollableHitSource;
40+
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
3941
import org.elasticsearch.rest.RestStatus;
4042
import org.elasticsearch.search.SearchHit;
4143
import org.elasticsearch.search.SearchHits;
4244
import org.elasticsearch.search.builder.SearchSourceBuilder;
4345
import org.elasticsearch.tasks.Task;
4446
import org.elasticsearch.transport.TransportService;
47+
import org.elasticsearch.xpack.core.ml.MlTasks;
4548
import org.elasticsearch.xpack.core.ml.action.DeleteForecastAction;
49+
import org.elasticsearch.xpack.core.ml.job.config.JobState;
4650
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
4751
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
4852
import org.elasticsearch.xpack.core.ml.job.results.Forecast;
@@ -55,6 +59,7 @@
5559
import java.io.InputStream;
5660
import java.util.ArrayList;
5761
import java.util.Arrays;
62+
import java.util.Collection;
5863
import java.util.EnumSet;
5964
import java.util.HashSet;
6065
import java.util.List;
@@ -71,21 +76,28 @@ public class TransportDeleteForecastAction extends HandledTransportAction<Delete
7176
private static final Logger logger = LogManager.getLogger(TransportDeleteForecastAction.class);
7277

7378
private final Client client;
79+
private final ClusterService clusterService;
7480
private static final int MAX_FORECAST_TO_SEARCH = 10_000;
7581

7682
private static final Set<ForecastRequestStatus> DELETABLE_STATUSES =
7783
EnumSet.of(ForecastRequestStatus.FINISHED, ForecastRequestStatus.FAILED);
7884

7985
@Inject
80-
public TransportDeleteForecastAction(TransportService transportService, ActionFilters actionFilters, Client client) {
86+
public TransportDeleteForecastAction(TransportService transportService,
87+
ActionFilters actionFilters,
88+
Client client,
89+
ClusterService clusterService) {
8190
super(DeleteForecastAction.NAME, transportService, actionFilters, DeleteForecastAction.Request::new);
8291
this.client = client;
92+
this.clusterService = clusterService;
8393
}
8494

8595
@Override
8696
protected void doExecute(Task task, DeleteForecastAction.Request request, ActionListener<AcknowledgedResponse> listener) {
8797
final String jobId = request.getJobId();
88-
final String forecastsExpression = request.getForecastId();
98+
99+
String forecastsExpression = request.getForecastId();
100+
final String[] forecastIds = Strings.tokenizeToStringArray(forecastsExpression, ",");
89101
ActionListener<SearchResponse> forecastStatsHandler = ActionListener.wrap(
90102
searchResponse -> deleteForecasts(searchResponse, request, listener),
91103
e -> listener.onFailure(new ElasticsearchException("An error occurred while searching forecasts to delete", e)));
@@ -95,10 +107,8 @@ protected void doExecute(Task task, DeleteForecastAction.Request request, Action
95107
BoolQueryBuilder builder = QueryBuilders.boolQuery();
96108
BoolQueryBuilder innerBool = QueryBuilders.boolQuery().must(
97109
QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE));
98-
99-
if (Metadata.ALL.equals(request.getForecastId()) == false) {
100-
Set<String> forcastIds = new HashSet<>(Arrays.asList(Strings.tokenizeToStringArray(forecastsExpression, ",")));
101-
innerBool.must(QueryBuilders.termsQuery(Forecast.FORECAST_ID.getPreferredName(), forcastIds));
110+
if (Strings.isAllOrWildcard(forecastIds) == false) {
111+
innerBool.must(QueryBuilders.termsQuery(Forecast.FORECAST_ID.getPreferredName(), new HashSet<>(Arrays.asList(forecastIds))));
102112
}
103113

104114
source.query(builder.filter(innerBool));
@@ -109,6 +119,17 @@ protected void doExecute(Task task, DeleteForecastAction.Request request, Action
109119
executeAsyncWithOrigin(client, ML_ORIGIN, SearchAction.INSTANCE, searchRequest, forecastStatsHandler);
110120
}
111121

122+
static void validateForecastState(Collection<ForecastRequestStats> forecastsToDelete, JobState jobState, String jobId) {
123+
List<String> badStatusForecasts = forecastsToDelete.stream()
124+
.filter((f) -> DELETABLE_STATUSES.contains(f.getStatus()) == false)
125+
.map(ForecastRequestStats::getForecastId)
126+
.collect(Collectors.toList());
127+
if (badStatusForecasts.size() > 0 && JobState.OPENED.equals(jobState)) {
128+
throw ExceptionsHelper.conflictStatusException(
129+
Messages.getMessage(Messages.REST_CANNOT_DELETE_FORECAST_IN_CURRENT_STATE, badStatusForecasts, jobId));
130+
}
131+
}
132+
112133
private void deleteForecasts(SearchResponse searchResponse,
113134
DeleteForecastAction.Request request,
114135
ActionListener<AcknowledgedResponse> listener) {
@@ -122,7 +143,7 @@ private void deleteForecasts(SearchResponse searchResponse,
122143
}
123144

124145
if (forecastsToDelete.isEmpty()) {
125-
if (Metadata.ALL.equals(request.getForecastId()) &&
146+
if (Strings.isAllOrWildcard(new String[]{request.getForecastId()}) &&
126147
request.isAllowNoForecasts()) {
127148
listener.onResponse(new AcknowledgedResponse(true));
128149
} else {
@@ -131,13 +152,13 @@ private void deleteForecasts(SearchResponse searchResponse,
131152
}
132153
return;
133154
}
134-
List<String> badStatusForecasts = forecastsToDelete.stream()
135-
.filter((f) -> !DELETABLE_STATUSES.contains(f.getStatus()))
136-
.map(ForecastRequestStats::getForecastId).collect(Collectors.toList());
137-
if (badStatusForecasts.size() > 0) {
138-
listener.onFailure(
139-
ExceptionsHelper.conflictStatusException(
140-
Messages.getMessage(Messages.REST_CANNOT_DELETE_FORECAST_IN_CURRENT_STATE, badStatusForecasts, jobId)));
155+
final ClusterState state = clusterService.state();
156+
PersistentTasksCustomMetadata persistentTasks = state.metadata().custom(PersistentTasksCustomMetadata.TYPE);
157+
JobState jobState = MlTasks.getJobState(jobId, persistentTasks);
158+
try {
159+
validateForecastState(forecastsToDelete, jobState, jobId);
160+
} catch (ElasticsearchException ex) {
161+
listener.onFailure(ex);
141162
return;
142163
}
143164

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ml.action;
7+
8+
import org.elasticsearch.ElasticsearchStatusException;
9+
import org.elasticsearch.test.ESTestCase;
10+
import org.elasticsearch.xpack.core.ml.job.config.JobState;
11+
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
12+
import java.util.List;
13+
import java.util.stream.Collectors;
14+
import java.util.stream.Stream;
15+
16+
17+
public class TransportDeleteForecastActionTests extends ESTestCase {
18+
19+
private static final int TEST_RUNS = 10;
20+
21+
public void testValidateForecastStateWithAllFailedFinished() {
22+
for (int i = 0; i < TEST_RUNS; ++i) {
23+
List<ForecastRequestStats> forecastRequestStats = Stream.generate(
24+
() -> createForecastStats(randomFrom(
25+
ForecastRequestStats.ForecastRequestStatus.FAILED,
26+
ForecastRequestStats.ForecastRequestStatus.FINISHED
27+
)))
28+
.limit(randomInt(10))
29+
.collect(Collectors.toList());
30+
31+
// This should not throw.
32+
TransportDeleteForecastAction.validateForecastState(
33+
forecastRequestStats,
34+
randomFrom(JobState.values()),
35+
randomAlphaOfLength(10));
36+
}
37+
}
38+
39+
public void testValidateForecastStateWithSomeFailedFinished() {
40+
for (int i = 0; i < TEST_RUNS; ++i) {
41+
List<ForecastRequestStats> forecastRequestStats = Stream.generate(
42+
() -> createForecastStats(randomFrom(
43+
ForecastRequestStats.ForecastRequestStatus.values()
44+
)))
45+
.limit(randomInt(10))
46+
.collect(Collectors.toList());
47+
48+
forecastRequestStats.add(createForecastStats(ForecastRequestStats.ForecastRequestStatus.STARTED));
49+
50+
{
51+
JobState jobState = randomFrom(JobState.CLOSED, JobState.CLOSING, JobState.FAILED);
52+
try {
53+
TransportDeleteForecastAction.validateForecastState(forecastRequestStats, jobState, randomAlphaOfLength(10));
54+
} catch (Exception ex) {
55+
fail("Should not have thrown: " + ex.getMessage());
56+
}
57+
}
58+
{
59+
JobState jobState = JobState.OPENED;
60+
expectThrows(
61+
ElasticsearchStatusException.class,
62+
() -> TransportDeleteForecastAction.validateForecastState(forecastRequestStats, jobState, randomAlphaOfLength(10))
63+
);
64+
}
65+
}
66+
}
67+
68+
69+
private static ForecastRequestStats createForecastStats(ForecastRequestStats.ForecastRequestStatus status) {
70+
ForecastRequestStats forecastRequestStats = new ForecastRequestStats(randomAlphaOfLength(10), randomAlphaOfLength(10));
71+
forecastRequestStats.setStatus(status);
72+
return forecastRequestStats;
73+
}
74+
75+
}

0 commit comments

Comments
 (0)