Skip to content

Commit 5a28e14

Browse files
committed
[ML] Throttle the delete-by-query of expired results (#47177)
Due to #47003 many clusters will have built up a large backlog of expired results. On upgrading to a version where that bug is fixed users could find that the first ML daily maintenance task deletes a very large amount of documents. This change introduces throttling to the delete-by-query that the ML daily maintenance uses to delete expired results to limit it to deleting an average 200 documents per second. (There is no throttling for state/forecast documents as these are expected to be lower volume.) Additionally a rough time limit of 8 hours is applied to the whole delete expired data action. (This is only rough as it won't stop part way through a single operation - it only checks the timeout between operations.) Relates #47103
1 parent 93c3da6 commit 5a28e14

14 files changed

+316
-50
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataAction.java

+4
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ public Response(StreamInput in) throws IOException {
6565
deleted = in.readBoolean();
6666
}
6767

68+
public boolean isDeleted() {
69+
return deleted;
70+
}
71+
6872
@Override
6973
public void writeTo(StreamOutput out) throws IOException {
7074
out.writeBoolean(deleted);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,13 @@ private void triggerTasks() {
116116
LOGGER.info("triggering scheduled [ML] maintenance tasks");
117117
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request(),
118118
ActionListener.wrap(
119-
response -> LOGGER.info("Successfully completed [ML] maintenance tasks"),
119+
response -> {
120+
if (response.isDeleted()) {
121+
LOGGER.info("Successfully completed [ML] maintenance tasks");
122+
} else {
123+
LOGGER.info("Halting [ML] maintenance tasks before completion as elapsed time is too great");
124+
}
125+
},
120126
e -> LOGGER.error("An error occurred during maintenance tasks execution", e)));
121127
scheduleNext();
122128
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java

+39-12
Original file line numberDiff line numberDiff line change
@@ -26,34 +26,54 @@
2626
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
2727
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;
2828

29+
import java.time.Clock;
30+
import java.time.Duration;
31+
import java.time.Instant;
2932
import java.util.Arrays;
3033
import java.util.Iterator;
3134
import java.util.List;
35+
import java.util.function.Supplier;
3236

3337
public class TransportDeleteExpiredDataAction extends HandledTransportAction<DeleteExpiredDataAction.Request,
3438
DeleteExpiredDataAction.Response> {
3539

40+
// TODO: make configurable in the request
41+
static final Duration MAX_DURATION = Duration.ofHours(8);
42+
3643
private final ThreadPool threadPool;
44+
private final String executor;
3745
private final Client client;
3846
private final ClusterService clusterService;
47+
private final Clock clock;
3948

4049
@Inject
4150
public TransportDeleteExpiredDataAction(ThreadPool threadPool, TransportService transportService,
4251
ActionFilters actionFilters, Client client, ClusterService clusterService) {
43-
super(DeleteExpiredDataAction.NAME, transportService, actionFilters, DeleteExpiredDataAction.Request::new);
52+
this(threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, transportService, actionFilters, client, clusterService,
53+
Clock.systemUTC());
54+
}
55+
56+
TransportDeleteExpiredDataAction(ThreadPool threadPool, String executor, TransportService transportService,
57+
ActionFilters actionFilters, Client client, ClusterService clusterService, Clock clock) {
58+
super(DeleteExpiredDataAction.NAME, transportService, actionFilters, DeleteExpiredDataAction.Request::new, executor);
4459
this.threadPool = threadPool;
60+
this.executor = executor;
4561
this.client = ClientHelper.clientWithOrigin(client, ClientHelper.ML_ORIGIN);
4662
this.clusterService = clusterService;
63+
this.clock = clock;
4764
}
4865

4966
@Override
5067
protected void doExecute(Task task, DeleteExpiredDataAction.Request request,
5168
ActionListener<DeleteExpiredDataAction.Response> listener) {
5269
logger.info("Deleting expired data");
53-
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> deleteExpiredData(listener));
70+
Instant timeoutTime = Instant.now(clock).plus(MAX_DURATION);
71+
Supplier<Boolean> isTimedOutSupplier = () -> Instant.now(clock).isAfter(timeoutTime);
72+
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> deleteExpiredData(listener, isTimedOutSupplier));
5473
}
5574

56-
private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response> listener) {
75+
private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response> listener,
76+
Supplier<Boolean> isTimedOutSupplier) {
5777
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
5878
List<MlDataRemover> dataRemovers = Arrays.asList(
5979
new ExpiredResultsRemover(client, auditor),
@@ -62,25 +82,32 @@ private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response>
6282
new UnusedStateRemover(client, clusterService)
6383
);
6484
Iterator<MlDataRemover> dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers);
65-
deleteExpiredData(dataRemoversIterator, listener);
85+
deleteExpiredData(dataRemoversIterator, listener, isTimedOutSupplier, true);
6686
}
6787

