Skip to content

Commit 3907867

Browse files
committed
[ML] Change JobManager to work with Job config in index (#33064)
1 parent 6a8bef4 commit 3907867

28 files changed

+1269
-611
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,15 @@
1212
import org.elasticsearch.xpack.core.ml.job.config.JobState;
1313
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
1414

15+
import java.util.Collection;
16+
import java.util.Set;
17+
import java.util.stream.Collectors;
18+
1519
public final class MlTasks {
1620

21+
public static final String JOB_TASK_PREFIX = "job-";
22+
public static final String DATAFEED_TASK_PREFIX = "datafeed-";
23+
1724
private MlTasks() {
1825
}
1926

@@ -22,15 +29,15 @@ private MlTasks() {
2229
* A datafeed id can be used as a job id, because they are stored separately in cluster state.
2330
*/
2431
public static String jobTaskId(String jobId) {
25-
return "job-" + jobId;
32+
return JOB_TASK_PREFIX + jobId;
2633
}
2734

2835
/**
2936
* Namespaces the task ids for datafeeds.
3037
* A job id can be used as a datafeed id, because they are stored separately in cluster state.
3138
*/
3239
public static String datafeedTaskId(String datafeedId) {
33-
return "datafeed-" + datafeedId;
40+
return DATAFEED_TASK_PREFIX + datafeedId;
3441
}
3542

3643
@Nullable
@@ -67,4 +74,17 @@ public static DatafeedState getDatafeedState(String datafeedId, @Nullable Persis
6774
return DatafeedState.STOPPED;
6875
}
6976
}
77+
78+
/**
79+
* The job Ids of anomaly detector job tasks
80+
* @param tasks Active tasks
81+
* @return The job Ids of anomaly detector job tasks
82+
*/
83+
public static Set<String> openJobIds(PersistentTasksCustomMetaData tasks) {
84+
Collection<PersistentTasksCustomMetaData.PersistentTask<?>> activeTasks = tasks.tasks();
85+
86+
return activeTasks.stream().filter(t -> t.getId().startsWith(JOB_TASK_PREFIX))
87+
.map(t -> t.getId().substring(JOB_TASK_PREFIX.length()))
88+
.collect(Collectors.toSet());
89+
}
7090
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
364364
Auditor auditor = new Auditor(client, clusterService.nodeName());
365365
JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings);
366366
UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(settings, client, clusterService, threadPool);
367-
JobManager jobManager = new JobManager(env, settings, jobResultsProvider, clusterService, auditor, client, notifier);
367+
JobManager jobManager = new JobManager(env, settings, jobResultsProvider, clusterService, auditor, threadPool, client, notifier);
368368

369369
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(settings, client);
370370
JobResultsPersister jobResultsPersister = new JobResultsPersister(settings, client);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarAction.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,11 @@ protected void doExecute(DeleteCalendarAction.Request request, ActionListener<Ac
6464
listener.onFailure(new ResourceNotFoundException("No calendar with id [" + calendarId + "]"));
6565
return;
6666
}
67-
jobManager.updateProcessOnCalendarChanged(calendar.getJobIds());
68-
listener.onResponse(new AcknowledgedResponse(true));
67+
68+
jobManager.updateProcessOnCalendarChanged(calendar.getJobIds(), ActionListener.wrap(
69+
r -> listener.onResponse(new AcknowledgedResponse(true)),
70+
listener::onFailure
71+
));
6972
},
7073
listener::onFailure));
7174
},

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarEventAction.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,10 @@ public void onResponse(DeleteResponse response) {
104104
if (response.status() == RestStatus.NOT_FOUND) {
105105
listener.onFailure(new ResourceNotFoundException("No event with id [" + eventId + "]"));
106106
} else {
107-
jobManager.updateProcessOnCalendarChanged(calendar.getJobIds());
108-
listener.onResponse(new AcknowledgedResponse(true));
107+
jobManager.updateProcessOnCalendarChanged(calendar.getJobIds(), ActionListener.wrap(
108+
r -> listener.onResponse(new AcknowledgedResponse(true)),
109+
listener::onFailure
110+
));
109111
}
110112
}
111113

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteModelSnapshotAction.java

