diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java index e5afb04c953bb..cc51f0a7c1a76 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java @@ -238,7 +238,6 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsFifty() throws Exception "Finished analysis"); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/47612") public void testStopAndRestart() throws Exception { initialize("regression_stop_and_restart"); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java index 7bac43cfad960..c3d27b09fde98 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java @@ -550,7 +550,6 @@ public void testLazyAssignmentWithModelMemoryLimitTooHighForAssignment() throws "Stopped analytics"); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/47612") public void testOutlierDetectionStopAndRestart() throws Exception { String sourceIndex = "test-outlier-detection-stop-and-restart"; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java index fea9753314d7a..c53238dc425d7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java @@ -15,9 +15,6 @@ import org.elasticsearch.action.admin.indices.get.GetIndexAction; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; -import org.elasticsearch.action.admin.indices.refresh.RefreshAction; -import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; -import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.ClusterState; @@ -160,34 +157,18 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF } // Reindexing is complete; start analytics - ActionListener refreshListener = ActionListener.wrap( + ActionListener reindexCompletedListener = ActionListener.wrap( refreshResponse -> { if (task.isStopping()) { LOGGER.debug("[{}] Stopping before starting analytics process", config.getId()); return; } task.setReindexingTaskId(null); - startAnalytics(task, config, false); - }, - error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage()) - ); - - // Refresh to ensure copied index is fully searchable - ActionListener reindexCompletedListener = ActionListener.wrap( - bulkResponse -> { - if (task.isStopping()) { - LOGGER.debug("[{}] Stopping before refreshing destination index", config.getId()); - return; - } task.setReindexingFinished(); auditor.info( config.getId(), Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex())); - ClientHelper.executeAsyncWithOrigin(client, - ClientHelper.ML_ORIGIN, - RefreshAction.INSTANCE, - new RefreshRequest(config.getDest().getIndex()), - refreshListener); + startAnalytics(task, config, false); }, error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage()) ); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java index d6be817804b30..55f5ef6be5353 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java @@ -79,6 +79,7 @@ public StartDataFrameAnalyticsAction.TaskParams getParams() { } public void setReindexingTaskId(Long reindexingTaskId) { + LOGGER.debug("[{}] Setting reindexing task id to [{}] from [{}]", taskParams.getId(), reindexingTaskId, this.reindexingTaskId); this.reindexingTaskId = reindexingTaskId; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java index 6b6983ce739de..1b887278c41b0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java @@ -237,7 +237,9 @@ public List getFieldNames() { public DataSummary collectDataSummary() { SearchRequestBuilder searchRequestBuilder = buildDataSummarySearchRequestBuilder(); SearchResponse searchResponse = executeSearchRequest(searchRequestBuilder); - return new DataSummary(searchResponse.getHits().getTotalHits().value, context.extractedFields.getAllFields().size()); + long rows = searchResponse.getHits().getTotalHits().value; + LOGGER.debug("[{}] Data summary rows [{}]", context.jobId, rows); + return new DataSummary(rows, context.extractedFields.getAllFields().size()); } public void collectDataSummaryAsync(ActionListener dataSummaryActionListener) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java index 485b9d9d60501..85b40bd64934f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java @@ -81,6 +81,9 @@ public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, return; } + // First we refresh the dest index to ensure data is searchable + refreshDest(config); + ProcessContext processContext = new ProcessContext(config.getId()); if (processContextByAllocation.putIfAbsent(task.getAllocationId(), processContext) != null) { finishHandler.accept(ExceptionsHelper.serverError("[" + processContext.id @@ -147,10 +150,12 @@ private void processData(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig c refreshDest(config); LOGGER.info("[{}] Result processor has completed", config.getId()); } catch (Exception e) { - String errorMsg = new ParameterizedMessage("[{}] Error while processing data [{}]", config.getId(), e.getMessage()) - .getFormattedMessage(); - LOGGER.error(errorMsg, e); - processContextByAllocation.get(task.getAllocationId()).setFailureReason(errorMsg); + if (task.isStopping() == false) { + String errorMsg = new ParameterizedMessage("[{}] Error while processing data [{}]", config.getId(), e.getMessage()) + .getFormattedMessage(); + LOGGER.error(errorMsg, e); + processContextByAllocation.get(task.getAllocationId()).setFailureReason(errorMsg); + } } finally { closeProcess(task);