Skip to content

Commit c8707ea

Browse files
authored
[ML] only persist progress if it has changed (#62123)
* [ML] only persist progress if it has changed We already search for the previously stored progress document. For optimization purposes, and to prevent restoring the same progress after a failed analytics job is stopped, this commit does an equality check between the previously stored progress and current progress If the progress has changed, persistence continues as normal.
1 parent 6008a74 commit c8707ea

File tree

3 files changed

+17
-4
lines changed

3 files changed

+17
-4
lines changed

x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -779,7 +779,6 @@ public void testUpdateAnalytics() throws Exception {
779779
assertThat(getOnlyElement(getAnalytics(jobId)).getDescription(), is(equalTo("updated-description-2")));
780780
}
781781

782-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/61913")
783782
public void testTooLowConfiguredMemoryStillStarts() throws Exception {
784783
initialize("low_memory_analysis");
785784
indexData(sourceIndex, 10_000, 0, NESTED_FIELD);

x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ private void cleanUpAnalytics() {
9393
for (DataFrameAnalyticsConfig config : analytics) {
9494
try {
9595
assertThat(deleteAnalytics(config.getId()).isAcknowledged(), is(true));
96-
assertThat(searchStoredProgress(config.getId()).getHits().getTotalHits().value, equalTo(0L));
9796
} catch (Exception e) {
9897
// just log and ignore
9998
logger.error(new ParameterizedMessage("[{}] Could not clean up analytics job config", config.getId()), e);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.elasticsearch.xpack.ml.dataframe.stats.ProgressTracker;
5050
import org.elasticsearch.xpack.ml.dataframe.stats.StatsHolder;
5151
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
52+
import org.elasticsearch.xpack.ml.utils.persistence.MlParserUtils;
5253

5354
import java.util.List;
5455
import java.util.Map;
@@ -309,17 +310,31 @@ void persistProgress(Client client, String jobId, Runnable runnable) {
309310
ActionListener<SearchResponse> searchFormerProgressDocListener = ActionListener.wrap(
310311
searchResponse -> {
311312
String indexOrAlias = AnomalyDetectorsIndex.jobStateIndexWriteAlias();
313+
StoredProgress previous = null;
312314
if (searchResponse.getHits().getHits().length > 0) {
313315
indexOrAlias = searchResponse.getHits().getHits()[0].getIndex();
316+
try {
317+
previous = MlParserUtils.parse(searchResponse.getHits().getHits()[0], StoredProgress.PARSER);
318+
} catch (Exception ex) {
319+
LOGGER.warn(new ParameterizedMessage("[{}] failed to parse previously stored progress", jobId), ex);
320+
}
314321
}
322+
323+
List<PhaseProgress> progress = statsHolder.getProgressTracker().report();
324+
final StoredProgress progressToStore = new StoredProgress(progress);
325+
if (progressToStore.equals(previous)) {
326+
LOGGER.debug("[{}] new progress is the same as previously persisted progress. Skipping storage.", jobId);
327+
runnable.run();
328+
return;
329+
}
330+
315331
IndexRequest indexRequest = new IndexRequest(indexOrAlias)
316332
.id(progressDocId)
317333
.setRequireAlias(AnomalyDetectorsIndex.jobStateIndexWriteAlias().equals(indexOrAlias))
318334
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
319-
List<PhaseProgress> progress = statsHolder.getProgressTracker().report();
320335
try (XContentBuilder jsonBuilder = JsonXContent.contentBuilder()) {
321336
LOGGER.debug("[{}] Persisting progress is: {}", jobId, progress);
322-
new StoredProgress(progress).toXContent(jsonBuilder, Payload.XContent.EMPTY_PARAMS);
337+
progressToStore.toXContent(jsonBuilder, Payload.XContent.EMPTY_PARAMS);
323338
indexRequest.source(jsonBuilder);
324339
}
325340
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, indexProgressDocListener);

0 commit comments

Comments
 (0)