Lines changed: 34 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,11 @@
1313
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1414
import org.elasticsearch.client.Client;
1515
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
16-
import org.elasticsearch.cluster.service.ClusterService;
1716
import org.elasticsearch.common.inject.Inject;
1817
import org.elasticsearch.common.settings.Settings;
1918
import org.elasticsearch.threadpool.ThreadPool;
2019
import org.elasticsearch.transport.TransportService;
2120
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
22-
import org.elasticsearch.xpack.core.ml.job.config.Job;
2321
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
2422
import org.elasticsearch.xpack.core.ml.job.persistence.JobDataDeleter;
2523
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
@@ -34,20 +32,20 @@ public class TransportDeleteModelSnapshotAction extends HandledTransportAction<D
3432
AcknowledgedResponse> {
3533

3634
private final Client client;
35+
private final JobManager jobManager;
3736
private final JobResultsProvider jobResultsProvider;
38-
private final ClusterService clusterService;
3937
private final Auditor auditor;
4038

4139
@Inject
4240
public TransportDeleteModelSnapshotAction(Settings settings, TransportService transportService, ThreadPool threadPool,
4341
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
44-
JobResultsProvider jobResultsProvider, ClusterService clusterService, Client client,
42+
JobManager jobManager, JobResultsProvider jobResultsProvider, Client client,
4543
Auditor auditor) {
4644
super(settings, DeleteModelSnapshotAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
4745
DeleteModelSnapshotAction.Request::new);
4846
this.client = client;
47+
this.jobManager = jobManager;
4948
this.jobResultsProvider = jobResultsProvider;
50-
this.clusterService = clusterService;
5149
this.auditor = auditor;
5250
}
5351

@@ -71,32 +69,40 @@ protected void doExecute(DeleteModelSnapshotAction.Request request, ActionListen
7169
ModelSnapshot deleteCandidate = deleteCandidates.get(0);
7270

7371
// Verify the snapshot is not being used
74-
Job job = JobManager.getJobOrThrowIfUnknown(request.getJobId(), clusterService.state());
75-
String currentModelInUse = job.getModelSnapshotId();
76-
if (currentModelInUse != null && currentModelInUse.equals(request.getSnapshotId())) {
77-
throw new IllegalArgumentException(Messages.getMessage(Messages.REST_CANNOT_DELETE_HIGHEST_PRIORITY,
78-
request.getSnapshotId(), request.getJobId()));
79-
}
72+
jobManager.getJob(request.getJobId(), ActionListener.wrap(
73+
job -> {
74+
String currentModelInUse = job.getModelSnapshotId();
75+
if (currentModelInUse != null && currentModelInUse.equals(request.getSnapshotId())) {
76+
listener.onFailure(
77+
new IllegalArgumentException(Messages.getMessage(Messages.REST_CANNOT_DELETE_HIGHEST_PRIORITY,
78+
request.getSnapshotId(), request.getJobId())));
79+
return;
80+
}
81+
82+
// Delete the snapshot and any associated state files
83+
JobDataDeleter deleter = new JobDataDeleter(client, request.getJobId());
84+
deleter.deleteModelSnapshots(Collections.singletonList(deleteCandidate),
85+
new ActionListener<BulkResponse>() {
86+
@Override
87+
public void onResponse(BulkResponse bulkResponse) {
88+
String msg = Messages.getMessage(Messages.JOB_AUDIT_SNAPSHOT_DELETED,
89+
deleteCandidate.getSnapshotId(), deleteCandidate.getDescription());
8090

81-
// Delete the snapshot and any associated state files
82-
JobDataDeleter deleter = new JobDataDeleter(client, request.getJobId());
83-
deleter.deleteModelSnapshots(Collections.singletonList(deleteCandidate), new ActionListener<BulkResponse>() {
84-
@Override
85-
public void onResponse(BulkResponse bulkResponse) {
86-
String msg = Messages.getMessage(Messages.JOB_AUDIT_SNAPSHOT_DELETED, deleteCandidate.getSnapshotId(),
87-
deleteCandidate.getDescription());
88-
auditor.info(request.getJobId(), msg);
89-
logger.debug("[{}] {}", request.getJobId(), msg);
90-
// We don't care about the bulk response, just that it succeeded
91-
listener.onResponse(new AcknowledgedResponse(true));
92-
}
91+
auditor.info(request.getJobId(), msg);
92+
logger.debug("[{}] {}", request.getJobId(), msg);
93+
// We don't care about the bulk response, just that it succeeded
94+
listener.onResponse(new AcknowledgedResponse(true));
95+
}
9396

94-
@Override
95-
public void onFailure(Exception e) {
96-
listener.onFailure(e);
97-
}
98-
});
97+
@Override
98+
public void onFailure(Exception e) {
99+
listener.onFailure(e);
100+
}
101+
});
99102

103+
},
104+
listener::onFailure
105+
));
100106
}, listener::onFailure);
101107
}
102108
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportForecastJobAction.java