68-
private void deleteExpiredData(Iterator<MlDataRemover> mlDataRemoversIterator,
69-
ActionListener<DeleteExpiredDataAction.Response> listener) {
70-
if (mlDataRemoversIterator.hasNext()) {
88+
void deleteExpiredData(Iterator<MlDataRemover> mlDataRemoversIterator,
89+
ActionListener<DeleteExpiredDataAction.Response> listener,
90+
Supplier<Boolean> isTimedOutSupplier,
91+
boolean haveAllPreviousDeletionsCompleted) {
92+
if (haveAllPreviousDeletionsCompleted && mlDataRemoversIterator.hasNext()) {
7193
MlDataRemover remover = mlDataRemoversIterator.next();
7294
ActionListener<Boolean> nextListener = ActionListener.wrap(
73-
booleanResponse -> deleteExpiredData(mlDataRemoversIterator, listener), listener::onFailure);
95+
booleanResponse -> deleteExpiredData(mlDataRemoversIterator, listener, isTimedOutSupplier, booleanResponse),
96+
listener::onFailure);
7497
// Removing expired ML data and artifacts requires multiple operations.
7598
// These are queued up and executed sequentially in the action listener,
7699
// the chained calls must all run the ML utility thread pool NOT the thread
77100
// the previous action returned in which in the case of a transport_client_boss
78101
// thread is a disaster.
79-
remover.remove(new ThreadedActionListener<>(logger, threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, nextListener,
80-
false));
102+
remover.remove(new ThreadedActionListener<>(logger, threadPool, executor, nextListener, false),
103+
isTimedOutSupplier);
81104
} else {
82-
logger.info("Completed deletion of expired data");
83-
listener.onResponse(new DeleteExpiredDataAction.Response(true));
105+
if (haveAllPreviousDeletionsCompleted) {
106+
logger.info("Completed deletion of expired ML data");
107+
} else {
108+
logger.info("Halted deletion of expired ML data until next invocation");
109+
}
110+
listener.onResponse(new DeleteExpiredDataAction.Response(haveAllPreviousDeletionsCompleted));
84111
}
85112
}
86113
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java

+14
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
2424
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
2525
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
26+
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
2627
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
2728
import org.elasticsearch.xpack.core.ml.job.results.Result;
2829

@@ -79,6 +80,9 @@ public void deleteModelSnapshots(List<ModelSnapshot> modelSnapshots, ActionListe
7980
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
8081
.setQuery(new IdsQueryBuilder().addIds(idsToDelete.toArray(new String[0])));
8182

83+
// _doc is the most efficient sort order and will also disable scoring
84+
deleteByQueryRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
85+
8286
try {
8387
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener);
8488
} catch (Exception e) {
@@ -101,6 +105,10 @@ public void deleteResultsFromTime(long cutoffEpochMs, ActionListener<Boolean> li
101105
.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(cutoffEpochMs));
102106
deleteByQueryHolder.dbqRequest.setIndicesOptions(IndicesOptions.lenientExpandOpen());
103107
deleteByQueryHolder.dbqRequest.setQuery(query);
108+
109+
// _doc is the most efficient sort order and will also disable scoring
110+
deleteByQueryHolder.dbqRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
111+
104112
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryHolder.dbqRequest,
105113
ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure));
106114
}
@@ -116,6 +124,9 @@ public void deleteInterimResults() {
116124
QueryBuilder qb = QueryBuilders.termQuery(Result.IS_INTERIM.getPreferredName(), true);
117125
deleteByQueryHolder.dbqRequest.setQuery(new ConstantScoreQueryBuilder(qb));
118126

127+
// _doc is the most efficient sort order and will also disable scoring
128+
deleteByQueryHolder.dbqRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
129+
119130
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) {
120131
client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryHolder.dbqRequest).get();
121132
} catch (Exception e) {
@@ -134,6 +145,9 @@ public void deleteDatafeedTimingStats(ActionListener<BulkByScrollResponse> liste
134145
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
135146
.setQuery(new IdsQueryBuilder().addIds(DatafeedTimingStats.documentId(jobId)));
136147

148+
// _doc is the most efficient sort order and will also disable scoring
149+
deleteByQueryRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
150+
137151
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener);
138152
}
139153

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java

