Skip to content

Commit e0489fc

Browse files
[7.x][ML] Always refresh dest index before starting analytics process (#48090) (#48196)
If a job stops right after reindexing is finished but before we refresh the destination index, we don't refresh at all. If the job is started again right after, it jumps into the analyzing state. However, the data is still not searchable. This is why we were seeing test failures that we start the process expecting X rows (where X is lower than the expected number of docs) and we end up getting X+. We fix this by moving the refresh of the dest index right before we start the process so it always ensures the data is searchable. Closes #47612 Backport of #48090
1 parent eb7969e commit e0489fc

File tree

6 files changed

+15
-28
lines changed

6 files changed

+15
-28
lines changed

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java

-1
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,6 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsFifty() throws Exception
238238
"Finished analysis");
239239
}
240240

241-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/47612")
242241
public void testStopAndRestart() throws Exception {
243242
initialize("regression_stop_and_restart");
244243

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java

-1
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,6 @@ public void testLazyAssignmentWithModelMemoryLimitTooHighForAssignment() throws
550550
"Stopped analytics");
551551
}
552552

553-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/47612")
554553
public void testOutlierDetectionStopAndRestart() throws Exception {
555554
String sourceIndex = "test-outlier-detection-stop-and-restart";
556555

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java

+2-21
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,6 @@
1515
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
1616
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
1717
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
18-
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
19-
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
20-
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
2118
import org.elasticsearch.action.support.ContextPreservingActionListener;
2219
import org.elasticsearch.client.node.NodeClient;
2320
import org.elasticsearch.cluster.ClusterState;
@@ -160,34 +157,18 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
160157
}
161158

162159
// Reindexing is complete; start analytics
163-
ActionListener<RefreshResponse> refreshListener = ActionListener.wrap(
160+
ActionListener<BulkByScrollResponse> reindexCompletedListener = ActionListener.wrap(
164161
refreshResponse -> {
165162
if (task.isStopping()) {
166163
LOGGER.debug("[{}] Stopping before starting analytics process", config.getId());
167164
return;
168165
}
169166
task.setReindexingTaskId(null);
170-
startAnalytics(task, config, false);
171-
},
172-
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
173-
);
174-
175-
// Refresh to ensure copied index is fully searchable
176-
ActionListener<BulkByScrollResponse> reindexCompletedListener = ActionListener.wrap(
177-
bulkResponse -> {
178-
if (task.isStopping()) {
179-
LOGGER.debug("[{}] Stopping before refreshing destination index", config.getId());
180-
return;
181-
}
182167
task.setReindexingFinished();
183168
auditor.info(
184169
config.getId(),
185170
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex()));
186-
ClientHelper.executeAsyncWithOrigin(client,
187-
ClientHelper.ML_ORIGIN,
188-
RefreshAction.INSTANCE,
189-
new RefreshRequest(config.getDest().getIndex()),
190-
refreshListener);
171+
startAnalytics(task, config, false);
191172
},
192173
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
193174
);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java

+1
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public StartDataFrameAnalyticsAction.TaskParams getParams() {
7979
}
8080

8181
public void setReindexingTaskId(Long reindexingTaskId) {
82+
LOGGER.debug("[{}] Setting reindexing task id to [{}] from [{}]", taskParams.getId(), reindexingTaskId, this.reindexingTaskId);
8283
this.reindexingTaskId = reindexingTaskId;
8384
}
8485

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,9 @@ public List<String> getFieldNames() {
237237
public DataSummary collectDataSummary() {
238238
SearchRequestBuilder searchRequestBuilder = buildDataSummarySearchRequestBuilder();
239239
SearchResponse searchResponse = executeSearchRequest(searchRequestBuilder);
240-
return new DataSummary(searchResponse.getHits().getTotalHits().value, context.extractedFields.getAllFields().size());
240+
long rows = searchResponse.getHits().getTotalHits().value;
241+
LOGGER.debug("[{}] Data summary rows [{}]", context.jobId, rows);
242+
return new DataSummary(rows, context.extractedFields.getAllFields().size());
241243
}
242244

243245
public void collectDataSummaryAsync(ActionListener<DataSummary> dataSummaryActionListener) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config,
8181
return;
8282
}
8383

84+
// First we refresh the dest index to ensure data is searchable
85+
refreshDest(config);
86+
8487
ProcessContext processContext = new ProcessContext(config.getId());
8588
if (processContextByAllocation.putIfAbsent(task.getAllocationId(), processContext) != null) {
8689
finishHandler.accept(ExceptionsHelper.serverError("[" + processContext.id
@@ -147,10 +150,12 @@ private void processData(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig c
147150
refreshDest(config);
148151
LOGGER.info("[{}] Result processor has completed", config.getId());
149152
} catch (Exception e) {
150-
String errorMsg = new ParameterizedMessage("[{}] Error while processing data [{}]", config.getId(), e.getMessage())
151-
.getFormattedMessage();
152-
LOGGER.error(errorMsg, e);
153-
processContextByAllocation.get(task.getAllocationId()).setFailureReason(errorMsg);
153+
if (task.isStopping() == false) {
154+
String errorMsg = new ParameterizedMessage("[{}] Error while processing data [{}]", config.getId(), e.getMessage())
155+
.getFormattedMessage();
156+
LOGGER.error(errorMsg, e);
157+
processContextByAllocation.get(task.getAllocationId()).setFailureReason(errorMsg);
158+
}
154159
} finally {
155160
closeProcess(task);
156161

0 commit comments

Comments
 (0)