From 2de33551de709facbd63eb76f2feb271100b5620 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Thu, 17 Oct 2019 16:26:22 +0100 Subject: [PATCH] [7.x][ML] Always refresh dest index before starting analytics process (#48090) If a job stops right after reindexing is finished but before we refresh the destination index, we don't refresh at all. If the job is started again right after, it jumps into the analyzing state. However, the data is still not searchable. This is why we were seeing test failures that we start the process expecting X rows (where X is lower than the expected number of docs) and we end up getting X+. We fix this by moving the refresh of the dest index right before we start the process so it always ensures the data is searchable. Closes #47612 Backport of #48090 --- .../xpack/ml/integration/RegressionIT.java | 1 - .../integration/RunDataFrameAnalyticsIT.java | 1 - .../dataframe/DataFrameAnalyticsManager.java | 23 ++----------------- .../ml/dataframe/DataFrameAnalyticsTask.java | 1 + .../extractor/DataFrameDataExtractor.java | 4 +++- .../process/AnalyticsProcessManager.java | 13 +++++++---- 6 files changed, 15 insertions(+), 28 deletions(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java index e5afb04c953bb..cc51f0a7c1a76 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java @@ -238,7 +238,6 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsFifty() throws Exception "Finished analysis"); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/47612") public void testStopAndRestart() throws Exception { initialize("regression_stop_and_restart"); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java index 7bac43cfad960..c3d27b09fde98 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java @@ -550,7 +550,6 @@ public void testLazyAssignmentWithModelMemoryLimitTooHighForAssignment() throws "Stopped analytics"); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/47612") public void testOutlierDetectionStopAndRestart() throws Exception { String sourceIndex = "test-outlier-detection-stop-and-restart"; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java index fea9753314d7a..c53238dc425d7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java @@ -15,9 +15,6 @@ import org.elasticsearch.action.admin.indices.get.GetIndexAction; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; -import org.elasticsearch.action.admin.indices.refresh.RefreshAction; -import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; -import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.ClusterState; @@ -160,34 +157,18 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF } // Reindexing is complete; start analytics - ActionListener refreshListener = ActionListener.wrap( + ActionListener reindexCompletedListener = ActionListener.wrap( refreshResponse -> { if (task.isStopping()) { LOGGER.debug("[{}] Stopping before starting analytics process", config.getId()); return; } task.setReindexingTaskId(null); - startAnalytics(task, config, false); - }, - error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage()) - ); - - // Refresh to ensure copied index is fully searchable - ActionListener reindexCompletedListener = ActionListener.wrap( - bulkResponse -> { - if (task.isStopping()) { - LOGGER.debug("[{}] Stopping before refreshing destination index", config.getId()); - return; - } task.setReindexingFinished(); auditor.info( config.getId(), Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex())); - ClientHelper.executeAsyncWithOrigin(client, - ClientHelper.ML_ORIGIN, - RefreshAction.INSTANCE, - new RefreshRequest(config.getDest().getIndex()), - refreshListener); + startAnalytics(task, config, false); }, error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage()) ); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java index d6be817804b30..55f5ef6be5353 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java @@ -79,6 +79,7 @@ public StartDataFrameAnalyticsAction.TaskParams getParams() { } public void setReindexingTaskId(Long reindexingTaskId) { + LOGGER.debug("[{}] Setting reindexing task id to [{}] from [{}]", taskParams.getId(), reindexingTaskId, this.reindexingTaskId); this.reindexingTaskId = reindexingTaskId; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java index 6b6983ce739de..1b887278c41b0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java @@ -237,7 +237,9 @@ public List getFieldNames() { public DataSummary collectDataSummary() { SearchRequestBuilder searchRequestBuilder = buildDataSummarySearchRequestBuilder(); SearchResponse searchResponse = executeSearchRequest(searchRequestBuilder); - return new DataSummary(searchResponse.getHits().getTotalHits().value, context.extractedFields.getAllFields().size()); + long rows = searchResponse.getHits().getTotalHits().value; + LOGGER.debug("[{}] Data summary rows [{}]", context.jobId, rows); + return new DataSummary(rows, context.extractedFields.getAllFields().size()); } public void collectDataSummaryAsync(ActionListener dataSummaryActionListener) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java index 485b9d9d60501..85b40bd64934f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java @@ -81,6 +81,9 @@ public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, return; } + // First we refresh the dest index to ensure data is searchable + refreshDest(config); + ProcessContext processContext = new ProcessContext(config.getId()); if (processContextByAllocation.putIfAbsent(task.getAllocationId(), processContext) != null) { finishHandler.accept(ExceptionsHelper.serverError("[" + processContext.id @@ -147,10 +150,12 @@ private void processData(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig c refreshDest(config); LOGGER.info("[{}] Result processor has completed", config.getId()); } catch (Exception e) { - String errorMsg = new ParameterizedMessage("[{}] Error while processing data [{}]", config.getId(), e.getMessage()) - .getFormattedMessage(); - LOGGER.error(errorMsg, e); - processContextByAllocation.get(task.getAllocationId()).setFailureReason(errorMsg); + if (task.isStopping() == false) { + String errorMsg = new ParameterizedMessage("[{}] Error while processing data [{}]", config.getId(), e.getMessage()) + .getFormattedMessage(); + LOGGER.error(errorMsg, e); + processContextByAllocation.get(task.getAllocationId()).setFailureReason(errorMsg); + } } finally { closeProcess(task);