Skip to content

Commit c9ad768

Browse files
[ML] Refresh results indices before running delete by query (elastic#74292)
Test failures in elastic#74101 revealed that the last documents persisted from the job running on a node before it goes down may not be deleted when the reset action is executed. The reason is that the results index has not been refreshed thus those docs are not visible to the search the delete by query action is doing. This commit adds a call to the refresh API before running delete by query to the results indices. Closes elastic#74101
1 parent b6168e2 commit c9ad768

File tree

2 files changed

+32
-13
lines changed

2 files changed

+32
-13
lines changed

x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,6 @@ public void testJobRelocationIsMemoryAware() throws Exception {
464464
});
465465
}
466466

467-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/74101")
468467
public void testClusterWithTwoMlNodes_RunsDatafeed_GivenOriginalNodeGoesDown() throws Exception {
469468
internalCluster().ensureAtMostNumDataNodes(0);
470469
logger.info("Starting dedicated master node...");

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,17 @@
77
package org.elasticsearch.xpack.ml.job.persistence;
88

99
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
10+
1011
import org.apache.logging.log4j.LogManager;
1112
import org.apache.logging.log4j.Logger;
1213
import org.elasticsearch.action.ActionListener;
1314
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
1415
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
1516
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
1617
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
18+
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
19+
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
20+
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
1721
import org.elasticsearch.action.bulk.BulkItemResponse;
1822
import org.elasticsearch.action.search.MultiSearchAction;
1923
import org.elasticsearch.action.search.MultiSearchRequest;
@@ -26,10 +30,10 @@
2630
import org.elasticsearch.cluster.ClusterState;
2731
import org.elasticsearch.cluster.metadata.AliasMetadata;
2832
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
33+
import org.elasticsearch.common.util.concurrent.ThreadContext;
2934
import org.elasticsearch.core.CheckedConsumer;
3035
import org.elasticsearch.core.Nullable;
3136
import org.elasticsearch.core.TimeValue;
32-
import org.elasticsearch.common.util.concurrent.ThreadContext;
3337
import org.elasticsearch.index.IndexNotFoundException;
3438
import org.elasticsearch.index.query.BoolQueryBuilder;
3539
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
@@ -280,17 +284,7 @@ public void deleteJobDocuments(JobConfigProvider jobConfigProvider, IndexNameExp
280284
ActionListener<Boolean> deleteByQueryExecutor = ActionListener.wrap(
281285
response -> {
282286
if (response && indexNames.get().length > 0) {
283-
logger.info("[{}] running delete by query on [{}]", jobId, String.join(", ", indexNames.get()));
284-
ConstantScoreQueryBuilder query =
285-
new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId));
286-
DeleteByQueryRequest request = new DeleteByQueryRequest(indexNames.get())
287-
.setQuery(query)
288-
.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpenHidden()))
289-
.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES)
290-
.setAbortOnVersionConflict(false)
291-
.setRefresh(true);
292-
293-
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, dbqHandler);
287+
deleteResultsByQuery(jobId, indexNames.get(), dbqHandler);
294288
} else { // We did not execute DBQ, no need to delete aliases or check the response
295289
dbqHandler.onResponse(null);
296290
}
@@ -414,6 +408,32 @@ public void deleteJobDocuments(JobConfigProvider jobConfigProvider, IndexNameExp
414408
deleteModelState(jobId, deleteStateHandler);
415409
}
416410

411+
private void deleteResultsByQuery(String jobId, String[] indices, ActionListener<BulkByScrollResponse> listener) {
412+
assert indices.length > 0;
413+
414+
ActionListener<RefreshResponse> refreshListener = ActionListener.wrap(
415+
refreshResponse -> {
416+
logger.info("[{}] running delete by query on [{}]", jobId, String.join(", ", indices));
417+
ConstantScoreQueryBuilder query =
418+
new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId));
419+
DeleteByQueryRequest request = new DeleteByQueryRequest(indices)
420+
.setQuery(query)
421+
.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpenHidden()))
422+
.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES)
423+
.setAbortOnVersionConflict(false)
424+
.setRefresh(true);
425+
426+
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, listener);
427+
},
428+
listener::onFailure
429+
);
430+
431+
// First, we refresh the indices to ensure any in-flight docs become visible
432+
RefreshRequest refreshRequest = new RefreshRequest(indices);
433+
refreshRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpenHidden()));
434+
executeAsyncWithOrigin(client, ML_ORIGIN, RefreshAction.INSTANCE, refreshRequest, refreshListener);
435+
}
436+
417437
private void deleteAliases(String jobId, ActionListener<AcknowledgedResponse> finishedHandler) {
418438
final String readAliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
419439
final String writeAliasName = AnomalyDetectorsIndex.resultsWriteAlias(jobId);

0 commit comments

Comments
 (0)