Skip to content

[7.x][ML] Always refresh dest index before starting analytics process… #48196

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsFifty() throws Exception
"Finished analysis");
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/47612")
public void testStopAndRestart() throws Exception {
initialize("regression_stop_and_restart");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,6 @@ public void testLazyAssignmentWithModelMemoryLimitTooHighForAssignment() throws
"Stopped analytics");
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/47612")
public void testOutlierDetectionStopAndRestart() throws Exception {
String sourceIndex = "test-outlier-detection-stop-and-restart";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -160,34 +157,18 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
}

// Reindexing is complete; start analytics
ActionListener<RefreshResponse> refreshListener = ActionListener.wrap(
ActionListener<BulkByScrollResponse> reindexCompletedListener = ActionListener.wrap(
refreshResponse -> {
if (task.isStopping()) {
LOGGER.debug("[{}] Stopping before starting analytics process", config.getId());
return;
}
task.setReindexingTaskId(null);
startAnalytics(task, config, false);
},
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
);

// Refresh to ensure copied index is fully searchable
ActionListener<BulkByScrollResponse> reindexCompletedListener = ActionListener.wrap(
bulkResponse -> {
if (task.isStopping()) {
LOGGER.debug("[{}] Stopping before refreshing destination index", config.getId());
return;
}
task.setReindexingFinished();
auditor.info(
config.getId(),
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex()));
ClientHelper.executeAsyncWithOrigin(client,
ClientHelper.ML_ORIGIN,
RefreshAction.INSTANCE,
new RefreshRequest(config.getDest().getIndex()),
refreshListener);
startAnalytics(task, config, false);
},
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public StartDataFrameAnalyticsAction.TaskParams getParams() {
}

public void setReindexingTaskId(Long reindexingTaskId) {
LOGGER.debug("[{}] Setting reindexing task id to [{}] from [{}]", taskParams.getId(), reindexingTaskId, this.reindexingTaskId);
this.reindexingTaskId = reindexingTaskId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@ public List<String> getFieldNames() {
public DataSummary collectDataSummary() {
SearchRequestBuilder searchRequestBuilder = buildDataSummarySearchRequestBuilder();
SearchResponse searchResponse = executeSearchRequest(searchRequestBuilder);
return new DataSummary(searchResponse.getHits().getTotalHits().value, context.extractedFields.getAllFields().size());
long rows = searchResponse.getHits().getTotalHits().value;
LOGGER.debug("[{}] Data summary rows [{}]", context.jobId, rows);
return new DataSummary(rows, context.extractedFields.getAllFields().size());
}

public void collectDataSummaryAsync(ActionListener<DataSummary> dataSummaryActionListener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config,
return;
}

// First we refresh the dest index to ensure data is searchable
refreshDest(config);

ProcessContext processContext = new ProcessContext(config.getId());
if (processContextByAllocation.putIfAbsent(task.getAllocationId(), processContext) != null) {
finishHandler.accept(ExceptionsHelper.serverError("[" + processContext.id
Expand Down Expand Up @@ -147,10 +150,12 @@ private void processData(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig c
refreshDest(config);
LOGGER.info("[{}] Result processor has completed", config.getId());
} catch (Exception e) {
String errorMsg = new ParameterizedMessage("[{}] Error while processing data [{}]", config.getId(), e.getMessage())
.getFormattedMessage();
LOGGER.error(errorMsg, e);
processContextByAllocation.get(task.getAllocationId()).setFailureReason(errorMsg);
if (task.isStopping() == false) {
String errorMsg = new ParameterizedMessage("[{}] Error while processing data [{}]", config.getId(), e.getMessage())
.getFormattedMessage();
LOGGER.error(errorMsg, e);
processContextByAllocation.get(task.getAllocationId()).setFailureReason(errorMsg);
}
} finally {
closeProcess(task);

Expand Down