Lines changed: 50 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import org.elasticsearch.Version;
1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.action.support.ActionFilters;
12-
import org.elasticsearch.cluster.ClusterState;
1312
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1413
import org.elasticsearch.cluster.service.ClusterService;
1514
import org.elasticsearch.common.inject.Inject;
@@ -42,15 +41,17 @@ public class TransportForecastJobAction extends TransportJobTaskAction<ForecastJ
4241
private static final ByteSizeValue FORECAST_LOCAL_STORAGE_LIMIT = new ByteSizeValue(500, ByteSizeUnit.MB);
4342

4443
private final JobResultsProvider jobResultsProvider;
44+
private final JobManager jobManager;
4545
@Inject
4646
public TransportForecastJobAction(Settings settings, TransportService transportService, ThreadPool threadPool,
4747
ClusterService clusterService, ActionFilters actionFilters,
4848
IndexNameExpressionResolver indexNameExpressionResolver, JobResultsProvider jobResultsProvider,
49-
AutodetectProcessManager processManager) {
49+
JobManager jobManager, AutodetectProcessManager processManager) {
5050
super(settings, ForecastJobAction.NAME, threadPool, clusterService, transportService, actionFilters,
5151
indexNameExpressionResolver, ForecastJobAction.Request::new, ForecastJobAction.Response::new,
5252
ThreadPool.Names.SAME, processManager);
5353
this.jobResultsProvider = jobResultsProvider;
54+
this.jobManager = jobManager;
5455
// ThreadPool.Names.SAME, because operations is executed by autodetect worker thread
5556
}
5657

