Skip to content

Commit 617db2e

Browse files
authored
[ML] Write header to autodetect before it is visible to other calls (#41486)
Backport of #41085
1 parent 36ad069 commit 617db2e

File tree

2 files changed

+16
-5
lines changed

2 files changed

+16
-5
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,8 @@ public class AutodetectCommunicator implements Closeable {
8989
&& job.getAnalysisConfig().getCategorizationFieldName() != null;
9090
}
9191

92-
public void init(ModelSnapshot modelSnapshot) throws IOException {
92+
public void restoreState(ModelSnapshot modelSnapshot) {
9393
autodetectProcess.restoreState(stateStreamer, modelSnapshot);
94-
createProcessWriter(Optional.empty()).writeHeader();
9594
}
9695

9796
private DataToProcessWriter createProcessWriter(Optional<DataDescription> dataDescription) {
@@ -100,6 +99,17 @@ private DataToProcessWriter createProcessWriter(Optional<DataDescription> dataDe
10099
dataCountsReporter, xContentRegistry);
101100
}
102101

102+
/**
103+
* This must be called once before {@link #writeToJob(InputStream, AnalysisRegistry, XContentType, DataLoadParams, BiConsumer)}
104+
* can be used
105+
*/
106+
public void writeHeader() throws IOException {
107+
createProcessWriter(Optional.empty()).writeHeader();
108+
}
109+
110+
/**
111+
* Call {@link #writeHeader()} exactly once before using this method
112+
*/
103113
public void writeToJob(InputStream inputStream, AnalysisRegistry analysisRegistry, XContentType xContentType,
104114
DataLoadParams params, BiConsumer<DataCounts, Exception> handler) {
105115
submitOperation(() -> {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,7 @@ protected void doRun() {
467467

468468
try {
469469
createProcessAndSetRunning(processContext, job, params, closeHandler);
470-
processContext.getAutodetectCommunicator().init(params.modelSnapshot());
470+
processContext.getAutodetectCommunicator().restoreState(params.modelSnapshot());
471471
setJobState(jobTask, JobState.OPENED);
472472
} catch (Exception e1) {
473473
// No need to log here as the persistent task framework will log it
@@ -508,14 +508,15 @@ protected void doRun() {
508508
private void createProcessAndSetRunning(ProcessContext processContext,
509509
Job job,
510510
AutodetectParams params,
511-
BiConsumer<Exception, Boolean> handler) {
511+
BiConsumer<Exception, Boolean> handler) throws IOException {
512512
// At this point we lock the process context until the process has been started.
513513
// The reason behind this is to ensure closing the job does not happen before
514514
// the process is started as that can result to the job getting seemingly closed
515515
// but the actual process is hanging alive.
516516
processContext.tryLock();
517517
try {
518518
AutodetectCommunicator communicator = create(processContext.getJobTask(), job, params, handler);
519+
communicator.writeHeader();
519520
processContext.setRunning(communicator);
520521
} finally {
521522
// Now that the process is running and we have updated its state we can unlock.
@@ -646,7 +647,7 @@ public void closeJob(JobTask jobTask, boolean restart, String reason) {
646647
processContext.tryLock();
647648
try {
648649
if (processContext.setDying() == false) {
649-
logger.debug("Cannot close job [{}] as it has already been closed", jobId);
650+
logger.debug("Cannot close job [{}] as it has been marked as dying", jobId);
650651
return;
651652
}
652653

0 commit comments

Comments
 (0)