Skip to content

Commit cbfb9b0

Browse files
authored
[ML] Fix forecasts deletion action when there are many forecast documents. (#75127)
1 parent 563e1fb commit cbfb9b0

File tree

5 files changed

+149
-87
lines changed

5 files changed

+149
-87
lines changed

server/src/main/java/org/elasticsearch/index/reindex/DeleteByQueryRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ public String toString() {
151151
//delete by query deletes all documents that match a query. The indices and indices options that affect how
152152
//indices are resolved depend entirely on the inner search request. That's why the following methods delegate to it.
153153
@Override
154-
public IndicesRequest indices(String... indices) {
154+
public DeleteByQueryRequest indices(String... indices) {
155155
assert getSearchRequest() != null;
156156
getSearchRequest().indices(indices);
157157
return this;

x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ForecastIT.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,20 @@
2626

2727
import java.time.Instant;
2828
import java.util.ArrayList;
29+
import java.util.Arrays;
2930
import java.util.Collections;
3031
import java.util.HashMap;
3132
import java.util.List;
3233
import java.util.Locale;
3334
import java.util.Map;
3435
import java.util.stream.Collectors;
36+
import java.util.stream.IntStream;
37+
import java.util.stream.Stream;
3538

3639
import static org.elasticsearch.xpack.core.ml.job.messages.Messages.JOB_FORECAST_NATIVE_PROCESS_KILLED;
3740
import static org.hamcrest.Matchers.closeTo;
3841
import static org.hamcrest.Matchers.equalTo;
42+
import static org.hamcrest.Matchers.is;
3943

4044
public class ForecastIT extends MlNativeAutodetectIntegTestCase {
4145

@@ -343,7 +347,61 @@ public void testDeleteWildCard() throws Exception {
343347
assertNull(getForecastStats(job.getId(), forecastId2Duration1HourNoExpiry));
344348
assertNull(getForecastStats(job.getId(), forecastId2Duration1HourNoExpiry2));
345349
}
350+
}
351+
352+
public void testDeleteAll() throws Exception {
353+
Detector.Builder detector1 = new Detector.Builder("sum", "value").setPartitionFieldName("category");
354+
Detector.Builder detector2 = new Detector.Builder("mean", "value").setPartitionFieldName("category");
355+
356+
TimeValue bucketSpan = TimeValue.timeValueHours(1);
357+
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector1.build(), detector2.build()));
358+
analysisConfig.setBucketSpan(bucketSpan);
359+
DataDescription.Builder dataDescription = new DataDescription.Builder();
360+
dataDescription.setTimeFormat("epoch");
361+
362+
Job.Builder job = new Job.Builder("forecast-it-test-delete-wildcard-2");
363+
job.setAnalysisConfig(analysisConfig);
364+
job.setDataDescription(dataDescription);
365+
putJob(job);
366+
openJob(job.getId());
367+
368+
long now = Instant.now().getEpochSecond();
369+
long timestamp = now - 50 * bucketSpan.seconds();
370+
List<String> data = new ArrayList<>();
371+
String[] partitionFieldValues = IntStream.range(0, 20).mapToObj(i -> "category_" + i).toArray(String[]::new);
372+
while (timestamp < now) {
373+
for (String partitionFieldValue : partitionFieldValues) {
374+
data.add(createJsonRecord(createRecord(timestamp, partitionFieldValue, 10.0)));
375+
data.add(createJsonRecord(createRecord(timestamp, partitionFieldValue, 30.0)));
376+
}
377+
timestamp += bucketSpan.seconds();
378+
}
346379

380+
postData(job.getId(), data.stream().collect(Collectors.joining()));
381+
flushJob(job.getId(), false);
382+
383+
long noForecasts = 11; // We want to make sure we set the search size instead of relying on the default
384+
List<String> forecastIds = new ArrayList<>();
385+
for (int i = 0; i < noForecasts; ++i) {
386+
String forecastId = forecast(job.getId(), TimeValue.timeValueHours(100), TimeValue.ZERO);
387+
forecastIds.add(forecastId);
388+
waitForecastToFinish(job.getId(), forecastId);
389+
}
390+
closeJob(job.getId());
391+
392+
assertThat(getJobStats(job.getId()).get(0).getForecastStats().getTotal(), is(equalTo(noForecasts)));
393+
for (String forecastId : forecastIds) {
394+
assertNotNull(getForecastStats(job.getId(), forecastId));
395+
}
396+
397+
DeleteForecastAction.Request request = new DeleteForecastAction.Request(job.getId(), randomBoolean() ? "*" : "_all");
398+
AcknowledgedResponse response = client().execute(DeleteForecastAction.INSTANCE, request).actionGet();
399+
assertTrue(response.isAcknowledged());
400+
401+
assertThat(getJobStats(job.getId()).get(0).getForecastStats().getTotal(), is(equalTo(0L)));
402+
for (String forecastId : forecastIds) {
403+
assertNull(getForecastStats(job.getId(), forecastId));
404+
}
347405
}
348406

349407
public void testDelete() throws Exception {
@@ -561,4 +619,10 @@ private static Map<String, Object> createRecord(long timestamp, double value) {
561619
record.put("value", value);
562620
return record;
563621
}
622+
623+
private static Map<String, Object> createRecord(long timestamp, String partitionFieldValue, double value) {
624+
Map<String, Object> record = createRecord(timestamp, value);
625+
record.put("category", partitionFieldValue);
626+
return record;
627+
}
564628
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteForecastAction.java

Lines changed: 52 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,8 @@
2323
import org.elasticsearch.cluster.ClusterState;
2424
import org.elasticsearch.cluster.service.ClusterService;
2525
import org.elasticsearch.common.Strings;
26-
import org.elasticsearch.core.Tuple;
2726
import org.elasticsearch.common.inject.Inject;
28-
import org.elasticsearch.common.xcontent.DeprecationHandler;
29-
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
30-
import org.elasticsearch.common.xcontent.XContentFactory;
31-
import org.elasticsearch.common.xcontent.XContentParser;
32-
import org.elasticsearch.common.xcontent.XContentType;
27+
import org.elasticsearch.core.Tuple;
3328
import org.elasticsearch.index.query.BoolQueryBuilder;
3429
import org.elasticsearch.index.query.QueryBuilder;
3530
import org.elasticsearch.index.query.QueryBuilders;
@@ -57,17 +52,13 @@
5752
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
5853
import org.elasticsearch.xpack.ml.utils.QueryBuilderHelper;
5954

60-
import java.io.IOException;
61-
import java.io.InputStream;
6255
import java.util.ArrayList;
63-
import java.util.Collection;
64-
import java.util.EnumSet;
65-
import java.util.HashSet;
6656
import java.util.List;
6757
import java.util.Set;
6858
import java.util.concurrent.TimeoutException;
69-
import java.util.stream.Collectors;
59+
import java.util.stream.Stream;
7060

61+
import static java.util.stream.Collectors.toSet;
7162
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
7263
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
7364

@@ -80,8 +71,10 @@ public class TransportDeleteForecastAction extends HandledTransportAction<Delete
8071
private final ClusterService clusterService;
8172
private static final int MAX_FORECAST_TO_SEARCH = 10_000;
8273

83-
private static final Set<ForecastRequestStatus> DELETABLE_STATUSES =
84-
EnumSet.of(ForecastRequestStatus.FINISHED, ForecastRequestStatus.FAILED);
74+
private static final Set<String> DELETABLE_STATUSES =
75+
Stream.of(ForecastRequestStatus.FINISHED, ForecastRequestStatus.FAILED)
76+
.map(ForecastRequestStatus::toString)
77+
.collect(toSet());
8578

8679
@Inject
8780
public TransportDeleteForecastAction(TransportService transportService,
@@ -105,47 +98,54 @@ protected void doExecute(Task task, DeleteForecastAction.Request request, Action
10598
e -> handleFailure(e, request, listener)
10699
);
107100

108-
SearchSourceBuilder source = new SearchSourceBuilder();
109-
110-
BoolQueryBuilder builder = QueryBuilders.boolQuery()
111-
.filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE));
101+
BoolQueryBuilder query =
102+
QueryBuilders.boolQuery()
103+
.filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE));
112104
QueryBuilderHelper
113105
.buildTokenFilterQuery(Forecast.FORECAST_ID.getPreferredName(), forecastIds)
114-
.ifPresent(builder::filter);
115-
source.query(builder);
116-
117-
SearchRequest searchRequest = new SearchRequest(AnomalyDetectorsIndex.jobResultsAliasedName(jobId));
118-
searchRequest.source(source);
106+
.ifPresent(query::filter);
107+
SearchSourceBuilder source =
108+
new SearchSourceBuilder()
109+
.size(MAX_FORECAST_TO_SEARCH)
110+
// We only need forecast id and status, there is no need fetching the whole source
111+
.fetchSource(false)
112+
.docValueField(ForecastRequestStats.FORECAST_ID.getPreferredName())
113+
.docValueField(ForecastRequestStats.STATUS.getPreferredName())
114+
.query(query);
115+
SearchRequest searchRequest =
116+
new SearchRequest(AnomalyDetectorsIndex.jobResultsAliasedName(jobId))
117+
.source(source);
119118

