Skip to content

Commit 76fa500

Browse files
[ML] Update reindexing task progress before persisting job progress (#61868)
This fixes a bug introduced by #61782. In that PR I thought I could simplify the persistence of progress by using the progress straight from the stats holder in the task instead of calling the get stats action. However, I overlooked that it is then possible to have stale progress for the reindexing task as that is only updated when the get stats API is called. In this commit this is fixed by updating reindexing task progress before persisting the job progress. This seems to be much more lightweight than calling the get stats request. Closes #61852
1 parent 8c37d05 commit 76fa500

File tree

3 files changed

+30
-13
lines changed

3 files changed

+30
-13
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java

+1
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ protected void taskOperation(GetDataFrameAnalyticsStatsAction.Request request, D
119119
}, listener::onFailure
120120
);
121121

122+
// We must update the progress of the reindexing task as it might be stale
122123
task.updateReindexTaskProgress(reindexingProgressListener);
123124
}
124125

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java

+22-10
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ void persistProgress(Client client, String jobId, Runnable runnable) {
290290

291291
String progressDocId = StoredProgress.documentId(jobId);
292292

293-
// Step 3: Run the runnable provided as the argument
293+
// Step 4: Run the runnable provided as the argument
294294
ActionListener<IndexResponse> indexProgressDocListener = ActionListener.wrap(
295295
indexResponse -> {
296296
LOGGER.debug("[{}] Successfully indexed progress document", jobId);
@@ -303,7 +303,7 @@ void persistProgress(Client client, String jobId, Runnable runnable) {
303303
}
304304
);
305305

306-
// Step 2: Create or update the progress document:
306+
// Step 3: Create or update the progress document:
307307
// - if the document did not exist, create the new one in the current write index
308308
// - if the document did exist, update it in the index where it resides (not necessarily the current write index)
309309
ActionListener<SearchResponse> searchFormerProgressDocListener = ActionListener.wrap(
@@ -331,14 +331,26 @@ void persistProgress(Client client, String jobId, Runnable runnable) {
331331
}
332332
);
333333

334-
// Step 1: Search for existing progress document in .ml-state*
335-
SearchRequest searchRequest =
336-
new SearchRequest(AnomalyDetectorsIndex.jobStateIndexPattern())
337-
.source(
338-
new SearchSourceBuilder()
339-
.size(1)
340-
.query(new IdsQueryBuilder().addIds(progressDocId)));
341-
executeAsyncWithOrigin(client, ML_ORIGIN, SearchAction.INSTANCE, searchRequest, searchFormerProgressDocListener);
334+
// Step 2: Search for existing progress document in .ml-state*
335+
ActionListener<Void> reindexProgressUpdateListener = ActionListener.wrap(
336+
aVoid -> {
337+
SearchRequest searchRequest =
338+
new SearchRequest(AnomalyDetectorsIndex.jobStateIndexPattern())
339+
.source(
340+
new SearchSourceBuilder()
341+
.size(1)
342+
.query(new IdsQueryBuilder().addIds(progressDocId)));
343+
executeAsyncWithOrigin(client, ML_ORIGIN, SearchAction.INSTANCE, searchRequest, searchFormerProgressDocListener);
344+
},
345+
e -> {
346+
LOGGER.error(new ParameterizedMessage(
347+
"[{}] cannot persist progress as an error occurred while updating reindexing task progress", taskParams.getId()), e);
348+
runnable.run();
349+
}
350+
);
351+
352+
// Step 1: Update reindexing progress as it could be stale
353+
updateReindexTaskProgress(reindexProgressUpdateListener);
342354
}
343355

344356
/**

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTaskTests.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.Map;
4747

4848
import static org.hamcrest.Matchers.equalTo;
49+
import static org.hamcrest.Matchers.hasSize;
4950
import static org.mockito.Matchers.any;
5051
import static org.mockito.Matchers.eq;
5152
import static org.mockito.Matchers.same;
@@ -216,8 +217,9 @@ private void testSetFailed(boolean nodeShuttingDown) throws IOException {
216217
PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, mock(ThreadPool.class), client);
217218
TaskManager taskManager = mock(TaskManager.class);
218219

220+
// We leave reindexing progress here to zero in order to check it is updated before it is persisted
219221
List<PhaseProgress> progress = List.of(
220-
new PhaseProgress(ProgressTracker.REINDEXING, 100),
222+
new PhaseProgress(ProgressTracker.REINDEXING, 0),
221223
new PhaseProgress(ProgressTracker.LOADING_DATA, 100),
222224
new PhaseProgress(ProgressTracker.WRITING_RESULTS, 30));
223225

@@ -239,6 +241,7 @@ private void testSetFailed(boolean nodeShuttingDown) throws IOException {
239241
new DataFrameAnalyticsTask(
240242
123, "type", "action", null, Map.of(), client, clusterService, analyticsManager, auditor, taskParams);
241243
task.init(persistentTasksService, taskManager, "task-id", 42);
244+
task.setReindexingFinished();
242245
Exception exception = new Exception("some exception");
243246

244247
task.setFailed(exception);
@@ -260,7 +263,8 @@ private void testSetFailed(boolean nodeShuttingDown) throws IOException {
260263
try (XContentParser parser = JsonXContent.jsonXContent.createParser(
261264
NamedXContentRegistry.EMPTY, DeprecationHandler.IGNORE_DEPRECATIONS, indexRequest.source().utf8ToString())) {
262265
StoredProgress parsedProgress = StoredProgress.PARSER.apply(parser, null);
263-
assertThat(parsedProgress.get(), equalTo(progress));
266+
assertThat(parsedProgress.get(), hasSize(3));
267+
assertThat(parsedProgress.get().get(0), equalTo(new PhaseProgress("reindexing", 100)));
264268
}
265269

266270
verify(client).execute(
@@ -269,7 +273,7 @@ private void testSetFailed(boolean nodeShuttingDown) throws IOException {
269273
"task-id", 42, new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.FAILED, 42, "some exception"))),
270274
any());
271275
}
272-
verifyNoMoreInteractions(client, clusterService, analyticsManager, auditor, taskManager);
276+
verifyNoMoreInteractions(client, analyticsManager, auditor, taskManager);
273277
}
274278

275279
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)