Skip to content

Commit e6e2cda

Browse files
committed
Clear Job#finished_time when it is opened (#32605) (#32755)
* Clear Job#finished_time when it is opened (#32605) * not returning failure when Job#finished_time is not reset * Changing error log string and source string
1 parent 19c84b9 commit e6e2cda

File tree

2 files changed

+110
-4
lines changed

2 files changed

+110
-4
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
1919
import org.elasticsearch.client.Client;
2020
import org.elasticsearch.cluster.ClusterState;
21+
import org.elasticsearch.cluster.ClusterStateUpdateTask;
2122
import org.elasticsearch.cluster.block.ClusterBlockException;
2223
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2324
import org.elasticsearch.cluster.metadata.AliasOrIndex;
2425
import org.elasticsearch.cluster.metadata.IndexMetaData;
2526
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2627
import org.elasticsearch.cluster.metadata.MappingMetaData;
28+
import org.elasticsearch.cluster.metadata.MetaData;
2729
import org.elasticsearch.cluster.node.DiscoveryNode;
2830
import org.elasticsearch.cluster.routing.IndexRoutingTable;
2931
import org.elasticsearch.cluster.service.ClusterService;
@@ -455,12 +457,25 @@ protected ClusterBlockException checkBlock(OpenJobAction.Request request, Cluste
455457
protected void masterOperation(OpenJobAction.Request request, ClusterState state, ActionListener<OpenJobAction.Response> listener) {
456458
OpenJobAction.JobParams jobParams = request.getJobParams();
457459
if (licenseState.isMachineLearningAllowed()) {
458-
// Step 5. Wait for job to be started and respond
459-
ActionListener<PersistentTasksCustomMetaData.PersistentTask<OpenJobAction.JobParams>> finalListener =
460+
461+
// Step 6. Clear job finished time once the job is started and respond
462+
ActionListener<OpenJobAction.Response> clearJobFinishTime = ActionListener.wrap(
463+
response -> {
464+
if (response.isAcknowledged()) {
465+
clearJobFinishedTime(jobParams.getJobId(), listener);
466+
} else {
467+
listener.onResponse(response);
468+
}
469+
},
470+
listener::onFailure
471+
);
472+
473+
// Step 5. Wait for job to be started
474+
ActionListener<PersistentTasksCustomMetaData.PersistentTask<OpenJobAction.JobParams>> waitForJobToStart =
460475
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<OpenJobAction.JobParams>>() {
461476
@Override
462477
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<OpenJobAction.JobParams> task) {
463-
waitForJobStarted(task.getId(), jobParams, listener);
478+
waitForJobStarted(task.getId(), jobParams, clearJobFinishTime);
464479
}
465480

466481
@Override
@@ -476,7 +491,7 @@ public void onFailure(Exception e) {
476491
// Step 4. Start job task
477492
ActionListener<PutJobAction.Response> jobUpateListener = ActionListener.wrap(
478493
response -> persistentTasksService.sendStartRequest(MlTasks.jobTaskId(jobParams.getJobId()),
479-
OpenJobAction.TASK_NAME, jobParams, finalListener),
494+
OpenJobAction.TASK_NAME, jobParams, waitForJobToStart),
480495
listener::onFailure
481496
);
482497

@@ -579,6 +594,35 @@ public void onTimeout(TimeValue timeout) {
579594
});
580595
}
581596

597+
private void clearJobFinishedTime(String jobId, ActionListener<OpenJobAction.Response> listener) {
598+
clusterService.submitStateUpdateTask("clearing-job-finish-time-for-" + jobId, new ClusterStateUpdateTask() {
599+
@Override
600+
public ClusterState execute(ClusterState currentState) {
601+
MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState);
602+
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata);
603+
Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId));
604+
jobBuilder.setFinishedTime(null);
605+
606+
mlMetadataBuilder.putJob(jobBuilder.build(), true);
607+
ClusterState.Builder builder = ClusterState.builder(currentState);
608+
return builder.metaData(new MetaData.Builder(currentState.metaData())
609+
.putCustom(MlMetadata.TYPE, mlMetadataBuilder.build()))
610+
.build();
611+
}
612+
613+
@Override
614+
public void onFailure(String source, Exception e) {
615+
logger.error("[" + jobId + "] Failed to clear finished_time; source [" + source + "]", e);
616+
listener.onResponse(new OpenJobAction.Response(true));
617+
}
618+
619+
@Override
620+
public void clusterStateProcessed(String source, ClusterState oldState,
621+
ClusterState newState) {
622+
listener.onResponse(new OpenJobAction.Response(true));
623+
}
624+
});
625+
}
582626
private void cancelJobStart(PersistentTasksCustomMetaData.PersistentTask<OpenJobAction.JobParams> persistentTask, Exception exception,
583627
ActionListener<OpenJobAction.Response> listener) {
584628
persistentTasksService.sendRemoveRequest(persistentTask.getId(),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ml.integration;
7+
8+
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
9+
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
10+
import org.elasticsearch.xpack.core.ml.job.config.Detector;
11+
import org.elasticsearch.xpack.core.ml.job.config.Job;
12+
import org.junit.After;
13+
14+
import java.util.Collections;
15+
16+
import static org.hamcrest.CoreMatchers.notNullValue;
17+
import static org.hamcrest.CoreMatchers.nullValue;
18+
import static org.hamcrest.Matchers.is;
19+
20+
public class ReopenJobResetsFinishedTimeIT extends MlNativeAutodetectIntegTestCase {
21+
22+
@After
23+
public void cleanUpTest() {
24+
cleanUp();
25+
}
26+
27+
public void test() {
28+
final String jobId = "reset-finished-time-test";
29+
Job.Builder job = createJob(jobId);
30+
31+
registerJob(job);
32+
putJob(job);
33+
openJob(job.getId());
34+
35+
assertThat(getSingleJob(jobId).getFinishedTime(), is(nullValue()));
36+
37+
closeJob(jobId);
38+
assertThat(getSingleJob(jobId).getFinishedTime(), is(notNullValue()));
39+
40+
openJob(jobId);
41+
assertThat(getSingleJob(jobId).getFinishedTime(), is(nullValue()));
42+
}
43+
44+
private Job getSingleJob(String jobId) {
45+
return getJob(jobId).get(0);
46+
}
47+
48+
private Job.Builder createJob(String id) {
49+
DataDescription.Builder dataDescription = new DataDescription.Builder();
50+
dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
51+
dataDescription.setTimeFormat(DataDescription.EPOCH_MS);
52+
53+
Detector.Builder d = new Detector.Builder("count", null);
54+
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build()));
55+
56+
Job.Builder builder = new Job.Builder();
57+
builder.setId(id);
58+
builder.setAnalysisConfig(analysisConfig);
59+
builder.setDataDescription(dataDescription);
60+
return builder;
61+
}
62+
}

0 commit comments

Comments
 (0)