120119
executeAsyncWithOrigin(client, ML_ORIGIN, SearchAction.INSTANCE, searchRequest, forecastStatsHandler);
121120
}
122121

123-
static void validateForecastState(Collection<ForecastRequestStats> forecastsToDelete, JobState jobState, String jobId) {
124-
List<String> badStatusForecasts = forecastsToDelete.stream()
125-
.filter((f) -> DELETABLE_STATUSES.contains(f.getStatus()) == false)
126-
.map(ForecastRequestStats::getForecastId)
127-
.collect(Collectors.toList());
128-
if (badStatusForecasts.size() > 0 && JobState.OPENED.equals(jobState)) {
122+
static List<String> extractForecastIds(SearchHit[] forecastsToDelete, JobState jobState, String jobId) {
123+
List<String> forecastIds = new ArrayList<>(forecastsToDelete.length);
124+
List<String> badStatusForecastIds = new ArrayList<>();
125+
for (SearchHit hit : forecastsToDelete) {
126+
String forecastId = hit.field(ForecastRequestStats.FORECAST_ID.getPreferredName()).getValue();
127+
String forecastStatus = hit.field(ForecastRequestStats.STATUS.getPreferredName()).getValue();
128+
if (DELETABLE_STATUSES.contains(forecastStatus)) {
129+
forecastIds.add(forecastId);
130+
} else {
131+
badStatusForecastIds.add(forecastId);
132+
}
133+
}
134+
if (badStatusForecastIds.size() > 0 && JobState.OPENED.equals(jobState)) {
129135
throw ExceptionsHelper.conflictStatusException(
130-
Messages.getMessage(Messages.REST_CANNOT_DELETE_FORECAST_IN_CURRENT_STATE, badStatusForecasts, jobId));
136+
Messages.getMessage(Messages.REST_CANNOT_DELETE_FORECAST_IN_CURRENT_STATE, badStatusForecastIds, jobId));
131137
}
138+
return forecastIds;
132139
}
133140

134141
private void deleteForecasts(SearchResponse searchResponse,
135142
DeleteForecastAction.Request request,
136143
ActionListener<AcknowledgedResponse> listener) {
137144
final String jobId = request.getJobId();
138-
Set<ForecastRequestStats> forecastsToDelete;
139-
try {
140-
forecastsToDelete = parseForecastsFromSearch(searchResponse);
141-
} catch (IOException e) {
142-
listener.onFailure(e);
143-
return;
144-
}
145+
SearchHits forecastsToDelete = searchResponse.getHits();
145146

146-
if (forecastsToDelete.isEmpty()) {
147-
if (Strings.isAllOrWildcard(request.getForecastId()) &&
148-
request.isAllowNoForecasts()) {
147+
if (forecastsToDelete.getHits().length == 0) {
148+
if (Strings.isAllOrWildcard(request.getForecastId()) && request.isAllowNoForecasts()) {
149149
listener.onResponse(AcknowledgedResponse.TRUE);
150150
} else {
151151
listener.onFailure(
@@ -156,16 +156,15 @@ private void deleteForecasts(SearchResponse searchResponse,
156156
final ClusterState state = clusterService.state();
157157
PersistentTasksCustomMetadata persistentTasks = state.metadata().custom(PersistentTasksCustomMetadata.TYPE);
158158
JobState jobState = MlTasks.getJobState(jobId, persistentTasks);
159+
final List<String> forecastIds;
159160
try {
160-
validateForecastState(forecastsToDelete, jobState, jobId);
161+
forecastIds = extractForecastIds(forecastsToDelete.getHits(), jobState, jobId);
161162
} catch (ElasticsearchException ex) {
162163
listener.onFailure(ex);
163164
return;
164165
}
165166

166-
final List<String> forecastIds = forecastsToDelete.stream().map(ForecastRequestStats::getForecastId).collect(Collectors.toList());
167167
DeleteByQueryRequest deleteByQueryRequest = buildDeleteByQuery(jobId, forecastIds);
168-
169168
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, ActionListener.wrap(
170169
response -> {
171170
if (response.isTimedOut()) {
@@ -208,45 +207,29 @@ private static Tuple<RestStatus, Throwable> getStatusAndReason(final BulkByScrol
208207
return new Tuple<>(status, reason);
209208
}
210209

211-
private static Set<ForecastRequestStats> parseForecastsFromSearch(SearchResponse searchResponse) throws IOException {
212-
SearchHits hits = searchResponse.getHits();
213-
List<ForecastRequestStats> allStats = new ArrayList<>(hits.getHits().length);
214-
for (SearchHit hit : hits) {
215-
try (InputStream stream = hit.getSourceRef().streamInput();
216-
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(
217-
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, stream)) {
218-
allStats.add(ForecastRequestStats.STRICT_PARSER.apply(parser, null));
219-
}
220-
}
221-
return new HashSet<>(allStats);
222-
}
223-
224210
private DeleteByQueryRequest buildDeleteByQuery(String jobId, List<String> forecastsToDelete) {
225-
DeleteByQueryRequest request = new DeleteByQueryRequest()
226-
.setAbortOnVersionConflict(false) //since these documents are not updated, a conflict just means it was deleted previously
227-
.setMaxDocs(MAX_FORECAST_TO_SEARCH)
228-
.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
229-
230-
request.indices(AnomalyDetectorsIndex.jobResultsAliasedName(jobId));
231-
BoolQueryBuilder innerBoolQuery = QueryBuilders.boolQuery();
232-
innerBoolQuery
211+
BoolQueryBuilder innerBoolQuery = QueryBuilders.boolQuery()
233212
.must(QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(),
234213
ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE))
235214
.must(QueryBuilders.termsQuery(Forecast.FORECAST_ID.getPreferredName(),
236215
forecastsToDelete));
237-
238216
QueryBuilder query = QueryBuilders.boolQuery().filter(innerBoolQuery);
239-
request.setQuery(query);
240-
request.setRefresh(true);
241-
return request;
217+
218+
// We want *all* of the docs to be deleted. Hence, we rely on the default value of max_docs.
219+
return new DeleteByQueryRequest()
220+
.setAbortOnVersionConflict(false) // since these documents are not updated, a conflict just means it was deleted previously
221+
.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES)
222+
.indices(AnomalyDetectorsIndex.jobResultsAliasedName(jobId))
223+
.setQuery(query)
224+
.setRefresh(true);
242225
}
243226

244227
private static void handleFailure(Exception e,
245228
DeleteForecastAction.Request request,
246229
ActionListener<AcknowledgedResponse> listener) {
247230
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
248231
if (request.isAllowNoForecasts() && Strings.isAllOrWildcard(request.getForecastId())) {
249-
listener.onResponse(AcknowledgedResponse.of(true));
232+
listener.onResponse(AcknowledgedResponse.TRUE);
250233
} else {
251234
listener.onFailure(new ResourceNotFoundException(
252235
Messages.getMessage(Messages.REST_NO_SUCH_FORECAST, request.getForecastId(), request.getJobId())

0 commit comments

Comments
 (0)