Skip to content

Commit 2e36ba1

Browse files
authored
[ML] ensure results index is updated before writing forecast documents (#68748)
When a job opens, we update the current forecasts that are opened to failed. But, before doing so, we need to make sure the results index is up to date. This is to protect us from any forecast document changes in the future.
1 parent 82253a3 commit 2e36ba1

File tree

1 file changed

+20
-5
lines changed

1 file changed

+20
-5
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.xpack.core.ml.job.config.JobState;
4141
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
4242
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
43+
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
4344
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
4445
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
4546
import org.elasticsearch.xpack.ml.MachineLearning;
@@ -195,13 +196,27 @@ protected void nodeOperation(AllocatedPersistentTask task, OpenJobAction.JobPara
195196
jobTask.setAutodetectProcessManager(autodetectProcessManager);
196197
JobTaskState jobTaskState = (JobTaskState) state;
197198
JobState jobState = jobTaskState == null ? null : jobTaskState.getState();
198-
jobResultsProvider.setRunningForecastsToFailed(params.getJobId(), ActionListener.wrap(
199-
r -> runJob(jobTask, jobState, params),
199+
ActionListener<Boolean> resultsMappingUpdateHandler = ActionListener.wrap(
200+
mappingsUpdate -> jobResultsProvider.setRunningForecastsToFailed(params.getJobId(), ActionListener.wrap(
201+
r -> runJob(jobTask, jobState, params),
202+
e -> {
203+
logger.warn(new ParameterizedMessage("[{}] failed to set forecasts to failed", params.getJobId()), e);
204+
runJob(jobTask, jobState, params);
205+
}
206+
)),
200207
e -> {
201-
logger.warn(new ParameterizedMessage("[{}] failed to set forecasts to failed", params.getJobId()), e);
202-
runJob(jobTask, jobState, params);
208+
logger.error(new ParameterizedMessage("[{}] Failed to update results mapping", params.getJobId()), e);
209+
jobTask.markAsFailed(e);
203210
}
204-
));
211+
);
212+
// We need to update the results index as we MAY update the current forecast results, setting the running forcasts to failed
213+
// This writes to the results index, which might need updating
214+
ElasticsearchMappings.addDocMappingIfMissing(
215+
AnomalyDetectorsIndex.jobResultsAliasedName(params.getJobId()),
216+
AnomalyDetectorsIndex::resultsMapping,
217+
client,
218+
clusterState,
219+
resultsMappingUpdateHandler);
205220
}
206221

207222
private void runJob(JobTask jobTask, JobState jobState, OpenJobAction.JobParams params) {

0 commit comments

Comments
 (0)