+13-5
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Iterator;
2323
import java.util.List;
2424
import java.util.concurrent.TimeUnit;
25+
import java.util.function.Supplier;
2526
import java.util.stream.Collectors;
2627

2728
/**
@@ -40,11 +41,12 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
4041
}
4142

4243
@Override
43-
public void remove(ActionListener<Boolean> listener) {
44-
removeData(newJobIterator(), listener);
44+
public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
45+
removeData(newJobIterator(), listener, isTimedOutSupplier);
4546
}
4647

47-
private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener<Boolean> listener) {
48+
private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener<Boolean> listener,
49+
Supplier<Boolean> isTimedOutSupplier) {
4850
if (jobIterator.hasNext() == false) {
4951
listener.onResponse(true);
5052
return;
@@ -56,13 +58,19 @@ private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener<B
5658
return;
5759
}
5860

61+
if (isTimedOutSupplier.get()) {
62+
listener.onResponse(false);
63+
return;
64+
}
65+
5966
Long retentionDays = getRetentionDays(job);
6067
if (retentionDays == null) {
61-
removeData(jobIterator, listener);
68+
removeData(jobIterator, listener, isTimedOutSupplier);
6269
return;
6370
}
6471
long cutoffEpochMs = calcCutoffEpochMs(retentionDays);
65-
removeDataBefore(job, cutoffEpochMs, ActionListener.wrap(response -> removeData(jobIterator, listener), listener::onFailure));
72+
removeDataBefore(job, cutoffEpochMs,
73+
ActionListener.wrap(response -> removeData(jobIterator, listener, isTimedOutSupplier), listener::onFailure));
6674
}
6775

6876
private WrappedBatchedJobsIterator newJobIterator() {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java

+17-3
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.threadpool.ThreadPool;
3333
import org.elasticsearch.xpack.core.ml.job.config.Job;
3434
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
35+
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
3536
import org.elasticsearch.xpack.core.ml.job.results.Forecast;
3637
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
3738
import org.elasticsearch.xpack.core.ml.job.results.Result;
@@ -44,6 +45,7 @@
4445
import java.util.ArrayList;
4546
import java.util.List;
4647
import java.util.Objects;
48+
import java.util.function.Supplier;
4749

4850
/**
4951
* Removes up to {@link #MAX_FORECASTS} forecasts (stats + forecasts docs) that have expired.
@@ -71,10 +73,10 @@ public ExpiredForecastsRemover(Client client, ThreadPool threadPool) {
7173
}
7274

7375
@Override
74-
public void remove(ActionListener<Boolean> listener) {
76+
public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
7577
LOGGER.debug("Removing forecasts that expire before [{}]", cutoffEpochMs);
7678
ActionListener<SearchResponse> forecastStatsHandler = ActionListener.wrap(
77-
searchResponse -> deleteForecasts(searchResponse, listener),
79+
searchResponse -> deleteForecasts(searchResponse, listener, isTimedOutSupplier),
7880
e -> listener.onFailure(new ElasticsearchException("An error occurred while searching forecasts to delete", e)));
7981

8082
SearchSourceBuilder source = new SearchSourceBuilder();
@@ -84,13 +86,16 @@ public void remove(ActionListener<Boolean> listener) {
8486
source.size(MAX_FORECASTS);
8587
source.trackTotalHits(true);
8688

89+
// _doc is the most efficient sort order and will also disable scoring
90+
source.sort(ElasticsearchMappings.ES_DOC);
91+
8792
SearchRequest searchRequest = new SearchRequest(RESULTS_INDEX_PATTERN);
8893
searchRequest.source(source);
8994
client.execute(SearchAction.INSTANCE, searchRequest, new ThreadedActionListener<>(LOGGER, threadPool,
9095
MachineLearning.UTILITY_THREAD_POOL_NAME, forecastStatsHandler, false));
9196
}
9297

93-
private void deleteForecasts(SearchResponse searchResponse, ActionListener<Boolean> listener) {
98+
private void deleteForecasts(SearchResponse searchResponse, ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
9499
List<ForecastRequestStats> forecastsToDelete;
95100
try {
96101
forecastsToDelete = findForecastsToDelete(searchResponse);
@@ -99,6 +104,11 @@ private void deleteForecasts(SearchResponse searchResponse, ActionListener<Boole
99104
return;
100105
}
101106

107+
if (isTimedOutSupplier.get()) {
108+
listener.onResponse(false);
109+
return;
110+
}
111+
102112
DeleteByQueryRequest request = buildDeleteByQuery(forecastsToDelete);
103113
client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<BulkByScrollResponse>() {
104114
@Override
@@ -157,6 +167,10 @@ private DeleteByQueryRequest buildDeleteByQuery(List<ForecastRequestStats> forec
157167
}
158168
QueryBuilder query = QueryBuilders.boolQuery().filter(boolQuery);
159169
request.setQuery(query);
170+
171+
// _doc is the most efficient sort order and will also disable scoring
172+
request.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
173+
160174
return request;
161175
}
162176
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
2424
import org.elasticsearch.xpack.core.ml.job.config.Job;
2525
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
26+
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
2627
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
2728
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField;
2829
import org.elasticsearch.xpack.ml.MachineLearning;
@@ -88,7 +89,7 @@ protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Bool
8889
.mustNot(activeSnapshotFilter)
8990
.mustNot(retainFilter);
9091

91-
searchRequest.source(new SearchSourceBuilder().query(query).size(MODEL_SNAPSHOT_SEARCH_SIZE));
92+
searchRequest.source(new SearchSourceBuilder().query(query).size(MODEL_SNAPSHOT_SEARCH_SIZE).sort(ElasticsearchMappings.ES_DOC));
9293

9394
client.execute(SearchAction.INSTANCE, searchRequest, new ThreadedActionListener<>(LOGGER, threadPool,
9495
MachineLearning.UTILITY_THREAD_POOL_NAME, expiredSnapshotsListener(job.getId(), listener), false));

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java

+9
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.xpack.core.ml.job.config.Job;
2020
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
2121
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
22+
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
2223
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
2324
import org.elasticsearch.xpack.core.ml.job.results.Forecast;
2425
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
@@ -88,13 +89,21 @@ private DeleteByQueryRequest createDBQRequest(Job job, long cutoffEpochMs) {
8889
DeleteByQueryRequest request = new DeleteByQueryRequest();
8990
request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
9091

92+
// Delete the documents gradually.
93+
// With DEFAULT_SCROLL_SIZE = 1000 this implies we spread deletion of 1 million documents over 5000 seconds ~= 83 minutes.
94+
request.setBatchSize(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE);
95+
request.setRequestsPerSecond(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE / 5);
96+
9197
request.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()));
9298
QueryBuilder excludeFilter = QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(),
9399
ModelSizeStats.RESULT_TYPE_VALUE, ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE);
94100
QueryBuilder query = createQuery(job.getId(), cutoffEpochMs)
95101
.filter(QueryBuilders.existsQuery(Result.RESULT_TYPE.getPreferredName()))
96102
.mustNot(excludeFilter);
97103
request.setQuery(query);
104+
105+
// _doc is the most efficient sort order and will also disable scoring
106+
request.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
98107
return request;
99108
}
100109

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
import org.elasticsearch.action.ActionListener;
99

10+
import java.util.function.Supplier;
11+
1012
public interface MlDataRemover {
11-
void remove(ActionListener<Boolean> listener);
13+
void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier);
1214
}

0 commit comments

Comments
 (0)