@@ -64,57 +65,63 @@ protected ForecastJobAction.Response readTaskResponse(StreamInput in) throws IOE
6465
@Override
6566
protected void taskOperation(ForecastJobAction.Request request, TransportOpenJobAction.JobTask task,
6667
ActionListener<ForecastJobAction.Response> listener) {
67-
ClusterState state = clusterService.state();
68-
Job job = JobManager.getJobOrThrowIfUnknown(task.getJobId(), state);
69-
validate(job, request);
68+
jobManager.getJob(task.getJobId(), ActionListener.wrap(
69+
job -> {
70+
validate(job, request);
7071

71-
ForecastParams.Builder paramsBuilder = ForecastParams.builder();
72+
ForecastParams.Builder paramsBuilder = ForecastParams.builder();
7273

73-
if (request.getDuration() != null) {
74-
paramsBuilder.duration(request.getDuration());
75-
}
74+
if (request.getDuration() != null) {
75+
paramsBuilder.duration(request.getDuration());
76+
}
7677

77-
if (request.getExpiresIn() != null) {
78-
paramsBuilder.expiresIn(request.getExpiresIn());
79-
}
78+
if (request.getExpiresIn() != null) {
79+
paramsBuilder.expiresIn(request.getExpiresIn());
80+
}
8081

81-
// tmp storage might be null, we do not log here, because it might not be
82-
// required
83-
Path tmpStorage = processManager.tryGetTmpStorage(task, FORECAST_LOCAL_STORAGE_LIMIT);
84-
if (tmpStorage != null) {
85-
paramsBuilder.tmpStorage(tmpStorage.toString());
86-
}
82+
// tmp storage might be null, we do not log here, because it might not be
83+
// required
84+
Path tmpStorage = processManager.tryGetTmpStorage(task, FORECAST_LOCAL_STORAGE_LIMIT);
85+
if (tmpStorage != null) {
86+
paramsBuilder.tmpStorage(tmpStorage.toString());
87+
}
8788

88-
ForecastParams params = paramsBuilder.build();
89-
processManager.forecastJob(task, params, e -> {
90-
if (e == null) {
91-
Consumer<ForecastRequestStats> forecastRequestStatsHandler = forecastRequestStats -> {
92-
if (forecastRequestStats == null) {
93-
// paranoia case, it should not happen that we do not retrieve a result
94-
listener.onFailure(new ElasticsearchException(
95-
"Cannot run forecast: internal error, please check the logs"));
96-
} else if (forecastRequestStats.getStatus() == ForecastRequestStats.ForecastRequestStatus.FAILED) {
97-
List<String> messages = forecastRequestStats.getMessages();
98-
if (messages.size() > 0) {
99-
listener.onFailure(ExceptionsHelper.badRequestException("Cannot run forecast: "
100-
+ messages.get(0)));
89+
ForecastParams params = paramsBuilder.build();
90+
processManager.forecastJob(task, params, e -> {
91+
if (e == null) {
92+
; getForecastRequestStats(request.getJobId(), params.getForecastId(), listener);
10193
} else {
102-
// paranoia case, it should not be possible to have an empty message list
103-
listener.onFailure(
104-
new ElasticsearchException(
105-
"Cannot run forecast: internal error, please check the logs"));
94+
listener.onFailure(e);
10695
}
107-
} else {
108-
listener.onResponse(new ForecastJobAction.Response(true, params.getForecastId()));
109-
}
110-
};
96+
});
97+
},
98+
listener::onFailure
99+
));
100+
}
111101

112-
jobResultsProvider.getForecastRequestStats(request.getJobId(), params.getForecastId(),
113-
forecastRequestStatsHandler, listener::onFailure);
102+
private void getForecastRequestStats(String jobId, String forecastId, ActionListener<ForecastJobAction.Response> listener) {
103+
Consumer<ForecastRequestStats> forecastRequestStatsHandler = forecastRequestStats -> {
104+
if (forecastRequestStats == null) {
105+
// paranoia case, it should not happen that we do not retrieve a result
106+
listener.onFailure(new ElasticsearchException(
107+
"Cannot run forecast: internal error, please check the logs"));
108+
} else if (forecastRequestStats.getStatus() == ForecastRequestStats.ForecastRequestStatus.FAILED) {
109+
List<String> messages = forecastRequestStats.getMessages();
110+
if (messages.size() > 0) {
111+
listener.onFailure(ExceptionsHelper.badRequestException("Cannot run forecast: "
112+
+ messages.get(0)));
113+
} else {
114+
// paranoia case, it should not be possible to have an empty message list
115+
listener.onFailure(
116+
new ElasticsearchException(
117+
"Cannot run forecast: internal error, please check the logs"));
118+
}
114119
} else {
115-
listener.onFailure(e);
120+
listener.onResponse(new ForecastJobAction.Response(true, forecastId));
116121
}
117-
});
122+
};
123+
124+
jobResultsProvider.getForecastRequestStats(jobId, forecastId, forecastRequestStatsHandler, listener::onFailure);
118125
}
119126

120127
static void validate(Job job, ForecastJobAction.Request request) {

0 commit comments

Comments
 (0)