Skip to content

Commit 7fba568

Browse files
[7.5][ML] Always refresh dest index before starting analytics process (#48090) (#48197)
If a job stops right after reindexing is finished but before we refresh the destination index, we don't refresh at all. If the job is started again right after, it jumps into the analyzing state. However, the data is still not searchable. This is why we were seeing test failures that we start the process expecting X rows (where X is lower than the expected number of docs) and we end up getting X+. We fix this by moving the refresh of the dest index right before we start the process so it always ensures the data is searchable. Closes #47612 Backport of #48090
1 parent 7da51d7 commit 7fba568

File tree

6 files changed

+15
-28
lines changed

6 files changed

+15
-28
lines changed

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java

-1
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,6 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsFifty() throws Exception
236236
"Finished analysis");
237237
}
238238

239-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/47612")
240239
public void testStopAndRestart() throws Exception {
241240
initialize("regression_stop_and_restart");
242241

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java

-1
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,6 @@ public void testLazyAssignmentWithModelMemoryLimitTooHighForAssignment() throws
548548
"Stopped analytics");
549549
}
550550

551-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/47612")
552551
public void testOutlierDetectionStopAndRestart() throws Exception {
553552
String sourceIndex = "test-outlier-detection-stop-and-restart";
554553

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java

+2-21
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,6 @@
1515
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
1616
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
1717
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
18-
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
19-
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
20-
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
2118
import org.elasticsearch.action.support.ContextPreservingActionListener;
2219
import org.elasticsearch.client.node.NodeClient;
2320
import org.elasticsearch.cluster.ClusterState;
@@ -160,34 +157,18 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
160157
}
161158

162159
// Reindexing is complete; start analytics
163-
ActionListener<RefreshResponse> refreshListener = ActionListener.wrap(
160+
ActionListener<BulkByScrollResponse> reindexCompletedListener = ActionListener.wrap(
164161
refreshResponse -> {
165162
if (task.isStopping()) {
166163
LOGGER.debug("[{}] Stopping before starting analytics process", config.getId());
167164
return;
168165
}
169166
task.setReindexingTaskId(null);
170-
startAnalytics(task, config, false);
171-
},
172-
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
173-
);
174-
175-
// Refresh to ensure copied index is fully searchable
176-
ActionListener<BulkByScrollResponse> reindexCompletedListener = ActionListener.wrap(
177-
bulkResponse -> {
178-
if (task.isStopping()) {
179-
LOGGER.debug("[{}] Stopping before refreshing destination index", config.getId());
180-
return;
181-
}
182167
task.setReindexingFinished();
183168
auditor.info(
184169
config.getId(),
185170
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex()));
186-
ClientHelper.executeAsyncWithOrigin(client,
187-
ClientHelper.ML_ORIGIN,
188-
RefreshAction.INSTANCE,
189-
new RefreshRequest(config.getDest().getIndex()),
190-
refreshListener);
171+
startAnalytics(task, config, false);
191172
},
192173
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
193174
);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java

+1
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public StartDataFrameAnalyticsAction.TaskParams getParams() {
7979
}
8080

8181
public void setReindexingTaskId(Long reindexingTaskId) {
82+
LOGGER.debug("[{}] Setting reindexing task id to [{}] from [{}]", taskParams.getId(), reindexingTaskId, this.reindexingTaskId);
8283
this.reindexingTaskId = reindexingTaskId;
8384
}
8485

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,9 @@ public List<String> getFieldNames() {
237237
public DataSummary collectDataSummary() {
238238
SearchRequestBuilder searchRequestBuilder = buildDataSummarySearchRequestBuilder();
239239
SearchResponse searchResponse = executeSearchRequest(searchRequestBuilder);
240-
return new DataSummary(searchResponse.getHits().getTotalHits().value, context.extractedFields.getAllFields().size());
240+
long rows = searchResponse.getHits().getTotalHits().value;
241+
LOGGER.debug("[{}] Data summary rows [{}]", context.jobId, rows);
242+
return new DataSummary(rows, context.extractedFields.getAllFields().size());
241243
}
242244

243245
public void collectDataSummaryAsync(ActionListener<DataSummary> dataSummaryActionListener) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config,
7777
return;
7878
}
7979

80+
// First we refresh the dest index to ensure data is searchable
81+
refreshDest(config);
82+
8083
ProcessContext processContext = new ProcessContext(config.getId());
8184
if (processContextByAllocation.putIfAbsent(task.getAllocationId(), processContext) != null) {
8285
finishHandler.accept(ExceptionsHelper.serverError("[" + processContext.id
@@ -143,10 +146,12 @@ private void processData(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig c
143146
refreshDest(config);
144147
LOGGER.info("[{}] Result processor has completed", config.getId());
145148
} catch (Exception e) {
146-
String errorMsg = new ParameterizedMessage("[{}] Error while processing data [{}]", config.getId(), e.getMessage())
147-
.getFormattedMessage();
148-
LOGGER.error(errorMsg, e);
149-
processContextByAllocation.get(task.getAllocationId()).setFailureReason(errorMsg);
149+
if (task.isStopping() == false) {
150+
String errorMsg = new ParameterizedMessage("[{}] Error while processing data [{}]", config.getId(), e.getMessage())
151+
.getFormattedMessage();
152+
LOGGER.error(errorMsg, e);
153+
processContextByAllocation.get(task.getAllocationId()).setFailureReason(errorMsg);
154+
}
150155
} finally {
151156
closeProcess(task);
152157

0 commit comments

Comments
 (0)