Skip to content

Commit b08416b

Browse files
authored
Clear Job#finished_time when it is opened (#32605) (#32755)
* Clear Job#finished_time when it is opened (#32605) * not returning failure when Job#finished_time is not reset * Changing error log string and source string
1 parent f5ba801 commit b08416b

File tree

2 files changed

+110
-4
lines changed

2 files changed

+110
-4
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

+48-4
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
1919
import org.elasticsearch.client.Client;
2020
import org.elasticsearch.cluster.ClusterState;
21+
import org.elasticsearch.cluster.ClusterStateUpdateTask;
2122
import org.elasticsearch.cluster.block.ClusterBlockException;
2223
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2324
import org.elasticsearch.cluster.metadata.AliasOrIndex;
2425
import org.elasticsearch.cluster.metadata.IndexMetaData;
2526
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2627
import org.elasticsearch.cluster.metadata.MappingMetaData;
28+
import org.elasticsearch.cluster.metadata.MetaData;
2729
import org.elasticsearch.cluster.node.DiscoveryNode;
2830
import org.elasticsearch.cluster.routing.IndexRoutingTable;
2931
import org.elasticsearch.cluster.service.ClusterService;
@@ -471,12 +473,25 @@ protected ClusterBlockException checkBlock(OpenJobAction.Request request, Cluste
471473
protected void masterOperation(OpenJobAction.Request request, ClusterState state, ActionListener<OpenJobAction.Response> listener) {
472474
OpenJobAction.JobParams jobParams = request.getJobParams();
473475
if (licenseState.isMachineLearningAllowed()) {
474-
// Step 5. Wait for job to be started and respond
475-
ActionListener<PersistentTasksCustomMetaData.PersistentTask<OpenJobAction.JobParams>> finalListener =
476+
477+
// Step 6. Clear job finished time once the job is started and respond
478+
ActionListener<OpenJobAction.Response> clearJobFinishTime = ActionListener.wrap(
479+
response -> {
480+
if (response.isAcknowledged()) {
481+
clearJobFinishedTime(jobParams.getJobId(), listener);
482+
} else {
483+
listener.onResponse(response);
484+
}
485+
},
486+
listener::onFailure
487+
);
488+
489+
// Step 5. Wait for job to be started
490+
ActionListener<PersistentTasksCustomMetaData.PersistentTask<OpenJobAction.JobParams>> waitForJobToStart =
476491
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<OpenJobAction.JobParams>>() {
477492
@Override
478493
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<OpenJobAction.JobParams> task) {
479-
waitForJobStarted(task.getId(), jobParams, listener);
494+
waitForJobStarted(task.getId(), jobParams, clearJobFinishTime);
480495
}
481496

482497
@Override
@@ -492,7 +507,7 @@ public void onFailure(Exception e) {
492507
// Step 4. Start job task
493508
ActionListener<PutJobAction.Response> establishedMemoryUpdateListener = ActionListener.wrap(
494509
response -> persistentTasksService.sendStartRequest(MlTasks.jobTaskId(jobParams.getJobId()),
495-
OpenJobAction.TASK_NAME, jobParams, finalListener),
510+
OpenJobAction.TASK_NAME, jobParams, waitForJobToStart),
496511
listener::onFailure
497512
);
498513

@@ -574,6 +589,35 @@ public void onTimeout(TimeValue timeout) {
574589
});
575590
}
576591

592+
private void clearJobFinishedTime(String jobId, ActionListener<OpenJobAction.Response> listener) {
593+
clusterService.submitStateUpdateTask("clearing-job-finish-time-for-" + jobId, new ClusterStateUpdateTask() {
594+
@Override
595+
public ClusterState execute(ClusterState currentState) {
596+
MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState);
597+
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata);
598+
Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId));
599+
jobBuilder.setFinishedTime(null);
600+
601+
mlMetadataBuilder.putJob(jobBuilder.build(), true);
602+
ClusterState.Builder builder = ClusterState.builder(currentState);
603+
return builder.metaData(new MetaData.Builder(currentState.metaData())
604+
.putCustom(MlMetadata.TYPE, mlMetadataBuilder.build()))
605+
.build();
606+
}
607+
608+
@Override
609+
public void onFailure(String source, Exception e) {
610+
logger.error("[" + jobId + "] Failed to clear finished_time; source [" + source + "]", e);
611+
listener.onResponse(new OpenJobAction.Response(true));
612+
}
613+
614+
@Override
615+
public void clusterStateProcessed(String source, ClusterState oldState,
616+
ClusterState newState) {
617+
listener.onResponse(new OpenJobAction.Response(true));
618+
}
619+
});
620+
}
577621
private void cancelJobStart(PersistentTasksCustomMetaData.PersistentTask<OpenJobAction.JobParams> persistentTask, Exception exception,
578622
ActionListener<OpenJobAction.Response> listener) {
579623
persistentTasksService.sendRemoveRequest(persistentTask.getId(),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ml.integration;
7+
8+
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
9+
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
10+
import org.elasticsearch.xpack.core.ml.job.config.Detector;
11+
import org.elasticsearch.xpack.core.ml.job.config.Job;
12+
import org.junit.After;
13+
14+
import java.util.Collections;
15+
16+
import static org.hamcrest.CoreMatchers.notNullValue;
17+
import static org.hamcrest.CoreMatchers.nullValue;
18+
import static org.hamcrest.Matchers.is;
19+
20+
public class ReopenJobResetsFinishedTimeIT extends MlNativeAutodetectIntegTestCase {
21+
22+
@After
23+
public void cleanUpTest() {
24+
cleanUp();
25+
}
26+
27+
public void test() {
28+
final String jobId = "reset-finished-time-test";
29+
Job.Builder job = createJob(jobId);
30+
31+
registerJob(job);
32+
putJob(job);
33+
openJob(job.getId());
34+
35+
assertThat(getSingleJob(jobId).getFinishedTime(), is(nullValue()));
36+
37+
closeJob(jobId);
38+
assertThat(getSingleJob(jobId).getFinishedTime(), is(notNullValue()));
39+
40+
openJob(jobId);
41+
assertThat(getSingleJob(jobId).getFinishedTime(), is(nullValue()));
42+
}
43+
44+
private Job getSingleJob(String jobId) {
45+
return getJob(jobId).get(0);
46+
}
47+
48+
private Job.Builder createJob(String id) {
49+
DataDescription.Builder dataDescription = new DataDescription.Builder();
50+
dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
51+
dataDescription.setTimeFormat(DataDescription.EPOCH_MS);
52+
53+
Detector.Builder d = new Detector.Builder("count", null);
54+
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build()));
55+
56+
Job.Builder builder = new Job.Builder();
57+
builder.setId(id);
58+
builder.setAnalysisConfig(analysisConfig);
59+
builder.setDataDescription(dataDescription);
60+
return builder;
61+
}
62+
}

0 commit comments

Comments
 (0)