Skip to content

Commit 022e5f5

Browse files
authored
A few improvements to AnalyticsProcessManager class that make the code more readable. (#50026)
1 parent 133b34c commit 022e5f5

File tree

3 files changed

+134
-122
lines changed

3 files changed

+134
-122
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -233,15 +233,7 @@ private void startAnalytics(DataFrameAnalyticsTask task, DataFrameAnalyticsConfi
233233
DataFrameAnalyticsTaskState analyzingState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.ANALYZING,
234234
task.getAllocationId(), null);
235235
task.updatePersistentTaskState(analyzingState, ActionListener.wrap(
236-
updatedTask -> processManager.runJob(task, config, dataExtractorFactory,
237-
error -> {
238-
if (error != null) {
239-
task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage());
240-
} else {
241-
auditor.info(config.getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_ANALYSIS);
242-
task.markAsCompleted();
243-
}
244-
}),
236+
updatedTask -> processManager.runJob(task, config, dataExtractorFactory),
245237
error -> {
246238
if (ExceptionsHelper.unwrapCause(error) instanceof ResourceNotFoundException) {
247239
// Task has stopped

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java

Lines changed: 78 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.apache.logging.log4j.LogManager;
99
import org.apache.logging.log4j.Logger;
1010
import org.apache.logging.log4j.message.ParameterizedMessage;
11+
import org.apache.lucene.util.SetOnce;
1112
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
1213
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
1314
import org.elasticsearch.action.search.SearchResponse;
@@ -90,19 +91,19 @@ public AnalyticsProcessManager(Client client,
9091
this.trainedModelProvider = Objects.requireNonNull(trainedModelProvider);
9192
}
9293

93-
public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, DataFrameDataExtractorFactory dataExtractorFactory,
94-
Consumer<Exception> finishHandler) {
94+
public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, DataFrameDataExtractorFactory dataExtractorFactory) {
9595
executorServiceForJob.execute(() -> {
96-
ProcessContext processContext = new ProcessContext(config.getId());
96+
ProcessContext processContext = new ProcessContext(config);
9797
synchronized (processContextByAllocation) {
9898
if (task.isStopping()) {
9999
// The task was requested to stop before we created the process context
100-
finishHandler.accept(null);
100+
auditor.info(config.getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_ANALYSIS);
101+
task.markAsCompleted();
101102
return;
102103
}
103104
if (processContextByAllocation.putIfAbsent(task.getAllocationId(), processContext) != null) {
104-
finishHandler.accept(
105-
ExceptionsHelper.serverError("[" + config.getId() + "] Could not create process as one already exists"));
105+
task.updateState(
106+
DataFrameAnalyticsState.FAILED, "[" + config.getId() + "] Could not create process as one already exists");
106107
return;
107108
}
108109
}
@@ -113,13 +114,13 @@ public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config,
113114
// Fetch existing model state (if any)
114115
BytesReference state = getModelState(config);
115116

116-
if (processContext.startProcess(dataExtractorFactory, config, task, state)) {
117-
executorServiceForProcess.execute(() -> processResults(processContext));
118-
executorServiceForProcess.execute(() -> processData(task, config, processContext.dataExtractor,
119-
processContext.process, processContext.resultProcessor, finishHandler, state));
117+
if (processContext.startProcess(dataExtractorFactory, task, state)) {
118+
executorServiceForProcess.execute(() -> processContext.resultProcessor.get().process(processContext.process.get()));
119+
executorServiceForProcess.execute(() -> processData(task, processContext, state));
120120
} else {
121121
processContextByAllocation.remove(task.getAllocationId());
122-
finishHandler.accept(null);
122+
auditor.info(config.getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_ANALYSIS);
123+
task.markAsCompleted();
123124
}
124125
});
125126
}
@@ -140,26 +141,18 @@ private BytesReference getModelState(DataFrameAnalyticsConfig config) {
140141
}
141142
}
142143

143-
private void processResults(ProcessContext processContext) {
144+
private void processData(DataFrameAnalyticsTask task, ProcessContext processContext, BytesReference state) {
145+
DataFrameAnalyticsConfig config = processContext.config;
146+
DataFrameDataExtractor dataExtractor = processContext.dataExtractor.get();
147+
AnalyticsProcess<AnalyticsResult> process = processContext.process.get();
148+
AnalyticsResultProcessor resultProcessor = processContext.resultProcessor.get();
144149
try {
145-
processContext.resultProcessor.process(processContext.process);
146-
} catch (Exception e) {
147-
processContext.setFailureReason(e.getMessage());
148-
}
149-
}
150-
151-
private void processData(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, DataFrameDataExtractor dataExtractor,
152-
AnalyticsProcess<AnalyticsResult> process, AnalyticsResultProcessor resultProcessor,
153-
Consumer<Exception> finishHandler, BytesReference state) {
154-
155-
try {
156-
ProcessContext processContext = processContextByAllocation.get(task.getAllocationId());
157150
writeHeaderRecord(dataExtractor, process);
158151
writeDataRows(dataExtractor, process, config.getAnalysis(), task.getProgressTracker());
159152
process.writeEndOfDataMessage();
160153
process.flushStream();
161154

162-
restoreState(config, state, process, finishHandler);
155+
restoreState(task, config, state, process);
163156

164157
LOGGER.info("[{}] Waiting for result processor to complete", config.getId());
165158
resultProcessor.awaitForCompletion();
@@ -168,26 +161,34 @@ private void processData(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig c
168161
refreshDest(config);
169162
LOGGER.info("[{}] Result processor has completed", config.getId());
170163
} catch (Exception e) {
171-
if (task.isStopping() == false) {
172-
String errorMsg = new ParameterizedMessage("[{}] Error while processing data [{}]", config.getId(), e.getMessage())
173-
.getFormattedMessage();
164+
if (task.isStopping()) {
165+
// Errors during task stopping are expected but we still want to log them just in case.
166+
String errorMsg =
167+
new ParameterizedMessage(
168+
"[{}] Error while processing data [{}]; task is stopping", config.getId(), e.getMessage()).getFormattedMessage();
169+
LOGGER.debug(errorMsg, e);
170+
} else {
171+
String errorMsg =
172+
new ParameterizedMessage("[{}] Error while processing data [{}]", config.getId(), e.getMessage()).getFormattedMessage();
174173
LOGGER.error(errorMsg, e);
175-
processContextByAllocation.get(task.getAllocationId()).setFailureReason(errorMsg);
174+
processContext.setFailureReason(errorMsg);
176175
}
177176
} finally {
178177
closeProcess(task);
179178

180-
ProcessContext processContext = processContextByAllocation.remove(task.getAllocationId());
179+
processContextByAllocation.remove(task.getAllocationId());
181180
LOGGER.debug("Removed process context for task [{}]; [{}] processes still running", config.getId(),
182181
processContextByAllocation.size());
183182

184183
if (processContext.getFailureReason() == null) {
185184
// This results in marking the persistent task as complete
186185
LOGGER.info("[{}] Marking task completed", config.getId());
187-
finishHandler.accept(null);
186+
auditor.info(config.getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_ANALYSIS);
187+
task.markAsCompleted();
188188
} else {
189189
LOGGER.error("[{}] Marking task failed; {}", config.getId(), processContext.getFailureReason());
190190
task.updateState(DataFrameAnalyticsState.FAILED, processContext.getFailureReason());
191+
// Note: We are not marking the task as failed here as we want the user to be able to inspect the failure reason.
191192
}
192193
}
193194
}
@@ -239,8 +240,8 @@ private void writeHeaderRecord(DataFrameDataExtractor dataExtractor, AnalyticsPr
239240
process.writeRecord(headerRecord);
240241
}
241242

242-
private void restoreState(DataFrameAnalyticsConfig config, @Nullable BytesReference state, AnalyticsProcess<AnalyticsResult> process,
243-
Consumer<Exception> failureHandler) {
243+
private void restoreState(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, @Nullable BytesReference state,
244+
AnalyticsProcess<AnalyticsResult> process) {
244245
if (config.getAnalysis().persistsState() == false) {
245246
LOGGER.debug("[{}] Analysis does not support state", config.getId());
246247
return;
@@ -258,7 +259,7 @@ private void restoreState(DataFrameAnalyticsConfig config, @Nullable BytesRefere
258259
process.restoreState(state);
259260
} catch (Exception e) {
260261
LOGGER.error(new ParameterizedMessage("[{}] Failed to restore state", process.getConfig().jobId()), e);
261-
failureHandler.accept(ExceptionsHelper.serverError("Failed to restore state", e));
262+
task.updateState(DataFrameAnalyticsState.FAILED, "Failed to restore state: " + e.getMessage());
262263
}
263264
}
264265

@@ -293,9 +294,10 @@ private void closeProcess(DataFrameAnalyticsTask task) {
293294

294295
ProcessContext processContext = processContextByAllocation.get(task.getAllocationId());
295296
try {
296-
processContext.process.close();
297+
processContext.process.get().close();
297298
LOGGER.info("[{}] Closed process", configId);
298299
} catch (Exception e) {
300+
LOGGER.error("[" + configId + "] Error closing data frame analyzer process", e);
299301
String errorMsg = new ParameterizedMessage(
300302
"[{}] Error closing data frame analyzer process [{}]", configId, e.getMessage()).getFormattedMessage();
301303
processContext.setFailureReason(errorMsg);
@@ -323,79 +325,76 @@ int getProcessContextCount() {
323325

324326
class ProcessContext {
325327

326-
private final String id;
327-
private volatile AnalyticsProcess<AnalyticsResult> process;
328-
private volatile DataFrameDataExtractor dataExtractor;
329-
private volatile AnalyticsResultProcessor resultProcessor;
330-
private volatile boolean processKilled;
331-
private volatile String failureReason;
328+
private final DataFrameAnalyticsConfig config;
329+
private final SetOnce<AnalyticsProcess<AnalyticsResult>> process = new SetOnce<>();
330+
private final SetOnce<DataFrameDataExtractor> dataExtractor = new SetOnce<>();
331+
private final SetOnce<AnalyticsResultProcessor> resultProcessor = new SetOnce<>();
332+
private final SetOnce<String> failureReason = new SetOnce<>();
332333

333-
ProcessContext(String id) {
334-
this.id = Objects.requireNonNull(id);
334+
ProcessContext(DataFrameAnalyticsConfig config) {
335+
this.config = Objects.requireNonNull(config);
335336
}
336337

337-
synchronized String getFailureReason() {
338-
return failureReason;
338+
String getFailureReason() {
339+
return failureReason.get();
339340
}
340341

341-
synchronized void setFailureReason(String failureReason) {
342-
// Only set the new reason if there isn't one already as we want to keep the first reason
343-
if (this.failureReason == null && failureReason != null) {
344-
this.failureReason = failureReason;
342+
void setFailureReason(String failureReason) {
343+
if (failureReason == null) {
344+
return;
345345
}
346+
// Only set the new reason if there isn't one already as we want to keep the first reason (most likely the root cause).
347+
this.failureReason.trySet(failureReason);
346348
}
347349

348350
synchronized void stop() {
349-
LOGGER.debug("[{}] Stopping process", id);
350-
processKilled = true;
351-
if (dataExtractor != null) {
352-
dataExtractor.cancel();
351+
LOGGER.debug("[{}] Stopping process", config.getId());
352+
if (dataExtractor.get() != null) {
353+
dataExtractor.get().cancel();
353354
}
354-
if (resultProcessor != null) {
355-
resultProcessor.cancel();
355+
if (resultProcessor.get() != null) {
356+
resultProcessor.get().cancel();
356357
}
357-
if (process != null) {
358+
if (process.get() != null) {
358359
try {
359-
process.kill();
360+
process.get().kill();
360361
} catch (IOException e) {
361-
LOGGER.error(new ParameterizedMessage("[{}] Failed to kill process", id), e);
362+
LOGGER.error(new ParameterizedMessage("[{}] Failed to kill process", config.getId()), e);
362363
}
363364
}
364365
}
365366

366367
/**
367368
* @return {@code true} if the process was started or {@code false} if it was not because it was stopped in the meantime
368369
*/
369-
synchronized boolean startProcess(DataFrameDataExtractorFactory dataExtractorFactory, DataFrameAnalyticsConfig config,
370-
DataFrameAnalyticsTask task, @Nullable BytesReference state) {
371-
if (processKilled) {
370+
synchronized boolean startProcess(DataFrameDataExtractorFactory dataExtractorFactory,
371+
DataFrameAnalyticsTask task,
372+
@Nullable BytesReference state) {
373+
if (task.isStopping()) {
372374
// The job was stopped before we started the process so no need to start it
373375
return false;
374376
}
375377

376-
dataExtractor = dataExtractorFactory.newExtractor(false);
378+
dataExtractor.set(dataExtractorFactory.newExtractor(false));
377379
AnalyticsProcessConfig analyticsProcessConfig =
378-
createProcessConfig(config, dataExtractor, dataExtractorFactory.getExtractedFields());
380+
createProcessConfig(dataExtractor.get(), dataExtractorFactory.getExtractedFields());
379381
LOGGER.trace("[{}] creating analytics process with config [{}]", config.getId(), Strings.toString(analyticsProcessConfig));
380382
// If we have no rows, that means there is no data so no point in starting the native process
381383
// just finish the task
382384
if (analyticsProcessConfig.rows() == 0) {
383385
LOGGER.info("[{}] no data found to analyze. Will not start analytics native process.", config.getId());
384386
return false;
385387
}
386-
process = createProcess(task, config, analyticsProcessConfig, state);
387-
DataFrameRowsJoiner dataFrameRowsJoiner = new DataFrameRowsJoiner(config.getId(), client,
388-
dataExtractorFactory.newExtractor(true));
389-
resultProcessor = new AnalyticsResultProcessor(
390-
config, dataFrameRowsJoiner, task.getProgressTracker(), trainedModelProvider, auditor, dataExtractor.getFieldNames());
388+
process.set(createProcess(task, config, analyticsProcessConfig, state));
389+
resultProcessor.set(createResultProcessor(task, dataExtractorFactory));
391390
return true;
392391
}
393392

394-
private AnalyticsProcessConfig createProcessConfig(
395-
DataFrameAnalyticsConfig config, DataFrameDataExtractor dataExtractor, ExtractedFields extractedFields) {
393+
private AnalyticsProcessConfig createProcessConfig(DataFrameDataExtractor dataExtractor,
394+
ExtractedFields extractedFields) {
396395
DataFrameDataExtractor.DataSummary dataSummary = dataExtractor.collectDataSummary();
397396
Set<String> categoricalFields = dataExtractor.getCategoricalFields(config.getAnalysis());
398-
AnalyticsProcessConfig processConfig = new AnalyticsProcessConfig(
397+
return new AnalyticsProcessConfig(
399398
config.getId(),
400399
dataSummary.rows,
401400
dataSummary.cols,
@@ -405,7 +404,14 @@ private AnalyticsProcessConfig createProcessConfig(
405404
categoricalFields,
406405
config.getAnalysis(),
407406
extractedFields);
408-
return processConfig;
407+
}
408+
409+
private AnalyticsResultProcessor createResultProcessor(DataFrameAnalyticsTask task,
410+
DataFrameDataExtractorFactory dataExtractorFactory) {
411+
DataFrameRowsJoiner dataFrameRowsJoiner =
412+
new DataFrameRowsJoiner(config.getId(), client, dataExtractorFactory.newExtractor(true));
413+
return new AnalyticsResultProcessor(
414+
config, dataFrameRowsJoiner, task.getProgressTracker(), trainedModelProvider, auditor, dataExtractor.get().getFieldNames());
409415
}
410416
}
411417
}

0 commit comments

Comments
 (0)