Skip to content

Commit 7bc6ba3

Browse files
[ML] DFA result processor should only skip rows and model chunks on cancel (elastic#60113)
When the job is force-closed or shutting down due to a fatal error we clean up all cancellable job operations. This includes cancelling the results processor. However, this means that we might not persist objects that are written from the process like stats, memory usage, etc. In hindsight, we do not gain from cancelling the results processor in its entirety. It makes more sense to skip row results and model chunks but keep stats and instrumentation about the job as the latter may contain useful information to understand what happened to the job.
1 parent 8543197 commit 7bc6ba3

File tree

7 files changed

+269
-69
lines changed

7 files changed

+269
-69
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,6 @@ synchronized void stop() {
434434
if (inferenceRunner.get() != null) {
435435
inferenceRunner.get().cancel();
436436
}
437-
statsPersister.cancel();
438437
if (process.get() != null) {
439438
try {
440439
process.get().kill();

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public class AnalyticsResultProcessor {
5656
private final ChunkedTrainedModelPersister chunkedTrainedModelPersister;
5757
private volatile String failure;
5858
private volatile boolean isCancelled;
59+
private long processedRows;
5960

6061
private volatile String latestModelId;
6162

@@ -92,31 +93,17 @@ public void awaitForCompletion() {
9293

9394
public void cancel() {
9495
dataFrameRowsJoiner.cancel();
95-
statsPersister.cancel();
9696
isCancelled = true;
9797
}
9898

9999
public void process(AnalyticsProcess<AnalyticsResult> process) {
100100
long totalRows = process.getConfig().rows();
101-
long processedRows = 0;
102101

103102
// TODO When java 9 features can be used, we will not need the local variable here
104103
try (DataFrameRowsJoiner resultsJoiner = dataFrameRowsJoiner) {
105104
Iterator<AnalyticsResult> iterator = process.readAnalyticsResults();
106105
while (iterator.hasNext()) {
107-
if (isCancelled) {
108-
break;
109-
}
110-
AnalyticsResult result = iterator.next();
111-
processResult(result, resultsJoiner);
112-
if (result.getRowResults() != null) {
113-
if (processedRows == 0) {
114-
LOGGER.info("[{}] Started writing results", analytics.getId());
115-
auditor.info(analytics.getId(), Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_STARTED_WRITING_RESULTS));
116-
}
117-
processedRows++;
118-
updateResultsProgress(processedRows >= totalRows ? 100 : (int) (processedRows * 100.0 / totalRows));
119-
}
106+
processResult(iterator.next(), resultsJoiner, totalRows);
120107
}
121108
} catch (Exception e) {
122109
if (isCancelled) {
@@ -141,10 +128,10 @@ private void completeResultsProgress() {
141128
statsHolder.getProgressTracker().updateWritingResultsProgress(100);
142129
}
143130

144-
private void processResult(AnalyticsResult result, DataFrameRowsJoiner resultsJoiner) {
131+
private void processResult(AnalyticsResult result, DataFrameRowsJoiner resultsJoiner, long totalRows) {
145132
RowResults rowResults = result.getRowResults();
146-
if (rowResults != null) {
147-
resultsJoiner.processRowResults(rowResults);
133+
if (rowResults != null && isCancelled == false) {
134+
processRowResult(resultsJoiner, totalRows, rowResults);
148135
}
149136
PhaseProgress phaseProgress = result.getPhaseProgress();
150137
if (phaseProgress != null) {
@@ -157,7 +144,7 @@ private void processResult(AnalyticsResult result, DataFrameRowsJoiner resultsJo
157144
latestModelId = chunkedTrainedModelPersister.createAndIndexInferenceModelMetadata(modelSize);
158145
}
159146
TrainedModelDefinitionChunk trainedModelDefinitionChunk = result.getTrainedModelDefinitionChunk();
160-
if (trainedModelDefinitionChunk != null) {
147+
if (trainedModelDefinitionChunk != null && isCancelled == false) {
161148
chunkedTrainedModelPersister.createAndIndexInferenceModelDoc(trainedModelDefinitionChunk);
162149
}
163150
MemoryUsage memoryUsage = result.getMemoryUsage();
@@ -181,6 +168,16 @@ private void processResult(AnalyticsResult result, DataFrameRowsJoiner resultsJo
181168
}
182169
}
183170

171+
private void processRowResult(DataFrameRowsJoiner rowsJoiner, long totalRows, RowResults rowResults) {
172+
rowsJoiner.processRowResults(rowResults);
173+
if (processedRows == 0) {
174+
LOGGER.info("[{}] Started writing results", analytics.getId());
175+
auditor.info(analytics.getId(), Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_STARTED_WRITING_RESULTS));
176+
}
177+
processedRows++;
178+
updateResultsProgress(processedRows >= totalRows ? 100 : (int) (processedRows * 100.0 / totalRows));
179+
}
180+
184181
private void setAndReportFailure(Exception e) {
185182
LOGGER.error(new ParameterizedMessage("[{}] Error processing results; ", analytics.getId()), e);
186183
failure = "error processing results; " + e.getMessage();

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/results/AnalyticsResult.java

Lines changed: 79 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,14 @@ public class AnalyticsResult implements ToXContentObject {
6666
private final ModelSizeInfo modelSizeInfo;
6767
private final TrainedModelDefinitionChunk trainedModelDefinitionChunk;
6868

69-
public AnalyticsResult(@Nullable RowResults rowResults,
70-
@Nullable PhaseProgress phaseProgress,
71-
@Nullable MemoryUsage memoryUsage,
72-
@Nullable OutlierDetectionStats outlierDetectionStats,
73-
@Nullable ClassificationStats classificationStats,
74-
@Nullable RegressionStats regressionStats,
75-
@Nullable ModelSizeInfo modelSizeInfo,
76-
@Nullable TrainedModelDefinitionChunk trainedModelDefinitionChunk) {
69+
private AnalyticsResult(@Nullable RowResults rowResults,
70+
@Nullable PhaseProgress phaseProgress,
71+
@Nullable MemoryUsage memoryUsage,
72+
@Nullable OutlierDetectionStats outlierDetectionStats,
73+
@Nullable ClassificationStats classificationStats,
74+
@Nullable RegressionStats regressionStats,
75+
@Nullable ModelSizeInfo modelSizeInfo,
76+
@Nullable TrainedModelDefinitionChunk trainedModelDefinitionChunk) {
7777
this.rowResults = rowResults;
7878
this.phaseProgress = phaseProgress;
7979
this.memoryUsage = memoryUsage;
@@ -172,4 +172,75 @@ public int hashCode() {
172172
return Objects.hash(rowResults, phaseProgress, memoryUsage, outlierDetectionStats, classificationStats,
173173
regressionStats, modelSizeInfo, trainedModelDefinitionChunk);
174174
}
175+
176+
public static Builder builder() {
177+
return new Builder();
178+
}
179+
180+
public static class Builder {
181+
182+
private RowResults rowResults;
183+
private PhaseProgress phaseProgress;
184+
private MemoryUsage memoryUsage;
185+
private OutlierDetectionStats outlierDetectionStats;
186+
private ClassificationStats classificationStats;
187+
private RegressionStats regressionStats;
188+
private ModelSizeInfo modelSizeInfo;
189+
private TrainedModelDefinitionChunk trainedModelDefinitionChunk;
190+
191+
private Builder() {}
192+
193+
public Builder setRowResults(RowResults rowResults) {
194+
this.rowResults = rowResults;
195+
return this;
196+
}
197+
198+
public Builder setPhaseProgress(PhaseProgress phaseProgress) {
199+
this.phaseProgress = phaseProgress;
200+
return this;
201+
}
202+
203+
public Builder setMemoryUsage(MemoryUsage memoryUsage) {
204+
this.memoryUsage = memoryUsage;
205+
return this;
206+
}
207+
208+
public Builder setOutlierDetectionStats(OutlierDetectionStats outlierDetectionStats) {
209+
this.outlierDetectionStats = outlierDetectionStats;
210+
return this;
211+
}
212+
213+
public Builder setClassificationStats(ClassificationStats classificationStats) {
214+
this.classificationStats = classificationStats;
215+
return this;
216+
}
217+
218+
public Builder setRegressionStats(RegressionStats regressionStats) {
219+
this.regressionStats = regressionStats;
220+
return this;
221+
}
222+
223+
public Builder setModelSizeInfo(ModelSizeInfo modelSizeInfo) {
224+
this.modelSizeInfo = modelSizeInfo;
225+
return this;
226+
}
227+
228+
public Builder setTrainedModelDefinitionChunk(TrainedModelDefinitionChunk trainedModelDefinitionChunk) {
229+
this.trainedModelDefinitionChunk = trainedModelDefinitionChunk;
230+
return this;
231+
}
232+
233+
public AnalyticsResult build() {
234+
return new AnalyticsResult(
235+
rowResults,
236+
phaseProgress,
237+
memoryUsage,
238+
outlierDetectionStats,
239+
classificationStats,
240+
regressionStats,
241+
modelSizeInfo,
242+
trainedModelDefinitionChunk
243+
);
244+
}
245+
}
175246
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsPersister.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ public class StatsPersister {
2929
private final String jobId;
3030
private final ResultsPersisterService resultsPersisterService;
3131
private final DataFrameAnalyticsAuditor auditor;
32-
private volatile boolean isCancelled;
3332

3433
public StatsPersister(String jobId, ResultsPersisterService resultsPersisterService, DataFrameAnalyticsAuditor auditor) {
3534
this.jobId = Objects.requireNonNull(jobId);
@@ -38,18 +37,14 @@ public StatsPersister(String jobId, ResultsPersisterService resultsPersisterServ
3837
}
3938

4039
public void persistWithRetry(ToXContentObject result, Function<String, String> docIdSupplier) {
41-
if (isCancelled) {
42-
return;
43-
}
44-
4540
try {
4641
resultsPersisterService.indexWithRetry(jobId,
4742
MlStatsIndex.writeAlias(),
4843
result,
4944
new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")),
5045
WriteRequest.RefreshPolicy.NONE,
5146
docIdSupplier.apply(jobId),
52-
() -> isCancelled == false,
47+
() -> true,
5348
errorMsg -> auditor.error(jobId,
5449
"failed to persist result with id [" + docIdSupplier.apply(jobId) + "]; " + errorMsg)
5550
);
@@ -59,8 +54,4 @@ public void persistWithRetry(ToXContentObject result, Function<String, String> d
5954
LOGGER.error(() -> new ParameterizedMessage("[{}] Failed indexing stats result", jobId), e);
6055
}
6156
}
62-
63-
public void cancel() {
64-
isCancelled = true;
65-
}
6657
}

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public class AnalyticsProcessManagerTests extends ESTestCase {
6060
private static final String CONFIG_ID = "config-id";
6161
private static final int NUM_ROWS = 100;
6262
private static final int NUM_COLS = 4;
63-
private static final AnalyticsResult PROCESS_RESULT = new AnalyticsResult(null, null, null, null, null, null, null, null);
63+
private static final AnalyticsResult PROCESS_RESULT = AnalyticsResult.builder().build();
6464

6565
private Client client;
6666
private DataFrameAnalyticsAuditor auditor;

0 commit comments

Comments
 (0)