Skip to content

Commit ec40597

Browse files
[7.x][ML] Update reindexing task progress before persisting job progress (#61868) (#61875)
This fixes a bug introduced by #61782. In that PR I thought I could simplify the persistence of progress by using the progress straight from the stats holder in the task instead of calling the get stats action. However, I overlooked that it is then possible to have stale progress for the reindexing task as that is only updated when the get stats API is called. In this commit this is fixed by updating reindexing task progress before persisting the job progress. This seems to be much more lightweight than calling the get stats request. Closes #61852 Backport of #61868
1 parent c22415c commit ec40597

File tree

3 files changed

+30
-13
lines changed

3 files changed

+30
-13
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ protected void taskOperation(GetDataFrameAnalyticsStatsAction.Request request, D
119119
}, listener::onFailure
120120
);
121121

122+
// We must update the progress of the reindexing task as it might be stale
122123
task.updateReindexTaskProgress(reindexingProgressListener);
123124
}
124125

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ void persistProgress(Client client, String jobId, Runnable runnable) {
290290

291291
String progressDocId = StoredProgress.documentId(jobId);
292292

293-
// Step 3: Run the runnable provided as the argument
293+
// Step 4: Run the runnable provided as the argument
294294
ActionListener<IndexResponse> indexProgressDocListener = ActionListener.wrap(
295295
indexResponse -> {
296296
LOGGER.debug("[{}] Successfully indexed progress document", jobId);
@@ -303,7 +303,7 @@ void persistProgress(Client client, String jobId, Runnable runnable) {
303303
}
304304
);
305305

306-
// Step 2: Create or update the progress document:
306+
// Step 3: Create or update the progress document:
307307
// - if the document did not exist, create the new one in the current write index
308308
// - if the document did exist, update it in the index where it resides (not necessarily the current write index)
309309
ActionListener<SearchResponse> searchFormerProgressDocListener = ActionListener.wrap(
@@ -331,14 +331,26 @@ void persistProgress(Client client, String jobId, Runnable runnable) {
331331
}
332332
);
333333

334-
// Step 1: Search for existing progress document in .ml-state*
335-
SearchRequest searchRequest =
336-
new SearchRequest(AnomalyDetectorsIndex.jobStateIndexPattern())
337-
.source(
338-
new SearchSourceBuilder()
339-
.size(1)
340-
.query(new IdsQueryBuilder().addIds(progressDocId)));
341-
executeAsyncWithOrigin(client, ML_ORIGIN, SearchAction.INSTANCE, searchRequest, searchFormerProgressDocListener);
334+
// Step 2: Search for existing progress document in .ml-state*
335+
ActionListener<Void> reindexProgressUpdateListener = ActionListener.wrap(
336+
aVoid -> {
337+
SearchRequest searchRequest =
338+
new SearchRequest(AnomalyDetectorsIndex.jobStateIndexPattern())
339+
.source(
340+
new SearchSourceBuilder()
341+
.size(1)
342+
.query(new IdsQueryBuilder().addIds(progressDocId)));
343+
executeAsyncWithOrigin(client, ML_ORIGIN, SearchAction.INSTANCE, searchRequest, searchFormerProgressDocListener);
344+
},
345+
e -> {
346+
LOGGER.error(new ParameterizedMessage(
347+
"[{}] cannot persist progress as an error occurred while updating reindexing task progress", taskParams.getId()), e);
348+
runnable.run();
349+
}
350+
);
351+
352+
// Step 1: Update reindexing progress as it could be stale
353+
updateReindexTaskProgress(reindexProgressUpdateListener);
342354
}
343355

344356
/**

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTaskTests.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.util.List;
4646

4747
import static org.hamcrest.Matchers.equalTo;
48+
import static org.hamcrest.Matchers.hasSize;
4849
import static org.mockito.Matchers.any;
4950
import static org.mockito.Matchers.eq;
5051
import static org.mockito.Matchers.same;
@@ -217,8 +218,9 @@ private void testSetFailed(boolean nodeShuttingDown) throws IOException {
217218
PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, mock(ThreadPool.class), client);
218219
TaskManager taskManager = mock(TaskManager.class);
219220

221+
// We leave reindexing progress here to zero in order to check it is updated before it is persisted
220222
List<PhaseProgress> progress = Arrays.asList(
221-
new PhaseProgress(ProgressTracker.REINDEXING, 100),
223+
new PhaseProgress(ProgressTracker.REINDEXING, 0),
222224
new PhaseProgress(ProgressTracker.LOADING_DATA, 100),
223225
new PhaseProgress(ProgressTracker.WRITING_RESULTS, 30));
224226

@@ -240,6 +242,7 @@ private void testSetFailed(boolean nodeShuttingDown) throws IOException {
240242
new DataFrameAnalyticsTask(
241243
123, "type", "action", null, Collections.emptyMap(), client, clusterService, analyticsManager, auditor, taskParams);
242244
task.init(persistentTasksService, taskManager, "task-id", 42);
245+
task.setReindexingFinished();
243246
Exception exception = new Exception("some exception");
244247

245248
task.setFailed(exception);
@@ -261,7 +264,8 @@ private void testSetFailed(boolean nodeShuttingDown) throws IOException {
261264
try (XContentParser parser = JsonXContent.jsonXContent.createParser(
262265
NamedXContentRegistry.EMPTY, DeprecationHandler.IGNORE_DEPRECATIONS, indexRequest.source().utf8ToString())) {
263266
StoredProgress parsedProgress = StoredProgress.PARSER.apply(parser, null);
264-
assertThat(parsedProgress.get(), equalTo(progress));
267+
assertThat(parsedProgress.get(), hasSize(3));
268+
assertThat(parsedProgress.get().get(0), equalTo(new PhaseProgress("reindexing", 100)));
265269
}
266270

267271
verify(client).execute(
@@ -270,7 +274,7 @@ private void testSetFailed(boolean nodeShuttingDown) throws IOException {
270274
"task-id", 42, new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.FAILED, 42, "some exception"))),
271275
any());
272276
}
273-
verifyNoMoreInteractions(client, clusterService, analyticsManager, auditor, taskManager);
277+
verifyNoMoreInteractions(client, analyticsManager, auditor, taskManager);
274278
}
275279

276280
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)